Files
meijiaka-zy/python-api/app/api/v1/points.py
T
小鱼开发 8e5174c58c fix: 修复轮询接口 CORS 头丢失 + CRUD 类型不匹配
- main.py: 自定义 exception_handler 手动添加 CORS 头,避免 500 响应被浏览器拦截
- crud/base.py: CRUDBase.get 的 id 参数改为 Any,兼容 int/BigInt 主键
- api/v1/points.py: query_recharge_status 去掉 str() 转换,直接传 int order_id
2026-05-08 21:56:56 +08:00

538 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
积分系统 API 路由
=================
提供积分查询、充值、消费、流水查询等功能。
"""
import logging
from datetime import UTC, datetime
import httpx
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import JSONResponse
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.deps import get_current_user, get_db
from app.crud.point_recharge_order import point_recharge_order
from app.crud.point_transaction import point_transaction
from app.models.user import User
from app.schemas.common import ApiResponse, PaginationParams, success_response
from app.schemas.point import (
ConsumeFreezeRequest,
ConsumeFreezeResponse,
ConsumeRefundRequest,
ConsumeResultResponse,
ConsumeSettleRequest,
PointBalanceResponse,
PointTransactionItem,
PointTransactionListResponse,
RechargeOrderItem,
RechargeOrderListResponse,
RechargeRequest,
RechargeResponse,
)
from app.services import point_service
from app.services.wxpay_service import WechatPayError, get_wxpay_service
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/points", tags=["Points"])
# ── 余额查询 ──────────────────────────────────────────
@router.get("/balance", response_model=ApiResponse[PointBalanceResponse])
async def get_balance(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""获取当前用户积分余额"""
balance = await point_service.get_user_balance(db, user_id=current_user.id)
return success_response(data=PointBalanceResponse(**balance))
# ── 流水查询 ──────────────────────────────────────────
@router.get("/transactions", response_model=ApiResponse[PointTransactionListResponse])
async def list_transactions(
pagination: PaginationParams = Depends(),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""获取当前用户积分流水记录"""
items = await point_transaction.get_by_user_id(
db,
user_id=current_user.id,
skip=pagination.offset,
limit=pagination.page_size,
)
# 由于 point_transaction CRUD 没有 count 方法,这里用 items 长度近似
# 实际需要时可以在 CRUD 加 count 方法
total = len(items)
return success_response(
data=PointTransactionListResponse(
items=[PointTransactionItem.model_validate(t) for t in items],
total=total,
skip=pagination.offset,
limit=pagination.page_size,
)
)
# ── 充值 ──────────────────────────────────────────────
@router.post("/recharge", response_model=ApiResponse[RechargeResponse])
async def create_recharge_order(
request: RechargeRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
创建积分充值订单(微信支付 Native 扫码)
1. 创建本地订单记录
2. 调用微信统一下单获取二维码链接 code_url
3. 前端用 code_url 生成二维码,用户微信扫码支付
4. 支付成功后微信异步通知 /recharge/notify
"""
logger.info(
f"[Points] 用户 {current_user.id} 发起充值: {request.points} 积分, "
f"金额: {request.amount_rmb}"
)
# 创建待支付订单
from app.models.point_recharge_order import PointRechargeOrder
order = PointRechargeOrder(
user_id=current_user.id,
points=request.points,
amount_rmb=request.amount_rmb,
status="pending",
trade_type="NATIVE",
)
db.add(order)
await db.flush()
# 生成商户订单号
out_trade_no = f"MJZ{order.id:012d}"
order.out_trade_no = out_trade_no
# 调用微信支付统一下单
try:
from app.services.wxpay_service import WechatPayError, get_wxpay_service
wxpay = get_wxpay_service()
# 从 request 获取 http_clientFastAPI 依赖注入方式获取不了,改用直接引用)
# 这里用默认的 httpx client
async with httpx.AsyncClient(timeout=httpx.Timeout(30.0, connect=10.0)) as client:
wx_result = await wxpay.native_order(
client,
description=f"积分充值 {request.points} 积分",
out_trade_no=out_trade_no,
amount=request.amount_rmb,
attach=str(current_user.id),
)
# 记录微信返回
order.request_response = str(wx_result)
code_url = wx_result.get("code_url")
if not code_url:
logger.error(f"[Points] 微信统一下单未返回 code_url: {wx_result}")
order.status = "failed"
order.error_msg = "微信未返回二维码链接"
raise HTTPException(status_code=500, detail="微信支付下单失败")
order.prepay_id = wx_result.get("prepay_id")
return success_response(
data=RechargeResponse(
order_id=order.id,
points=request.points,
amount_rmb=request.amount_rmb,
code_url=code_url,
),
message="充值订单已创建,请扫描微信二维码完成支付",
)
except WechatPayError as e:
logger.error(f"[Points] 微信统一下单失败: {e}")
order.status = "failed"
order.error_code = e.code
order.error_msg = str(e)
raise HTTPException(status_code=500, detail=f"微信支付下单失败: {e}")
except HTTPException:
raise
except Exception as e:
logger.exception(f"[Points] 创建充值订单异常: {e}")
order.status = "failed"
order.error_msg = str(e)
raise HTTPException(status_code=500, detail="创建充值订单失败")
@router.post("/recharge/notify")
async def handle_wxpay_notify(
request: Request,
db: AsyncSession = Depends(get_db),
):
"""
微信支付回调通知
微信服务器在支付成功后推送通知到此端点。
验证签名 → 更新订单 → 给用户充值积分。
必须返回 200 且 body 为 {"code": "SUCCESS"},否则微信会重试。
注意:此接口不设置 response_model,直接返回原始 JSON
避免 FastAPI 用 ApiResponse 包装导致微信无法识别。
"""
def _wx_response() -> JSONResponse:
"""构造微信要求的回调响应"""
return JSONResponse(content={"code": "SUCCESS", "message": "OK"})
# 读取回调原始 bodyAPIv2 为 XML 格式)
body_bytes = await request.body()
body_str = body_bytes.decode("utf-8")
logger.info(f"[WechatPay] 收到回调通知: body={body_str[:200]}")
wxpay = get_wxpay_service()
# 解析 XML
try:
notify_data = wxpay._xml_to_dict(body_str)
except Exception as e:
logger.error(f"[WechatPay] XML 解析失败: {e}, body={body_str[:200]}")
return _wx_response()
# 验签(APIv2:从 XML 中的 sign 字段验 MD5 签名)
try:
verified = wxpay.verify_notify(notify_data)
except WechatPayError as e:
logger.error(f"[WechatPay] 回调验签失败: {e}")
return _wx_response()
if not verified:
logger.error("[WechatPay] 回调签名验证未通过")
return _wx_response()
# APIv2 回调不加密,直接提取字段
out_trade_no = notify_data.get("out_trade_no")
wx_order_no = notify_data.get("transaction_id")
trade_state = notify_data.get("result_code", "")
if not out_trade_no:
logger.error("[WechatPay] 回调缺少 out_trade_no")
return _wx_response()
# 查找订单
order = await point_recharge_order.get_by_out_trade_no(
db, out_trade_no=out_trade_no
)
if not order:
logger.error(f"[WechatPay] 回调订单不存在: {out_trade_no}")
return _wx_response()
# 记录回调原始内容
order.notify_raw = body_str
order.notify_verified = True
order.wx_order_no = wx_order_no
# 只处理支付成功状态
if trade_state != "SUCCESS":
logger.info(f"[WechatPay] 订单 {out_trade_no} 状态: {trade_state},暂不处理")
if trade_state in ("CLOSED", "REVOKED"):
order.status = "closed"
await db.commit()
return _wx_response()
# 幂等:已处理过的订单不再重复充值
if order.status == "paid":
logger.info(f"[WechatPay] 订单 {out_trade_no} 已处理,跳过")
await db.commit() # 提交 notify_raw 等记录
return _wx_response()
# 更新订单状态并充值积分(同一事务)
try:
order.status = "paid"
order.paid_at = datetime.now(UTC)
# 给用户充值积分
await point_service.recharge(
db,
user_id=order.user_id,
points=order.points,
source="wxpay",
description=f"微信支付充值 {order.points} 积分",
order_id=order.id,
)
await db.commit()
logger.info(
f"[WechatPay] 订单 {out_trade_no} 处理完成,"
f"用户 {order.user_id} 充值 {order.points} 积分"
)
except Exception as e:
await db.rollback()
logger.exception(f"[WechatPay] 订单 {out_trade_no} 充值积分失败: {e}")
# 记录错误但不抛出,返回 SUCCESS 避免微信重试
order.error_msg = f"充值积分失败: {e}"
await db.commit()
return _wx_response()
@router.get("/recharge/query/{order_id}", response_model=ApiResponse[dict])
async def query_recharge_status(
order_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
查询充值订单支付状态(前端轮询用)
如果订单仍在 pending 状态,会主动查询微信支付确认最新状态。
"""
order = await point_recharge_order.get(db, id=order_id)
if not order or order.user_id != current_user.id:
raise HTTPException(status_code=404, detail="订单不存在")
# 如果是 pending,主动查询微信支付
if order.status == "pending" and order.out_trade_no:
try:
from app.services.wxpay_service import WechatPayError, get_wxpay_service
wxpay = get_wxpay_service()
async with httpx.AsyncClient(timeout=httpx.Timeout(30.0, connect=10.0)) as client:
wx_result = await wxpay.query_order(
client, out_trade_no=order.out_trade_no
)
order.query_result = str(wx_result)
trade_state = wx_result.get("trade_state", "")
if trade_state == "SUCCESS" and order.status != "paid":
# 支付成功但本地未处理,补发积分
order.status = "paid"
order.paid_at = datetime.now(UTC)
order.wx_order_no = wx_result.get("transaction_id")
await point_service.recharge(
db,
user_id=order.user_id,
points=order.points,
source="wxpay",
description=f"微信支付充值 {order.points} 积分(主动查询补单)",
order_id=order.id,
)
logger.info(
f"[Points] 订单 {order.out_trade_no} 通过主动查询确认支付成功,"
f"补发 {order.points} 积分"
)
elif trade_state in ("CLOSED", "REVOKED"):
order.status = "closed"
except WechatPayError as e:
logger.warning(f"[Points] 主动查询订单 {order.out_trade_no} 失败: {e}")
except Exception as e:
logger.exception(f"[Points] 主动查询订单异常: {e}")
return success_response(
data={
"order_id": order.id,
"status": order.status,
"points": order.points,
"amount_rmb": order.amount_rmb,
"paid_at": order.paid_at.isoformat() if order.paid_at else None,
"wx_order_no": order.wx_order_no,
}
)
# ── 消费 ──────────────────────────────────────────────
@router.post("/consume/freeze", response_model=ApiResponse[ConsumeFreezeResponse])
async def freeze_points(
request: ConsumeFreezeRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
消费预扣积分
在调用 AI 服务前预扣积分,确保用户有足够余额。
返回预扣结果,后续需要调用结算接口确认或退回。
"""
try:
tx, _ = await point_service.freeze_for_consumption(
db,
user_id=current_user.id,
source_type=request.source_type,
source_id=request.source_id,
param=request.param,
description=request.description,
)
# 预扣金额(取绝对值)
frozen_points = abs(tx.amount)
# 获取当前可用余额
balance = await point_service.get_user_balance(db, user_id=current_user.id)
return success_response(
data=ConsumeFreezeResponse(
transaction_id=tx.id,
frozen_points=frozen_points,
available_balance=balance["available"],
),
message=f"预扣 {frozen_points} 积分成功",
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.post("/consume/settle", response_model=ApiResponse[ConsumeResultResponse])
async def settle_consumption(
request: ConsumeSettleRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
消费结算
AI 服务调用成功后调用此接口,根据实际消耗结算积分。
如果实际消耗少于预扣金额,差额自动退回。
"""
try:
refund_tx = await point_service.settle_consumption(
db,
user_id=current_user.id,
source_type=request.source_type,
source_id=request.source_id,
actual_points=request.actual_points,
description=request.description,
)
if refund_tx:
return success_response(
data=ConsumeResultResponse(
success=True,
transaction_id=refund_tx.id,
refunded_points=refund_tx.amount,
message=f"结算成功,退回 {refund_tx.amount} 积分",
),
message="消费结算完成",
)
return success_response(
data=ConsumeResultResponse(success=True),
message="消费结算完成",
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.post("/consume/refund", response_model=ApiResponse[ConsumeResultResponse])
async def refund_consumption(
request: ConsumeRefundRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
消费失败退款
AI 服务调用失败后调用此接口,全额退还预扣积分。
"""
try:
tx = await point_service.refund_consumption(
db,
user_id=current_user.id,
source_type=request.source_type,
source_id=request.source_id,
description=request.description,
)
return success_response(
data=ConsumeResultResponse(
success=True,
transaction_id=tx.id,
refunded_points=tx.amount,
message=f"退款 {tx.amount} 积分成功",
),
message="消费退款完成",
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
# ── 充值订单查询 ──────────────────────────────────────
@router.get("/orders", response_model=ApiResponse[RechargeOrderListResponse])
async def list_recharge_orders(
pagination: PaginationParams = Depends(),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""获取当前用户充值订单列表"""
items = await point_recharge_order.get_by_user_id(
db,
user_id=current_user.id,
skip=pagination.offset,
limit=pagination.page_size,
)
total = len(items)
return success_response(
data=RechargeOrderListResponse(
items=[RechargeOrderItem.model_validate(o) for o in items],
total=total,
skip=pagination.offset,
limit=pagination.page_size,
)
)
# ── 内部管理接口(后续可加管理员权限检查)─────────────
@router.post("/admin/recharge", response_model=ApiResponse[dict])
async def admin_recharge(
user_id: str,
points: int,
source: str = "compensation",
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
管理员直接充值(用于补偿、活动赠送等)
需要管理员权限,当前先做接口占位。
"""
# TODO: 加管理员权限检查
try:
tx = await point_service.recharge(
db,
user_id=user_id,
points=points,
source=source,
description=f"管理员操作: {source}",
)
return success_response(
data={"transaction_id": tx.id, "points": points},
message=f"成功充值 {points} 积分",
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))