Files
meijiaka-zy/python-api/app/scheduler/handlers/script_handler.py
T

116 lines
3.9 KiB
Python

"""
Script 任务处理器
================
管理脚本生成的执行。
不占用 Kling/Volc 槽位,使用独立的 script 槽位池。
"""
import logging
from typing import Any
from app.scheduler.handlers.base import AsyncHandler
from app.scheduler.models import StateChange
from app.scheduler.registry import JobRegistry
from app.scheduler.slot_manager import SlotManager
from app.services.script_service import ScriptService
logger = logging.getLogger(__name__)
SLOT_KEY = "script:slots"
MAX_SLOTS = 10
class ScriptHandler(AsyncHandler):
name = "script"
slot_key = SLOT_KEY
max_slots = MAX_SLOTS
async def tick(
self, jobs: list[Any], registry: JobRegistry, slots: SlotManager
) -> list[StateChange]:
changes: list[StateChange] = []
for job in jobs:
acquired = await slots.acquire(SLOT_KEY, job.job_id, MAX_SLOTS)
if not acquired:
continue
try:
changes.extend(await self._process_job(job, registry, slots))
except Exception as e:
logger.exception(f"[Script {job.job_id}] failed")
changes.append(StateChange(job_id=job.job_id, field_path="status", value="failed"))
changes.append(
StateChange(job_id=job.job_id, field_path="error", value=str(e)[:500])
)
finally:
await slots.release(SLOT_KEY, job.job_id)
return changes
async def _process_job(
self, job: Any, registry: JobRegistry, slots: SlotManager
) -> list[StateChange]:
changes: list[StateChange] = []
params = job.params or {}
category = params.get("category", "")
subcategory = params.get("subcategory", "")
style = params.get("style", "default")
duration = params.get("duration", 60)
await registry.update(
job.job_id,
status="running",
progress=10,
message="分析需求中...",
completed=0,
total=1,
)
try:
await __import__("asyncio").sleep(2)
await registry.update(
job.job_id,
progress=40,
message="构思脚本中...",
)
service = ScriptService()
shots = await service.generate_script(
category=category,
subcategory=subcategory,
duration=duration,
script_type=style,
)
# 计算分镜真实总时长
total_duration = sum(s.duration for s in shots if s.duration)
result_data = {
"title": f"{category}/{subcategory}",
"scenes": [s.model_dump() for s in shots],
"total_duration": total_duration,
"style": style,
"shot_count": len(shots),
}
changes.append(StateChange(job_id=job.job_id, field_path="status", value="completed"))
changes.append(StateChange(job_id=job.job_id, field_path="progress", value=100))
changes.append(
StateChange(job_id=job.job_id, field_path="message", value="脚本生成完成")
)
changes.append(StateChange(job_id=job.job_id, field_path="completed", value=1))
changes.append(StateChange(job_id=job.job_id, field_path="total", value=1))
changes.append(StateChange(job_id=job.job_id, field_path="result", value=result_data))
except Exception as exc:
logger.exception(f"[ScriptTask {job.job_id}] Failed")
changes.append(StateChange(job_id=job.job_id, field_path="status", value="failed"))
changes.append(
StateChange(job_id=job.job_id, field_path="message", value=str(exc)[:200])
)
changes.append(StateChange(job_id=job.job_id, field_path="error", value=str(exc)[:500]))
return changes