Files
meijiaka-zy/python-api/app/core/security.py
T
小鱼开发 95e55293c6 security: 全面生产安全加固与部署修复
后端安全:
- DEBUG 默认 True → False
- 彻底移除 AUTH_BYPASS 认证绕过
- 验证码不再明文打印到日志
- 上传接口增加大小限制(500MB/20MB/100MB)与魔数校验
- python-jose → PyJWT, 更新 requirements.lock/uv.lock
- Bandit 恢复关键规则(B104/B301/B305/B314/B324/B603/B607)
- 修复 5 处 try_except_pass, 15 处加 nosec 注释
- 启用 Bandit pre-commit 钩子

前端安全:
- 配置完整 CSP 策略
- 收紧 Capabilities(fs:allow-read-file → $RESOURCE/**)
- 移除硬编码 devToken
- 清理前端 TODO(美家卡智影命名统一)

部署修复:
- docker-compose.prod 增加 alembic 迁移步骤
- api + scheduler 增加 Redis 心跳健康检查
- Nginx 添加安全响应头
- Nginx client_max_body_size 100M → 500M
- .env.example 补充 UPLOAD_MAX_* 配置与安全注释

其他:
- /voice/upload 合并到 /upload/audio
- Rust 上传增加文件大小检查
- 清理 Rust 19 处 println! + 前端 21 处 console.info
- 修复 VideoCompose.tsx toast 未导入(已有bug)
2026-05-10 23:31:34 +08:00

118 lines
2.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
安全工具 - JWT Token 生成与验证
===============================
支持双 Token 体系:
- Access Token:短效(30 分钟),用于 API 请求认证
- Refresh Token:长效(30 天),用于换取新的 Access Token
"""
from __future__ import annotations
import uuid
from datetime import UTC, datetime, timedelta
from typing import Any
import jwt
from jwt import PyJWTError
from app.config import get_settings
settings = get_settings()
def create_access_token(data: dict[str, Any], expires_delta: timedelta | None = None) -> str:
"""
创建 Access Token(短效,用于 API 请求)
Payload 包含 {"type": "access", "sub": user_id, "exp": ...}
"""
to_encode = data.copy()
to_encode.update({"type": "access"})
if expires_delta:
expire = datetime.now(UTC) + expires_delta
else:
expire = datetime.now(UTC) + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(
to_encode,
settings.SECRET_KEY,
algorithm=settings.ALGORITHM,
)
return encoded_jwt
def create_refresh_token(data: dict[str, Any], expires_delta: timedelta | None = None) -> str:
"""
创建 Refresh Token(长效,用于换取 Access Token
Payload 包含 {"type": "refresh", "sub": user_id, "jti": uuid, "exp": ...}
jti 用于唯一标识该 Refresh Token,便于撤销。
"""
to_encode = data.copy()
to_encode.update({"type": "refresh", "jti": str(uuid.uuid4())})
if expires_delta:
expire = datetime.now(UTC) + expires_delta
else:
expire = datetime.now(UTC) + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(
to_encode,
settings.SECRET_KEY,
algorithm=settings.ALGORITHM,
)
return encoded_jwt
def verify_token(token: str) -> dict[str, Any] | None:
"""
通用 Token 验证(不区分类型)
Args:
token: JWT Token 字符串
Returns:
解码后的 payload,如果验证失败返回 None
"""
try:
payload = jwt.decode(
token,
settings.SECRET_KEY,
algorithms=[settings.ALGORITHM],
)
return payload
except PyJWTError:
return None
def verify_access_token(token: str) -> dict[str, Any] | None:
"""
验证 Access Token
额外检查 payload 中 type 必须为 "access"
"""
payload = verify_token(token)
if not payload or payload.get("type") != "access":
return None
return payload
def verify_refresh_token(token: str) -> dict[str, Any] | None:
"""
验证 Refresh Token
额外检查 payload 中 type 必须为 "refresh"
"""
payload = verify_token(token)
if not payload or payload.get("type") != "refresh":
return None
return payload