Files
meijiaka-zy/python-api/app/scheduler/handlers/video_handler.py
T
小鱼开发 773065536c refactor: 统一项目命名为 meijiaka-zy / 美家卡智影
- 中文产品名统一为 美家卡智影
- 代码目录/容器名/数据卷: meijiaka-zy
- 本地存储路径: Meijiaka-zy
- 数据库名: meijiaka_zy
- 七牛云资源前缀: meijiaka-zy
- 部署脚本指向新仓库 meijiaka-zy.git
2026-04-26 23:02:05 +08:00

426 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Video 任务处理器
===============
管理 Kling 视频生成(含 segment 和 empty_shot)的提交、轮询、下载。
占用全局槽位:18
"""
import asyncio
import contextlib
import json
import logging
import tempfile
import uuid
from pathlib import Path
from typing import Any
import aiohttp
from app.ai.providers.klingai_provider import KlingAIProvider, KlingPromptBuilder
from app.config import get_settings
from app.core.config_loader import get_config_loader
from app.scheduler.handlers.base import AsyncHandler
from app.scheduler.models import StateChange
from app.scheduler.registry import JobRegistry
from app.scheduler.slot_manager import SlotManager
from app.services.qiniu_service import get_qiniu_service
logger = logging.getLogger(__name__)
SLOT_KEY = "kling:video_slots"
MAX_SLOTS = 18
class VideoHandler(AsyncHandler):
name = "video"
slot_key = SLOT_KEY
max_slots = MAX_SLOTS
def __init__(self):
self._provider: KlingAIProvider | None = None
async def _get_provider(self) -> KlingAIProvider:
if self._provider is None:
settings = get_settings()
config_loader = get_config_loader()
platform = config_loader.get_platform("klingai")
self._provider = KlingAIProvider(
{
"access_key": settings.KLINGAI_ACCESS_KEY or "",
"secret_key": settings.KLINGAI_SECRET_KEY or "",
"base_url": (
platform.base_url if platform else "https://api-beijing.klingai.com"
),
}
)
return self._provider
def _get_project_video_dir(self, project_id: str) -> Path:
video_dir = Path.home() / "Documents" / "Meijiaka-zy" / "projects" / project_id / "videos"
video_dir.mkdir(parents=True, exist_ok=True)
return video_dir
async def _download_video(self, video_url: str, local_path: Path) -> None:
async with aiohttp.ClientSession() as session, session.get(video_url) as resp:
resp.raise_for_status()
local_path.write_bytes(await resp.read())
async def _download_image(self, image_url: str, local_path: Path) -> None:
async with aiohttp.ClientSession() as session, session.get(image_url) as resp:
resp.raise_for_status()
local_path.write_bytes(await resp.read())
async def _poll_image_task(self, provider: KlingAIProvider, image_task_id: str) -> str:
"""轮询文生图任务,返回图片 URL"""
timeout = 600
start = asyncio.get_event_loop().time()
while True:
if asyncio.get_event_loop().time() - start > timeout:
raise TimeoutError("文生图轮询超时")
result = await provider.get_image_task(image_task_id)
status = result.get("task_status", "unknown")
if status == "succeed":
images = result.get("task_result", {}).get("images", [])
if images and images[0].get("url"):
return images[0]["url"]
raise Exception("文生图成功但未返回图片 URL")
if status == "failed":
raise Exception(result.get("task_status_msg", "文生图失败"))
await asyncio.sleep(5)
async def tick(
self, jobs: list[Any], registry: JobRegistry, slots: SlotManager
) -> list[StateChange]:
changes: list[StateChange] = []
provider = await self._get_provider()
for job in jobs:
job_changes = await self._process_job(job, registry, slots, provider)
changes.extend(job_changes)
return changes
async def _process_job(
self, job: Any, registry: JobRegistry, slots: SlotManager, provider: KlingAIProvider
) -> list[StateChange]:
changes: list[StateChange] = []
params = job.params or {}
shots = params.get("shots", [])
if isinstance(shots, str):
shots = json.loads(shots)
params["shots"] = shots
if not shots:
changes.append(StateChange(job_id=job.job_id, field_path="status", value="failed"))
changes.append(StateChange(job_id=job.job_id, field_path="error", value="没有镜头数据"))
return changes
project_id = params.get("project_id", job.job_id)
# 1. 查询 submitted 状态的 shots
for i, shot in enumerate(shots):
if shot.get("status") != "submitted":
continue
provider_task_id = shot.get("provider_task_id")
if not provider_task_id:
continue
try:
if shot.get("type") == "segment":
result = await provider.get_omni_video_task(provider_task_id)
else:
result = await provider.get_video_task(
provider_task_id, task_type="image2video"
)
status = result.get("task_status", "unknown")
except Exception as e:
logger.error(f"[Video {job.job_id}] Query shot {shot['id']} error: {e}")
# 累计查询失败计数
fail_count = shot.get("query_fail_count", 0) + 1
if fail_count >= 5:
await slots.release(SLOT_KEY, f"{job.job_id}:{shot['id']}")
shots[i]["status"] = "failed"
shots[i]["error_message"] = f"查询状态连续失败: {e}"[:500]
shots[i]["query_fail_count"] = fail_count
changes.append(
StateChange(job_id=job.job_id, field_path="params", value=params)
)
else:
shots[i]["query_fail_count"] = fail_count
changes.append(
StateChange(job_id=job.job_id, field_path="params", value=params)
)
continue
# 检查超时:超过 2 小时还在 processing 就标记失败
created_at = result.get("created_at", 0) # KlingAI 返回的是 Unix 毫秒时间戳
import time
now_ms = int(time.time() * 1000)
if status == "processing" and created_at > 0 and (now_ms - created_at) > 2 * 60 * 60 * 1000:
# 超时 2 小时,标记失败释放槽位
await slots.release(SLOT_KEY, f"{job.job_id}:{shot['id']}")
shots[i]["status"] = "failed"
shots[i]["error_message"] = "生成超时(超过 2 小时仍在处理中)"
logger.warning(f"[Video {job.job_id}] Shot {shot['id']} timeout, marked as failed")
changes.append(StateChange(job_id=job.job_id, field_path="params", value=params))
elif status == "succeed":
await slots.release(SLOT_KEY, f"{job.job_id}:{shot['id']}")
videos = result.get("task_result", {}).get("videos", [])
video_url = videos[0].get("url") if videos else None
if video_url:
shots[i]["video_url"] = video_url
shots[i]["status"] = "completed"
# 完成就立即下载,不用等全部完成
logger.info(f"[Video {job.job_id}] Shot {shot['id']} completed, downloading...")
await self._download_and_upload(project_id, shots[i])
else:
shots[i]["status"] = "failed"
shots[i]["error_message"] = "任务成功但未返回视频"
changes.append(StateChange(job_id=job.job_id, field_path="params", value=params))
elif status == "failed":
await slots.release(SLOT_KEY, f"{job.job_id}:{shot['id']}")
shots[i]["status"] = "failed"
shots[i]["error_message"] = result.get("task_status_msg", "生成失败")[:500]
changes.append(StateChange(job_id=job.job_id, field_path="params", value=params))
# 2. 提交 pending 状态的 shots(填槽),segment 优先于 empty_shot
pending_shots = sorted(
[s for s in shots if s.get("status") == "pending"],
key=lambda s: 0 if s.get("type") == "segment" else 1,
)
for shot in pending_shots:
slot_id = f"{job.job_id}:{shot['id']}"
acquired = await slots.acquire(SLOT_KEY, slot_id, MAX_SLOTS)
if not acquired:
continue # 当前这个获取失败(槽位满或网络问题),跳过尝试下一个,下次 tick 再重试
try:
if shot.get("type") == "segment":
human_id = shot.get("human_id") or params.get("human_id")
if not human_id:
raise ValueError(f"分镜 {shot['id']} 缺少 human_id")
prompt = KlingPromptBuilder.omni_segment(
shot.get("scene", ""), shot.get("voiceover", "")
)
result = await provider.generate_video_omni(
prompt=prompt,
model="kling-v3-omni",
mode="pro",
aspect_ratio="9:16",
duration=shot.get("duration"),
sound="on",
multi_shot=False,
element_list=[{"element_id": str(human_id)}],
)
else:
# empty_shot: 文生图 -> 上传七牛 -> 图生视频
result = await self._submit_empty_shot(shot, provider)
provider_task_id = result.get("task_id")
if not provider_task_id:
raise ValueError(f"创建任务失败,未返回 provider_task_id: {result}")
shot["provider_task_id"] = provider_task_id
shot["status"] = "submitted"
logger.info(f"[Video {job.job_id}] Shot {shot['id']} submitted: {provider_task_id}")
except Exception as e:
await slots.release(SLOT_KEY, slot_id)
shot["status"] = "failed"
shot["error_message"] = str(e)[:500]
logger.error(f"[Video {job.job_id}] Submit shot {shot['id']} failed: {e}")
changes.append(StateChange(job_id=job.job_id, field_path="params", value=params))
# 3. 检查是否所有 shots 都完成,做最终汇总
all_done = all(s.get("status") in ("completed", "failed") for s in shots)
completed = sum(1 for s in shots if s.get("status") == "completed")
failed = sum(1 for s in shots if s.get("status") == "failed")
if all_done:
# 下载已经在每个分镜完成时处理过了,这里只重试下载失败的
retry_download_tasks = [
self._download_and_upload(project_id, shot)
for shot in shots
if shot.get("status") == "completed"
and shot.get("video_url")
and not shot.get("local_path")
]
if retry_download_tasks:
logger.info(f"[Video {job.job_id}] Final retry downloading {len(retry_download_tasks)} videos...")
await asyncio.gather(*retry_download_tasks, return_exceptions=True)
logger.info(f"[Video {job.job_id}] Retry downloads finished")
# shots 字典已被 _download_and_upload 更新,写回 params
changes.append(StateChange(job_id=job.job_id, field_path="params", value=params))
# 下载后重新统计,以反映可能的下载失败
completed = sum(1 for s in shots if s.get("status") == "completed")
failed = sum(1 for s in shots if s.get("status") == "failed")
if completed == 0 and failed > 0:
errors = "; ".join(
f"{s.get('id')}: {s.get('error_message')}"
for s in shots
if s.get("error_message")
)
changes.append(StateChange(job_id=job.job_id, field_path="status", value="failed"))
changes.append(
StateChange(
job_id=job.job_id,
field_path="message",
value=f"全部失败 ({failed}/{len(shots)})",
)
)
changes.append(StateChange(job_id=job.job_id, field_path="error", value=errors))
changes.append(
StateChange(job_id=job.job_id, field_path="completed", value=len(shots))
)
changes.append(StateChange(job_id=job.job_id, field_path="total", value=len(shots)))
changes.append(StateChange(job_id=job.job_id, field_path="progress", value=100))
else:
changes.append(
StateChange(job_id=job.job_id, field_path="status", value="completed")
)
changes.append(
StateChange(
job_id=job.job_id,
field_path="message",
value=f"完成!成功 {completed},失败 {failed}",
)
)
changes.append(
StateChange(job_id=job.job_id, field_path="completed", value=len(shots))
)
changes.append(StateChange(job_id=job.job_id, field_path="total", value=len(shots)))
changes.append(StateChange(job_id=job.job_id, field_path="progress", value=100))
# result 字段包含 shots 汇总(含下载后的 local_path / qiniu_url
result_data = {
"project_id": project_id,
"completed": completed,
"failed": failed,
"total": len(shots),
"shots": [
{
"shot_id": s.get("id"),
"type": s.get("type"),
"status": s.get("status"),
"task_id": s.get("provider_task_id"),
"video_url": s.get("video_url"),
"local_path": s.get("local_path"),
"qiniu_url": s.get("qiniu_url"),
"error_message": s.get("error_message"),
}
for s in shots
],
}
changes.append(
StateChange(job_id=job.job_id, field_path="result", value=result_data)
)
else:
done_count = completed + failed
changes.append(StateChange(job_id=job.job_id, field_path="status", value="running"))
changes.append(
StateChange(
job_id=job.job_id,
field_path="message",
value=f"{done_count}/{len(shots)} 个镜头处理中",
)
)
changes.append(StateChange(job_id=job.job_id, field_path="completed", value=done_count))
changes.append(StateChange(job_id=job.job_id, field_path="total", value=len(shots)))
return changes
async def _submit_empty_shot(
self, shot: dict[str, Any], provider: KlingAIProvider
) -> dict[str, Any]:
"""空镜 shot 的完整提交流程:文生图 -> 上传七牛 -> 图生视频"""
qiniu = get_qiniu_service()
# 1. 文生图
image_result = await provider.generate_image(
prompt=shot.get("scene", ""),
model="kling-v3",
aspect_ratio="9:16",
)
image_task_id = image_result.get("task_id")
if not image_task_id:
raise ValueError(f"文生图创建失败: {image_result}")
# 2. 轮询图片完成
image_url = await self._poll_image_task(provider, image_task_id)
# 3. 下载图片
temp_dir = Path(tempfile.gettempdir()) / "meijiaka-zy_empty_shot"
temp_dir.mkdir(parents=True, exist_ok=True)
temp_image_path = temp_dir / f"{image_task_id}.jpg"
await self._download_image(image_url, temp_image_path)
# 4. 上传七牛
qiniu_result = qiniu.upload_file(
local_path=str(temp_image_path),
file_type="image",
check_duplicate=True,
)
qiniu_image_url = qiniu_result["url"]
with contextlib.suppress(Exception):
temp_image_path.unlink()
# 5. 图生视频
voice_id = shot.get("voice_id") or get_settings().DEFAULT_EMPTY_SHOT_VOICE_ID
prompt = KlingPromptBuilder.empty_shot(shot.get("scene", ""), shot.get("voiceover", ""))
result = await provider.generate_video_image2video(
prompt=prompt,
image_url=qiniu_image_url,
model="kling-v2-6",
mode="pro",
duration=shot.get("duration"),
voice_list=[{"voice_id": voice_id}],
sound="on",
negative_prompt="画外音没有标点的时候不要轻易断句",
)
return result
async def _download_and_upload(self, project_id: str, shot: dict[str, Any]) -> None:
"""下载视频到本地并上传七牛。直接更新传入的 shot 字典,不操作 Redis。"""
video_url = shot.get("video_url")
if not video_url:
shot["status"] = "failed"
shot["error_message"] = "没有视频URL"
return
video_dir = self._get_project_video_dir(project_id)
# 清理同 shot_id 的旧视频文件(避免重新生成后前端缓存不刷新)
import glob as stdlib_glob
pattern = f"scene_{stdlib_glob.escape(str(shot['id']))}_*.mp4"
for old_file in video_dir.glob(pattern):
try:
old_file.unlink()
logger.info(f"[Video] Removed old file: {old_file}")
except Exception as e:
logger.warning(f"[Video] Failed to remove old file {old_file}: {e}")
# 使用随机后缀命名,确保前端检测到 filePath 变化并重新加载
local_path = video_dir / f"scene_{shot['id']}_{uuid.uuid4().hex[:6]}.mp4"
try:
await self._download_video(video_url, local_path)
shot["local_path"] = str(local_path)
try:
qiniu = get_qiniu_service()
qiniu_result = qiniu.upload_video(local_path=str(local_path))
shot["qiniu_url"] = qiniu_result["url"]
except Exception as e:
logger.warning(f"[Video] Shot {shot['id']} upload qiniu failed: {e}")
shot["qiniu_url"] = None
logger.info(f"[Video] Shot {shot['id']} download/upload done: {local_path}")
except Exception as e:
logger.error(f"[Video] Shot {shot['id']} download failed: {e}")
shot["status"] = "failed"
shot["error_message"] = f"下载失败: {e}"[:500]