Files
meijiaka-zy/python-api/mark_timeout_failed.py
T

43 lines
1.0 KiB
Python

#!/usr/bin/env python3
"""
手动把卡住的任务标记失败
"""
import redis.asyncio as redis
import json
import asyncio
from app.config import get_settings
async def main():
settings = get_settings()
r = redis.from_url(settings.REDIS_URL)
job_key = "job:task_09285f973d814364"
params_raw = await r.hget(job_key, "params")
if not params_raw:
print("Job not found")
return
params = json.loads(params_raw)
shots = params["shots"]
# 找到第3个分镜标记失败
for i, shot in enumerate(shots):
if shot["id"] == "3" and shot["status"] == "submitted":
shot["status"] = "failed"
shot["error_message"] = "生成超时(手动标记失败)"
print(f"Marked shot 3 as failed")
break
# 写回 Redis
await r.hset(job_key, "params", json.dumps(params))
# 释放槽位
await r.srem("kling:video_slots", "task_09285f973d814364:3")
print("Done")
await r.aclose()
if __name__ == "__main__":
asyncio.run(main())