Files
meijiaka-zy/docs/third-party-integration-architecture.md
小鱼开发 7550559aa0 refactor: 清理未使用IPC命令、修正point_service注释与扣费逻辑、修复camelToSnake正则、优化vidu import
- 删除8个未使用IPC命令,保留validate_media_path
- file.rs返回类型优化为ApiResponse<()>
- point_service.consume()注释与签名一致
- VideoGeneration改为拼接成功后扣费
- 添加漏扣费风险注释
- 删除过时测试文件
- 修复camelToSnake连续大写字母问题
- vidu.py import移至模块顶层

Refs: P1-1~P1-6 技术债务清理
2026-05-14 17:45:28 +08:00

695 lines
21 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 第三方平台接入架构设计方案(最终版)
> 版本:v1.0 Final
> 适用范围:`python-api/` 所有第三方服务接入层
> 生效日期:2026-05-02
---
## 一、设计目标
| 目标 | 验收标准 |
|------|---------|
| 新增平台接入成本 < 30 分钟 | 提供 Adapter 模板,复制粘贴后填充 4 个方法即可 |
| 第三方故障不拖垮用户 | 单点故障时,用户 100ms 内收到明确错误,而非超时 30 秒 |
| 多用户同时使用无冲突 | 5 个用户同时生成 TTS/脚本时,不触发第三方 429 限流 |
| 任务状态可追踪 | 用户关闭应用后重开,能恢复进行中的视频生成/字幕任务 |
| 未来换平台无感知 | 换 TTS 供应商时,前端接口、存储层、用户历史记录全部无感知 |
---
## 二、整体架构
```
┌──────────────────────────────────────────────┐
│ RouterFastAPI
│ - 校验输入、序列化输出 │
│ - 统一错误中间件 │
│ - 不处理重试、不限流、不直接调第三方 │
│ - 回调入口:/webhooks/{platform} │
├──────────────────────────────────────────────┤
│ Application Service │
│ - ScriptService:编排脚本→TTS→视频生成 │
│ - VideoService:编排字幕→合成 │
│ - 只操作领域对象,不感知平台差异 │
├──────────────────────────────────────────────┤
│ Gateway Layer │
│ ┌─────────────────┐ ┌──────────────────┐ │
│ │ LLM Gateway │ │ Task Gateway │ │
│ │ - 模型路由 │ │ - 任务状态机 │ │
│ │ - Fallback │ │ - 轮询调度 │ │
│ │ - 流式代理 │ │ - 回调处理 │ │
│ └────────┬────────┘ └────────┬─────────┘ │
│ │ │ │
│ └────────────────────┘ │
│ │ │
│ ┌────────▼────────┐ │
│ │ Shared Infra │ │
│ │ - Token Bucket │ │
│ │ - CircuitBreaker│ │
│ │ - stamina retry │ │
│ │ - Structured Log│ │
│ └────────┬────────┘ │
├────────────────────┼─────────────────────────┤
│ Adapter Layer │ │
│ - VolcengineArkAdapter │
│ - OpenAIAdapter │
│ - ViduAdapter │
│ - VolcengineCaptionAdapter │
│ - MockAdapter │
│ 每个 AdapterProtocol 约定,无状态,可替换 │
├──────────────────────────────────────────────┤
│ Transport Layer │
│ - httpx.AsyncClient(所有 Raw HTTP
│ - 官方 SDK(仅 LLM 层:AsyncArk/AsyncOpenAI)│
│ - lifespan 显式创建、显式关闭 │
└────────────────────────────────────────────────┘
```
---
## 三、分层设计
### 3.1 Adapter 层
**职责**:纯翻译。把内部标准请求 ↔ 供应商特定请求,内部标准响应 ↔ 供应商特定响应。
**不职责**:重试、限流、业务逻辑、状态管理。
**Protocol 约定**
```python
class LLMAdapter(Protocol):
platform_id: str
async def chat(self, messages, model, **params) -> AdapterResponse: ...
async def chat_stream(self, messages, model, **params): ... # AsyncIterator
async def health(self) -> AdapterResponse: ...
async def close(self) -> None: ...
class TaskAdapter(Protocol):
platform_id: str
async def submit(self, task_type, payload, callback_url) -> AdapterResponse: ...
async def query(self, platform_task_id) -> TaskStatus: ...
async def parse_callback(self, body) -> TaskStatus: ...
async def verify_signature(self, headers, body, secret) -> bool: ...
async def extract_nonce(self, headers) -> str | None: ...
async def health(self) -> AdapterResponse: ...
async def close(self) -> None: ...
```
**AdapterResponse 标准格式**
```python
@dataclass(frozen=True)
class AdapterResponse:
success: bool
data: dict | None = None
error_code: str | None = None
error_message: str | None = None
retryable: bool = False # Gateway 据此决定是否重试
```
**TaskStatus 标准格式**
```python
@dataclass(frozen=True)
class TaskStatus:
task_id: str # 供应商 task_id
state: str # "pending" | "processing" | "completed" | "failed"
result: dict | None = None
error_message: str | None = None
```
**Client 统一**
- 所有 Raw HTTP 用 `httpx.AsyncClient`
- LLM 官方 SDKAsyncArk、AsyncOpenAI)保留,但 lifespan shutdown 时显式 `close()`
- 每个 Adapter 独立 Client,独立连接池,互不干扰。
---
### 3.2 Gateway 层
#### 3.2.1 LLM Gateway
```python
class LLMGateway:
def __init__(self, adapters: dict[str, LLMAdapter], runtime_config: GatewayRuntimeConfig):
self.adapters = adapters
self.config = runtime_config
async def chat(self, model_id, messages, **params) -> dict:
# 1. 路由到 Adapter
# 2. 主模型失败时 Fallback
# 3. 流式中途失败不再 Fallback
```
**Fallback 规则**
- 配置驱动,`runtime_config.ark_fallback_chain`
- 流式中途失败 → 立即抛异常,不降级(避免内容混合)
- 同步调用失败 → 按链降级,对用户透明
#### 3.2.2 Task Gateway
```python
class TaskGateway:
def __init__(self, adapters, storage, runtime_config):
self.adapters = adapters
self.storage = storage # Redis
self.config = runtime_config
self.circuit = CircuitBreaker()
async def submit(self, platform_id, task_type, payload, callback_url=None) -> str:
# 1. 限流检查
# 2. 熔断检查
# 3. Adapter.submit() → 获取 platform_task_id
# 4. 生成 internal_task_id (UUID)
# 5. Redis 存储映射
# 6. 返回 internal_task_id
async def query(self, internal_task_id) -> TaskStatus:
# 1. 查 Redis 映射
# 2. 非终态时穿透供应商查询(可选)
# 3. 更新 Redis
async def handle_webhook(self, platform, headers, body, query):
# 1. nonce 防重放检查
# 2. Adapter.verify_signature()
# 3. Adapter.parse_callback()
# 4. 更新任务状态
```
**内部 ID 隔离**
```python
# Redis 存储结构
task:{internal_task_id} -> {
"platform_id": "vidu",
"platform_task_id": "vidu_abc123",
"task_type": "lip_sync",
"state": "processing",
"submitted_at": "2026-05-02T12:00:00Z"
}
TTL: 3600 1 小时
```
**轮询调度器**(火山字幕示例):
```python
async def poll_until_complete(self, internal_task_id, max_wait=120):
intervals = [0, 1, 2, 4, 8, 8, 10] # 非阻塞阶段
for interval in intervals:
await asyncio.sleep(interval)
status = await self.query(internal_task_id)
if status.state == "completed":
return status
if status.state == "failed":
raise TaskError(status.error_message)
# 切换 blocking 阶段
while elapsed < max_wait:
status = await self._query_with_blocking(internal_task_id)
if status.state in ("completed", "failed"):
return status
raise TaskError("任务超时")
```
#### 3.2.3 Shared Infra
**Token Bucket**(内存级,`aiolimiter`):
```python
vidu_limiter = AsyncLimiter(max_rate=20, time_period=1.0) # 20/s
caption_limiter = AsyncLimiter(max_rate=2, time_period=1.0) # 2/s
ark_limiter = AsyncLimiter(max_rate=50, time_period=1.0) # 50/s
```
**CircuitBreaker**
```python
class CircuitBreaker:
failure_threshold: int = 5 # 连续失败 5 次熔断
recovery_timeout: float = 60.0 # 60 秒后探测恢复
```
**Retry Policy**`stamina`):
```python
with stamina.retry_context(
on=(httpx.NetworkError, httpx.TimeoutException),
attempts=3,
timeout=30.0,
wait_initial=1.0,
wait_max=10.0,
):
await adapter.submit(...)
```
---
### 3.3 Application Service 层
**职责**:编排业务流程,不感知平台差异。
```python
class ScriptService:
async def generate_script(self, category, subcategory, duration):
# 调用 LLM Gateway,不关心底层是火山方舟还是 OpenAI
result = await llm_gateway.chat(
model_id="doubao-seed-2-0-pro",
messages=[...],
temperature=0.7,
)
return self._parse_shots(result.data["content"])
class VideoService:
async def submit_lip_sync(self, video_url, audio_url):
# 调用 Task Gateway,不关心底层是 Vidu 还是 HeyGen
task_id = await task_gateway.submit(
platform_id="vidu",
task_type="lip_sync",
payload={"video_url": video_url, "audio_url": audio_url},
callback_url=f"{settings.app_base_url}/webhooks/vidu",
)
return task_id
```
---
### 3.4 Router 层
**职责**:HTTP 语义转换,参数校验,统一返回格式。
**统一错误中间件**
```python
@app.exception_handler(PlatformError)
async def platform_error_handler(request, exc: PlatformError):
status = 502 if exc.retryable else 400
return JSONResponse(
status_code=status,
content={
"code": exc.status_code or 500,
"message": str(exc),
"data": None,
"detail": {
"platform": exc.platform,
"retryable": exc.retryable,
} if settings.DEBUG else None,
},
)
```
**回调入口**
```python
@router.post("/webhooks/{platform}")
async def universal_webhook(platform: str, request: Request):
raw_headers = dict(request.headers)
raw_body = await request.body()
query_params = dict(request.query_params)
await task_gateway.handle_webhook(
platform=platform,
headers=raw_headers,
body=raw_body,
query=query_params,
original_path=request.url.path,
)
return {"received": True}
```
---
## 四、核心数据流
### 4.1 TTS 语音合成(同步调用)
```
用户点击"生成配音"
POST /voice/synthesize
Router 校验参数
ViduService.synthesize(text, voice_id...)
LLM Gateway.call_sync(platform="vidu", method="tts", ...)
Token Bucket 取令牌(rate=20/s
stamina 重试网络错误(最多3次)
ViduAdapter.call(method="tts", ...)
httpx.AsyncClient → Vidu API
返回音频 URL
```
**异常路径**
- 网络错误 → stamina 重试 → 3 次失败后抛 PlatformError(retryable=True) → 502
- Vidu 返回 400 → PlatformError(retryable=False) → 400
- Vidu 返回 500 → PlatformError(retryable=True) → 502
### 4.2 脚本生成 SSE(流式调用)
```
用户点击"生成脚本"
POST /script/generate/stream
ScriptService.generate_script_stream(...)
LLM Gateway.chat_stream(model_id="doubao-seed-2-0-pro", ...)
VolcengineArkAdapter.chat_stream(...)
SSE 流式输出
```
**关键约束**:流式中途失败**不降级**。已输出内容保持不变,前端收到 error 事件后自行处理。
### 4.3 视频生成任务提交(异步任务)
```
用户点击"生成视频"
POST /vidu/lip-sync
VideoService.submit_lip_sync(...)
Task Gateway.submit(platform="vidu", task_type="lip_sync", ...)
Token Bucket 取令牌(rate=5/s
CircuitBreaker 检查
ViduAdapter.submit(method="lip_sync", ...)
返回 platform_task_id
生成 internal_task_id (UUID)
Redis 存储映射
返回 {task_id: internal_task_id}
```
### 4.4 字幕打轴(同步阻塞 + 后端内部轮询)
当前前端调用 `POST /caption/ata/align`,后端同步阻塞等待结果(内部轮询最多 120 秒),直接返回打轴结果。
```
用户点击"生成字幕"
POST /caption/ata/align
CaptionService.auto_align_caption(...) 内部轮询
直接返回 {utterances, duration}
```
> 注:`/caption/generate`、`/caption/submit` 等异步字幕接口已删除,当前仅保留 `/caption/ata/align` 同步打轴。
### 4.5 回调处理(Vidu 视频生成完成)
```
Vidu 服务器 POST /webhooks/vidu
Router 提取 headers / body / query
Task Gateway.handle_webhook(...)
1. ViduAdapter.extract_nonce(headers) → nonce
→ Redis 查 nonce 是否已用
→ 已用 → 401
2. ViduAdapter.verify_signature(headers, body, secret)
→ 失败 → 401
3. Redis 标记 nonce 已用(TTL 300s
4. ViduAdapter.parse_callback(body) → TaskStatus
5. Redis 更新任务状态
```
---
## 五、并发控制
### 5.1 三层隔离模型
```
┌─────────────────────────────────────────┐
│ 第一层:任务层(Slot Scheduler
│ 控制"同时有多少个异步任务在执行" │
│ - 火山字幕:max 5 │
│ - 视频生成:按 Vidu 配额配置 │
│ - 脚本生成:max 10 │
├─────────────────────────────────────────┤
│ 第二层:请求层(Gateway Token Bucket
│ 控制"每秒向某平台发多少请求" │
│ - Vidu TTS20/s │
│ - Vidu 视频生成提交:5/s │
│ - 火山方舟:50/s │
│ - 火山字幕提交:2/s │
├─────────────────────────────────────────┤
│ 第三层:连接层(HTTP Client Pool
│ 控制"同时保持多少条 TCP 连接" │
│ - Vidumax 20 │
│ - 火山字幕:max 10 │
│ - 火山方舟:SDK 内部管理 │
└─────────────────────────────────────────┘
```
### 5.2 流式连接单独计数
```python
# LLM Gateway 内
active_streams: dict[str, int] = {} # {platform: count}
# 流式上限
MAX_STREAMS = {
"volcengine_ark": 30,
"openai": 30,
}
```
---
## 六、错误处理
### 6.1 异常类
```python
class PlatformError(Exception):
"""第三方平台调用失败"""
def __init__(self, message, *, platform: str, retryable: bool = False, status_code: int | None = None):
super().__init__(message)
self.platform = platform
self.retryable = retryable
self.status_code = status_code
class TaskError(Exception):
"""任务生命周期错误"""
pass
class LLMError(Exception):
"""LLM 调用失败(含 Fallback 耗尽)"""
pass
```
### 6.2 HTTP 状态码映射
| 场景 | PlatformError 属性 | HTTP 状态码 |
|------|-------------------|------------|
| 网络超时、DNS 失败、5xx | `retryable=True` | 502 Bad Gateway |
| 供应商限流 429 | `retryable=True` | 429 Too Many Requests |
| 认证失败 401/403 | `retryable=False` | 401 Unauthorized |
| 参数错误 400 | `retryable=False` | 400 Bad Request |
| 业务逻辑错误(state=failed | `retryable=False` | 400 Bad Request |
### 6.3 全局响应格式
```json
{
"code": 0,
"message": "成功",
"data": {}
}
```
错误时:
```json
{
"code": 500,
"message": "Vidu TTS 服务暂不可用",
"data": null,
"detail": {
"platform": "vidu",
"retryable": true
}
}
```
---
## 七、配置规范
### 7.1 嵌套配置模型
```python
class ViduConfig(BaseModel):
api_key: str = ""
base_url: str = "https://api.vidu.com"
max_connections: int = 20
timeout: float = 30.0
class RuntimeConfig(BaseModel):
"""运行时配置,支持热重载"""
vidu_qps: float = 20.0
vidu_burst: int = 30
ark_fallback_chain: list[str] = ["doubao-seed-2-0-lite"]
caption_poll_intervals: list[float] = [1.0, 1.0, 2.0, 2.0, 4.0, 4.0, 8.0, 8.0, 10.0]
circuit_failure_threshold: int = 5
circuit_recovery_timeout: float = 60.0
class Settings(BaseSettings):
vidu: ViduConfig = Field(default_factory=ViduConfig)
volcengine_ark: VolcengineArkConfig = Field(default_factory=VolcengineArkConfig)
volcengine_caption: VolcengineCaptionConfig = Field(default_factory=VolcengineCaptionConfig)
openai: OpenAIConfig = Field(default_factory=OpenAIConfig)
runtime: RuntimeConfig = Field(default_factory=RuntimeConfig)
model_config = SettingsConfigDict(
env_nested_delimiter="__",
)
```
### 7.2 `.env` 示例
```bash
# === 启动配置(改后需重启)===
VIDU__API_KEY=sk-xxx
VIDU__BASE_URL=https://api.vidu.com
VIDU__MAX_CONNECTIONS=20
VOLCENGINE_ARK__API_KEY=ak-xxx
VOLCENGINE_CAPTION__APPID=app-xxx
VOLCENGINE_CAPTION__TOKEN=tk-xxx
OPENAI__API_KEY=sk-xxx
# === 运行时配置(改后可热载)===
RUNTIME__VIDU__QPS=20
RUNTIME__VIDU__BURST=30
RUNTIME__ARK__FALLBACK_CHAIN=doubao-seed-2-0-lite,doubao-lite-32k
RUNTIME__CAPTION__POLL_INTERVALS=1,1,2,2,4,4,8,8,10
RUNTIME__CIRCUIT__FAILURE_THRESHOLD=5
```
### 7.3 热重载 API
```python
@router.post("/admin/runtime-config")
async def reload_runtime_config(updates: dict):
gateway_registry.update_runtime_config(**updates)
return {"updated": list(updates.keys())}
```
---
## 八、日志与可观测性
### 8.1 结构化日志字段
```python
{
"event": "platform_call",
"platform": "vidu",
"method": "tts_sync",
"task_type": "tts",
"duration_ms": 1250,
"success": true,
"http_status": 200,
"retry_count": 0,
}
```
### 8.2 脱敏规则
| 级别 | 字段 | 生产环境处理 |
|------|------|------------|
| P1 | `api_key`, `authorization`, `x-hmac-signature` | `[REDACTED]` |
| P2 | `audio_url`, `video_url`, `text` | URL 去签名参数 / 文案截断前 30 字 |
| P3 | `platform_task_id`, `internal_task_id` | 前缀保留 8 字符 |
| P4 | `duration_ms`, `http_status`, `retry_count` | 完整保留 |
### 8.3 健康检查端点
```python
@router.get("/system/platform-health")
async def platform_health():
results = {}
for pid, adapter in registry.adapters.items():
resp = await adapter.health()
results[pid] = {
"available": resp.success,
"error": resp.error_message,
}
return results
```
---
## 九、迁移策略
### 9.1 迁移原则
- **新旧代码并行**:通过 flag 切换,可随时回滚
- **逐个平台迁移**:Vidu → 火山字幕 → LLM
- **前端无感知**:Router URL、请求体、响应体不变
### 9.2 Flag 切换机制
```python
# Router 层
USE_NEW_VIDU = settings.FEATURE_FLAGS.get("new_vidu_adapter", False)
@router.post("/voice/synthesize")
async def synthesize(request: TTSSynthesizeRequest):
if USE_NEW_VIDU:
service = get_vidu_service_v2() # 新架构
else:
service = get_vidu_service() # 旧代码
...
```
### 9.3 迁移 Checklist
| 步骤 | 动作 | 验证 |
|------|------|------|
| 1 | 新建 `ViduAdapterV2`,实现 Protocol | 单元测试通过 |
| 2 | 注册到 Gatewayflag 关闭 | 不影响线上 |
| 3 | 测试环境开启 flag,全量回归 | 所有 Vidu 接口正常 |
| 4 | 生产灰度 10% → 50% → 100% | 监控 error rate |
| 5 | 旧代码保留 1 周后删除 | 无回滚需求 |
---
## 十、附录:最终决策清单
| # | 决策项 | 结论 |
|---|--------|------|
| 1 | 回调验签位置 | **C**Router 提取纯数据 → Gateway 调度 → Adapter 验签 |
| 2 | 任务结果保留 | 实时反映,Redis 映射 TTL = 1h |
| 3 | 七牛云 | 不纳入新架构 |
| 4 | SSE 断线 | 不支持续传 |
| 5 | MockAdapter | 仅 `DEBUG=True` 时注册 |
| 6 | 配置热重载 | **B**:限流参数 + Fallback 链可热载,Adapter 需重启 |
| 7 | 日志脱敏 | 四级分级(P1/P2/P3/P4+ 四档环境 + URL 智能剥离 |
| 8 | 火山字幕轮询 | **B**:前 3 次非阻塞(0→1→3s)+ 后切换 `blocking=1` |
| 9 | 迁移策略 | **C**:适配器层先行,flag 切换 |
| 10 | API Key | 手动维护 |
| 11 | 任务状态持久化 | **C**Redis 开启 AOF 持久化 |