Files
meijiaka-zy/python-api/app/services/voice_clone_service.py
T
小鱼开发 67e73b5a51 feat: 素材库重构、七牛上传修复、配音页面优化、MiniMax后端接入
- 素材库: VoiceMaterialLibrary 支持音频/视频分类、Modal弹窗、进度弹窗
- 列表布局: 紧凑单行、灰色图标按钮、重命名功能、删除ConfirmModal
- 生成配音: toast替换为ProgressModal
- 私有音色显示: 描述改为createdAt日期
- 七牛上传: 修复upload_stream参数、修正put_stream参数名
- MiniMax后端: 新增Provider+Service,TTS/克隆/音色列表切到MiniMax
- 前端默认音色: tianxin_xiaoling
- Rust: 新增voice命令、本地音频存储、配音生成功能
- 新增shot统计组件、脚本编辑器优化
2026-04-21 23:27:08 +08:00

264 lines
8.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
语音克隆服务层
=============
封装 Kling AI 声音克隆 API,提供个性化音色克隆能力。
API 文档:https://klingai.com/document-api
"""
import asyncio
import logging
from enum import Enum
from app.ai.providers.klingai_provider import KlingAIProvider
from app.config import get_settings
logger = logging.getLogger(__name__)
# 克隆任务配置
CLONE_TASK_TIMEOUT = 600 # 克隆任务最大等待时间(秒)
CLONE_POLL_INTERVAL = 5.0 # 轮询间隔(秒)
def _get_kling_provider() -> KlingAIProvider:
"""获取 KlingAI Provider 实例"""
settings = get_settings()
config = {
"access_key": settings.KLINGAI_ACCESS_KEY or "",
"secret_key": settings.KLINGAI_SECRET_KEY or "",
}
return KlingAIProvider(config)
class CloneTaskStatus(Enum):
"""克隆任务状态(字符串枚举)"""
PENDING = "pending" # 任务已提交,等待处理
PROCESSING = "processing" # 正在处理
SUCCEEDED = "succeeded" # 成功
FAILED = "failed" # 失败
TIMEOUT = "timeout" # 超时
class VoiceCloneService:
"""
Kling AI 声音克隆服务客户端
⚠️ 已废弃:语音克隆功能已迁移至 MiniMaxTTSService
保留此文件仅用于历史兼容,新代码请使用 MiniMaxTTSService
"""
def __init__(self) -> None:
self.provider = _get_kling_provider()
self.timeout = CLONE_TASK_TIMEOUT
async def submit_clone_task(
self,
source_audio_url: str | None = None,
source_video_url: str | None = None,
video_id: str | None = None,
voice_name: str | None = None,
callback_url: str | None = None,
external_task_id: str | None = None,
) -> str:
"""
提交声音克隆任务。
Args:
source_audio_url: 源音频 URL5-30秒,mp3/wav格式,需公开可访问)
source_video_url: 源视频 URL(可选)
video_id: 历史作品ID(可选,通过已有作品克隆音色)
voice_name: 自定义音色名称(≤20字符)
callback_url: 回调地址
external_task_id: 自定义任务ID
Returns:
克隆任务 ID
Raises:
ValueError: 参数校验失败
"""
if not source_audio_url and not source_video_url and not video_id:
raise ValueError("必须提供 source_audio_url、source_video_url 或 video_id 之一")
if source_audio_url and not source_audio_url.startswith(("http://", "https://")):
raise ValueError("source_audio_url 必须是有效的 URL")
if source_video_url and not source_video_url.startswith(("http://", "https://")):
raise ValueError("source_video_url 必须是有效的 URL")
if voice_name and len(voice_name) > 20:
raise ValueError("voice_name 不能超过 20 字符")
# 提交克隆任务
result = await self.provider.create_custom_voice(
voice_name=voice_name or "自定义音色",
audio_url=source_audio_url,
video_url=source_video_url,
video_id=video_id,
callback_url=callback_url,
external_task_id=external_task_id,
)
# Kling API 返回 task_id
task_id = result.get("task_id")
if not task_id:
raise ValueError("提交克隆任务失败: 未返回 task_id")
logger.info(f"[VoiceClone] 提交任务成功: task_id={task_id}")
return task_id
async def query_clone_task(self, task_id: str, blocking: bool = False) -> dict:
"""
查询声音克隆任务状态。
Args:
task_id: 任务 ID
blocking: 是否阻塞等待(False 则立即返回当前状态)
Returns:
任务状态信息,包含字段:
- task_id: 任务 ID
- status: 任务状态 (pending/processing/succeeded/failed/timeout)
- voice_id: 克隆成功的音色 ID(如已完成)
- trial_url: 试听地址(如已完成)
- error_message: 错误信息(如失败)
"""
# Kling 使用不同的查询接口
result = await self.provider.get_custom_voice_task(task_id)
status = result.get("task_status", "pending")
# 映射状态
status_map = {
"pending": CloneTaskStatus.PENDING.value,
"processing": CloneTaskStatus.PROCESSING.value,
"succeed": CloneTaskStatus.SUCCEEDED.value,
"failed": CloneTaskStatus.FAILED.value,
}
mapped_status = status_map.get(status, status)
ret = {
"task_id": task_id,
"status": mapped_status,
"voice_id": None,
"trial_url": None,
"error_message": None,
}
# 提取音色信息
if mapped_status == CloneTaskStatus.SUCCEEDED.value:
task_result = result.get("task_result", {})
if isinstance(task_result, dict):
voices = task_result.get("voices", [])
if voices and len(voices) > 0:
ret["voice_id"] = voices[0].get("voice_id")
ret["trial_url"] = voices[0].get("trial_url")
if mapped_status == CloneTaskStatus.FAILED.value:
ret["error_message"] = result.get("message", "任务失败")
if blocking and mapped_status in (CloneTaskStatus.PENDING.value, CloneTaskStatus.PROCESSING.value):
ret = await self._wait_for_completion(task_id)
return ret
async def _wait_for_completion(self, task_id: str, poll_interval: float = CLONE_POLL_INTERVAL) -> dict:
"""
阻塞等待克隆任务完成。
Args:
task_id: 任务 ID
poll_interval: 轮询间隔(秒)
Returns:
最终任务状态
"""
elapsed = 0.0
while elapsed < self.timeout:
await asyncio.sleep(poll_interval)
elapsed += poll_interval
result = await self.query_clone_task(task_id, blocking=False)
status = result.get("status", "pending")
logger.debug(f"[VoiceClone] task_id={task_id}, status={status}, elapsed={elapsed}s")
if status in (CloneTaskStatus.SUCCEEDED.value, CloneTaskStatus.FAILED.value):
return result
# 超时
logger.warning(f"[VoiceClone] task_id={task_id} 等待超时")
return {
"task_id": task_id,
"status": CloneTaskStatus.TIMEOUT.value,
"voice_id": None,
"trial_url": None,
"error_message": f"等待超时({self.timeout}秒)",
}
async def wait_for_clone(
self,
source_audio_url: str | None = None,
source_video_url: str | None = None,
video_id: str | None = None,
voice_name: str | None = None,
poll_interval: float = CLONE_POLL_INTERVAL,
) -> dict:
"""
一站式:提交克隆任务并等待完成。
Args:
source_audio_url: 源音频 URL
source_video_url: 源视频 URL
video_id: 历史作品ID
voice_name: 自定义音色名称
poll_interval: 轮询间隔
Returns:
最终任务状态
Raises:
ValueError: 提交失败
TimeoutError: 等待超时
"""
task_id = await self.submit_clone_task(
source_audio_url=source_audio_url,
source_video_url=source_video_url,
video_id=video_id,
voice_name=voice_name,
)
result = await self.query_clone_task(task_id, blocking=False)
status = result.get("status", "pending")
if status == CloneTaskStatus.SUCCEEDED.value:
logger.info(f"[VoiceClone] 克隆成功: task_id={task_id}")
return result
# 阻塞等待
result = await self._wait_for_completion(task_id, poll_interval=poll_interval)
return result
async def list_custom_voices(self) -> list[dict]:
"""
查询自定义音色列表。
Returns:
自定义音色列表
"""
return await self.provider.list_custom_voices()
async def delete_custom_voice(self, voice_id: str) -> bool:
"""
删除自定义音色。
Args:
voice_id: 音色 ID
Returns:
是否删除成功
"""
result = await self.provider.delete_custom_voice(voice_id)
return result.get("code") == 0