98 lines
2.7 KiB
Python
98 lines
2.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
查询 KlingAI Omni-Video 任务状态
|
|
"""
|
|
|
|
import asyncio
|
|
import os
|
|
|
|
import aiohttp
|
|
from dotenv import load_dotenv
|
|
from jose import jwt
|
|
|
|
# 加载 .env 文件
|
|
load_dotenv()
|
|
|
|
access_key = os.getenv("KLINGAI_ACCESS_KEY", "")
|
|
secret_key = os.getenv("KLINGAI_SECRET_KEY", "")
|
|
base_url = "https://api-beijing.klingai.com"
|
|
|
|
if not access_key or not secret_key:
|
|
print("错误: 请在 .env 文件中配置 KLINGAI_ACCESS_KEY 和 KLINGAI_SECRET_KEY")
|
|
exit(1)
|
|
|
|
|
|
def generate_jwt_token() -> str:
|
|
"""生成 JWT Token"""
|
|
import time
|
|
|
|
headers = {"alg": "HS256", "typ": "JWT"}
|
|
current_time = int(time.time())
|
|
payload = {
|
|
"iss": access_key,
|
|
"exp": current_time + 1800,
|
|
"nbf": current_time - 5,
|
|
}
|
|
return jwt.encode(payload, secret_key, algorithm="HS256", headers=headers)
|
|
|
|
|
|
async def query_task(task_type: str, task_id: str) -> dict:
|
|
"""查询任务状态"""
|
|
token = generate_jwt_token()
|
|
url = f"{base_url}/v1/videos/{task_type}/{task_id}"
|
|
headers = {
|
|
"Authorization": f"Bearer {token}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
async with aiohttp.ClientSession() as session, session.get(url, headers=headers) as resp:
|
|
data = await resp.json()
|
|
return data
|
|
|
|
|
|
async def main():
|
|
task_type = "image2video"
|
|
task_id = "871485500765933601"
|
|
print(f"正在查询任务: {task_type}/{task_id}")
|
|
print("-" * 60)
|
|
|
|
result = await query_task(task_type, task_id)
|
|
code = result.get("code", -1)
|
|
message = result.get("message", "")
|
|
data = result.get("data", {})
|
|
|
|
print(f"响应 code: {code}")
|
|
print(f"响应 message: {message}")
|
|
print()
|
|
|
|
if code == 0 and data:
|
|
task_status = data.get("task_status", "")
|
|
if task_status == "succeed":
|
|
print("✅ 任务已完成!")
|
|
video_url = data.get("video_url", "")
|
|
duration = data.get("duration", "")
|
|
print(f"视频 URL: {video_url}")
|
|
print(f"时长: {duration}s")
|
|
elif task_status == "processing" or task_status == "submitted":
|
|
print(f"⏳ 任务处理中... 状态: {task_status}")
|
|
elif task_status == "failed":
|
|
print("❌ 任务失败")
|
|
print(f"失败原因: {data.get('err_msg', '未知错误')}")
|
|
else:
|
|
print(f"任务状态: {task_status}")
|
|
print()
|
|
print("完整数据:")
|
|
import json
|
|
|
|
print(json.dumps(data, indent=2, ensure_ascii=False))
|
|
else:
|
|
print("查询失败")
|
|
print("完整响应:")
|
|
import json
|
|
|
|
print(json.dumps(result, indent=2, ensure_ascii=False))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|