Files
meijiaka-zy/python-api/app/scheduler/handlers/script_handler.py
T
小鱼开发 e262134148 refactor: 移除 KlingAI 和 MiniMax 相关代码
删除内容:
- KlingAI Provider、MiniMax Provider
- Kling 视频/图片/TTS/语音克隆/形象克隆 Service 和 Scheduler Handler
- 已废弃的 TTSService、VoiceCloneService
- config 中 KLINGAI_*/MINIMAX_* 配置项
- ai_models.yaml 中 klingai 平台和模型配置
- docker-compose 中相关环境变量
- .env.example 中相关配置示例
- deploy-test.sh 中相关检查
- Makefile 中 klingai 语义检查排除规则
- KlingTaskStatus 枚举

修改内容:
- model_router.py 移除 KlingAI 平台分支
- voice.py 重写,修复批量合成/文件保存中 service 未定义的 Bug
- vidu_service.py 移除 MiniMax 相关注释
- script_handler.py 更新注释
2026-05-02 23:16:14 +08:00

116 lines
3.9 KiB
Python

"""
Script 任务处理器
================
管理脚本生成的执行。
不占用 Volc 槽位,使用独立的 script 槽位池。
"""
import logging
from typing import Any
from app.scheduler.handlers.base import AsyncHandler
from app.scheduler.models import StateChange
from app.scheduler.registry import JobRegistry
from app.scheduler.slot_manager import SlotManager
from app.services.script_service import ScriptService
logger = logging.getLogger(__name__)
SLOT_KEY = "script:slots"
MAX_SLOTS = 10
class ScriptHandler(AsyncHandler):
name = "script"
slot_key = SLOT_KEY
max_slots = MAX_SLOTS
async def tick(
self, jobs: list[Any], registry: JobRegistry, slots: SlotManager
) -> list[StateChange]:
changes: list[StateChange] = []
for job in jobs:
acquired = await slots.acquire(SLOT_KEY, job.job_id, MAX_SLOTS)
if not acquired:
continue
try:
changes.extend(await self._process_job(job, registry, slots))
except Exception as e:
logger.exception(f"[Script {job.job_id}] failed")
changes.append(StateChange(job_id=job.job_id, field_path="status", value="failed"))
changes.append(
StateChange(job_id=job.job_id, field_path="error", value=str(e)[:500])
)
finally:
await slots.release(SLOT_KEY, job.job_id)
return changes
async def _process_job(
self, job: Any, registry: JobRegistry, slots: SlotManager
) -> list[StateChange]:
changes: list[StateChange] = []
params = job.params or {}
category = params.get("category", "")
subcategory = params.get("subcategory", "")
style = params.get("style", "default")
duration = params.get("duration", 60)
await registry.update(
job.job_id,
status="running",
progress=10,
message="分析需求中...",
completed=0,
total=1,
)
try:
await __import__("asyncio").sleep(2)
await registry.update(
job.job_id,
progress=40,
message="构思脚本中...",
)
service = ScriptService()
shots = await service.generate_script(
category=category,
subcategory=subcategory,
duration=duration,
script_type=style,
)
# 计算分镜真实总时长
total_duration = sum(s.duration for s in shots if s.duration)
result_data = {
"title": f"{category}/{subcategory}",
"scenes": [s.model_dump() for s in shots],
"total_duration": total_duration,
"style": style,
"shot_count": len(shots),
}
changes.append(StateChange(job_id=job.job_id, field_path="status", value="completed"))
changes.append(StateChange(job_id=job.job_id, field_path="progress", value=100))
changes.append(
StateChange(job_id=job.job_id, field_path="message", value="脚本生成完成")
)
changes.append(StateChange(job_id=job.job_id, field_path="completed", value=1))
changes.append(StateChange(job_id=job.job_id, field_path="total", value=1))
changes.append(StateChange(job_id=job.job_id, field_path="result", value=result_data))
except Exception as exc:
logger.exception(f"[ScriptTask {job.job_id}] Failed")
changes.append(StateChange(job_id=job.job_id, field_path="status", value="failed"))
changes.append(
StateChange(job_id=job.job_id, field_path="message", value=str(exc)[:200])
)
changes.append(StateChange(job_id=job.job_id, field_path="error", value=str(exc)[:500]))
return changes