""" Redis 客户端 ============ 全局 Redis 连接,供 Scheduler 和 RateLimiter 使用 """ from typing import Any from redis.asyncio import Redis from app.config import get_settings # 全局客户端(懒加载) _redis_client: Redis | None = None def get_redis_client() -> Redis: """获取或创建 Redis 客户端""" global _redis_client if _redis_client is None: settings = get_settings() # 构建连接参数 client_kwargs: dict[str, Any] = { "host": settings.REDIS_HOST, "port": settings.REDIS_PORT, "db": settings.REDIS_DB, "decode_responses": True, } # 有密码时添加 if settings.REDIS_PASSWORD: client_kwargs["password"] = settings.REDIS_PASSWORD _redis_client = Redis(**client_kwargs) return _redis_client def init_redis_client(redis: Redis) -> None: """初始化全局客户端(用于测试)""" global _redis_client _redis_client = redis