""" 安全工具 - JWT Token 生成与验证 + 密码哈希 ========================================== 支持双 Token 体系: - Access Token:短效(30 分钟),用于 API 请求认证 - Refresh Token:长效(30 天),用于换取新的 Access Token 密码哈希: - 使用 bcrypt 进行密码哈希和校验 """ from __future__ import annotations import uuid from datetime import UTC, datetime, timedelta from typing import Any import jwt from jwt import PyJWTError from passlib.context import CryptContext from app.config import get_settings settings = get_settings() # bcrypt 密码哈希上下文 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def hash_password(password: str) -> str: """对明文密码进行 bcrypt 哈希""" return pwd_context.hash(password) def verify_password(plain_password: str, hashed_password: str) -> bool: """校验明文密码与哈希密码是否匹配""" return pwd_context.verify(plain_password, hashed_password) def create_access_token(data: dict[str, Any], expires_delta: timedelta | None = None) -> str: """ 创建 Access Token(短效,用于 API 请求) Payload 包含 {"type": "access", "sub": user_id, "exp": ...} """ to_encode = data.copy() to_encode.update({"type": "access"}) if expires_delta: expire = datetime.now(UTC) + expires_delta else: expire = datetime.now(UTC) + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode( to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM, ) return encoded_jwt def create_refresh_token(data: dict[str, Any], expires_delta: timedelta | None = None) -> str: """ 创建 Refresh Token(长效,用于换取 Access Token) Payload 包含 {"type": "refresh", "sub": user_id, "jti": uuid, "exp": ...} jti 用于唯一标识该 Refresh Token,便于撤销。 """ to_encode = data.copy() to_encode.update({"type": "refresh", "jti": str(uuid.uuid4())}) if expires_delta: expire = datetime.now(UTC) + expires_delta else: expire = datetime.now(UTC) + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode( to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM, ) return encoded_jwt def verify_token(token: str) -> dict[str, Any] | None: """ 通用 Token 验证(不区分类型) Args: token: JWT Token 字符串 Returns: 解码后的 payload,如果验证失败返回 None """ try: payload = jwt.decode( token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM], ) return payload except PyJWTError: return None def verify_access_token(token: str) -> dict[str, Any] | None: """ 验证 Access Token 额外检查 payload 中 type 必须为 "access"。 """ payload = verify_token(token) if not payload or payload.get("type") != "access": return None return payload def verify_refresh_token(token: str) -> dict[str, Any] | None: """ 验证 Refresh Token 额外检查 payload 中 type 必须为 "refresh"。 """ payload = verify_token(token) if not payload or payload.get("type") != "refresh": return None return payload