Files
meijiaka-zy/python-api/app/core/security.py
T
小鱼开发 51521fc0dd feat(payment): 微信支付 APIv2 + 积分充值 + SMS 短信 + 双 Token 认证
- 微信支付从 APIv3 降级为 APIv2(MD5/XML)
- 积分系统:充值下单、微信回调、消费冻结/结算/退款
- SMS B2M 短信验证码服务
- 双 Token 认证(Access 30min + Refresh 30days)
- SSE 单设备踢人
- 用户设备管理、积分账户模型
- Alembic 迁移脚本
2026-05-07 18:43:02 +08:00

117 lines
2.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
安全工具 - JWT Token 生成与验证
===============================
支持双 Token 体系:
- Access Token:短效(30 分钟),用于 API 请求认证
- Refresh Token:长效(30 天),用于换取新的 Access Token
"""
from __future__ import annotations
import uuid
from datetime import UTC, datetime, timedelta
from typing import Any
from jose import JWTError, jwt
from app.config import get_settings
settings = get_settings()
def create_access_token(data: dict[str, Any], expires_delta: timedelta | None = None) -> str:
"""
创建 Access Token(短效,用于 API 请求)
Payload 包含 {"type": "access", "sub": user_id, "exp": ...}
"""
to_encode = data.copy()
to_encode.update({"type": "access"})
if expires_delta:
expire = datetime.now(UTC) + expires_delta
else:
expire = datetime.now(UTC) + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(
to_encode,
settings.SECRET_KEY,
algorithm=settings.ALGORITHM,
)
return encoded_jwt
def create_refresh_token(data: dict[str, Any], expires_delta: timedelta | None = None) -> str:
"""
创建 Refresh Token(长效,用于换取 Access Token
Payload 包含 {"type": "refresh", "sub": user_id, "jti": uuid, "exp": ...}
jti 用于唯一标识该 Refresh Token,便于撤销。
"""
to_encode = data.copy()
to_encode.update({"type": "refresh", "jti": str(uuid.uuid4())})
if expires_delta:
expire = datetime.now(UTC) + expires_delta
else:
expire = datetime.now(UTC) + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(
to_encode,
settings.SECRET_KEY,
algorithm=settings.ALGORITHM,
)
return encoded_jwt
def verify_token(token: str) -> dict[str, Any] | None:
"""
通用 Token 验证(不区分类型)
Args:
token: JWT Token 字符串
Returns:
解码后的 payload,如果验证失败返回 None
"""
try:
payload = jwt.decode(
token,
settings.SECRET_KEY,
algorithms=[settings.ALGORITHM],
)
return payload
except JWTError:
return None
def verify_access_token(token: str) -> dict[str, Any] | None:
"""
验证 Access Token
额外检查 payload 中 type 必须为 "access"
"""
payload = verify_token(token)
if not payload or payload.get("type") != "access":
return None
return payload
def verify_refresh_token(token: str) -> dict[str, Any] | None:
"""
验证 Refresh Token
额外检查 payload 中 type 必须为 "refresh"
"""
payload = verify_token(token)
if not payload or payload.get("type") != "refresh":
return None
return payload