83b10945c8
后端: - CRUD 新增 sum_consumed_today() 方法,统计用户今日消费积分总和 - API 新增 GET /points/today-consumed 路由 前端: - 个人中心积分数字从 40px 改为 32px - 今日消耗从本地计算改为调用后端接口
558 lines
20 KiB
Python
558 lines
20 KiB
Python
"""
|
||
积分系统 API 路由
|
||
=================
|
||
|
||
提供积分查询、充值、消费、流水查询等功能。
|
||
"""
|
||
|
||
import logging
|
||
from datetime import UTC, datetime
|
||
|
||
import httpx
|
||
from fastapi import APIRouter, Depends, HTTPException, Request
|
||
from fastapi.responses import JSONResponse
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from app.api.deps import get_current_user, get_db
|
||
from app.crud.point_recharge_order import point_recharge_order
|
||
from app.crud.point_transaction import point_transaction
|
||
from app.models.user import User
|
||
from app.schemas.common import ApiResponse, PaginationParams, success_response
|
||
from app.schemas.point import (
|
||
ConsumeRequest,
|
||
PointBalanceResponse,
|
||
PointTransactionItem,
|
||
PointTransactionListResponse,
|
||
RechargeRequest,
|
||
RechargeResponse,
|
||
)
|
||
from app.services import point_service
|
||
from app.services.wxpay_service import WechatPayError, get_wxpay_service
|
||
|
||
logger = logging.getLogger(__name__)
|
||
router = APIRouter(prefix="/points", tags=["Points"])
|
||
|
||
|
||
# ── 余额查询 ──────────────────────────────────────────
|
||
|
||
@router.get("/balance", response_model=ApiResponse[PointBalanceResponse])
|
||
async def get_balance(
|
||
db: AsyncSession = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""获取当前用户积分余额"""
|
||
balance = await point_service.get_user_balance(db, user_id=current_user.id)
|
||
return success_response(data=PointBalanceResponse(**balance))
|
||
|
||
|
||
# ── 流水查询 ──────────────────────────────────────────
|
||
|
||
@router.get("/transactions", response_model=ApiResponse[PointTransactionListResponse])
|
||
async def list_transactions(
|
||
pagination: PaginationParams = Depends(),
|
||
tx_type: str | None = None,
|
||
category: str | None = None,
|
||
source_type: str | None = None,
|
||
start_time: str | None = None,
|
||
end_time: str | None = None,
|
||
db: AsyncSession = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""
|
||
获取当前用户积分流水记录(支持筛选)
|
||
|
||
- tx_type: consume / recharge / expire
|
||
- category: 脚本生成 / 配音合成 / 视频生成 / 压制成片 / 字幕烧录 / 封面设计 / 充值
|
||
- source_type: script / polish / title / tts / voice_clone / video / compose / subtitle_burn / cover_design / wxpay
|
||
- start_time / end_time: ISO 8601 格式,时间范围最多 30 天
|
||
"""
|
||
from datetime import datetime, timedelta
|
||
|
||
# 解析时间范围
|
||
parsed_start: datetime | None = None
|
||
parsed_end: datetime | None = None
|
||
|
||
if start_time:
|
||
try:
|
||
parsed_start = datetime.fromisoformat(start_time.replace("Z", "+00:00"))
|
||
except ValueError:
|
||
raise HTTPException(status_code=400, detail="start_time 格式错误,应为 ISO 8601")
|
||
|
||
if end_time:
|
||
try:
|
||
parsed_end = datetime.fromisoformat(end_time.replace("Z", "+00:00"))
|
||
except ValueError:
|
||
raise HTTPException(status_code=400, detail="end_time 格式错误,应为 ISO 8601")
|
||
|
||
# 限制时间范围最多 30 天
|
||
if parsed_start and parsed_end:
|
||
if (parsed_end - parsed_start) > timedelta(days=30):
|
||
raise HTTPException(status_code=400, detail="时间范围最多 30 天")
|
||
elif parsed_start and not parsed_end:
|
||
parsed_end = parsed_start + timedelta(days=30)
|
||
elif parsed_end and not parsed_start:
|
||
parsed_start = parsed_end - timedelta(days=30)
|
||
|
||
items = await point_transaction.get_by_user_id(
|
||
db,
|
||
user_id=current_user.id,
|
||
skip=pagination.offset,
|
||
limit=pagination.page_size,
|
||
tx_type=tx_type,
|
||
category=category,
|
||
source_type=source_type,
|
||
start_time=parsed_start,
|
||
end_time=parsed_end,
|
||
)
|
||
|
||
total = await point_transaction.count_by_user_id(
|
||
db,
|
||
user_id=current_user.id,
|
||
tx_type=tx_type,
|
||
category=category,
|
||
source_type=source_type,
|
||
start_time=parsed_start,
|
||
end_time=parsed_end,
|
||
)
|
||
|
||
return success_response(
|
||
data=PointTransactionListResponse(
|
||
items=[PointTransactionItem.model_validate(t) for t in items],
|
||
total=total,
|
||
skip=pagination.offset,
|
||
limit=pagination.page_size,
|
||
)
|
||
)
|
||
|
||
|
||
# ── 充值 ──────────────────────────────────────────────
|
||
|
||
@router.post("/recharge", response_model=ApiResponse[RechargeResponse])
|
||
async def create_recharge_order(
|
||
request: RechargeRequest,
|
||
db: AsyncSession = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
http_request: Request = None,
|
||
):
|
||
"""
|
||
创建积分充值订单(微信支付 Native 扫码)
|
||
|
||
1. 创建本地订单记录
|
||
2. 调用微信统一下单获取二维码链接 code_url
|
||
3. 前端用 code_url 生成二维码,用户微信扫码支付
|
||
4. 支付成功后微信异步通知 /recharge/notify
|
||
"""
|
||
logger.info(
|
||
f"[Points] 用户 {current_user.id} 发起充值: {request.points} 积分, "
|
||
f"金额: {request.amount_rmb} 分"
|
||
)
|
||
|
||
# 创建待支付订单
|
||
from app.models.point_recharge_order import PointRechargeOrder
|
||
|
||
order = PointRechargeOrder(
|
||
user_id=current_user.id,
|
||
points=request.points,
|
||
amount_rmb=request.amount_rmb,
|
||
status="pending",
|
||
trade_type="NATIVE",
|
||
)
|
||
db.add(order)
|
||
await db.flush()
|
||
|
||
# 生成商户订单号
|
||
out_trade_no = f"MJZ{order.id:012d}"
|
||
order.out_trade_no = out_trade_no
|
||
|
||
# 二维码有效期 2 分钟(与前端轮询对齐)
|
||
from datetime import datetime, timedelta, timezone
|
||
|
||
# 前端倒计时用 UTC ISO 格式,不受服务器时区影响
|
||
expire_at = datetime.now(UTC) + timedelta(minutes=2)
|
||
# 微信 time_expire 要求北京时间(UTC+8),格式 yyyyMMddHHmmss
|
||
beijing_tz = timezone(timedelta(hours=8))
|
||
time_expire = expire_at.astimezone(beijing_tz).strftime("%Y%m%d%H%M%S")
|
||
|
||
# 获取客户端真实 IP(优先从 Nginx 转发头读取)
|
||
client_ip = None
|
||
if http_request:
|
||
xff = http_request.headers.get("x-forwarded-for")
|
||
if xff:
|
||
client_ip = xff.split(",")[0].strip()
|
||
else:
|
||
xri = http_request.headers.get("x-real-ip")
|
||
client_ip = xri or (http_request.client.host if http_request.client else None)
|
||
|
||
# 调用微信支付统一下单
|
||
try:
|
||
from app.services.wxpay_service import WechatPayError, get_wxpay_service
|
||
|
||
wxpay = get_wxpay_service()
|
||
|
||
# 从 request 获取 http_client(FastAPI 依赖注入方式获取不了,改用直接引用)
|
||
# 这里用默认的 httpx client
|
||
async with httpx.AsyncClient(timeout=httpx.Timeout(30.0, connect=10.0)) as client:
|
||
wx_result = await wxpay.native_order(
|
||
client,
|
||
description=f"积分充值 {request.points} 积分",
|
||
out_trade_no=out_trade_no,
|
||
amount=request.amount_rmb,
|
||
attach=str(current_user.id),
|
||
time_expire=time_expire,
|
||
client_ip=client_ip,
|
||
)
|
||
|
||
# 记录微信返回
|
||
order.request_response = str(wx_result)
|
||
code_url = wx_result.get("code_url")
|
||
|
||
if not code_url:
|
||
logger.error(f"[Points] 微信统一下单未返回 code_url: {wx_result}")
|
||
order.status = "failed"
|
||
order.error_msg = "微信未返回二维码链接"
|
||
raise HTTPException(status_code=500, detail="微信支付下单失败")
|
||
|
||
order.prepay_id = wx_result.get("prepay_id")
|
||
await db.commit()
|
||
|
||
return success_response(
|
||
data=RechargeResponse(
|
||
order_id=order.id,
|
||
points=request.points,
|
||
amount_rmb=request.amount_rmb,
|
||
code_url=code_url,
|
||
expire_at=expire_at.isoformat(),
|
||
),
|
||
message="充值订单已创建,请扫描微信二维码完成支付",
|
||
)
|
||
|
||
except WechatPayError as e:
|
||
logger.error(f"[Points] 微信统一下单失败: {e}")
|
||
order.status = "failed"
|
||
order.error_code = e.code
|
||
order.error_msg = str(e)
|
||
await db.commit()
|
||
raise HTTPException(status_code=500, detail=f"微信支付下单失败: {e}")
|
||
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.exception(f"[Points] 创建充值订单异常: {e}")
|
||
order.status = "failed"
|
||
order.error_msg = str(e)
|
||
await db.commit()
|
||
raise HTTPException(status_code=500, detail="创建充值订单失败")
|
||
|
||
|
||
@router.post("/recharge/notify")
|
||
async def handle_wxpay_notify(
|
||
request: Request,
|
||
db: AsyncSession = Depends(get_db),
|
||
):
|
||
"""
|
||
微信支付回调通知
|
||
|
||
微信服务器在支付成功后推送通知到此端点。
|
||
验证签名 → 更新订单 → 给用户充值积分。
|
||
必须返回 200 且 body 为 {"code": "SUCCESS"},否则微信会重试。
|
||
|
||
注意:此接口不设置 response_model,直接返回原始 JSON,
|
||
避免 FastAPI 用 ApiResponse 包装导致微信无法识别。
|
||
"""
|
||
|
||
def _wx_response() -> JSONResponse:
|
||
"""构造微信要求的回调响应"""
|
||
return JSONResponse(content={"code": "SUCCESS", "message": "OK"})
|
||
|
||
# 读取回调原始 body(APIv2 为 XML 格式)
|
||
body_bytes = await request.body()
|
||
body_str = body_bytes.decode("utf-8")
|
||
|
||
logger.info(f"[WechatPay] 收到回调通知: body={body_str[:200]}")
|
||
|
||
wxpay = get_wxpay_service()
|
||
|
||
# 解析 XML
|
||
try:
|
||
notify_data = wxpay._xml_to_dict(body_str)
|
||
except Exception as e:
|
||
logger.error(f"[WechatPay] XML 解析失败: {e}, body={body_str[:200]}")
|
||
return _wx_response()
|
||
|
||
# 验签(APIv2:从 XML 中的 sign 字段验 MD5 签名)
|
||
try:
|
||
verified = wxpay.verify_notify(notify_data)
|
||
except WechatPayError as e:
|
||
logger.error(f"[WechatPay] 回调验签失败: {e}")
|
||
return _wx_response()
|
||
|
||
if not verified:
|
||
logger.error("[WechatPay] 回调签名验证未通过")
|
||
return _wx_response()
|
||
|
||
# APIv2 回调不加密,直接提取字段
|
||
out_trade_no = notify_data.get("out_trade_no")
|
||
wx_order_no = notify_data.get("transaction_id")
|
||
trade_state = notify_data.get("result_code", "")
|
||
|
||
if not out_trade_no:
|
||
logger.error("[WechatPay] 回调缺少 out_trade_no")
|
||
return _wx_response()
|
||
|
||
# 查找订单
|
||
order = await point_recharge_order.get_by_out_trade_no(
|
||
db, out_trade_no=out_trade_no
|
||
)
|
||
if not order:
|
||
logger.error(f"[WechatPay] 回调订单不存在: {out_trade_no}")
|
||
return _wx_response()
|
||
|
||
# 记录回调原始内容
|
||
order.notify_raw = body_str
|
||
order.notify_verified = True
|
||
order.wx_order_no = wx_order_no
|
||
|
||
# 只处理支付成功状态
|
||
if trade_state != "SUCCESS":
|
||
logger.info(f"[WechatPay] 订单 {out_trade_no} 状态: {trade_state},暂不处理")
|
||
if trade_state in ("CLOSED", "REVOKED"):
|
||
order.status = "closed"
|
||
await db.commit()
|
||
return _wx_response()
|
||
|
||
# 幂等:已处理过的订单不再重复充值
|
||
if order.status == "paid":
|
||
logger.info(f"[WechatPay] 订单 {out_trade_no} 已处理,跳过")
|
||
await db.commit() # 提交 notify_raw 等记录
|
||
return _wx_response()
|
||
|
||
# 更新订单状态并充值积分(同一事务)
|
||
try:
|
||
order.status = "paid"
|
||
order.paid_at = datetime.now(UTC)
|
||
|
||
# 给用户充值积分
|
||
await point_service.recharge(
|
||
db,
|
||
user_id=order.user_id,
|
||
points=order.points,
|
||
source="wxpay",
|
||
description=f"微信支付充值 {order.points} 积分",
|
||
order_id=order.id,
|
||
)
|
||
|
||
await db.commit()
|
||
|
||
logger.info(
|
||
f"[WechatPay] 订单 {out_trade_no} 处理完成,"
|
||
f"用户 {order.user_id} 充值 {order.points} 积分"
|
||
)
|
||
|
||
except Exception as e:
|
||
await db.rollback()
|
||
logger.exception(f"[WechatPay] 订单 {out_trade_no} 充值积分失败: {e}")
|
||
# 记录错误但不抛出,返回 SUCCESS 避免微信重试
|
||
order.error_msg = f"充值积分失败: {e}"
|
||
await db.commit()
|
||
|
||
return _wx_response()
|
||
|
||
|
||
@router.get("/recharge/query/{order_id}", response_model=ApiResponse[dict])
|
||
async def query_recharge_status(
|
||
order_id: int,
|
||
db: AsyncSession = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""
|
||
查询充值订单支付状态(前端轮询用)
|
||
|
||
如果订单仍在 pending 状态,会主动查询微信支付确认最新状态。
|
||
"""
|
||
|
||
order = await point_recharge_order.get(db, id=order_id)
|
||
if not order or order.user_id != current_user.id:
|
||
raise HTTPException(status_code=404, detail="订单不存在")
|
||
|
||
# 如果是 pending,主动查询微信支付
|
||
if order.status == "pending" and order.out_trade_no:
|
||
try:
|
||
from app.services.wxpay_service import WechatPayError, get_wxpay_service
|
||
|
||
wxpay = get_wxpay_service()
|
||
async with httpx.AsyncClient(timeout=httpx.Timeout(30.0, connect=10.0)) as client:
|
||
wx_result = await wxpay.query_order(
|
||
client, out_trade_no=order.out_trade_no
|
||
)
|
||
|
||
order.query_result = str(wx_result)
|
||
trade_state = wx_result.get("trade_state", "")
|
||
|
||
if trade_state == "SUCCESS" and order.status != "paid":
|
||
# 支付成功但本地未处理,补发积分
|
||
order.status = "paid"
|
||
order.paid_at = datetime.now(UTC)
|
||
order.wx_order_no = wx_result.get("transaction_id")
|
||
|
||
await point_service.recharge(
|
||
db,
|
||
user_id=order.user_id,
|
||
points=order.points,
|
||
source="wxpay",
|
||
description=f"微信支付充值 {order.points} 积分(主动查询补单)",
|
||
order_id=order.id,
|
||
)
|
||
logger.info(
|
||
f"[Points] 订单 {order.out_trade_no} 通过主动查询确认支付成功,"
|
||
f"补发 {order.points} 积分"
|
||
)
|
||
|
||
elif trade_state in ("CLOSED", "REVOKED"):
|
||
order.status = "closed"
|
||
|
||
await db.commit()
|
||
|
||
except WechatPayError as e:
|
||
logger.warning(f"[Points] 主动查询订单 {order.out_trade_no} 失败: {e}")
|
||
except Exception as e:
|
||
logger.exception(f"[Points] 主动查询订单异常: {e}")
|
||
|
||
return success_response(
|
||
data={
|
||
"order_id": order.id,
|
||
"status": order.status,
|
||
"points": order.points,
|
||
"amount_rmb": order.amount_rmb,
|
||
"paid_at": order.paid_at.isoformat() if order.paid_at else None,
|
||
"wx_order_no": order.wx_order_no,
|
||
}
|
||
)
|
||
|
||
|
||
# ── 充值档位查询 ──────────────────────────────────────
|
||
|
||
@router.get("/recharge-options", response_model=ApiResponse[list[dict]])
|
||
async def get_recharge_options(
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""
|
||
获取充值档位配置(由后端控制,支持积分赠送)。
|
||
|
||
前端充值弹窗调用此接口展示可选档位,无需硬编码。
|
||
"""
|
||
options = point_service.get_recharge_options()
|
||
return success_response(data=options, message="获取充值档位成功")
|
||
|
||
|
||
# ── 扣费业务类型查询 ──────────────────────────────────
|
||
|
||
@router.get("/chargeable-types", response_model=ApiResponse[list[str]])
|
||
async def get_chargeable_types(
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""
|
||
获取所有需要扣费的业务类型列表。
|
||
|
||
用于前端积分明细页面的类型筛选下拉框,只展示会产生积分消耗的业务。
|
||
免费业务(如字幕生成 caption)不包含在返回列表中。
|
||
"""
|
||
types = point_service.get_chargeable_source_types()
|
||
return success_response(data=types, message="获取扣费业务类型成功")
|
||
|
||
|
||
# ── 积分规则查询 ──────────────────────────────────────
|
||
|
||
@router.get("/rules", response_model=ApiResponse[list[dict]])
|
||
async def get_points_rules(
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""
|
||
获取积分计费规则列表。
|
||
|
||
返回所有业务的积分消耗规则,供前端在操作按钮上展示积分提示。
|
||
"""
|
||
rules = []
|
||
for source_type, cfg in point_service.POINTS_CONFIG.items():
|
||
if source_type.startswith("_"):
|
||
continue
|
||
rule = {
|
||
"source_type": source_type,
|
||
"mode": cfg["mode"],
|
||
}
|
||
if cfg["mode"] == "fixed":
|
||
rule["points"] = cfg["points"]
|
||
elif cfg["mode"] == "duration":
|
||
rule["min_points"] = cfg.get("min_points", 1)
|
||
if "divisor" in cfg:
|
||
rule["unit"] = f"每 {cfg['divisor']} 秒"
|
||
rule["points_per_unit"] = 1
|
||
if "multiplier" in cfg:
|
||
rule["unit"] = "每秒"
|
||
rule["points_per_unit"] = cfg["multiplier"]
|
||
rules.append(rule)
|
||
return success_response(data=rules, message="获取积分规则成功")
|
||
|
||
|
||
# ── 积分预估查询 ──────────────────────────────────────
|
||
|
||
|
||
|
||
# ── 今日消费统计 ──────────────────────────────────────
|
||
|
||
@router.get("/today-consumed", response_model=ApiResponse[dict])
|
||
async def get_today_consumed(
|
||
db: AsyncSession = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""获取当前用户今日消费积分总额"""
|
||
total = await point_transaction.sum_consumed_today(db, user_id=current_user.id)
|
||
return success_response(data={"total": total})
|
||
|
||
|
||
# ── 直接消费扣费(前端/Rust 层调用)───────────────────
|
||
|
||
@router.post("/consume", response_model=ApiResponse[dict])
|
||
async def consume_points(
|
||
request: ConsumeRequest,
|
||
db: AsyncSession = Depends(get_db),
|
||
current_user: User = Depends(get_current_user),
|
||
):
|
||
"""
|
||
直接消费积分(不经过冻结)
|
||
|
||
用于 Rust/前端层业务在执行本地操作前扣费:
|
||
- compose(压制成片)
|
||
- subtitle_burn(字幕烧录)
|
||
- cover_design(封面设计)
|
||
|
||
余额不足时返回 402,前端应拦截并引导充值。
|
||
"""
|
||
try:
|
||
tx = await point_service.consume(
|
||
db,
|
||
user_id=current_user.id,
|
||
points=request.points,
|
||
source_type=request.source_type,
|
||
source_id=request.source_id,
|
||
description=f"【{request.description or request.source_type}】",
|
||
allow_negative=False,
|
||
)
|
||
except ValueError as e:
|
||
# 余额不足(在同一事务内判断,避免竞态)
|
||
if "积分不足" in str(e):
|
||
raise HTTPException(status_code=402, detail=str(e))
|
||
raise HTTPException(status_code=400, detail=str(e))
|
||
await db.commit()
|
||
|
||
return success_response(
|
||
data={
|
||
"transaction_id": tx.id,
|
||
"consumed_points": tx.amount,
|
||
"balance_after": tx.balance_after,
|
||
"source_type": tx.source_type,
|
||
},
|
||
message="消费成功",
|
||
)
|
||
|
||
|