Files
meijiaka-zy/python-api/app/api/v1/vidu.py
T
小鱼开发 7550559aa0 refactor: 清理未使用IPC命令、修正point_service注释与扣费逻辑、修复camelToSnake正则、优化vidu import
- 删除8个未使用IPC命令,保留validate_media_path
- file.rs返回类型优化为ApiResponse<()>
- point_service.consume()注释与签名一致
- VideoGeneration改为拼接成功后扣费
- 添加漏扣费风险注释
- 删除过时测试文件
- 修复camelToSnake连续大写字母问题
- vidu.py import移至模块顶层

Refs: P1-1~P1-6 技术债务清理
2026-05-14 17:45:28 +08:00

125 lines
4.7 KiB
Python

"""
Vidu 回调路由
============
仅保留 Vidu 视频生成任务完成回调接口。
视频生成任务统一走 /tasks/video 创建,/tasks/{task_id} 轮询。
Vidu 任务完成后主动 POST 通知此接口。
回调更新 Async Engine TaskRegistry,供前端统一轮询。
"""
import logging
from fastapi import APIRouter, HTTPException, Request
from app.config import get_settings
from app.core.exceptions import PlatformError
from app.core.redis_client import get_redis_client
from app.platform_gateway import PlatformGateway
from app.schemas.common import success_response
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/vidu", tags=["Vidu"])
def get_platform_gateway(request: Request) -> PlatformGateway:
"""从 app.state 获取 PlatformGateway"""
return request.app.state.platform_gateway
@router.post("/callback")
async def vidu_callback(request: Request):
"""
Vidu 视频生成任务完成回调
Vidu 任务完成后主动 POST 通知此接口。
无需登录校验(Vidu 外部调用),统一走 PlatformGateway 处理。
回调结果写入 Async Engine TaskRegistry,前端通过 /tasks/{task_id} 统一轮询。
"""
# 1. 统一走 PlatformGateway 处理回调(签名验证 + nonce 防重放)
gateway = request.app.state.platform_gateway
body_bytes = await request.body()
headers_dict = dict(request.headers)
logger.info(f"[Vidu] 收到回调: url={request.url}, body={body_bytes.decode('utf-8', errors='replace')[:500]}")
try:
task_status = await gateway.handle_webhook(
platform="vidu",
headers=headers_dict,
body=body_bytes,
secret=get_settings().VIDU_API_KEY,
callback_url=str(request.url),
)
except PlatformError as e:
logger.warning(f"[Vidu] 回调验证失败: {e}")
raise HTTPException(status_code=401, detail="回调签名验证失败")
except Exception as e:
logger.error(f"[Vidu] 回调处理失败: {e}")
raise HTTPException(status_code=500, detail="回调处理失败,请稍后重试")
logger.info(f"[Vidu] 回调解析完成: state={task_status.state}, result={task_status.result}, error={task_status.error_message}")
# 2. 通过 platform_task_id 反查 Async Engine 内部 task_id,更新 TaskRegistry
platform_task_id = (
task_status.result.get("task_id") if task_status.result else None
)
video_url = (
task_status.result.get("video_url") if task_status.result else None
)
logger.info(f"[Vidu] 准备反查 internal_task_id: platform_task_id={platform_task_id}")
internal_task_id = None
if platform_task_id:
internal_task_id = await gateway.get_internal_task_id_by_platform_task_id(
"vidu", platform_task_id
)
if internal_task_id:
# 更新 Async Engine TaskRegistry,供前端统一轮询 /tasks/{task_id}
#
# ⚠️ 注意:此处仅更新任务状态,不执行积分扣费。
# 视频生成的扣费在前端 VideoGeneration.tsx 拼接完成后执行。
# 如果用户在 Vidu 任务提交后关闭应用,前端不会执行扣费,形成漏扣。
# 当前接受此风险,详情见前端注释。
from app.scheduler.registry import TaskRegistry
registry = TaskRegistry(get_redis_client())
task_record = await registry.get(internal_task_id)
if task_record:
if task_status.state == "completed":
await registry.update(
internal_task_id,
status="completed",
progress=100,
message="视频生成完成",
completed=1,
total=1,
result={"video_url": video_url, "state": "success"},
)
elif task_status.state == "failed":
await registry.update(
internal_task_id,
status="failed",
message="视频生成失败",
error=task_status.error_message or "视频生成失败",
)
logger.info(
f"[Vidu] 回调已更新 TaskRegistry: task={internal_task_id}, "
f"state={task_status.state}, video_url={video_url}"
)
else:
logger.warning(
f"[Vidu] 回调找不到对应任务记录: internal={internal_task_id}, "
f"platform={platform_task_id}"
)
else:
logger.warning(
f"[Vidu] 回调无法反查内部 task_id: platform={platform_task_id}"
)
return success_response(message="回调已接收")