Files
meijiaka-zy/python-api/app/api/v1/klingai.py
T

1047 lines
33 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
KlingAI (可灵 AI) API 路由
==========================
提供视频生成、图像生成、对口型等功能。
"""
import logging
from typing import Any
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, Field
from app.ai.providers.klingai_provider import KlingAIProvider
from app.config import get_settings
from app.core.config_loader import get_config_loader
from app.schemas.common import ApiResponse, success_response
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/klingai", tags=["KlingAI"])
# ============ 请求/响应 Schema ============
class OmniVideoRequest(BaseModel):
"""Omni-Video 视频生成请求(kling-v3-omni / kling-video-o1"""
prompt: str = Field(
...,
description="视频描述提示词(不超过2500字符),支持 <<<element_1>>>/<<<image_1>>>/<<<video_1>>> 引用语法",
example="一只<<<element_1>>>在花园里奔跑,阳光明媚",
)
model: str | None = Field("kling-v3-omni", description="模型: kling-v3-omni, kling-video-o1")
duration: int = Field(5, ge=3, le=15, description="视频时长(秒): 3/5/10/15Omni支持3-15秒")
aspect_ratio: str = Field("16:9", description="宽高比: 16:9, 9:16, 1:1")
mode: str = Field("pro", description="生成模式: pro(高质量) 或 std(标准)")
sound: str = Field("on", description="声音控制: on=音画同出, off=无声")
negative_prompt: str | None = Field(None, description="负面提示词")
# 多镜头参数
multi_shot: bool = Field(False, description="是否启用多镜头模式")
shot_type: str | None = Field(
None, description="分镜方式: customize=自定义, intelligence=智能分镜"
)
multi_prompt: list[dict | None] = Field(
None,
description="多镜头提示词列表,每个元素包含 index, prompt, duration,最多6个分镜",
)
# 参考资源
image_list: list[dict | None] = Field(None, description="参考图片列表,最多4张")
element_list: list[dict | None] = Field(
None, description="主体参考列表,格式: [{'elementId': 123}], 最多7个"
)
video_list: list[dict | None] = Field(None, description="参考视频列表")
callback_url: str | None = Field(None, description="回调通知地址")
external_task_id: str | None = Field(None, description="自定义任务ID")
class VideoGenerateRequest(BaseModel):
"""文生视频请求"""
prompt: str = Field(
...,
description="视频描述提示词(不超过2500字符)",
example="一只猫在花园里玩耍",
)
model: str | None = Field("kling-v2.6", description="视频模型: kling-v2.6, kling-v2.5-turbo")
duration: int = Field(5, ge=5, le=10, description="视频时长(秒): 5 或 10")
aspect_ratio: str = Field("16:9", description="宽高比: 16:9, 9:16, 1:1, 4:3, 3:4")
mode: str = Field("pro", description="生成模式: pro(高质量) 或 std(标准)")
negative_prompt: str | None = Field(None, description="负面提示词")
callback_url: str | None = Field(None, description="回调通知地址")
class VideoGenerateResponse(BaseModel):
"""视频生成响应"""
task_id: str
task_status: str
created_at: int
updated_at: int
class Image2VideoRequest(BaseModel):
"""图生视频请求"""
image_url: str = Field(..., description="输入图片 URL")
prompt: str | None = Field(None, description="视频运动描述提示词")
model: str | None = Field("kling-v2.6", description="视频模型")
duration: int = Field(5, ge=5, le=10, description="视频时长(秒)")
aspect_ratio: str | None = Field(None, description="宽高比")
mode: str = Field("pro", description="生成模式")
callback_url: str | None = Field(None, description="回调通知地址")
class IdentifyFaceRequest(BaseModel):
"""人脸识别请求"""
video_id: str | None = Field(None, description="KlingAI 生成的视频 ID")
video_url: str | None = Field(None, description="上传的视频 URL(与 videoId 二选一")
class IdentifyFaceResponse(BaseModel):
"""人脸识别响应"""
session_id: str
face_data: list[dict[str, Any]]
class FaceChooseItem(BaseModel):
"""新版对口型人脸配置"""
face_id: str = Field(..., description="人脸ID,由 identify-face 接口返回")
audio_id: str | None = Field(None, description="通过TTS生成的音频ID(与 sound_file 二选一)")
sound_file: str | None = Field(None, description="音频文件URL或Base64(与 audio_id 二选一)")
sound_start_time: int = Field(0, description="音频裁剪起点时间(ms")
sound_end_time: int = Field(..., description="音频裁剪终点时间(ms")
sound_insert_time: int = Field(0, description="裁剪后音频插入时间(ms")
class AdvancedLipSyncRequest(BaseModel):
"""新版对口型请求(advanced-lip-sync"""
session_id: str = Field(..., description="人脸识别返回的会话ID")
face_choose: list[FaceChooseItem] = Field(..., description="人脸对口型配置列表")
callback_url: str | None = Field(None, description="回调通知地址")
class OmniImageRequest(BaseModel):
"""Omni-Image 图像生成请求"""
prompt: str = Field(..., description="图像描述提示词")
model: str | None = Field("kling-image-o1", description="模型: kling-image-o1, kling-v3-omni")
aspect_ratio: str | None = Field("9:16", description="宽高比: 16:9/9:16/1:1/4:3/3:4")
resolution: str | None = Field("1k", description="清晰度: 1k/2k/4k")
result_type: str | None = Field("single", description="结果类型: single/series")
n: int | None = Field(1, description="生成数量 1-9")
element_list: list[dict[str, Any]] | None = Field(None, description="主体参考列表")
image_list: list[dict[str, Any]] | None = Field(None, description="参考图列表")
callback_url: str | None = Field(None, description="回调通知地址")
class ImageGenerateRequest(BaseModel):
"""图像生成请求"""
prompt: str = Field(..., description="图像描述提示词")
model: str | None = Field("kolors-v1", description="图像模型: kolors-v1")
width: int = Field(1024, description="图像宽度")
height: int = Field(1024, description="图像高度")
negative_prompt: str | None = Field(None, description="负面提示词")
callback_url: str | None = Field(None, description="回调通知地址")
class TaskStatusResponse(BaseModel):
"""任务状态响应"""
task_id: str
task_status: str # submitted, processing, succeed, failed
created_at: int
updated_at: int
video_url: str | None = None
image_url: str | None = None
error_message: str | None = None
class VirtualTryonRequest(BaseModel):
"""虚拟试穿请求"""
person_image_url: str = Field(..., description="人物图片 URL")
cloth_image_url: str = Field(..., description="衣服图片 URL")
callback_url: str | None = Field(None, description="回调通知地址")
# ============ 自定义音色 Schema ============
class CreateCustomVoiceRequest(BaseModel):
"""创建自定义音色请求"""
voice_name: str = Field(..., description="音色名称(最多20字符)", example="我的音色")
audio_url: str | None = Field(None, description="音频文件URLmp3/wav/mp4/mov")
video_url: str | None = Field(None, description="视频文件URL")
video_id: str | None = Field(
None, description="历史作品IDv2.6/sound=on/数字人/对口型生成的视频)"
)
callback_url: str | None = Field(None, description="回调通知地址")
external_task_id: str | None = Field(None, description="自定义任务ID")
class ElementImage(BaseModel):
"""主体参考图片(对应 KlingAI 官方格式 imageUrl"""
image_url: str = Field(..., description="图片URL")
name: str | None = Field(None, description="图片名称")
class ElementVideo(BaseModel):
"""主体参考视频(对应 KlingAI 官方格式 videoUrl"""
video_url: str = Field(..., description="视频URL")
name: str | None = Field(None, description="视频名称")
class CreateElementRequest(BaseModel):
"""创建主体请求"""
element_name: str = Field(..., description="主体名称(最多20字符)", example="我的小猫")
element_description: str = Field(
..., description="主体描述(最多100字符)", example="一只橘色的小猫,毛茸茸的"
)
reference_type: str = Field("image_refer", description="参考类型: image_refer 或 video_refer")
element_image_list: list[ElementImage] | None = Field(
None, description="图片参考列表(图片定制时必填,第一个作为正面图)"
)
element_video_list: list[ElementVideo] | None = Field(
None, description="视频参考列表(视频定制时必填,第一个作为正面视频)"
)
element_voice_id: str | None = Field(None, description="音色ID,绑定音色到主体")
callback_url: str | None = Field(None, description="回调通知地址")
class ElementResponse(BaseModel):
"""主体响应"""
element_id: int | None = None
element_name: str | None = None
element_description: str | None = None
element_type: str | None = None # image_refer / video_refer
status: str | None = None
task_id: str | None = None
task_status: str | None = None
created_at: int | None = None
updated_at: int | None = None
element_image_list: dict | None = None
element_video_list: dict | None = None
element_voice_info: dict | None = None
owned_by: str | None = None
class CreateCustomVoiceResponse(BaseModel):
"""创建自定义音色响应"""
task_id: str
task_status: str
created_at: int
updated_at: int
class VoiceInfo(BaseModel):
"""音色信息"""
voice_id: str
voice_name: str
trial_url: str | None = None
owned_by: str | None = None
status: str | None = None
# ============ 辅助函数 ============
async def get_klingai_provider() -> KlingAIProvider:
"""获取 KlingAI Provider 实例
API Key 从 Settings 读取(符合配置规范)
"""
settings = get_settings()
config_loader = get_config_loader()
platform = config_loader.get_platform("klingai")
if not platform:
raise HTTPException(status_code=404, detail="KlingAI 平台未配置")
# 从 Settings 读取 AK/SK(符合配置规范:.env → Settings → 服务层)
access_key = settings.KLINGAI_ACCESS_KEY
secret_key = settings.KLINGAI_SECRET_KEY
if not access_key or not secret_key:
raise HTTPException(
status_code=400,
detail="KlingAI Access Key 或 Secret Key 未配置,请设置环境变量 KLINGAI_ACCESS_KEY 和 KLINGAI_SECRET_KEY",
)
# 从 YAML 读取 base_url(模型配置)
base_url = platform.base_url if platform else None
return KlingAIProvider(
{
"access_key": access_key,
"secret_key": secret_key,
"base_url": base_url or "https://api-beijing.klingai.com",
}
)
# ============ API 路由 ============
@router.post("/videos/omni", response_model=ApiResponse[VideoGenerateResponse])
async def create_omni_video(data: OmniVideoRequest):
"""
Omni-Video 多模态视频生成
支持文本、图片、主体、视频等多种输入方式组合生成视频。
适用于 kling-v3-omni 和 kling-video-o1 模型。
**特性:**
- 支持 3-15 秒视频生成
- 支持多镜头和智能分镜 (shotType=intelligence)
- 支持引用主体、图片、视频作为参考
- 支持 <<<element_1>>>/<<<image_1>>>/<<<video_1>>> 语法引用提示词中的资源
"""
try:
provider = await get_klingai_provider()
result = await provider.generate_video_omni(
prompt=data.prompt,
model=data.model,
mode=data.mode,
aspect_ratio=data.aspect_ratio,
duration=data.duration,
sound=data.sound,
negative_prompt=data.negative_prompt,
multi_shot=data.multi_shot,
shot_type=data.shot_type,
multi_prompt=data.multi_prompt,
image_list=data.image_list,
element_list=data.element_list,
video_list=data.video_list,
callback_url=data.callback_url,
external_task_id=data.external_task_id,
)
return success_response(data=VideoGenerateResponse(**result))
except HTTPException:
raise
except Exception as e:
logger.error(f"Omni-Video 生成失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/videos/omni/{task_id}", response_model=ApiResponse[TaskStatusResponse])
async def get_omni_video_task(task_id: str):
"""
查询 Omni-Video 任务状态
查询指定 Omni-Video 任务的执行状态和结果。
"""
try:
provider = await get_klingai_provider()
result = await provider.get_omni_video_task(task_id)
return success_response(data=TaskStatusResponse(**result))
except HTTPException:
raise
except Exception as e:
logger.error(f"查询 Omni-Video 任务失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/videos/omni", response_model=ApiResponse[dict])
async def list_omni_video_tasks(
page: int = 1,
page_size: int = 30,
):
"""
查询 Omni-Video 任务列表
查询历史 Omni-Video 任务列表。
"""
try:
provider = await get_klingai_provider()
result = await provider.list_omni_video_tasks(
page=page,
page_size=page_size,
)
return success_response(data=result)
except HTTPException:
raise
except Exception as e:
logger.error(f"查询 Omni-Video 任务列表失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/videos/image2video", response_model=ApiResponse[VideoGenerateResponse])
async def create_image_to_video(data: Image2VideoRequest):
"""
图生视频
根据输入图片生成视频。
"""
try:
provider = await get_klingai_provider()
result = await provider.generate_video_from_image(
image_url=data.image_url,
prompt=data.prompt,
model=data.model,
duration=data.duration,
aspect_ratio=data.aspect_ratio,
mode=data.mode,
callback_url=data.callback_url,
)
return success_response(data=VideoGenerateResponse(**result))
except HTTPException:
raise
except Exception as e:
logger.error(f"图生视频失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/videos/extend", response_model=ApiResponse[VideoGenerateResponse])
async def extend_video(
video_id: str,
prompt: str | None = None,
duration: int = 5,
callback_url: str | None = None,
):
"""
视频延长
延长现有视频的时长。
"""
try:
provider = await get_klingai_provider()
result = await provider.extend_video(
video_id=video_id,
prompt=prompt,
duration=duration,
callback_url=callback_url,
)
return success_response(data=VideoGenerateResponse(**result))
except HTTPException:
raise
except Exception as e:
logger.error(f"视频延长失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/videos/identify-face", response_model=ApiResponse[IdentifyFaceResponse])
async def identify_face(data: IdentifyFaceRequest):
"""
对口型前置:人脸识别
分析视频中的人脸信息,返回 sessionId 和 faceId,用于后续的 advanced-lip-sync。
"""
try:
if not data.videoId and not data.videoUrl:
raise HTTPException(status_code=400, detail="必须提供 videoId 或 videoUrl")
provider = await get_klingai_provider()
result = await provider.identify_face(video_id=data.videoId, video_url=data.videoUrl)
return success_response(data=IdentifyFaceResponse(**result))
except HTTPException:
raise
except Exception as e:
logger.error(f"人脸识别失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/videos/advanced-lip-sync", response_model=ApiResponse[VideoGenerateResponse])
async def create_advanced_lip_sync(data: AdvancedLipSyncRequest):
"""
新版对口型视频生成
基于 KlingAI advanced-lip-sync 接口,先调用 /videos/identify-face 获取 sessionId 和 faceId
再传入本接口生成对口型视频。
支持 audio_idTTS 生成)或 soundFile(外部音频 URL)驱动口型。
"""
try:
provider = await get_klingai_provider()
face_choose = [item.model_dump() for item in data.faceChoose]
result = await provider.advanced_lip_sync(
session_id=data.sessionId,
face_choose=face_choose,
callback_url=data.callbackUrl,
)
return success_response(data=VideoGenerateResponse(**result))
except HTTPException:
raise
except Exception as e:
logger.error(f"对口型生成失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/videos/advanced-lip-sync/{taskId}", response_model=ApiResponse[TaskStatusResponse])
async def get_advanced_lip_sync_task(taskId: str):
"""
查询新版对口型任务状态
"""
try:
provider = await get_klingai_provider()
result = await provider.get_advanced_lip_sync_task(taskId)
taskStatus = result.get("taskStatus", "unknown")
videos = result.get("task_result", {}).get("videos", [])
videoUrl = videos[0].get("url") if videos else None
return success_response(
data=TaskStatusResponse(
taskId=result.get("taskId", taskId),
taskStatus=taskStatus,
createdAt=result.get("createdAt", 0),
updatedAt=result.get("updatedAt", 0),
videoUrl=videoUrl,
errorMessage=result.get("taskStatus_msg"),
)
)
except HTTPException:
raise
except Exception as e:
logger.error(f"查询对口型任务失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/images/omni", response_model=ApiResponse[VideoGenerateResponse])
async def create_omni_image(data: OmniImageRequest):
"""
Omni-Image 图像生成
支持文本、主体、参考图等多种输入方式组合生成图像。
"""
try:
provider = await get_klingai_provider()
result = await provider.generate_omni_image(
prompt=data.prompt,
model=data.model,
aspect_ratio=data.aspect_ratio,
resolution=data.resolution,
result_type=data.result_type,
n=data.n,
element_list=data.element_list,
image_list=data.image_list,
callback_url=data.callback_url,
)
return success_response(data=VideoGenerateResponse(**result))
except HTTPException:
raise
except Exception as e:
logger.error(f"Omni-Image 生成失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/images/omni/{task_id}", response_model=ApiResponse[TaskStatusResponse])
async def get_omni_image_task(task_id: str):
"""
查询 Omni-Image 任务状态
"""
try:
provider = await get_klingai_provider()
result = await provider.get_omni_image_task(task_id)
task_status = result.get("task_status", "unknown")
images = result.get("task_result", {}).get("images", [])
image_url = images[0].get("url") if images else None
return success_response(
data=TaskStatusResponse(
task_id=result.get("task_id", task_id),
task_status=task_status,
created_at=result.get("created_at", 0),
updated_at=result.get("updated_at", 0),
image_url=image_url,
error_message=result.get("task_status_msg"),
)
)
except HTTPException:
raise
except Exception as e:
logger.error(f"查询 Omni-Image 任务失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/images/generations", response_model=ApiResponse[VideoGenerateResponse])
async def create_image(data: ImageGenerateRequest):
"""
文生图
根据文本描述生成图像。
"""
try:
provider = await get_klingai_provider()
result = await provider.generate_image(
prompt=data.prompt,
model=data.model,
width=data.width,
height=data.height,
negative_prompt=data.negativePrompt,
callback_url=data.callbackUrl,
)
return success_response(data=VideoGenerateResponse(**result))
except HTTPException:
raise
except Exception as e:
logger.error(f"图像生成失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/virtual-tryon", response_model=ApiResponse[VideoGenerateResponse])
async def create_virtual_tryon(data: VirtualTryonRequest):
"""
虚拟试穿
将衣服虚拟试穿到人物身上。
"""
try:
provider = await get_klingai_provider()
result = await provider.virtual_tryon(
person_image_url=data.person_imageUrl,
cloth_image_url=data.cloth_imageUrl,
callback_url=data.callbackUrl,
)
return success_response(data=VideoGenerateResponse(**result))
except HTTPException:
raise
except Exception as e:
logger.error(f"虚拟试穿失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/tasks/{taskId}", response_model=ApiResponse[TaskStatusResponse])
async def get_taskStatus(
taskId: str,
task_type: str = "video",
):
"""
查询任务状态
查询指定任务的执行状态和结果。
Args:
taskId: 任务 ID
task_type: 任务类型 (video, image2video, image, lip-sync, virtual-tryon)
"""
try:
provider = await get_klingai_provider()
result = await provider.get_taskStatus(
task_id=taskId,
task_type=task_type,
)
return success_response(data=TaskStatusResponse(**result))
except HTTPException:
raise
except Exception as e:
logger.error(f"查询任务状态失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/tasks", response_model=ApiResponse[dict])
async def list_tasks(
task_type: str = "video",
page: int = 1,
page_size: int = 10,
):
"""
查询任务列表
查询历史任务列表。
"""
try:
provider = await get_klingai_provider()
result = await provider.list_tasks(
task_type=task_type,
page=page,
page_size=page_size,
)
return success_response(data=result)
except HTTPException:
raise
except Exception as e:
logger.error(f"查询任务列表失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ============ 主体管理 API ============
@router.post("/elements", response_model=ApiResponse[ElementResponse])
async def create_element(data: CreateElementRequest):
"""
创建主体(自定义元素)
通过上传图片或视频创建可复用的主体,用于视频/图像生成时保持角色一致性。
图片要求:
- 格式:jpg, jpeg, png
- 大小:≤10MB
- 数量:正面图 + 1-3张其他角度
视频要求:
- 格式:mp4, mov
- 时长:3-8秒
- 分辨率:1080P
- 大小:≤200MB
"""
try:
provider = await get_klingai_provider()
imageList = None
if data.element_imageList:
imageList = {
"frontalImage": data.element_imageList[0].imageUrl,
"referImages": [
{"imageUrl": img.imageUrl, "name": img.name}
for img in data.element_imageList[1:]
if img.imageUrl
],
}
videoList = None
if data.element_videoList:
videoList = {
"frontal_video": data.element_videoList[0].videoUrl,
"referVideos": [
{"videoUrl": vid.videoUrl, "name": vid.name}
for vid in data.element_videoList[1:]
if vid.videoUrl
],
}
result = await provider.create_element(
element_name=data.elementName,
element_description=data.elementDescription,
reference_type=data.referenceType,
element_image_list=imageList,
element_video_list=videoList,
element_voice_id=data.element_voiceId,
callback_url=data.callbackUrl,
)
return success_response(data=ElementResponse(**result))
except HTTPException:
raise
except Exception as e:
logger.error(f"创建主体失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/elements", response_model=ApiResponse[list[ElementResponse]])
async def list_elements():
"""
查询主体列表
获取所有已创建的主体(自定义元素)列表。
"""
try:
provider = await get_klingai_provider()
result = await provider.list_elements()
elements = [ElementResponse(**item) for item in result if isinstance(item, dict)]
return success_response(data=elements)
except HTTPException:
raise
except Exception as e:
logger.error(f"查询主体列表失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/elements/{elementId}", response_model=ApiResponse[ElementResponse])
async def get_element(elementId: str):
"""
查询单个主体详情
获取指定主体的详细信息。
"""
try:
provider = await get_klingai_provider()
result = await provider.get_element(elementId)
return success_response(data=ElementResponse(**result))
except HTTPException:
raise
except Exception as e:
logger.error(f"查询主体详情失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/elements/{elementId}", response_model=ApiResponse[dict])
async def delete_element(elementId: str):
"""
删除主体
删除不再使用的主体(自定义元素)。
"""
try:
provider = await get_klingai_provider()
result = await provider.delete_element(elementId)
return success_response(data=result)
except HTTPException:
raise
except Exception as e:
logger.error(f"删除主体失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ============ 智能补全主体图 API ============
class AiMultiShotRequest(BaseModel):
"""智能补全主体图请求"""
frontal_image: str = Field(
..., description="主体正面参考图 URL", example="https://example.com/front.jpg"
)
callback_url: str | None = Field(None, description="回调通知地址")
class AiMultiShotResponse(BaseModel):
"""智能补全主体图响应"""
task_id: str
task_status: str
created_at: int
updated_at: int
@router.post("/elements/ai-multi-shot", response_model=ApiResponse[AiMultiShotResponse])
async def ai_multiShot(data: AiMultiShotRequest):
"""
智能补全主体不同角度图片
通过主体正面图,自动推理出该主体其他角度图片。
每次可生成3组结果供选择,每次扣减0.5积分。
使用流程:
1. 调用此接口传入正面图
2. 轮询查询任务状态
3. 获取生成的多组角度图片
4. 选择合适的图片创建主体
"""
try:
provider = await get_klingai_provider()
result = await provider.ai_multiShot(
frontal_image=data.frontalImage,
callback_url=data.callbackUrl,
)
return success_response(data=AiMultiShotResponse(**result))
except HTTPException:
raise
except Exception as e:
logger.error(f"智能补全主体图失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/elements/ai-multi-shot/{taskId}", response_model=ApiResponse[dict])
async def get_ai_multiShot_task(taskId: str):
"""
查询智能补全主体图任务状态
获取指定任务的执行状态和生成的多角度图片结果。
"""
try:
provider = await get_klingai_provider()
result = await provider.get_ai_multiShot_task(taskId)
return success_response(data=result)
except HTTPException:
raise
except Exception as e:
logger.error(f"查询智能补全任务失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ============ 自定义音色 API ============
@router.post("/voices/custom", response_model=ApiResponse[CreateCustomVoiceResponse])
async def create_custom_voice(data: CreateCustomVoiceRequest):
"""
创建自定义音色
通过上传音频文件或引用历史视频创建自定义音色,用于对口型视频。
音频要求:
- 格式:mp3, wav, mp4, mov
- 时长:5-30 秒
- 人声干净、无杂音、单一人声
"""
try:
provider = await get_klingai_provider()
result = await provider.create_custom_voice(
voice_name=data.voiceName,
audio_url=data.audioUrl,
video_url=data.videoUrl,
video_id=data.videoId,
callback_url=data.callbackUrl,
external_task_id=data.externalTaskId,
)
return success_response(data=CreateCustomVoiceResponse(**result))
except HTTPException:
raise
except Exception as e:
logger.error(f"创建自定义音色失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/voices/custom", response_model=ApiResponse[list[VoiceInfo]])
async def list_custom_voices():
"""
查询自定义音色列表
获取所有已创建的自定义音色。
"""
try:
provider = await get_klingai_provider()
result = await provider.list_custom_voices()
voices = []
for item in result:
if isinstance(item, dict) and "task_result" in item:
task_result = item.get("task_result", {})
voices_data = task_result.get("voices", [])
for voice in voices_data:
voices.append(VoiceInfo(**voice))
return success_response(data=voices)
except HTTPException:
raise
except Exception as e:
logger.error(f"查询自定义音色列表失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/voices/custom/{voiceId}", response_model=ApiResponse[dict])
async def get_custom_voice(voiceId: str):
"""
查询单个自定义音色
获取指定自定义音色的详细信息。
"""
try:
provider = await get_klingai_provider()
result = await provider.get_custom_voice(voiceId)
return success_response(data=result)
except HTTPException:
raise
except Exception as e:
logger.error(f"查询自定义音色失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/voices/presets", response_model=ApiResponse[list[VoiceInfo]])
async def list_preset_voices():
"""
查询官方预设音色列表
获取 KlingAI 提供的官方音色列表。
"""
try:
provider = await get_klingai_provider()
result = await provider.list_preset_voices()
voices = []
for item in result:
if isinstance(item, dict) and "task_result" in item:
task_result = item.get("task_result", {})
voices_data = task_result.get("voices", [])
for voice in voices_data:
voices.append(VoiceInfo(**voice))
return success_response(data=voices)
except HTTPException:
raise
except Exception as e:
logger.error(f"查询官方音色列表失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/voices/custom/{voiceId}", response_model=ApiResponse[dict])
async def delete_custom_voice(voiceId: str):
"""
删除自定义音色
删除不再使用的自定义音色。
"""
try:
provider = await get_klingai_provider()
result = await provider.delete_custom_voice(voiceId)
return success_response(data=result)
except HTTPException:
raise
except Exception as e:
logger.error(f"删除自定义音色失败: {e}")
raise HTTPException(status_code=500, detail=str(e))