Files
meijiaka-zy/python-api/tests/test_kling_tts.py
T
小鱼开发 bb08d0f586 refactor: 从智影 Fork 重构为智剪,独立 Docker 基础设施,开发模式认证兜底
主要变更:
- 修复 /tasks/script 路由 404(去掉重复 prefix)
- 开发模式自动认证兜底(无需登录即可测试流程)
- Docker 基础设施独立化(共用 db/redis)
- 前端 API 端口改为 8081
- 新增 TTS/语音克隆、视频粗剪、音频混音等智剪功能
- 删除智影专属模块(avatar、model_usage、qiniu 上传等)
2026-04-21 12:35:50 +08:00

300 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Kling TTS 服务单元测试
"""
import pytest
from unittest.mock import AsyncMock, patch, MagicMock
import httpx
class TestTTSService:
"""TTS 服务测试"""
@pytest.fixture
def mock_provider(self):
"""创建模拟的 KlingAIProvider"""
mock = MagicMock()
mock.generate_tts = AsyncMock(return_value={"task_id": "test-task-123"})
mock.get_tts_task = AsyncMock(return_value={
"status": "succeed",
"task_result": {"audio_url": "https://example.com/audio.mp3"}
})
return mock
@pytest.fixture
def tts_service(self, mock_provider):
"""创建 TTSService 实例(带 mock provider"""
from app.services.tts_service import TTSService
service = TTSService.__new__(TTSService) # 不调用 __init__
service.provider = mock_provider
service.default_voice_id = "829826751244537879"
return service
def test_tts_service_init(self, tts_service):
"""测试 TTSService 初始化"""
assert tts_service is not None
assert hasattr(tts_service, "provider")
assert hasattr(tts_service, "default_voice_id")
def test_preset_voices_loaded(self, tts_service):
"""测试预设音色已加载"""
voices = tts_service.get_preset_voices()
assert len(voices) > 0
assert any(v["voice_id"] == "829826751244537879" for v in voices)
def test_get_voice_by_id_found(self, tts_service):
"""测试根据 ID 获取音色 - 找到"""
voice = tts_service.get_voice_by_id("829826751244537879")
assert voice is not None
assert voice["name"] == "温柔女声"
def test_get_voice_by_id_not_found(self, tts_service):
"""测试根据 ID 获取音色 - 未找到"""
voice = tts_service.get_voice_by_id("non-existent-id")
assert voice is None
@pytest.mark.asyncio
async def test_synthesize_sync_success(self, tts_service):
"""测试同步合成成功"""
with patch.object(tts_service.provider, "generate_tts", new_callable=AsyncMock) as mock_generate:
mock_generate.return_value = {"task_id": "test-task-123"}
with patch.object(tts_service.provider, "get_tts_task", new_callable=AsyncMock) as mock_get:
# 第一次调用:pending
mock_get.return_value = {"status": "pending"}
# 第二次调用:succeed
mock_get.return_value = {
"status": "succeed",
"task_result": {"audio_url": "https://kling.example.com/audio.mp3"}
}
result = await tts_service.synthesize_sync(
text="你好世界",
voice_id="829826751244537879",
speed=1.0,
voice_language="zh"
)
assert result == "https://kling.example.com/audio.mp3"
@pytest.mark.asyncio
async def test_synthesize_sync_empty_text(self, tts_service):
"""测试空文本"""
with pytest.raises(ValueError, match="text 不能为空"):
await tts_service.synthesize_sync(text="", voice_id="test")
@pytest.mark.asyncio
async def test_synthesize_sync_text_too_long(self, tts_service):
"""测试文本超长(验证在 API 调用之前)"""
# 中文每个字是 1 个字符,"测" * 1200 = 1200 个字符
long_text = "" * 1201 # 超过 1000
with pytest.raises(ValueError, match="text 不能超过"):
await tts_service.synthesize_sync(text=long_text, voice_id="test")
@pytest.mark.asyncio
async def test_synthesize_sync_no_task_id(self, tts_service):
"""测试提交任务未返回 task_id"""
with patch.object(tts_service.provider, "generate_tts", new_callable=AsyncMock) as mock_generate:
mock_generate.return_value = {} # 没有 task_id
with pytest.raises(ValueError, match="任务提交失败"):
await tts_service.synthesize_sync(text="你好", voice_id="test")
@pytest.mark.asyncio
async def test_synthesize_sync_task_failed(self, tts_service):
"""测试任务失败"""
with patch.object(tts_service.provider, "generate_tts", new_callable=AsyncMock) as mock_generate:
mock_generate.return_value = {"task_id": "test-task-123"}
with patch.object(tts_service.provider, "get_tts_task", new_callable=AsyncMock) as mock_get:
mock_get.return_value = {"status": "failed", "message": "服务器错误"}
with pytest.raises(ValueError, match="任务失败"):
await tts_service.synthesize_sync(text="你好", voice_id="test")
def test_get_preset_voices_static(self):
"""测试静态方法获取预设音色"""
from app.services.tts_service import TTSService
voices = TTSService.get_preset_voices()
assert len(voices) == 5
# 验证默认音色存在
assert any(v["voice_id"] == "829826751244537879" for v in voices)
class TestVoiceCloneService:
"""声音克隆服务测试"""
@pytest.fixture
def clone_service(self):
"""创建 VoiceCloneService 实例"""
from app.services.voice_clone_service import VoiceCloneService
return VoiceCloneService()
def test_voice_clone_service_init(self, clone_service):
"""测试 VoiceCloneService 初始化"""
assert clone_service is not None
assert hasattr(clone_service, "provider")
assert hasattr(clone_service, "timeout")
@pytest.mark.asyncio
async def test_submit_clone_with_audio_url(self, clone_service):
"""测试使用音频 URL 提交克隆任务"""
with patch.object(clone_service.provider, "create_custom_voice", new_callable=AsyncMock) as mock_create:
mock_create.return_value = {"task_id": "clone-task-123"}
task_id = await clone_service.submit_clone_task(
source_audio_url="https://example.com/source.mp3",
voice_name="我的克隆音色"
)
assert task_id == "clone-task-123"
mock_create.assert_called_once()
call_kwargs = mock_create.call_args.kwargs
assert call_kwargs["audio_url"] == "https://example.com/source.mp3"
assert call_kwargs["voice_name"] == "我的克隆音色"
@pytest.mark.asyncio
async def test_submit_clone_with_video_url(self, clone_service):
"""测试使用视频 URL 提交克隆任务"""
with patch.object(clone_service.provider, "create_custom_voice", new_callable=AsyncMock) as mock_create:
mock_create.return_value = {"task_id": "clone-task-456"}
task_id = await clone_service.submit_clone_task(
source_video_url="https://example.com/source.mp4",
voice_name="视频音色"
)
assert task_id == "clone-task-456"
mock_create.assert_called_once()
call_kwargs = mock_create.call_args.kwargs
assert call_kwargs["video_url"] == "https://example.com/source.mp4"
@pytest.mark.asyncio
async def test_submit_clone_no_source(self, clone_service):
"""测试没有提供任何来源时抛出异常"""
with pytest.raises(ValueError, match="必须提供"):
await clone_service.submit_clone_task()
@pytest.mark.asyncio
async def test_submit_clone_invalid_audio_url(self, clone_service):
"""测试无效的音频 URL"""
with pytest.raises(ValueError, match="有效的 URL"):
await clone_service.submit_clone_task(source_audio_url="not-a-url")
@pytest.mark.asyncio
async def test_submit_clone_voice_name_too_long(self, clone_service):
"""测试音色名称过长"""
with pytest.raises(ValueError, match="不能超过"):
await clone_service.submit_clone_task(
source_audio_url="https://example.com/audio.mp3",
voice_name="a" * 25 # 超过 20 字符
)
@pytest.mark.asyncio
async def test_submit_clone_no_task_id(self, clone_service):
"""测试提交任务未返回 task_id"""
with patch.object(clone_service.provider, "create_custom_voice", new_callable=AsyncMock) as mock_create:
mock_create.return_value = {} # 没有 task_id
with pytest.raises(ValueError, match="未返回 task_id"):
await clone_service.submit_clone_task(
source_audio_url="https://example.com/audio.mp3"
)
@pytest.mark.asyncio
async def test_query_clone_task_success(self, clone_service):
"""测试查询克隆任务状态 - 成功"""
with patch.object(clone_service.provider, "get_custom_voice_task", new_callable=AsyncMock) as mock_get:
mock_get.return_value = {
"task_status": "succeed",
"task_result": {
"voices": [
{"voice_id": "custom-voice-001", "trial_url": "https://example.com/trial.mp3"}
]
}
}
result = await clone_service.query_clone_task("clone-task-123")
assert result["task_id"] == "clone-task-123"
assert result["status"] == "succeeded"
assert result["voice_id"] == "custom-voice-001"
assert result["trial_url"] == "https://example.com/trial.mp3"
@pytest.mark.asyncio
async def test_query_clone_task_pending(self, clone_service):
"""测试查询克隆任务状态 - 待处理"""
with patch.object(clone_service.provider, "get_custom_voice_task", new_callable=AsyncMock) as mock_get:
mock_get.return_value = {"task_status": "pending"}
result = await clone_service.query_clone_task("clone-task-123")
assert result["task_id"] == "clone-task-123"
assert result["status"] == "pending"
assert result["voice_id"] is None
@pytest.mark.asyncio
async def test_query_clone_task_failed(self, clone_service):
"""测试查询克隆任务状态 - 失败"""
with patch.object(clone_service.provider, "get_custom_voice_task", new_callable=AsyncMock) as mock_get:
mock_get.return_value = {"task_status": "failed", "message": "音频格式不支持"}
result = await clone_service.query_clone_task("clone-task-123")
assert result["status"] == "failed"
assert "音频格式不支持" in result["error_message"]
@pytest.mark.asyncio
async def test_delete_custom_voice_success(self, clone_service):
"""测试删除自定义音色 - 成功"""
with patch.object(clone_service.provider, "delete_custom_voice", new_callable=AsyncMock) as mock_delete:
mock_delete.return_value = {"code": 0, "message": "success"}
result = await clone_service.delete_custom_voice("custom-voice-001")
assert result is True
mock_delete.assert_called_once_with("custom-voice-001")
@pytest.mark.asyncio
async def test_delete_custom_voice_failure(self, clone_service):
"""测试删除自定义音色 - 失败"""
with patch.object(clone_service.provider, "delete_custom_voice", new_callable=AsyncMock) as mock_delete:
mock_delete.return_value = {"code": 400, "message": "Invalid voice id"}
result = await clone_service.delete_custom_voice("non-existent")
assert result is False
@pytest.mark.asyncio
async def test_wait_for_clone_immediate_success(self, clone_service):
"""测试立即克隆成功"""
with patch.object(clone_service.provider, "create_custom_voice", new_callable=AsyncMock) as mock_create:
mock_create.return_value = {"task_id": "clone-task-123"}
with patch.object(clone_service.provider, "get_custom_voice_task", new_callable=AsyncMock) as mock_get:
# 第一次查询就成功
mock_get.return_value = {
"task_status": "succeed",
"task_result": {"voices": [{"voice_id": "v1"}]}
}
result = await clone_service.wait_for_clone(
source_audio_url="https://example.com/audio.mp3"
)
assert result["status"] == "succeeded"
assert result["voice_id"] == "v1"
class TestCloneTaskStatus:
"""克隆任务状态枚举测试"""
def test_clone_task_status_values(self):
"""测试状态枚举值"""
from app.services.voice_clone_service import CloneTaskStatus
assert CloneTaskStatus.PENDING.value == "pending"
assert CloneTaskStatus.PROCESSING.value == "processing"
assert CloneTaskStatus.SUCCEEDED.value == "succeeded"
assert CloneTaskStatus.FAILED.value == "failed"
assert CloneTaskStatus.TIMEOUT.value == "timeout"