Files
meijiaka-zy/docs/third-party-integration-implementation-plan.md
小鱼开发 04e467e433 feat(points): 积分系统收尾 + 充值弹窗改造 + 命名统一
后端:
- 微信回调 db.commit 失败仍返回 SUCCESS,避免无限重试
- recharge() 加 order_id 幂等保护,防重复充值
- time_expire 使用北京时间(UTC+8),修复时区 bug
- 充值档位后端配置化(points-config.yaml + /recharge-options API)
- 代码审查 20 项修复(认证加固、扣费顺序、错误响应、状态同步等)

前端:
- 充值弹窗:自动轮询 + 【我已支付】手动兜底
- 二维码倒计时显示,过期后遮罩 + 刷新按钮
- 充值档位从后端动态加载
- 去掉 select/qrcode 弹窗标题,金额红色突出显示
- 全项目命名统一(视频生成/压制成片/配音合成/声音复刻等)
- Modal 关闭按钮独立于 title 显示
2026-05-09 21:29:35 +08:00

22 KiB
Raw Permalink Blame History

第三方平台接入架构标准化实施计划

版本:v1.0 依据:third-party-integration-architecture.md(架构设计最终版) 目标:统一异常体系、Adapter 契约、HTTP Client 生命周期、异步任务状态机、配置分层 生效日期:2026-05-03


一、总则

1.1 实施原则

  • 标准优先:以行业主流做法为准,不因"改动小"而妥协
  • 文档先行:所有变更必须在此文档中登记,实施完成后逐项核对
  • 标准优先:存量代码直接按标准改造,不做兼容包装层
  • 渐进验证:每阶段完成后运行测试,确认无回归再进入下一阶段

1.2 不适用本标准的例外

  • 七牛云存储(纯上传下载,不纳入 Adapter 体系)

二、五条铁律规范(实施标准)

铁律 1:异常出口唯一

规范内容:所有 app/services/app/ai/ 下的代码,对外抛出的异常必须是 PlatformError。Router 只 except PlatformErrorAppException(业务错误)。

具体标准

# app/core/exceptions.py —— 唯一的第三方异常类
class PlatformErrorType:
    RATE_LIMIT = "rate_limit"
    AUTH_FAILED = "auth_failed"
    TIMEOUT = "timeout"
    SERVER_ERROR = "server_error"
    BAD_REQUEST = "bad_request"
    QUOTA_EXHAUSTED = "quota_exhausted"
    NOT_FOUND = "not_found"
    UNKNOWN = "unknown"

class PlatformError(Exception):
    def __init__(
        self,
        message: str,
        *,
        platform: str,
        retryable: bool = False,
        error_type: str = PlatformErrorType.UNKNOWN,
        status_code: int | None = None,
    ):
        super().__init__(message)
        self.platform = platform
        self.retryable = retryable
        self.error_type = error_type
        self.status_code = status_code

HTTP 状态码映射(全局中间件):

error_type retryable HTTP 状态码
rate_limit True 429
timeout True 504
server_error True 502
auth_failed False 401
bad_request False 400
quota_exhausted False 429(带 Retry-After
unknown False 400

禁止事项

  • app/services/app/ai/ 中禁止 raise HTTPException
  • app/services/app/ai/ 中禁止裸 raise Exception(...)
  • 各 Router 中禁止 except Exception: raise HTTPException(500, ...) 处理第三方错误

铁律 2Adapter 最小契约

规范内容:每个新平台必须实现 PlatformAdapterplatform_id + health() + close())。按需实现 SyncCapable(同步调用)或 TaskCapable(异步任务)。

Protocol 定义

# app/ai/adapters/base.py

@runtime_checkable
class PlatformAdapter(Protocol):
    """所有 Adapter 的准入门槛"""
    platform_id: str
    async def health(self) -> AdapterResponse: ...
    async def close(self) -> None: ...

@runtime_checkable
class SyncCapable(Protocol):
    """同步调用能力(TTS、Chat、图片生成等)"""
    async def call(self, method: str, payload: dict) -> AdapterResponse: ...

@runtime_checkable
class TaskCapable(Protocol):
    """异步任务能力(视频生成、字幕、TTS 等)"""
    async def submit(self, task_type: str, payload: dict, callback_url: str | None) -> AdapterResponse: ...
    async def query(self, platform_job_id: str) -> TaskStatus: ...

@runtime_checkable
class CallbackCapable(Protocol):
    """回调验签能力(可选)"""
    async def verify_signature(self, headers: dict, body: bytes, secret: str) -> bool: ...
    async def parse_callback(self, body: bytes) -> TaskStatus: ...

统一返回值

@dataclass(frozen=True)
class AdapterResponse:
    success: bool
    data: dict | None = None
    error_code: str | None = None
    error_message: str | None = None
    retryable: bool = False

@dataclass(frozen=True)
class TaskStatus:
    state: str  # "pending" | "processing" | "completed" | "failed"
    result: dict | None = None
    error_message: str | None = None

方法标识常量

# app/ai/adapters/constants.py
class Method:
    TTS = "tts"
    CHAT = "chat"
    CHAT_STREAM = "chat_stream"
    IMAGE_GENERATE = "image_generate"
    LIP_SYNC = "lip_sync"
    CAPTION = "caption"
    AUTO_ALIGN = "auto_align"

各平台对号入座

平台 必须实现 当前状态
火山方舟 PlatformAdapter + SyncCapable 缺失,需新建 VolcengineArkAdapter
Vidu PlatformAdapter + SyncCapable + TaskCapable + CallbackCapable 缺失,需新建 ViduAdapter
火山字幕 PlatformAdapter + TaskCapable 缺失,需新建 VolcengineCaptionAdapter

禁止事项

  • 新增平台不实现 PlatformAdapter 直接接入
  • Adapter 内部抛出的异常不是 PlatformError
  • call() 方法返回裸 dict 而不是 AdapterResponse

铁律 3HTTP Client 统一关闭

规范内容:所有对外 HTTP 连接(httpx.AsyncClient 或 SDK 内部)必须在 lifespan 中创建和销毁。禁止在方法内临时创建 httpx.AsyncClient()

具体标准

# lifespan 中的标准写法
@asynccontextmanager
async def lifespan(app: FastAPI):
    # 各平台独立 Client(故障隔离)
    app.state.http_clients = {
        "vidu": httpx.AsyncClient(timeout=30, limits=httpx.Limits(max_connections=20)),
        "volcengine_caption": httpx.AsyncClient(timeout=60, limits=httpx.Limits(max_connections=10)),
        "default": httpx.AsyncClient(timeout=30, limits=httpx.Limits(max_connections=50)),
    }
    
    # SDK 客户端
    app.state.ark_client = AsyncArk(api_key=..., timeout=1800)
    
    yield
    
    # 统一关闭
    for client in app.state.http_clients.values():
        await client.aclose()
    
    if hasattr(app.state, "ark_client") and not app.state.ark_client.is_closed():
        await app.state.ark_client.close()

迁移清单

文件 当前 Client 整改方式
app/ai/providers/vidu_provider.py aiohttp.ClientSession 迁移为 httpx.AsyncClient,由 lifespan 注入
app/services/volcengine_caption_service.py httpx.AsyncClient(懒加载,永不关闭) 改为 lifespan 注入,删除 _get_client() 懒加载
app/api/v1/voice.py 临时 httpx.AsyncClient() 改为 app.state.http_clients["default"]

禁止事项

  • 禁止 import aiohttp(项目统一使用 httpx
  • 禁止在方法/路由内 httpx.AsyncClient() 临时创建(下载大文件例外,需注释说明)
  • 禁止 Provider __init__ 中创建 Client 而不在 lifespan 中关闭

铁律 4:异步任务状态唯一

规范内容:所有"提交后等待"的任务,状态必须写入统一的状态机。第三方回调走统一入口 /webhooks/{platform}

注意:本铁律涉及数据库设计,当前文档中 SQLAlchemy Job model 相关设计已挂起,待数据库方案确定后补充。本章只规定接口和状态机标准。

统一状态枚举

class JobStatus(str, Enum):
    PENDING = "pending"       # 已提交,等待调度
    QUEUED = "queued"         # 已进入队列,等待槽位
    RUNNING = "running"       # 正在执行
    SUCCEEDED = "succeeded"   # 成功完成
    FAILED = "failed"         # 失败
    CANCELLED = "cancelled"   # 用户取消或超时取消

统一任务 API

# 提交任务
POST /jobs
Request: {platform: str, task_type: str, payload: dict, idempotency_key: str | None}
Response: {job_id: UUID, status: "pending"}

# 查询任务
GET /jobs/{job_id}
Response: {job_id, status, progress, message, result, error, created_at, updated_at}

# 统一回调入口
POST /webhooks/{platform}

第三方状态映射Adapter 层负责):

第三方状态 内部状态
Vidu: pending / processing RUNNING
Vidu: success SUCCEEDED
Vidu: failed FAILED
火山字幕: code=2000 RUNNING
火山字幕: code=0 SUCCEEDED
火山字幕: code=1001/1002/1012 FAILED(不可重试)
火山字幕: code=1003(超频) FAILED(可重试)

禁止事项

  • 禁止 Router/Service 私设 Redis key(如 vidu:lipsync:xxx
  • 禁止在 Router 中直接处理回调验签(必须由 Adapter 处理)
  • 禁止各平台使用自己的状态字符串返回给前端

铁律 5:配置与密钥分离

规范内容:非敏感配置走 config/platform-config.yaml,支持热重载。密钥走 .env,修改需重启。

文件结构

# config/platform-config.yaml
platforms:
  <platform_id>:
    provider: <provider_type>
    base_url: <url>
    models:  # 原 ai_models.yaml 内容合并至此
      - id: <model_alias>
        model_name: <实际模型ID>
        capabilities: [<capability>]
        default_params: <dict>
    rate_limit:
      qps: <float>
      burst: <int>
    methods:
      <method>:
        timeout: <int>
        max_connections: <int>
        rate_limit:
          qps: <float>
          burst: <int>

runtime:
  fallback_chains:
    <capability>:
      - <model_alias_1>
      - <model_alias_2>
  task_timeouts:
    <task_type>: <seconds>
  task_ttl:
    <task_type>: <seconds>

热重载实现

class RuntimeConfig:
    """运行时配置,轮询检查 mtime10秒间隔)+ Admin API 手动触发"""
    
    async def get(self, key: str, default=None):
        await self._reload_if_changed()
        return self._config.get(key, default)
    
    async def force_reload(self) -> bool:
        """Admin API 调用"""
        ...

Admin API

@router.post("/admin/runtime-config/reload")
async def reload_runtime_config():
    success = await runtime_config.force_reload()
    return {"reloaded": success, "version": runtime_config.version}

@router.get("/admin/runtime-config")
async def get_runtime_config():
    return runtime_config.get_raw()

迁移清单

.env 中的配置项 迁移目标 状态
VIDU_BASE_URL platforms.vidu.base_url 待迁移
VOLCENGINE_BASE_URL platforms.volcengine_ark.base_url 待迁移
VOLC_SUBTITLE_MAX_CONCURRENT platforms.volcengine_caption.methods.caption.max_connections 待迁移
VOLC_SUBTITLE_TIMEOUT runtime.task_timeouts.caption 待迁移

禁止事项

  • 禁止在 .env 中存放非敏感配置(URL、超时、限流)
  • 禁止代码中硬编码配置(如 timeout=30max_rate=20
  • 禁止 Settings 类超过 150 行(逐步瘦身)

三、分阶段实施计划

Phase 0:准备(0.5 天)

# 任务 输出文件 检查方式
0.1 新建 app/ai/adapters/ 目录结构 app/ai/adapters/__init__.py 目录存在
0.2 新建 app/platform_gateway.py 骨架 app/platform_gateway.py 文件存在,类定义完整
0.3 安装/确认 importlinter 可用 pyproject.toml 依赖 pip show importlinter
0.4 备份现有 exceptions.py git stash / 分支 可回滚

Phase 1:异常体系(0.5 天)

# 任务 输出文件 检查方式
1.1 重构 PlatformError + PlatformErrorType app/core/exceptions.py 类型定义完整,含所有字段
1.2 保留 AppException 体系(业务错误) app/core/exceptions.py 原有类不删除
1.3 main.py 注册 PlatformError 全局中间件 app/main.py 启动无报错,异常测试返回正确 HTTP 码
1.4 VolcengineArkAdapter._wrap_error() 实现异常映射 app/ai/adapters/volcengine_ark.py 单元测试覆盖
1.5 ViduAdapter._wrap_error() 实现异常映射 app/ai/adapters/vidu.py 单元测试覆盖
1.6 make lint-semantic 增加异常规则 Makefile 提交时自动检查

验收标准

  • 任意第三方调用失败,Router 返回的 JSON 中 detail.retryable 正确
  • 网络超时返回 504,限流返回 429,认证失败返回 401
  • 业务错误(如参数校验失败)仍走 AppException → 400/422

Phase 2Adapter Protocol + 配置合并(1 天)

# 任务 输出文件 检查方式
2.1 PlatformAdapter / SyncCapable / TaskCapable / CallbackCapable Protocol app/ai/adapters/base.py isinstance 校验通过
2.2 AdapterResponse / TaskStatus dataclass app/ai/adapters/base.py frozen=True,字段完整
2.3 Method 常量定义 app/ai/adapters/constants.py 覆盖所有现有方法
2.4 合并 ai_models.yamlplatform-config.yaml config/platform-config.yaml 原有模型列表完整迁移
2.5 RuntimeConfig 热重载实现 app/core/runtime_config.py mtime 轮询 + force_reload 均工作
2.6 Admin API /admin/runtime-config/* app/api/v1/system.py GET/POST 返回正确
2.7 Settings 类清理非敏感配置 app/config.py 只保留密钥,行数 < 150

验收标准

  • 新增一个 MockAdapter 实现 ProtocolIDE 自动提示缺失方法
  • 修改 runtime.yaml 中的 qps10 秒内新请求生效
  • Admin API 手动触发 reload,返回最新配置

Phase 3HTTP Client 统一(1 天)

# 任务 输出文件 检查方式
3.1 ViduProvideraiohttp 迁移到 httpx app/ai/providers/vidu_provider.py 功能测试通过
3.2 VolcengineCaptionService 删除懒加载,改为注入 Client app/services/volcengine_caption_service.py 功能测试通过
3.3 voice.py 中临时 httpx.AsyncClient() 改为共享 Client app/api/v1/voice.py 代码审查
3.4 lifespan 统一管理所有 Client 生命周期 app/main.py 启动/关闭无泄漏日志
3.5 ViduAdapter.close() / VolcengineCaptionAdapter.close() 实现 对应 Adapter 文件 lifespan shutdown 时调用
3.6 make lint 增加 aiohttp import 禁止规则 pyproject.toml 或 pre-commit import aiohttp 报 error

验收标准

  • pip list | grep aiohttp 无输出(或确认仅作为间接依赖)
  • python -m app.main 启动后,关闭时无 unclosed client session 警告
  • 所有 AsyncClient 创建都在 lifespan 中

Phase 4Gateway 骨架 + Adapter 包装层(1 天)

# 任务 输出文件 检查方式
4.1 PlatformGateway 骨架(call_sync / submit_task / query_task / handle_webhook app/platform_gateway.py 类方法签名完整
4.2 VolcengineArkAdapter 改造现有 Provider 实现 Protocol app/ai/adapters/volcengine_ark.py 单元测试通过
4.3 ViduAdapter 改造现有 Provider 实现 Protocol app/ai/adapters/vidu.py 单元测试通过
4.4 VolcengineCaptionAdapter 改造现有 Service 实现 Protocol app/ai/adapters/volcengine_caption.py 单元测试通过
4.5 LLMGateway 实现(模型选择、Fallback、流式路由) app/ai/gateways/llm_gateway.py 脚本生成功能测试通过
4.6 lifespan 中初始化所有 Adapter 并注册到 Gateway app/main.py 启动日志显示各平台初始化成功

验收标准

  • 新增一个 MockAdapter 实现 Protocol,5 分钟内完成注册并可用
  • LLMGateway.chat() 主模型失败时自动 Fallback 到备用模型
  • 健康检查 /system/platform-health 返回所有平台状态

Phase 5:异步任务统一(2 天,数据库方案确定后实施)

# 任务 输出文件 检查方式
5.1 SQLAlchemy Job model(独立设计) app/models/job.py Alembic 迁移成功
5.2 Pydantic JobResponse Schema app/schemas/job.py 覆盖所有字段
5.3 JobRegistry 改为先写数据库、再写 Redis app/scheduler/registry.py 数据库有数据
5.4 JobStatus 扩展为 6 种状态 app/schemas/enums.py 覆盖所有场景
5.5 ViduHandler 接入 Async Engine app/scheduler/handlers/vidu_handler.py 视频生成任务走 Engine
5.6 SubtitleHandler 改为通过 Gateway 调用 app/scheduler/handlers/subtitle_handler.py 字幕任务走 Gateway
5.7 统一回调入口 /webhooks/{platform} app/api/v1/webhooks.py Vidu 回调正常
5.8 删除 Router 中私设 Redis key 的代码 app/api/v1/vidu.py vidu:lipsync: 字样
5.9 统一任务 API /jobs/{job_id} app/api/v1/jobs.py GET 返回标准格式
5.10 脚本生成从 SSE 改为异步任务 app/api/v1/script.py / app/services/script_service.py POST /jobs 提交,轮询 /jobs/{id}
5.11 删除 /script/generate/stream SSE 端点 app/api/v1/script.py 端点不存在

验收标准

  • Vidu 视频生成任务提交后,Redis 中只有 job:{uuid} 格式的 key
  • 应用重启后,从数据库恢复 running 任务继续执行
  • 前端轮询 /jobs/{id} 获取所有异步任务状态

Phase 6:清理与验证(0.5 天)

# 任务 输出文件 检查方式
6.1 importlinter 配置(禁止 Router 直接 import Provider .importlinter CI 中运行通过
6.2 删除废弃的 ai_models.yaml(确认合并完成后) 文件不存在
6.3 删除 ViduService / VolcengineCaptionService 中的重复异常处理 对应文件 代码审查
6.4 全量回归测试(所有现有 API 调用一遍) 测试脚本通过
6.5 更新本文档,标记各阶段完成状态 本文档 所有 checkbox 打勾

四、检查清单汇总

4.1 新增文件清单

文件路径 说明 所属阶段
app/ai/adapters/__init__.py Adapter 包 Phase 0
app/ai/adapters/base.py Protocol + dataclass Phase 2
app/ai/adapters/constants.py Method 常量 Phase 2
app/ai/adapters/volcengine_ark.py 火山方舟 Adapter Phase 4
app/ai/adapters/vidu.py Vidu Adapter Phase 4
app/ai/adapters/volcengine_caption.py 火山字幕 Adapter Phase 4
app/ai/gateways/llm_gateway.py LLM 网关 Phase 4
app/platform_gateway.py 统一平台网关 Phase 0/4
app/core/runtime_config.py 运行时配置 + 热重载 Phase 2
config/platform-config.yaml 合并后的平台配置 Phase 2
app/models/job.py 异步任务数据库模型 Phase 5
app/api/v1/jobs.py 统一任务 API Phase 5
app/api/v1/webhooks.py 统一回调入口 Phase 5
app/scheduler/handlers/vidu_handler.py Vidu 任务处理器 Phase 5
.importlinter 架构约束配置 Phase 6

4.2 修改文件清单

文件路径 修改内容 所属阶段
app/core/exceptions.py 新增 PlatformError / PlatformErrorType Phase 1
app/main.py 注册异常中间件、lifespan Client 管理 Phase 1/3/4
app/config.py 清理非敏感配置,只保留密钥 Phase 2
app/ai/providers/vidu_provider.py aiohttp → httpx Phase 3
app/services/volcengine_caption_service.py 删除懒加载,改为注入 Client Phase 3
app/api/v1/voice.py 临时 Client → 共享 Client Phase 3
app/api/v1/script.py SSE → 异步任务 + 删除 stream 端点 Phase 5
app/services/script_service.py 删除 generate_script_stream Phase 5
app/api/v1/system.py 新增 Admin API Phase 2
app/scheduler/registry.py 先写数据库再写 Redis Phase 5
app/scheduler/handlers/subtitle_handler.py 通过 Gateway 调用 Phase 5
app/api/v1/vidu.py 删除私设 Redis key Phase 5
app/schemas/enums.py 扩展 JobStatus Phase 5
Makefile / pyproject.toml lint 规则 Phase 1/3/6

4.3 废弃文件清单

文件路径 废弃原因 处理时间
config/ai_models.yaml 合并到 platform-config.yaml Phase 6

五、风险项与应对

风险 影响 概率 应对
aiohttp 迁移到 httpx 导致 Vidu 某些边缘场景行为不一致 功能回归 迁移后全量测试 Vidu TTS/视频生成/声音复刻
PlatformError 未覆盖所有异常路径,仍有裸 Exception 漏出 前端收到 500 无法处理 make lint-semantic 强制检查 + Code Review
配置热重载导致运行时行为突变 线上限流突然变更 Admin API 加操作日志,变更前确认
Phase 5 数据库改造影响现有 Async Engine 字幕/脚本任务异常 数据库方案评审后再实施,分步迁移
前端轮询改造工作量超预期 延期 提前与前端同步接口变更,预留 2 天

六、验收标准(最终 Checklist)

实施全部完成后,按以下清单逐项核对:

  • PlatformErrorapp/services/app/ai/ 中唯一的第三方异常类型
  • Router 中不存在 except Exception: raise HTTPException(500) 处理第三方错误
  • 新增 MockAdapter 实现 Protocol30 分钟内完成注册并可用
  • aiohttp 不在项目直接依赖中(pip show aiohttp 不显示或仅为间接依赖)
  • 所有 AsyncClient 在 lifespan 中创建和销毁
  • 关闭应用时无 unclosed client session 警告
  • config/platform-config.yaml 存在且包含所有平台配置
  • 修改 platform-config.yaml 中的限流参数,10 秒内新请求生效
  • Admin API /admin/runtime-config/reload 手动触发重载成功
  • 健康检查 /system/platform-health 返回所有平台状态
  • importlinter CI 检查通过(Router 不直接 import Provider
  • 全量 API 回归测试通过

本文档为实施的唯一依据。任何偏离文档的变更必须在此文档中登记并说明理由。