# 第三方平台接入架构标准化实施计划 > 版本:v1.0 > 依据:third-party-integration-architecture.md(架构设计最终版) > 目标:统一异常体系、Adapter 契约、HTTP Client 生命周期、异步任务状态机、配置分层 > 生效日期:2026-05-03 --- ## 一、总则 ### 1.1 实施原则 - **标准优先**:以行业主流做法为准,不因"改动小"而妥协 - **文档先行**:所有变更必须在此文档中登记,实施完成后逐项核对 - **标准优先**:存量代码直接按标准改造,不做兼容包装层 - **渐进验证**:每阶段完成后运行测试,确认无回归再进入下一阶段 ### 1.2 不适用本标准的例外 - 七牛云存储(纯上传下载,不纳入 Adapter 体系) --- ## 二、五条铁律规范(实施标准) ### 铁律 1:异常出口唯一 **规范内容**:所有 `app/services/`、`app/ai/` 下的代码,对外抛出的异常必须是 `PlatformError`。Router 只 `except PlatformError` 和 `AppException`(业务错误)。 **具体标准**: ```python # app/core/exceptions.py —— 唯一的第三方异常类 class PlatformErrorType: RATE_LIMIT = "rate_limit" AUTH_FAILED = "auth_failed" TIMEOUT = "timeout" SERVER_ERROR = "server_error" BAD_REQUEST = "bad_request" QUOTA_EXHAUSTED = "quota_exhausted" NOT_FOUND = "not_found" UNKNOWN = "unknown" class PlatformError(Exception): def __init__( self, message: str, *, platform: str, retryable: bool = False, error_type: str = PlatformErrorType.UNKNOWN, status_code: int | None = None, ): super().__init__(message) self.platform = platform self.retryable = retryable self.error_type = error_type self.status_code = status_code ``` **HTTP 状态码映射**(全局中间件): | error_type | retryable | HTTP 状态码 | |-----------|-----------|------------| | rate_limit | True | 429 | | timeout | True | 504 | | server_error | True | 502 | | auth_failed | False | 401 | | bad_request | False | 400 | | quota_exhausted | False | 429(带 Retry-After) | | unknown | False | 400 | **禁止事项**: - [ ] `app/services/` 和 `app/ai/` 中禁止 `raise HTTPException` - [ ] `app/services/` 和 `app/ai/` 中禁止裸 `raise Exception(...)` - [ ] 各 Router 中禁止 `except Exception: raise HTTPException(500, ...)` 处理第三方错误 --- ### 铁律 2:Adapter 最小契约 **规范内容**:每个新平台必须实现 `PlatformAdapter`(`platform_id` + `health()` + `close()`)。按需实现 `SyncCapable`(同步调用)或 `TaskCapable`(异步任务)。 **Protocol 定义**: ```python # app/ai/adapters/base.py @runtime_checkable class PlatformAdapter(Protocol): """所有 Adapter 的准入门槛""" platform_id: str async def health(self) -> AdapterResponse: ... async def close(self) -> None: ... @runtime_checkable class SyncCapable(Protocol): """同步调用能力(TTS、Chat、图片生成等)""" async def call(self, method: str, payload: dict) -> AdapterResponse: ... @runtime_checkable class TaskCapable(Protocol): """异步任务能力(视频生成、字幕、TTS 等)""" async def submit(self, task_type: str, payload: dict, callback_url: str | None) -> AdapterResponse: ... async def query(self, platform_job_id: str) -> TaskStatus: ... @runtime_checkable class CallbackCapable(Protocol): """回调验签能力(可选)""" async def verify_signature(self, headers: dict, body: bytes, secret: str) -> bool: ... async def parse_callback(self, body: bytes) -> TaskStatus: ... ``` **统一返回值**: ```python @dataclass(frozen=True) class AdapterResponse: success: bool data: dict | None = None error_code: str | None = None error_message: str | None = None retryable: bool = False @dataclass(frozen=True) class TaskStatus: state: str # "pending" | "processing" | "completed" | "failed" result: dict | None = None error_message: str | None = None ``` **方法标识常量**: ```python # app/ai/adapters/constants.py class Method: TTS = "tts" CHAT = "chat" CHAT_STREAM = "chat_stream" IMAGE_GENERATE = "image_generate" LIP_SYNC = "lip_sync" CAPTION = "caption" AUTO_ALIGN = "auto_align" ``` **各平台对号入座**: | 平台 | 必须实现 | 当前状态 | |-----|---------|---------| | 火山方舟 | `PlatformAdapter + SyncCapable` | ❌ 缺失,需新建 `VolcengineArkAdapter` | | Vidu | `PlatformAdapter + SyncCapable + TaskCapable + CallbackCapable` | ❌ 缺失,需新建 `ViduAdapter` | | 火山字幕 | `PlatformAdapter + TaskCapable` | ❌ 缺失,需新建 `VolcengineCaptionAdapter` | **禁止事项**: - [ ] 新增平台不实现 `PlatformAdapter` 直接接入 - [ ] Adapter 内部抛出的异常不是 `PlatformError` - [ ] `call()` 方法返回裸 `dict` 而不是 `AdapterResponse` --- ### 铁律 3:HTTP Client 统一关闭 **规范内容**:所有对外 HTTP 连接(`httpx.AsyncClient` 或 SDK 内部)必须在 `lifespan` 中创建和销毁。禁止在方法内临时创建 `httpx.AsyncClient()`。 **具体标准**: ```python # lifespan 中的标准写法 @asynccontextmanager async def lifespan(app: FastAPI): # 各平台独立 Client(故障隔离) app.state.http_clients = { "vidu": httpx.AsyncClient(timeout=30, limits=httpx.Limits(max_connections=20)), "volcengine_caption": httpx.AsyncClient(timeout=60, limits=httpx.Limits(max_connections=10)), "default": httpx.AsyncClient(timeout=30, limits=httpx.Limits(max_connections=50)), } # SDK 客户端 app.state.ark_client = AsyncArk(api_key=..., timeout=1800) yield # 统一关闭 for client in app.state.http_clients.values(): await client.aclose() if hasattr(app.state, "ark_client") and not app.state.ark_client.is_closed(): await app.state.ark_client.close() ``` **迁移清单**: | 文件 | 当前 Client | 整改方式 | |-----|------------|---------| | `app/ai/providers/vidu_provider.py` | `aiohttp.ClientSession` | 迁移为 `httpx.AsyncClient`,由 lifespan 注入 | | `app/services/volcengine_caption_service.py` | `httpx.AsyncClient`(懒加载,永不关闭) | 改为 lifespan 注入,删除 `_get_client()` 懒加载 | | `app/api/v1/voice.py` | 临时 `httpx.AsyncClient()` | 改为 `app.state.http_clients["default"]` | **禁止事项**: - [ ] 禁止 import `aiohttp`(项目统一使用 `httpx`) - [ ] 禁止在方法/路由内 `httpx.AsyncClient()` 临时创建(下载大文件例外,需注释说明) - [ ] 禁止 Provider `__init__` 中创建 Client 而不在 lifespan 中关闭 --- ### 铁律 4:异步任务状态唯一 **规范内容**:所有"提交后等待"的任务,状态必须写入统一的状态机。第三方回调走统一入口 `/webhooks/{platform}`。 > **注意**:本铁律涉及数据库设计,当前文档中 SQLAlchemy `Job` model 相关设计已挂起,待数据库方案确定后补充。本章只规定接口和状态机标准。 **统一状态枚举**: ```python class JobStatus(str, Enum): PENDING = "pending" # 已提交,等待调度 QUEUED = "queued" # 已进入队列,等待槽位 RUNNING = "running" # 正在执行 SUCCEEDED = "succeeded" # 成功完成 FAILED = "failed" # 失败 CANCELLED = "cancelled" # 用户取消或超时取消 ``` **统一任务 API**: ```python # 提交任务 POST /jobs Request: {platform: str, task_type: str, payload: dict, idempotency_key: str | None} Response: {job_id: UUID, status: "pending"} # 查询任务 GET /jobs/{job_id} Response: {job_id, status, progress, message, result, error, created_at, updated_at} # 统一回调入口 POST /webhooks/{platform} ``` **第三方状态映射**(Adapter 层负责): | 第三方状态 | 内部状态 | |-----------|---------| | Vidu: `pending` / `processing` | `RUNNING` | | Vidu: `success` | `SUCCEEDED` | | Vidu: `failed` | `FAILED` | | 火山字幕: `code=2000` | `RUNNING` | | 火山字幕: `code=0` | `SUCCEEDED` | | 火山字幕: `code=1001/1002/1012` | `FAILED`(不可重试) | | 火山字幕: `code=1003`(超频) | `FAILED`(可重试) | **禁止事项**: - [ ] 禁止 Router/Service 私设 Redis key(如 `vidu:lipsync:xxx`) - [ ] 禁止在 Router 中直接处理回调验签(必须由 Adapter 处理) - [ ] 禁止各平台使用自己的状态字符串返回给前端 --- ### 铁律 5:配置与密钥分离 **规范内容**:非敏感配置走 `config/platform-config.yaml`,支持热重载。密钥走 `.env`,修改需重启。 **文件结构**: ```yaml # config/platform-config.yaml platforms: : provider: base_url: models: # 原 ai_models.yaml 内容合并至此 - id: model_name: <实际模型ID> capabilities: [] default_params: rate_limit: qps: burst: methods: : timeout: max_connections: rate_limit: qps: burst: runtime: fallback_chains: : - - task_timeouts: : task_ttl: : ``` **热重载实现**: ```python class RuntimeConfig: """运行时配置,轮询检查 mtime(10秒间隔)+ Admin API 手动触发""" async def get(self, key: str, default=None): await self._reload_if_changed() return self._config.get(key, default) async def force_reload(self) -> bool: """Admin API 调用""" ... ``` **Admin API**: ```python @router.post("/admin/runtime-config/reload") async def reload_runtime_config(): success = await runtime_config.force_reload() return {"reloaded": success, "version": runtime_config.version} @router.get("/admin/runtime-config") async def get_runtime_config(): return runtime_config.get_raw() ``` **迁移清单**: | `.env` 中的配置项 | 迁移目标 | 状态 | |------------------|---------|------| | `VIDU_BASE_URL` | `platforms.vidu.base_url` | 待迁移 | | `VOLCENGINE_BASE_URL` | `platforms.volcengine_ark.base_url` | 待迁移 | | `VOLC_SUBTITLE_MAX_CONCURRENT` | `platforms.volcengine_caption.methods.caption.max_connections` | 待迁移 | | `VOLC_SUBTITLE_TIMEOUT` | `runtime.task_timeouts.caption` | 待迁移 | **禁止事项**: - [ ] 禁止在 `.env` 中存放非敏感配置(URL、超时、限流) - [ ] 禁止代码中硬编码配置(如 `timeout=30`、`max_rate=20`) - [ ] 禁止 `Settings` 类超过 150 行(逐步瘦身) --- ## 三、分阶段实施计划 ### Phase 0:准备(0.5 天) | # | 任务 | 输出文件 | 检查方式 | |---|------|---------|---------| | 0.1 | 新建 `app/ai/adapters/` 目录结构 | `app/ai/adapters/__init__.py` | 目录存在 | | 0.2 | 新建 `app/platform_gateway.py` 骨架 | `app/platform_gateway.py` | 文件存在,类定义完整 | | 0.3 | 安装/确认 `importlinter` 可用 | `pyproject.toml` 依赖 | `pip show importlinter` | | 0.4 | 备份现有 `exceptions.py` | git stash / 分支 | 可回滚 | ### Phase 1:异常体系(0.5 天) | # | 任务 | 输出文件 | 检查方式 | |---|------|---------|---------| | 1.1 | 重构 `PlatformError` + `PlatformErrorType` | `app/core/exceptions.py` | 类型定义完整,含所有字段 | | 1.2 | 保留 `AppException` 体系(业务错误) | `app/core/exceptions.py` | 原有类不删除 | | 1.3 | `main.py` 注册 `PlatformError` 全局中间件 | `app/main.py` | 启动无报错,异常测试返回正确 HTTP 码 | | 1.4 | `VolcengineArkAdapter._wrap_error()` 实现异常映射 | `app/ai/adapters/volcengine_ark.py` | 单元测试覆盖 | | 1.5 | `ViduAdapter._wrap_error()` 实现异常映射 | `app/ai/adapters/vidu.py` | 单元测试覆盖 | | 1.6 | `make lint-semantic` 增加异常规则 | `Makefile` | 提交时自动检查 | **验收标准**: - [ ] 任意第三方调用失败,Router 返回的 JSON 中 `detail.retryable` 正确 - [ ] 网络超时返回 504,限流返回 429,认证失败返回 401 - [ ] 业务错误(如参数校验失败)仍走 `AppException` → 400/422 ### Phase 2:Adapter Protocol + 配置合并(1 天) | # | 任务 | 输出文件 | 检查方式 | |---|------|---------|---------| | 2.1 | `PlatformAdapter` / `SyncCapable` / `TaskCapable` / `CallbackCapable` Protocol | `app/ai/adapters/base.py` | `isinstance` 校验通过 | | 2.2 | `AdapterResponse` / `TaskStatus` dataclass | `app/ai/adapters/base.py` | frozen=True,字段完整 | | 2.3 | `Method` 常量定义 | `app/ai/adapters/constants.py` | 覆盖所有现有方法 | | 2.4 | 合并 `ai_models.yaml` → `platform-config.yaml` | `config/platform-config.yaml` | 原有模型列表完整迁移 | | 2.5 | `RuntimeConfig` 热重载实现 | `app/core/runtime_config.py` | mtime 轮询 + force_reload 均工作 | | 2.6 | Admin API `/admin/runtime-config/*` | `app/api/v1/system.py` | GET/POST 返回正确 | | 2.7 | `Settings` 类清理非敏感配置 | `app/config.py` | 只保留密钥,行数 < 150 | **验收标准**: - [ ] 新增一个 MockAdapter 实现 Protocol,IDE 自动提示缺失方法 - [ ] 修改 `runtime.yaml` 中的 qps,10 秒内新请求生效 - [ ] Admin API 手动触发 reload,返回最新配置 ### Phase 3:HTTP Client 统一(1 天) | # | 任务 | 输出文件 | 检查方式 | |---|------|---------|---------| | 3.1 | `ViduProvider` 从 `aiohttp` 迁移到 `httpx` | `app/ai/providers/vidu_provider.py` | 功能测试通过 | | 3.2 | `VolcengineCaptionService` 删除懒加载,改为注入 Client | `app/services/volcengine_caption_service.py` | 功能测试通过 | | 3.3 | `voice.py` 中临时 `httpx.AsyncClient()` 改为共享 Client | `app/api/v1/voice.py` | 代码审查 | | 3.4 | lifespan 统一管理所有 Client 生命周期 | `app/main.py` | 启动/关闭无泄漏日志 | | 3.5 | `ViduAdapter.close()` / `VolcengineCaptionAdapter.close()` 实现 | 对应 Adapter 文件 | lifespan shutdown 时调用 | | 3.6 | `make lint` 增加 `aiohttp` import 禁止规则 | `pyproject.toml` 或 pre-commit | import aiohttp 报 error | **验收标准**: - [ ] `pip list | grep aiohttp` 无输出(或确认仅作为间接依赖) - [ ] `python -m app.main` 启动后,关闭时无 `unclosed client session` 警告 - [ ] 所有 `AsyncClient` 创建都在 lifespan 中 ### Phase 4:Gateway 骨架 + Adapter 包装层(1 天) | # | 任务 | 输出文件 | 检查方式 | |---|------|---------|---------| | 4.1 | `PlatformGateway` 骨架(`call_sync` / `submit_task` / `query_task` / `handle_webhook`) | `app/platform_gateway.py` | 类方法签名完整 | | 4.2 | `VolcengineArkAdapter` 改造现有 Provider 实现 Protocol | `app/ai/adapters/volcengine_ark.py` | 单元测试通过 | | 4.3 | `ViduAdapter` 改造现有 Provider 实现 Protocol | `app/ai/adapters/vidu.py` | 单元测试通过 | | 4.4 | `VolcengineCaptionAdapter` 改造现有 Service 实现 Protocol | `app/ai/adapters/volcengine_caption.py` | 单元测试通过 | | 4.5 | `LLMGateway` 实现(模型选择、Fallback、流式路由) | `app/ai/gateways/llm_gateway.py` | 脚本生成功能测试通过 | | 4.6 | lifespan 中初始化所有 Adapter 并注册到 Gateway | `app/main.py` | 启动日志显示各平台初始化成功 | **验收标准**: - [ ] 新增一个 `MockAdapter` 实现 Protocol,5 分钟内完成注册并可用 - [ ] `LLMGateway.chat()` 主模型失败时自动 Fallback 到备用模型 - [ ] 健康检查 `/system/platform-health` 返回所有平台状态 ### Phase 5:异步任务统一(2 天,数据库方案确定后实施) | # | 任务 | 输出文件 | 检查方式 | |---|------|---------|---------| | 5.1 | SQLAlchemy `Job` model(独立设计) | `app/models/job.py` | Alembic 迁移成功 | | 5.2 | Pydantic `JobResponse` Schema | `app/schemas/job.py` | 覆盖所有字段 | | 5.3 | `JobRegistry` 改为先写数据库、再写 Redis | `app/scheduler/registry.py` | 数据库有数据 | | 5.4 | `JobStatus` 扩展为 6 种状态 | `app/schemas/enums.py` | 覆盖所有场景 | | 5.5 | `ViduHandler` 接入 Async Engine | `app/scheduler/handlers/vidu_handler.py` | 视频生成任务走 Engine | | 5.6 | `SubtitleHandler` 改为通过 Gateway 调用 | `app/scheduler/handlers/subtitle_handler.py` | 字幕任务走 Gateway | | 5.7 | 统一回调入口 `/webhooks/{platform}` | `app/api/v1/webhooks.py` | Vidu 回调正常 | | 5.8 | 删除 Router 中私设 Redis key 的代码 | `app/api/v1/vidu.py` | 无 `vidu:lipsync:` 字样 | | 5.9 | 统一任务 API `/jobs/{job_id}` | `app/api/v1/jobs.py` | GET 返回标准格式 | | 5.10 | 脚本生成从 SSE 改为异步任务 | `app/api/v1/script.py` / `app/services/script_service.py` | POST /jobs 提交,轮询 /jobs/{id} | | 5.11 | 删除 `/script/generate/stream` SSE 端点 | `app/api/v1/script.py` | 端点不存在 | **验收标准**: - [ ] Vidu 视频生成任务提交后,Redis 中只有 `job:{uuid}` 格式的 key - [ ] 应用重启后,从数据库恢复 running 任务继续执行 - [ ] 前端轮询 `/jobs/{id}` 获取所有异步任务状态 ### Phase 6:清理与验证(0.5 天) | # | 任务 | 输出文件 | 检查方式 | |---|------|---------|---------| | 6.1 | `importlinter` 配置(禁止 Router 直接 import Provider) | `.importlinter` | CI 中运行通过 | | 6.2 | 删除废弃的 `ai_models.yaml`(确认合并完成后) | — | 文件不存在 | | 6.3 | 删除 `ViduService` / `VolcengineCaptionService` 中的重复异常处理 | 对应文件 | 代码审查 | | 6.4 | 全量回归测试(所有现有 API 调用一遍) | — | 测试脚本通过 | | 6.5 | 更新本文档,标记各阶段完成状态 | 本文档 | 所有 checkbox 打勾 | --- ## 四、检查清单汇总 ### 4.1 新增文件清单 | 文件路径 | 说明 | 所属阶段 | |---------|------|---------| | `app/ai/adapters/__init__.py` | Adapter 包 | Phase 0 | | `app/ai/adapters/base.py` | Protocol + dataclass | Phase 2 | | `app/ai/adapters/constants.py` | Method 常量 | Phase 2 | | `app/ai/adapters/volcengine_ark.py` | 火山方舟 Adapter | Phase 4 | | `app/ai/adapters/vidu.py` | Vidu Adapter | Phase 4 | | `app/ai/adapters/volcengine_caption.py` | 火山字幕 Adapter | Phase 4 | | `app/ai/gateways/llm_gateway.py` | LLM 网关 | Phase 4 | | `app/platform_gateway.py` | 统一平台网关 | Phase 0/4 | | `app/core/runtime_config.py` | 运行时配置 + 热重载 | Phase 2 | | `config/platform-config.yaml` | 合并后的平台配置 | Phase 2 | | `app/models/job.py` | 异步任务数据库模型 | Phase 5 | | `app/api/v1/jobs.py` | 统一任务 API | Phase 5 | | `app/api/v1/webhooks.py` | 统一回调入口 | Phase 5 | | `app/scheduler/handlers/vidu_handler.py` | Vidu 任务处理器 | Phase 5 | | `.importlinter` | 架构约束配置 | Phase 6 | ### 4.2 修改文件清单 | 文件路径 | 修改内容 | 所属阶段 | |---------|---------|---------| | `app/core/exceptions.py` | 新增 `PlatformError` / `PlatformErrorType` | Phase 1 | | `app/main.py` | 注册异常中间件、lifespan Client 管理 | Phase 1/3/4 | | `app/config.py` | 清理非敏感配置,只保留密钥 | Phase 2 | | `app/ai/providers/vidu_provider.py` | aiohttp → httpx | Phase 3 | | `app/services/volcengine_caption_service.py` | 删除懒加载,改为注入 Client | Phase 3 | | `app/api/v1/voice.py` | 临时 Client → 共享 Client | Phase 3 | | `app/api/v1/script.py` | SSE → 异步任务 + 删除 stream 端点 | Phase 5 | | `app/services/script_service.py` | 删除 generate_script_stream | Phase 5 | | `app/api/v1/system.py` | 新增 Admin API | Phase 2 | | `app/scheduler/registry.py` | 先写数据库再写 Redis | Phase 5 | | `app/scheduler/handlers/subtitle_handler.py` | 通过 Gateway 调用 | Phase 5 | | `app/api/v1/vidu.py` | 删除私设 Redis key | Phase 5 | | `app/schemas/enums.py` | 扩展 `JobStatus` | Phase 5 | | `Makefile` / `pyproject.toml` | lint 规则 | Phase 1/3/6 | ### 4.3 废弃文件清单 | 文件路径 | 废弃原因 | 处理时间 | |---------|---------|---------| | `config/ai_models.yaml` | 合并到 `platform-config.yaml` | Phase 6 | --- ## 五、风险项与应对 | 风险 | 影响 | 概率 | 应对 | |-----|------|------|------| | `aiohttp` 迁移到 `httpx` 导致 Vidu 某些边缘场景行为不一致 | 功能回归 | 中 | 迁移后全量测试 Vidu TTS/视频生成/声音复刻 | | `PlatformError` 未覆盖所有异常路径,仍有裸 Exception 漏出 | 前端收到 500 无法处理 | 低 | `make lint-semantic` 强制检查 + Code Review | | 配置热重载导致运行时行为突变 | 线上限流突然变更 | 低 | Admin API 加操作日志,变更前确认 | | Phase 5 数据库改造影响现有 Async Engine | 字幕/脚本任务异常 | 中 | 数据库方案评审后再实施,分步迁移 | | 前端轮询改造工作量超预期 | 延期 | 中 | 提前与前端同步接口变更,预留 2 天 | --- ## 六、验收标准(最终 Checklist) 实施全部完成后,按以下清单逐项核对: - [ ] `PlatformError` 是 `app/services/` 和 `app/ai/` 中唯一的第三方异常类型 - [ ] Router 中不存在 `except Exception: raise HTTPException(500)` 处理第三方错误 - [ ] 新增 MockAdapter 实现 Protocol,30 分钟内完成注册并可用 - [ ] `aiohttp` 不在项目直接依赖中(`pip show aiohttp` 不显示或仅为间接依赖) - [ ] 所有 `AsyncClient` 在 lifespan 中创建和销毁 - [ ] 关闭应用时无 `unclosed client session` 警告 - [ ] `config/platform-config.yaml` 存在且包含所有平台配置 - [ ] 修改 `platform-config.yaml` 中的限流参数,10 秒内新请求生效 - [ ] Admin API `/admin/runtime-config/reload` 手动触发重载成功 - [ ] 健康检查 `/system/platform-health` 返回所有平台状态 - [ ] `importlinter` CI 检查通过(Router 不直接 import Provider) - [ ] 全量 API 回归测试通过 --- > 本文档为实施的唯一依据。任何偏离文档的变更必须在此文档中登记并说明理由。