9.9 KiB
9.9 KiB
TokenManager - 通用 API 认证 Token 管理器
概述
TokenManager 是一个通用的 Token 缓存与自动刷新管理器,用于解决第三方 API 认证 Token 的有效期管理问题。主要解决以下痛点:
- Token 频繁过期:如 KlingAI 的 JWT Token 只有 30 分钟有效期
- 重复请求:每次 API 调用都重新生成 Token,增加开销
- 并发安全:多个并发请求同时触发 Token 刷新时的竞态条件
- 预热机制:在 Token 过期前主动刷新,避免请求时等待
特性
- ✅ Token 缓存:缓存 Token 直到接近过期时间
- ✅ 自动刷新:Token 即将过期时自动后台刷新
- ✅ 并发安全:使用锁机制确保并发请求只触发一次刷新
- ✅ 多策略支持:内置 JWT、OAuth2 策略,支持自定义策略
- ✅ 多实例隔离:不同 Provider 的 Token 相互隔离
- ✅ 预热机制:提前刷新 Token,确保请求时始终有效
架构设计
┌─────────────────────────────────────────────────────────────┐
│ TokenManager (单例) │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
│ │ Token Cache │ │ Refresh Lock│ │ Background Tasks │ │
│ │ {key: info} │ │ {key: Lock} │ │ {preemptive refresh}│ │
│ └─────────────┘ └─────────────┘ └─────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
┌───────────────────┼───────────────────┐
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│JWT Strategy │ │OAuth2 Str. │ │ Custom Str. │
└─────────────┘ └─────────────┘ └─────────────┘
快速开始
1. 基础用法 - 便捷函数
from app.core.token_manager import get_jwt_token, get_oauth2_token
# JWT Token (KlingAI 模式)
token_info = await get_jwt_token(
access_key="your_access_key",
secret_key="your_secret_key",
)
headers = {"Authorization": f"Bearer {token_info.token}"}
# OAuth2 Token
token_info = await get_oauth2_token(
client_id="your_client_id",
client_secret="your_client_secret",
token_url="https://api.example.com/oauth2/token",
)
2. Provider 集成(推荐)
from app.core.token_manager import JWTTokenStrategy, TokenManager
class KlingAIProvider:
def __init__(self, access_key: str, secret_key: str):
self._token_strategy = JWTTokenStrategy(
access_key=access_key,
secret_key=secret_key,
expires_in=1800, # 30分钟
)
async def _get_headers(self) -> dict[str, str]:
"""获取带认证的请求头"""
token_info = await TokenManager.get_instance().get_token(self._token_strategy)
return {
"Authorization": f"Bearer {token_info.token}",
"Content-Type": "application/json",
}
async def api_call(self):
headers = await self._get_headers()
# 使用 headers 发起请求
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=payload) as resp:
return await resp.json()
3. 自定义 Token 策略
from app.core.token_manager import BaseTokenStrategy, TokenInfo, TokenManager
class MyCustomStrategy(BaseTokenStrategy):
async def generate(self) -> TokenInfo:
# 实现你的 token 获取逻辑
token = await fetch_token_from_somewhere()
return TokenInfo(
token=token,
expires_at=time.time() + 3600,
token_type="Bearer",
)
def get_cache_key(self) -> str:
# 返回唯一的缓存标识
return "my_custom_key"
# 使用
strategy = MyCustomStrategy()
token_info = await TokenManager.get_instance().get_token(strategy)
核心概念
TokenInfo
Token 信息数据类:
@dataclass
class TokenInfo:
token: str # Token 字符串
expires_at: float # 过期时间戳(秒)
token_type: str # Token 类型(默认 Bearer)
extra_data: dict # 额外数据(如 refresh_token)
# 属性
is_expired: bool # 是否已过期
expires_in: float # 剩余有效时间(秒)
# 方法
is_near_expiry(safety_margin=300) -> bool # 是否接近过期
安全边界(Safety Margin)
为了防止 Token 在请求过程中过期,TokenManager 使用安全边界机制:
- 默认安全边界:5分钟(300秒)
- 当 Token 剩余有效期小于安全边界时,视为"接近过期"
- 接近过期时会触发刷新
- 预热机制会在 2 * 安全边界(10分钟)前后台刷新
Token 生命周期:
生成 ──────────────────────────────────────────> 过期
│ │
│<─────────── 30分钟有效期 ───────────────────>│
│ │
│ │<── 5分钟安全边界 ──>│ │
│ │ │ │
│ │ 接近过期,开始刷新 │ │
│ │ │ │
│ │<── 10分钟 ──>│ │ │
│ │ │ │ │
│ │ 后台预热任务调度 │ │
│ │ │ │ │
│ ▼ ▼ ▼ ▼
生成 预热 刷新 过期(永不发生)
内置策略
JWTTokenStrategy
用于 JWT 认证(如 KlingAI):
strategy = JWTTokenStrategy(
access_key="your_access_key",
secret_key="your_secret_key",
expires_in=1800, # Token 有效期(秒)
algorithm="HS256", # JWT 算法
token_type="JWT", # Token 类型
)
OAuth2TokenStrategy
用于 OAuth2 认证:
strategy = OAuth2TokenStrategy(
client_id="your_client_id",
client_secret="your_client_secret",
token_url="https://api.example.com/oauth2/token",
scope="read write", # 可选
extra_params={}, # 额外参数
)
API 参考
TokenManager
class TokenManager:
@classmethod
def get_instance(cls) -> TokenManager:
"""获取单例实例"""
async def get_token(
self,
strategy: BaseTokenStrategy,
force_refresh: bool = False,
) -> TokenInfo:
"""获取有效的 token"""
async def get_token_string(
self,
strategy: BaseTokenStrategy,
force_refresh: bool = False,
) -> str:
"""获取 token 字符串(带 Bearer 前缀)"""
async def invalidate(self, strategy: BaseTokenStrategy) -> bool:
"""使缓存失效"""
def clear(self):
"""清除所有 token 缓存"""
def get_stats(self) -> dict:
"""获取缓存统计信息"""
并发安全机制
TokenManager 使用双重检查锁定(Double-Checked Locking)模式确保并发安全:
# 伪代码
async def get_token(strategy):
cache_key = strategy.get_cache_key()
# 第一次检查(无锁)
if cache_key in cache and not near_expiry:
return cache[cache_key]
# 获取刷新锁
async with refresh_locks[cache_key]:
# 第二次检查(有锁)
if cache_key in cache and not near_expiry:
return cache[cache_key]
# 执行刷新
new_token = await strategy.generate()
cache[cache_key] = new_token
return new_token
这样即使有 100 个并发请求同时触发 Token 刷新,也只会执行一次实际的刷新操作。
测试
运行测试:
cd python-api
pytest tests/test_token_manager.py -v
测试覆盖:
- TokenInfo 数据类行为
- JWT Token 生成
- Token 缓存机制
- 并发安全(10 个并发请求只生成 1 个 token)
- 强制刷新
- 缓存失效
- OAuth2 支持
- 多 Provider 隔离
最佳实践
- 在 Provider 初始化时创建 Strategy:避免每次请求都创建新的 Strategy 实例
- 使用相同的 access_key/secret_key:确保缓存命中
- 不要手动管理 Token 过期:TokenManager 会自动处理
- 定期查看统计信息:用于监控 Token 使用情况
# 调试:查看 Token 缓存统计
stats = TokenManager.get_instance().get_stats()
print(stats)
# {
# 'total_cached': 3,
# 'active_tasks': 3,
# 'tokens': {
# 'jwt:key1': {'expires_in': 1200, 'is_expired': False, 'is_near_expiry': False},
# ...
# }
# }