Files
meijiaka-zy/python-api/app/scheduler/handlers/video_handler.py
T
小鱼开发 4cba598b17 feat: 视频生成积分按总时长一次性扣费 + 错误文案友好化 + 弹窗样式修复
- 视频生成积分规则:从按对口型实际时长计费改为按脚本规划总时长计费
- 前端 VideoGeneration:提交 lipSync 时传 plannedDuration + totalPlannedDuration + batchId
- 后端 video_handler:tick 预检用 planned_duration,扣费改为总时长一次性扣 + batch_id 幂等
- 后端 tasks.py:VideoParams 替换字段,余额检查用 planned_duration
- 前端按钮旁显示预计消耗积分
- 新增 errorMessage.ts:统一错误信息友好化转换
- ScriptCreation/VideoGeneration/VoiceSynthesis/SubtitleBurning/CoverDesign:弹窗错误文案改用友好提示
- ProgressModal.css:错误/成功文案添加折行样式
- ContentManagement.css:补全缺失的 settings-row 样式
- ScriptCreation:删除过时空状态文案和多余 Slider.css import
2026-05-12 12:36:39 +08:00

352 lines
15 KiB
Python

"""
Video 任务处理器
================
管理 Vidu 视频生成任务的提交与轮询。
采用提交 + 轮询两阶段设计。
"""
import logging
from typing import Any
from app.core.platform_config import get_platform_config_loader
from app.db.session import AsyncSessionLocal
from app.scheduler.handlers.base import AsyncHandler
from app.scheduler.models import StateChange
from app.scheduler.registry import TaskRegistry
from app.scheduler.slot_manager import SlotManager
from app.services import point_service as ps
from app.services.vidu_service import ViduService
logger = logging.getLogger(__name__)
SLOT_KEY = "vidu:video_slots"
def _get_video_max_slots() -> int:
"""从 platform-config.yaml 读取 rate_limit 配置作为 max_slots"""
try:
loader = get_platform_config_loader()
platform = loader.get_platform("vidu")
if platform and "lip_sync" in platform.methods:
return int(platform.methods["lip_sync"].rate_limit_qps)
if platform:
return int(platform.rate_limit_qps)
except Exception as e:
logger.warning(f"读取视频平台 rate_limit 配置失败: {e}")
return 5
class VideoHandler(AsyncHandler):
name = "video"
slot_key = SLOT_KEY
max_slots = _get_video_max_slots()
def __init__(self, service: ViduService | None = None):
self.service = service
def _get_service(self) -> ViduService:
if self.service is None:
raise RuntimeError(
"VideoHandler 需要通过构造函数传入 ViduService 实例"
)
return self.service
async def _deduct_video_points(self, task: Any, params: dict[str, Any]) -> None:
"""视频生成后置扣费:按总规划时长一次性扣费,批次幂等。"""
batch_id = params.get("batch_id")
total_planned_duration = float(params.get("total_planned_duration", 0) or 0)
if not batch_id or total_planned_duration <= 0:
return
async with AsyncSessionLocal() as db:
from sqlalchemy import select
from app.models.point_transaction import PointTransaction
# 幂等:该批次是否已扣过
result = await db.execute(
select(PointTransaction).where(
PointTransaction.user_id == task.user_id,
PointTransaction.source_type == "video",
PointTransaction.source_id == batch_id,
)
)
if result.scalar_one_or_none():
return
try:
points = ps._calculate_cost("video", {"seconds": total_planned_duration})
await ps.consume(
db,
user_id=task.user_id,
points=points,
source_type="video",
source_id=batch_id,
description="【视频生成】",
duration=total_planned_duration,
)
await db.commit()
except Exception as e:
logger.error(f"[Video {task.task_id}] 扣费失败: {e}")
async def tick(
self, tasks: list[Any], registry: TaskRegistry, slots: SlotManager
) -> list[StateChange]:
changes: list[StateChange] = []
for task in tasks:
params = task.params or {}
vidu_task_id = params.get("vidu_task_id")
if vidu_task_id:
# 轮询阶段:优先检查 callback 结果,fallback 主动查询 Vidu API
result_data = task.result or {}
if result_data.get("video_url") or result_data.get("state") == "success":
# callback 已到达,结果已写入 TaskRegistry
await self._deduct_video_points(task, params)
changes.append(
StateChange(
task_id=task.task_id,
field_path="status",
value="completed",
)
)
changes.append(
StateChange(
task_id=task.task_id,
field_path="message",
value="视频生成完成",
)
)
changes.append(
StateChange(
task_id=task.task_id,
field_path="completed",
value=1,
)
)
changes.append(
StateChange(
task_id=task.task_id, field_path="total", value=1
)
)
elif task.status == "failed":
# callback 已标记失败,移除 running
await registry.remove_running(task.task_id)
else:
# callback 尚未到达,fallback:主动查询 Vidu API
try:
service = self._get_service()
vidu_status = await service.lip_sync_query(task.task_id)
vidu_state = vidu_status.get("state", "")
creations = vidu_status.get("creations", [])
video_url = creations[0].get("url") if creations else None
if vidu_state == "success" and video_url:
await self._deduct_video_points(task, params)
changes.append(
StateChange(
task_id=task.task_id,
field_path="status",
value="completed",
)
)
changes.append(
StateChange(
task_id=task.task_id,
field_path="message",
value="视频生成完成",
)
)
changes.append(
StateChange(
task_id=task.task_id,
field_path="completed",
value=1,
)
)
changes.append(
StateChange(
task_id=task.task_id,
field_path="total",
value=1,
)
)
changes.append(
StateChange(
task_id=task.task_id,
field_path="result",
value={
"video_url": video_url,
"state": "success",
},
)
)
# 移除 running set
await registry.remove_running(task.task_id)
logger.info(
f"[Video {task.task_id}] 主动查询 Vidu 任务已完成: "
f"video_url={video_url[:60]}..."
)
elif vidu_state == "failed":
changes.append(
StateChange(
task_id=task.task_id,
field_path="status",
value="failed",
)
)
changes.append(
StateChange(
task_id=task.task_id,
field_path="message",
value="视频生成失败",
)
)
changes.append(
StateChange(
task_id=task.task_id,
field_path="error",
value=vidu_status.get("message") or "视频生成失败",
)
)
await registry.remove_running(task.task_id)
logger.warning(
f"[Video {task.task_id}] 主动查询 Vidu 任务失败: "
f"{vidu_status.get('message')}"
)
else:
# 仍在处理中,继续等待
logger.debug(
f"[Video {task.task_id}] 主动查询 Vidu 状态: {vidu_state},继续等待"
)
except Exception as e:
logger.warning(
f"[Video {task.task_id}] 主动查询 Vidu 失败: {e}"
)
continue # ← 已提交,不再重复提交
# 积分预检:余额不足直接失败,不提交 Vidu 任务
planned_duration = float(params.get("planned_duration", 0) or 0)
if planned_duration > 0:
try:
async with AsyncSessionLocal() as db:
from sqlalchemy import select
from app.models.user_point import UserPoint
points = ps._calculate_cost("video", {"seconds": planned_duration})
result = await db.execute(select(UserPoint).where(UserPoint.user_id == task.user_id))
up = result.scalar_one_or_none()
if not up or up.balance < points:
message = f"积分不足,需要 {points} 积分,当前余额 {up.balance if up else 0}"
await registry.update(
task.task_id,
status="failed",
progress=0,
message=message,
)
changes.append(StateChange(task_id=task.task_id, field_path="status", value="failed"))
changes.append(StateChange(task_id=task.task_id, field_path="message", value=message))
continue
except Exception as e:
logger.error(f"[Video {task.task_id}] 积分预检失败: {e}")
# 提交阶段:占用 slot,提交成功后自动释放
async with slots.acquire_ctx(
SLOT_KEY, task.task_id, self.max_slots
) as acquired:
if not acquired:
continue
try:
service = self._get_service()
# 自动构建回调地址(前端未传时兜底)
callback_url = params.get("callback_url")
if not callback_url:
from app.config import get_settings
base_url = get_settings().app_base_url
if base_url:
callback_url = f"{base_url}/api/v1/vidu/callback"
else:
raise ValueError("callback_url 未配置且无法自动推断应用地址")
logger.info(
f"[Video {task.task_id}] 准备提交 Vidu 视频生成: "
f"callback_url={callback_url}, video_url={params.get('video_url', '')[:60]}..."
)
vidu_task_id = await service.lip_sync_create(
video_url=params.get("video_url", ""),
audio_url=params.get("audio_url"),
text=params.get("text"),
voice_id=params.get("voice_id"),
speed=float(params.get("speed", 1.0)),
volume=int(params.get("volume", 0)),
ref_photo_url=params.get("ref_photo_url"),
callback_url=callback_url,
task_id=task.task_id,
)
if not vidu_task_id:
raise ValueError("未返回任务ID")
params["vidu_task_id"] = vidu_task_id
changes.append(
StateChange(
task_id=task.task_id, field_path="params", value=params
)
)
changes.append(
StateChange(
task_id=task.task_id,
field_path="message",
value="视频生成任务已提交",
)
)
except RuntimeError:
logger.error(f"[Video {task.task_id}] service not initialized")
changes.append(
StateChange(
task_id=task.task_id, field_path="status", value="failed"
)
)
changes.append(
StateChange(
task_id=task.task_id,
field_path="message",
value="视频处理服务未就绪",
)
)
changes.append(
StateChange(
task_id=task.task_id,
field_path="error",
value="视频处理服务未就绪",
)
)
except Exception as e:
logger.error(f"[Video {task.task_id}] submit error: {e}")
changes.append(
StateChange(
task_id=task.task_id, field_path="status", value="failed"
)
)
changes.append(
StateChange(
task_id=task.task_id,
field_path="message",
value="视频生成任务提交失败,请稍后重试",
)
)
changes.append(
StateChange(
task_id=task.task_id,
field_path="error",
value="视频生成任务提交失败",
)
)
return changes