后端: - 微信回调 db.commit 失败仍返回 SUCCESS,避免无限重试 - recharge() 加 order_id 幂等保护,防重复充值 - time_expire 使用北京时间(UTC+8),修复时区 bug - 充值档位后端配置化(points-config.yaml + /recharge-options API) - 代码审查 20 项修复(认证加固、扣费顺序、错误响应、状态同步等) 前端: - 充值弹窗:自动轮询 + 【我已支付】手动兜底 - 二维码倒计时显示,过期后遮罩 + 刷新按钮 - 充值档位从后端动态加载 - 去掉 select/qrcode 弹窗标题,金额红色突出显示 - 全项目命名统一(视频生成/压制成片/配音合成/声音复刻等) - Modal 关闭按钮独立于 title 显示
22 KiB
第三方平台接入架构标准化实施计划
版本:v1.0 依据:third-party-integration-architecture.md(架构设计最终版) 目标:统一异常体系、Adapter 契约、HTTP Client 生命周期、异步任务状态机、配置分层 生效日期:2026-05-03
一、总则
1.1 实施原则
- 标准优先:以行业主流做法为准,不因"改动小"而妥协
- 文档先行:所有变更必须在此文档中登记,实施完成后逐项核对
- 标准优先:存量代码直接按标准改造,不做兼容包装层
- 渐进验证:每阶段完成后运行测试,确认无回归再进入下一阶段
1.2 不适用本标准的例外
- 七牛云存储(纯上传下载,不纳入 Adapter 体系)
二、五条铁律规范(实施标准)
铁律 1:异常出口唯一
规范内容:所有 app/services/、app/ai/ 下的代码,对外抛出的异常必须是 PlatformError。Router 只 except PlatformError 和 AppException(业务错误)。
具体标准:
# app/core/exceptions.py —— 唯一的第三方异常类
class PlatformErrorType:
RATE_LIMIT = "rate_limit"
AUTH_FAILED = "auth_failed"
TIMEOUT = "timeout"
SERVER_ERROR = "server_error"
BAD_REQUEST = "bad_request"
QUOTA_EXHAUSTED = "quota_exhausted"
NOT_FOUND = "not_found"
UNKNOWN = "unknown"
class PlatformError(Exception):
def __init__(
self,
message: str,
*,
platform: str,
retryable: bool = False,
error_type: str = PlatformErrorType.UNKNOWN,
status_code: int | None = None,
):
super().__init__(message)
self.platform = platform
self.retryable = retryable
self.error_type = error_type
self.status_code = status_code
HTTP 状态码映射(全局中间件):
| error_type | retryable | HTTP 状态码 |
|---|---|---|
| rate_limit | True | 429 |
| timeout | True | 504 |
| server_error | True | 502 |
| auth_failed | False | 401 |
| bad_request | False | 400 |
| quota_exhausted | False | 429(带 Retry-After) |
| unknown | False | 400 |
禁止事项:
app/services/和app/ai/中禁止raise HTTPExceptionapp/services/和app/ai/中禁止裸raise Exception(...)- 各 Router 中禁止
except Exception: raise HTTPException(500, ...)处理第三方错误
铁律 2:Adapter 最小契约
规范内容:每个新平台必须实现 PlatformAdapter(platform_id + health() + close())。按需实现 SyncCapable(同步调用)或 TaskCapable(异步任务)。
Protocol 定义:
# app/ai/adapters/base.py
@runtime_checkable
class PlatformAdapter(Protocol):
"""所有 Adapter 的准入门槛"""
platform_id: str
async def health(self) -> AdapterResponse: ...
async def close(self) -> None: ...
@runtime_checkable
class SyncCapable(Protocol):
"""同步调用能力(TTS、Chat、图片生成等)"""
async def call(self, method: str, payload: dict) -> AdapterResponse: ...
@runtime_checkable
class TaskCapable(Protocol):
"""异步任务能力(视频生成、字幕、TTS 等)"""
async def submit(self, task_type: str, payload: dict, callback_url: str | None) -> AdapterResponse: ...
async def query(self, platform_job_id: str) -> TaskStatus: ...
@runtime_checkable
class CallbackCapable(Protocol):
"""回调验签能力(可选)"""
async def verify_signature(self, headers: dict, body: bytes, secret: str) -> bool: ...
async def parse_callback(self, body: bytes) -> TaskStatus: ...
统一返回值:
@dataclass(frozen=True)
class AdapterResponse:
success: bool
data: dict | None = None
error_code: str | None = None
error_message: str | None = None
retryable: bool = False
@dataclass(frozen=True)
class TaskStatus:
state: str # "pending" | "processing" | "completed" | "failed"
result: dict | None = None
error_message: str | None = None
方法标识常量:
# app/ai/adapters/constants.py
class Method:
TTS = "tts"
CHAT = "chat"
CHAT_STREAM = "chat_stream"
IMAGE_GENERATE = "image_generate"
LIP_SYNC = "lip_sync"
CAPTION = "caption"
AUTO_ALIGN = "auto_align"
各平台对号入座:
| 平台 | 必须实现 | 当前状态 |
|---|---|---|
| 火山方舟 | PlatformAdapter + SyncCapable |
❌ 缺失,需新建 VolcengineArkAdapter |
| Vidu | PlatformAdapter + SyncCapable + TaskCapable + CallbackCapable |
❌ 缺失,需新建 ViduAdapter |
| 火山字幕 | PlatformAdapter + TaskCapable |
❌ 缺失,需新建 VolcengineCaptionAdapter |
禁止事项:
- 新增平台不实现
PlatformAdapter直接接入 - Adapter 内部抛出的异常不是
PlatformError call()方法返回裸dict而不是AdapterResponse
铁律 3:HTTP Client 统一关闭
规范内容:所有对外 HTTP 连接(httpx.AsyncClient 或 SDK 内部)必须在 lifespan 中创建和销毁。禁止在方法内临时创建 httpx.AsyncClient()。
具体标准:
# lifespan 中的标准写法
@asynccontextmanager
async def lifespan(app: FastAPI):
# 各平台独立 Client(故障隔离)
app.state.http_clients = {
"vidu": httpx.AsyncClient(timeout=30, limits=httpx.Limits(max_connections=20)),
"volcengine_caption": httpx.AsyncClient(timeout=60, limits=httpx.Limits(max_connections=10)),
"default": httpx.AsyncClient(timeout=30, limits=httpx.Limits(max_connections=50)),
}
# SDK 客户端
app.state.ark_client = AsyncArk(api_key=..., timeout=1800)
yield
# 统一关闭
for client in app.state.http_clients.values():
await client.aclose()
if hasattr(app.state, "ark_client") and not app.state.ark_client.is_closed():
await app.state.ark_client.close()
迁移清单:
| 文件 | 当前 Client | 整改方式 |
|---|---|---|
app/ai/providers/vidu_provider.py |
aiohttp.ClientSession |
迁移为 httpx.AsyncClient,由 lifespan 注入 |
app/services/volcengine_caption_service.py |
httpx.AsyncClient(懒加载,永不关闭) |
改为 lifespan 注入,删除 _get_client() 懒加载 |
app/api/v1/voice.py |
临时 httpx.AsyncClient() |
改为 app.state.http_clients["default"] |
禁止事项:
- 禁止 import
aiohttp(项目统一使用httpx) - 禁止在方法/路由内
httpx.AsyncClient()临时创建(下载大文件例外,需注释说明) - 禁止 Provider
__init__中创建 Client 而不在 lifespan 中关闭
铁律 4:异步任务状态唯一
规范内容:所有"提交后等待"的任务,状态必须写入统一的状态机。第三方回调走统一入口 /webhooks/{platform}。
注意:本铁律涉及数据库设计,当前文档中 SQLAlchemy
Jobmodel 相关设计已挂起,待数据库方案确定后补充。本章只规定接口和状态机标准。
统一状态枚举:
class JobStatus(str, Enum):
PENDING = "pending" # 已提交,等待调度
QUEUED = "queued" # 已进入队列,等待槽位
RUNNING = "running" # 正在执行
SUCCEEDED = "succeeded" # 成功完成
FAILED = "failed" # 失败
CANCELLED = "cancelled" # 用户取消或超时取消
统一任务 API:
# 提交任务
POST /jobs
Request: {platform: str, task_type: str, payload: dict, idempotency_key: str | None}
Response: {job_id: UUID, status: "pending"}
# 查询任务
GET /jobs/{job_id}
Response: {job_id, status, progress, message, result, error, created_at, updated_at}
# 统一回调入口
POST /webhooks/{platform}
第三方状态映射(Adapter 层负责):
| 第三方状态 | 内部状态 |
|---|---|
Vidu: pending / processing |
RUNNING |
Vidu: success |
SUCCEEDED |
Vidu: failed |
FAILED |
火山字幕: code=2000 |
RUNNING |
火山字幕: code=0 |
SUCCEEDED |
火山字幕: code=1001/1002/1012 |
FAILED(不可重试) |
火山字幕: code=1003(超频) |
FAILED(可重试) |
禁止事项:
- 禁止 Router/Service 私设 Redis key(如
vidu:lipsync:xxx) - 禁止在 Router 中直接处理回调验签(必须由 Adapter 处理)
- 禁止各平台使用自己的状态字符串返回给前端
铁律 5:配置与密钥分离
规范内容:非敏感配置走 config/platform-config.yaml,支持热重载。密钥走 .env,修改需重启。
文件结构:
# config/platform-config.yaml
platforms:
<platform_id>:
provider: <provider_type>
base_url: <url>
models: # 原 ai_models.yaml 内容合并至此
- id: <model_alias>
model_name: <实际模型ID>
capabilities: [<capability>]
default_params: <dict>
rate_limit:
qps: <float>
burst: <int>
methods:
<method>:
timeout: <int>
max_connections: <int>
rate_limit:
qps: <float>
burst: <int>
runtime:
fallback_chains:
<capability>:
- <model_alias_1>
- <model_alias_2>
task_timeouts:
<task_type>: <seconds>
task_ttl:
<task_type>: <seconds>
热重载实现:
class RuntimeConfig:
"""运行时配置,轮询检查 mtime(10秒间隔)+ Admin API 手动触发"""
async def get(self, key: str, default=None):
await self._reload_if_changed()
return self._config.get(key, default)
async def force_reload(self) -> bool:
"""Admin API 调用"""
...
Admin API:
@router.post("/admin/runtime-config/reload")
async def reload_runtime_config():
success = await runtime_config.force_reload()
return {"reloaded": success, "version": runtime_config.version}
@router.get("/admin/runtime-config")
async def get_runtime_config():
return runtime_config.get_raw()
迁移清单:
.env 中的配置项 |
迁移目标 | 状态 |
|---|---|---|
VIDU_BASE_URL |
platforms.vidu.base_url |
待迁移 |
VOLCENGINE_BASE_URL |
platforms.volcengine_ark.base_url |
待迁移 |
VOLC_SUBTITLE_MAX_CONCURRENT |
platforms.volcengine_caption.methods.caption.max_connections |
待迁移 |
VOLC_SUBTITLE_TIMEOUT |
runtime.task_timeouts.caption |
待迁移 |
禁止事项:
- 禁止在
.env中存放非敏感配置(URL、超时、限流) - 禁止代码中硬编码配置(如
timeout=30、max_rate=20) - 禁止
Settings类超过 150 行(逐步瘦身)
三、分阶段实施计划
Phase 0:准备(0.5 天)
| # | 任务 | 输出文件 | 检查方式 |
|---|---|---|---|
| 0.1 | 新建 app/ai/adapters/ 目录结构 |
app/ai/adapters/__init__.py |
目录存在 |
| 0.2 | 新建 app/platform_gateway.py 骨架 |
app/platform_gateway.py |
文件存在,类定义完整 |
| 0.3 | 安装/确认 importlinter 可用 |
pyproject.toml 依赖 |
pip show importlinter |
| 0.4 | 备份现有 exceptions.py |
git stash / 分支 | 可回滚 |
Phase 1:异常体系(0.5 天)
| # | 任务 | 输出文件 | 检查方式 |
|---|---|---|---|
| 1.1 | 重构 PlatformError + PlatformErrorType |
app/core/exceptions.py |
类型定义完整,含所有字段 |
| 1.2 | 保留 AppException 体系(业务错误) |
app/core/exceptions.py |
原有类不删除 |
| 1.3 | main.py 注册 PlatformError 全局中间件 |
app/main.py |
启动无报错,异常测试返回正确 HTTP 码 |
| 1.4 | VolcengineArkAdapter._wrap_error() 实现异常映射 |
app/ai/adapters/volcengine_ark.py |
单元测试覆盖 |
| 1.5 | ViduAdapter._wrap_error() 实现异常映射 |
app/ai/adapters/vidu.py |
单元测试覆盖 |
| 1.6 | make lint-semantic 增加异常规则 |
Makefile |
提交时自动检查 |
验收标准:
- 任意第三方调用失败,Router 返回的 JSON 中
detail.retryable正确 - 网络超时返回 504,限流返回 429,认证失败返回 401
- 业务错误(如参数校验失败)仍走
AppException→ 400/422
Phase 2:Adapter Protocol + 配置合并(1 天)
| # | 任务 | 输出文件 | 检查方式 |
|---|---|---|---|
| 2.1 | PlatformAdapter / SyncCapable / TaskCapable / CallbackCapable Protocol |
app/ai/adapters/base.py |
isinstance 校验通过 |
| 2.2 | AdapterResponse / TaskStatus dataclass |
app/ai/adapters/base.py |
frozen=True,字段完整 |
| 2.3 | Method 常量定义 |
app/ai/adapters/constants.py |
覆盖所有现有方法 |
| 2.4 | 合并 ai_models.yaml → platform-config.yaml |
config/platform-config.yaml |
原有模型列表完整迁移 |
| 2.5 | RuntimeConfig 热重载实现 |
app/core/runtime_config.py |
mtime 轮询 + force_reload 均工作 |
| 2.6 | Admin API /admin/runtime-config/* |
app/api/v1/system.py |
GET/POST 返回正确 |
| 2.7 | Settings 类清理非敏感配置 |
app/config.py |
只保留密钥,行数 < 150 |
验收标准:
- 新增一个 MockAdapter 实现 Protocol,IDE 自动提示缺失方法
- 修改
runtime.yaml中的 qps,10 秒内新请求生效 - Admin API 手动触发 reload,返回最新配置
Phase 3:HTTP Client 统一(1 天)
| # | 任务 | 输出文件 | 检查方式 |
|---|---|---|---|
| 3.1 | ViduProvider 从 aiohttp 迁移到 httpx |
app/ai/providers/vidu_provider.py |
功能测试通过 |
| 3.2 | VolcengineCaptionService 删除懒加载,改为注入 Client |
app/services/volcengine_caption_service.py |
功能测试通过 |
| 3.3 | voice.py 中临时 httpx.AsyncClient() 改为共享 Client |
app/api/v1/voice.py |
代码审查 |
| 3.4 | lifespan 统一管理所有 Client 生命周期 | app/main.py |
启动/关闭无泄漏日志 |
| 3.5 | ViduAdapter.close() / VolcengineCaptionAdapter.close() 实现 |
对应 Adapter 文件 | lifespan shutdown 时调用 |
| 3.6 | make lint 增加 aiohttp import 禁止规则 |
pyproject.toml 或 pre-commit |
import aiohttp 报 error |
验收标准:
pip list | grep aiohttp无输出(或确认仅作为间接依赖)python -m app.main启动后,关闭时无unclosed client session警告- 所有
AsyncClient创建都在 lifespan 中
Phase 4:Gateway 骨架 + Adapter 包装层(1 天)
| # | 任务 | 输出文件 | 检查方式 |
|---|---|---|---|
| 4.1 | PlatformGateway 骨架(call_sync / submit_task / query_task / handle_webhook) |
app/platform_gateway.py |
类方法签名完整 |
| 4.2 | VolcengineArkAdapter 改造现有 Provider 实现 Protocol |
app/ai/adapters/volcengine_ark.py |
单元测试通过 |
| 4.3 | ViduAdapter 改造现有 Provider 实现 Protocol |
app/ai/adapters/vidu.py |
单元测试通过 |
| 4.4 | VolcengineCaptionAdapter 改造现有 Service 实现 Protocol |
app/ai/adapters/volcengine_caption.py |
单元测试通过 |
| 4.5 | LLMGateway 实现(模型选择、Fallback、流式路由) |
app/ai/gateways/llm_gateway.py |
脚本生成功能测试通过 |
| 4.6 | lifespan 中初始化所有 Adapter 并注册到 Gateway | app/main.py |
启动日志显示各平台初始化成功 |
验收标准:
- 新增一个
MockAdapter实现 Protocol,5 分钟内完成注册并可用 LLMGateway.chat()主模型失败时自动 Fallback 到备用模型- 健康检查
/system/platform-health返回所有平台状态
Phase 5:异步任务统一(2 天,数据库方案确定后实施)
| # | 任务 | 输出文件 | 检查方式 |
|---|---|---|---|
| 5.1 | SQLAlchemy Job model(独立设计) |
app/models/job.py |
Alembic 迁移成功 |
| 5.2 | Pydantic JobResponse Schema |
app/schemas/job.py |
覆盖所有字段 |
| 5.3 | JobRegistry 改为先写数据库、再写 Redis |
app/scheduler/registry.py |
数据库有数据 |
| 5.4 | JobStatus 扩展为 6 种状态 |
app/schemas/enums.py |
覆盖所有场景 |
| 5.5 | ViduHandler 接入 Async Engine |
app/scheduler/handlers/vidu_handler.py |
视频生成任务走 Engine |
| 5.6 | SubtitleHandler 改为通过 Gateway 调用 |
app/scheduler/handlers/subtitle_handler.py |
字幕任务走 Gateway |
| 5.7 | 统一回调入口 /webhooks/{platform} |
app/api/v1/webhooks.py |
Vidu 回调正常 |
| 5.8 | 删除 Router 中私设 Redis key 的代码 | app/api/v1/vidu.py |
无 vidu:lipsync: 字样 |
| 5.9 | 统一任务 API /jobs/{job_id} |
app/api/v1/jobs.py |
GET 返回标准格式 |
| 5.10 | 脚本生成从 SSE 改为异步任务 | app/api/v1/script.py / app/services/script_service.py |
POST /jobs 提交,轮询 /jobs/{id} |
| 5.11 | 删除 /script/generate/stream SSE 端点 |
app/api/v1/script.py |
端点不存在 |
验收标准:
- Vidu 视频生成任务提交后,Redis 中只有
job:{uuid}格式的 key - 应用重启后,从数据库恢复 running 任务继续执行
- 前端轮询
/jobs/{id}获取所有异步任务状态
Phase 6:清理与验证(0.5 天)
| # | 任务 | 输出文件 | 检查方式 |
|---|---|---|---|
| 6.1 | importlinter 配置(禁止 Router 直接 import Provider) |
.importlinter |
CI 中运行通过 |
| 6.2 | 删除废弃的 ai_models.yaml(确认合并完成后) |
— | 文件不存在 |
| 6.3 | 删除 ViduService / VolcengineCaptionService 中的重复异常处理 |
对应文件 | 代码审查 |
| 6.4 | 全量回归测试(所有现有 API 调用一遍) | — | 测试脚本通过 |
| 6.5 | 更新本文档,标记各阶段完成状态 | 本文档 | 所有 checkbox 打勾 |
四、检查清单汇总
4.1 新增文件清单
| 文件路径 | 说明 | 所属阶段 |
|---|---|---|
app/ai/adapters/__init__.py |
Adapter 包 | Phase 0 |
app/ai/adapters/base.py |
Protocol + dataclass | Phase 2 |
app/ai/adapters/constants.py |
Method 常量 | Phase 2 |
app/ai/adapters/volcengine_ark.py |
火山方舟 Adapter | Phase 4 |
app/ai/adapters/vidu.py |
Vidu Adapter | Phase 4 |
app/ai/adapters/volcengine_caption.py |
火山字幕 Adapter | Phase 4 |
app/ai/gateways/llm_gateway.py |
LLM 网关 | Phase 4 |
app/platform_gateway.py |
统一平台网关 | Phase 0/4 |
app/core/runtime_config.py |
运行时配置 + 热重载 | Phase 2 |
config/platform-config.yaml |
合并后的平台配置 | Phase 2 |
app/models/job.py |
异步任务数据库模型 | Phase 5 |
app/api/v1/jobs.py |
统一任务 API | Phase 5 |
app/api/v1/webhooks.py |
统一回调入口 | Phase 5 |
app/scheduler/handlers/vidu_handler.py |
Vidu 任务处理器 | Phase 5 |
.importlinter |
架构约束配置 | Phase 6 |
4.2 修改文件清单
| 文件路径 | 修改内容 | 所属阶段 |
|---|---|---|
app/core/exceptions.py |
新增 PlatformError / PlatformErrorType |
Phase 1 |
app/main.py |
注册异常中间件、lifespan Client 管理 | Phase 1/3/4 |
app/config.py |
清理非敏感配置,只保留密钥 | Phase 2 |
app/ai/providers/vidu_provider.py |
aiohttp → httpx | Phase 3 |
app/services/volcengine_caption_service.py |
删除懒加载,改为注入 Client | Phase 3 |
app/api/v1/voice.py |
临时 Client → 共享 Client | Phase 3 |
app/api/v1/script.py |
SSE → 异步任务 + 删除 stream 端点 | Phase 5 |
app/services/script_service.py |
删除 generate_script_stream | Phase 5 |
app/api/v1/system.py |
新增 Admin API | Phase 2 |
app/scheduler/registry.py |
先写数据库再写 Redis | Phase 5 |
app/scheduler/handlers/subtitle_handler.py |
通过 Gateway 调用 | Phase 5 |
app/api/v1/vidu.py |
删除私设 Redis key | Phase 5 |
app/schemas/enums.py |
扩展 JobStatus |
Phase 5 |
Makefile / pyproject.toml |
lint 规则 | Phase 1/3/6 |
4.3 废弃文件清单
| 文件路径 | 废弃原因 | 处理时间 |
|---|---|---|
config/ai_models.yaml |
合并到 platform-config.yaml |
Phase 6 |
五、风险项与应对
| 风险 | 影响 | 概率 | 应对 |
|---|---|---|---|
aiohttp 迁移到 httpx 导致 Vidu 某些边缘场景行为不一致 |
功能回归 | 中 | 迁移后全量测试 Vidu TTS/视频生成/声音复刻 |
PlatformError 未覆盖所有异常路径,仍有裸 Exception 漏出 |
前端收到 500 无法处理 | 低 | make lint-semantic 强制检查 + Code Review |
| 配置热重载导致运行时行为突变 | 线上限流突然变更 | 低 | Admin API 加操作日志,变更前确认 |
| Phase 5 数据库改造影响现有 Async Engine | 字幕/脚本任务异常 | 中 | 数据库方案评审后再实施,分步迁移 |
| 前端轮询改造工作量超预期 | 延期 | 中 | 提前与前端同步接口变更,预留 2 天 |
六、验收标准(最终 Checklist)
实施全部完成后,按以下清单逐项核对:
PlatformError是app/services/和app/ai/中唯一的第三方异常类型- Router 中不存在
except Exception: raise HTTPException(500)处理第三方错误 - 新增 MockAdapter 实现 Protocol,30 分钟内完成注册并可用
aiohttp不在项目直接依赖中(pip show aiohttp不显示或仅为间接依赖)- 所有
AsyncClient在 lifespan 中创建和销毁 - 关闭应用时无
unclosed client session警告 config/platform-config.yaml存在且包含所有平台配置- 修改
platform-config.yaml中的限流参数,10 秒内新请求生效 - Admin API
/admin/runtime-config/reload手动触发重载成功 - 健康检查
/system/platform-health返回所有平台状态 importlinterCI 检查通过(Router 不直接 import Provider)- 全量 API 回归测试通过
本文档为实施的唯一依据。任何偏离文档的变更必须在此文档中登记并说明理由。