Files
meijiaka-zy/python-api/app/api/v1/vidu.py
T
小鱼开发 a2106cbfb3 feat(log): Vidu 回调增加详细日志
- 记录回调请求体(前500字符,防日志过大)
- 记录 handle_webhook 解析结果(state/result/error)
- 记录反查 internal_task_id 前后的 platform_task_id 和结果
- 更新成功时记录 video_url
2026-05-05 23:03:45 +08:00

126 lines
4.5 KiB
Python

"""
Vidu 回调路由
============
仅保留 Vidu 对口型任务完成回调接口。
视频生成任务统一走 /tasks/video 创建,/tasks/{task_id} 轮询。
Vidu 任务完成后主动 POST 通知此接口。
回调更新 Async Engine TaskRegistry,供前端统一轮询。
"""
import logging
from fastapi import APIRouter, HTTPException, Request
from app.core.exceptions import PlatformError
from app.core.redis_client import get_redis_client
from app.platform_gateway import PlatformGateway
from app.schemas.common import success_response
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/vidu", tags=["Vidu"])
def get_platform_gateway(request: Request) -> PlatformGateway:
"""从 app.state 获取 PlatformGateway"""
return request.app.state.platform_gateway
@router.post("/callback")
async def vidu_callback(request: Request):
"""
Vidu 对口型任务完成回调
Vidu 任务完成后主动 POST 通知此接口。
无需登录校验(Vidu 外部调用),统一走 PlatformGateway 处理。
回调结果写入 Async Engine TaskRegistry,前端通过 /tasks/{task_id} 统一轮询。
"""
from app.config import get_settings
settings = get_settings()
secret_key = settings.VIDU_API_KEY
# 1. 统一走 PlatformGateway 处理回调(签名验证 + nonce 防重放)
gateway = request.app.state.platform_gateway
body_bytes = await request.body()
headers_dict = dict(request.headers)
logger.info(f"[Vidu] 收到回调: url={request.url}, body={body_bytes.decode('utf-8', errors='replace')[:500]}")
try:
task_status = await gateway.handle_webhook(
platform="vidu",
headers=headers_dict,
body=body_bytes,
secret=secret_key,
callback_url=str(request.url),
)
except PlatformError as e:
logger.warning(f"[Vidu] 回调验证失败: {e}")
raise HTTPException(status_code=401, detail="回调签名验证失败")
except Exception as e:
logger.error(f"[Vidu] 回调处理失败: {e}")
raise HTTPException(status_code=500, detail="回调处理失败,请稍后重试")
logger.info(f"[Vidu] 回调解析完成: state={task_status.state}, result={task_status.result}, error={task_status.error_message}")
# 2. 通过 platform_task_id 反查 Async Engine 内部 task_id,更新 TaskRegistry
platform_task_id = (
task_status.result.get("task_id") if task_status.result else None
)
video_url = (
task_status.result.get("video_url") if task_status.result else None
)
logger.info(f"[Vidu] 准备反查 internal_task_id: platform_task_id={platform_task_id}")
internal_task_id = None
if platform_task_id:
internal_task_id = await gateway.get_internal_task_id_by_platform_task_id(
"vidu", platform_task_id
)
if internal_task_id:
# 更新 Async Engine TaskRegistry,供前端统一轮询 /tasks/{task_id}
from app.scheduler.registry import TaskRegistry
registry = TaskRegistry(get_redis_client())
task_record = await registry.get(internal_task_id)
if task_record:
if task_status.state == "completed":
await registry.update(
internal_task_id,
status="completed",
progress=100,
message="视频生成完成",
completed=1,
total=1,
result={"video_url": video_url, "state": "success"},
)
elif task_status.state == "failed":
await registry.update(
internal_task_id,
status="failed",
message="视频生成失败",
error=task_status.error_message or "视频生成失败",
)
# 移除 running set(如果还在)
await registry.remove_running(internal_task_id)
logger.info(
f"[Vidu] 回调已更新 TaskRegistry: task={internal_task_id}, "
f"state={task_status.state}, video_url={video_url}"
)
else:
logger.warning(
f"[Vidu] 回调找不到对应任务记录: internal={internal_task_id}, "
f"platform={platform_task_id}"
)
else:
logger.warning(
f"[Vidu] 回调无法反查内部 task_id: platform={platform_task_id}"
)
return success_response(message="回调已接收")