30536276ba
核心变更:
- 统一第三方接口架构:所有服务走 PlatformGateway(call_sync/submit_task/query_task/handle_webhook)
- 视频生成(Vidu 对口型)纳入 Async Engine,与 script/subtitle/tts 统一为 POST /tasks/{task_type} 模式
- 新增 VideoHandler、TTSHandler,完善 ScriptHandler/SubtitleHandler
- PlatformGateway 生成 internal_task_id,建立 Redis 双向映射,callback 场景传入 Async Engine task_id 保证映射一致
- SlotManager 新增 acquire_ctx 上下文管理器,所有 Handler 统一使用
- ViduAdapter 状态映射归一化(normalize_state/denormalize_state)
- 移除 ViduService Semaphore 和 tenacity 重试,并发控制完全交予 SlotManager
- nonce 防重放下沉到 CallbackCapable 协议
- Service 层错误统一为 PlatformError,路由层错误信息脱敏
- 废弃 /voice/lip-sync,清理 vidu.py 遗留路由
Bug 修复:
- VideoHandler 轮询阶段后添加 continue,防止已提交任务重复创建
- voice.py synthesize_to_file 变量名冲突(request vs request_body)
- PlatformGateway.submit_task 空 data 防护
- ScriptHandler 动态导入 asyncio 改为模块级导入
- SubtitleHandler 完成时补充 progress=100
文档:
- 更新 AGENTS.md 核心功能、运行时架构、异步调度描述
122 lines
4.3 KiB
Python
122 lines
4.3 KiB
Python
"""
|
|
Async Engine 核心调度器
|
|
=======================
|
|
|
|
驱动所有 Handler 的 Tick 循环,批量查询、批量更新。
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from typing import Any
|
|
|
|
from app.core.redis_client import get_redis_client
|
|
from app.scheduler.handlers.base import AsyncHandler
|
|
from app.scheduler.models import StateChange
|
|
from app.scheduler.registry import TaskRegistry
|
|
from app.scheduler.slot_manager import SlotManager
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AsyncEngine:
|
|
"""统一异步作业调度引擎"""
|
|
|
|
def __init__(self, handlers: list[AsyncHandler] | None = None):
|
|
self.redis = get_redis_client()
|
|
self.registry = TaskRegistry(self.redis)
|
|
self.slots = SlotManager(self.redis)
|
|
self.handlers: dict[str, AsyncHandler] = {}
|
|
if handlers:
|
|
for h in handlers:
|
|
self.handlers[h.name] = h
|
|
|
|
def register(self, handler: AsyncHandler) -> None:
|
|
"""注册一个 Handler"""
|
|
self.handlers[handler.name] = handler
|
|
logger.info(f"Registered handler: {handler.name}")
|
|
|
|
async def tick(self) -> None:
|
|
"""执行一次完整的调度 Tick"""
|
|
tick_start = asyncio.get_event_loop().time()
|
|
|
|
try:
|
|
# 1. 加载所有 running 的作业 ID
|
|
running_ids = await self.registry.get_running_task_ids()
|
|
if not running_ids:
|
|
logger.debug("Tick: no running tasks")
|
|
return
|
|
|
|
# 2. 按 task_type 分组
|
|
tasks_by_type: dict[str, list[Any]] = {}
|
|
for task_id in running_ids:
|
|
record = await self.registry.get(task_id)
|
|
if not record:
|
|
await self.registry.remove_running(task_id)
|
|
continue
|
|
tasks_by_type.setdefault(record.task_type, []).append(record)
|
|
|
|
# 3. 并行执行各 Handler 的 tick
|
|
results = await asyncio.gather(
|
|
*[
|
|
self._safe_tick(handler_name, handler, tasks_by_type.get(handler_name, []))
|
|
for handler_name, handler in self.handlers.items()
|
|
]
|
|
)
|
|
|
|
# 4. 收集并应用状态变更
|
|
for changes in results:
|
|
if changes:
|
|
await self._apply_changes(changes)
|
|
|
|
# 5. 清理已结束的作业
|
|
await self._cleanup_finished()
|
|
|
|
except Exception:
|
|
logger.exception("Scheduler tick failed")
|
|
finally:
|
|
elapsed = asyncio.get_event_loop().time() - tick_start
|
|
logger.debug(f"Tick completed in {elapsed:.2f}s")
|
|
|
|
async def _safe_tick(
|
|
self, name: str, handler: AsyncHandler, tasks: list[Any]
|
|
) -> list[StateChange]:
|
|
"""安全执行 Handler tick,捕获异常"""
|
|
try:
|
|
return await handler.tick(tasks, self.registry, self.slots)
|
|
except Exception:
|
|
logger.exception(f"Handler tick failed: {name}")
|
|
return []
|
|
|
|
async def _apply_changes(self, changes: list[StateChange]) -> None:
|
|
"""批量应用状态变更到 Redis"""
|
|
pipe = self.redis.pipeline()
|
|
executed = False
|
|
for change in changes:
|
|
key, field, value = change.to_redis_command()
|
|
pipe.hset(key, field, value)
|
|
executed = True
|
|
if executed:
|
|
await pipe.execute()
|
|
|
|
async def _cleanup_finished(self) -> None:
|
|
"""清理已完成的作业"""
|
|
running_ids = await self.registry.get_running_task_ids()
|
|
for task_id in running_ids:
|
|
record = await self.registry.get(task_id)
|
|
if not record:
|
|
await self.registry.remove_running(task_id)
|
|
continue
|
|
if record.status in ("completed", "failed"):
|
|
await self.registry.remove_running(task_id)
|
|
logger.info(f"Task moved to finished: {task_id} ({record.status})")
|
|
|
|
async def run_forever(self, interval: float = 10.0, min_interval: float = 2.0) -> None:
|
|
"""启动无限 Tick 循环"""
|
|
logger.info("Async Engine started")
|
|
while True:
|
|
tick_start = asyncio.get_event_loop().time()
|
|
await self.tick()
|
|
elapsed = asyncio.get_event_loop().time() - tick_start
|
|
sleep_time = max(interval - elapsed, min_interval)
|
|
await asyncio.sleep(sleep_time)
|