Files
meijiaka-zy/python-api/app/scheduler/engine.py
T
小鱼开发 95e55293c6 security: 全面生产安全加固与部署修复
后端安全:
- DEBUG 默认 True → False
- 彻底移除 AUTH_BYPASS 认证绕过
- 验证码不再明文打印到日志
- 上传接口增加大小限制(500MB/20MB/100MB)与魔数校验
- python-jose → PyJWT, 更新 requirements.lock/uv.lock
- Bandit 恢复关键规则(B104/B301/B305/B314/B324/B603/B607)
- 修复 5 处 try_except_pass, 15 处加 nosec 注释
- 启用 Bandit pre-commit 钩子

前端安全:
- 配置完整 CSP 策略
- 收紧 Capabilities(fs:allow-read-file → $RESOURCE/**)
- 移除硬编码 devToken
- 清理前端 TODO(美家卡智影命名统一)

部署修复:
- docker-compose.prod 增加 alembic 迁移步骤
- api + scheduler 增加 Redis 心跳健康检查
- Nginx 添加安全响应头
- Nginx client_max_body_size 100M → 500M
- .env.example 补充 UPLOAD_MAX_* 配置与安全注释

其他:
- /voice/upload 合并到 /upload/audio
- Rust 上传增加文件大小检查
- 清理 Rust 19 处 println! + 前端 21 处 console.info
- 修复 VideoCompose.tsx toast 未导入(已有bug)
2026-05-10 23:31:34 +08:00

124 lines
4.5 KiB
Python

"""
Async Engine 核心调度器
=======================
驱动所有 Handler 的 Tick 循环,批量查询、批量更新。
"""
import asyncio
import logging
from typing import Any
from app.core.redis_client import get_redis_client
from app.scheduler.handlers.base import AsyncHandler
from app.scheduler.models import StateChange
from app.scheduler.registry import TaskRegistry
from app.scheduler.slot_manager import SlotManager
logger = logging.getLogger(__name__)
class AsyncEngine:
"""统一异步作业调度引擎"""
def __init__(self, handlers: list[AsyncHandler] | None = None):
self.redis = get_redis_client()
self.registry = TaskRegistry(self.redis)
self.slots = SlotManager(self.redis)
self.handlers: dict[str, AsyncHandler] = {}
if handlers:
for h in handlers:
self.handlers[h.name] = h
def register(self, handler: AsyncHandler) -> None:
"""注册一个 Handler"""
self.handlers[handler.name] = handler
logger.info(f"Registered handler: {handler.name}")
async def tick(self) -> None:
"""执行一次完整的调度 Tick"""
tick_start = asyncio.get_event_loop().time()
try:
# 1. 加载所有 running 的作业 ID
running_ids = await self.registry.get_running_task_ids()
if not running_ids:
logger.debug("Tick: no running tasks")
return
# 2. 按 task_type 分组
tasks_by_type: dict[str, list[Any]] = {}
for task_id in running_ids:
record = await self.registry.get(task_id)
if not record:
await self.registry.remove_running(task_id)
continue
tasks_by_type.setdefault(record.task_type, []).append(record)
# 3. 并行执行各 Handler 的 tick
results = await asyncio.gather(
*[
self._safe_tick(handler_name, handler, tasks_by_type.get(handler_name, []))
for handler_name, handler in self.handlers.items()
]
)
# 4. 收集并应用状态变更
for changes in results:
if changes:
await self._apply_changes(changes)
# 5. 清理已结束的作业
await self._cleanup_finished()
except Exception:
logger.exception("Scheduler tick failed")
finally:
elapsed = asyncio.get_event_loop().time() - tick_start
logger.debug(f"Tick completed in {elapsed:.2f}s")
# 写入心跳,供 healthcheck 检查
await self.redis.set("scheduler:heartbeat", str(asyncio.get_event_loop().time()), ex=60)
async def _safe_tick(
self, name: str, handler: AsyncHandler, tasks: list[Any]
) -> list[StateChange]:
"""安全执行 Handler tick,捕获异常"""
try:
return await handler.tick(tasks, self.registry, self.slots)
except Exception:
logger.exception(f"Handler tick failed: {name}")
return []
async def _apply_changes(self, changes: list[StateChange]) -> None:
"""批量应用状态变更到 Redis"""
pipe = self.redis.pipeline()
executed = False
for change in changes:
key, field, value = change.to_redis_command()
pipe.hset(key, field, value)
executed = True
if executed:
await pipe.execute()
async def _cleanup_finished(self) -> None:
"""清理已完成的作业"""
running_ids = await self.registry.get_running_task_ids()
for task_id in running_ids:
record = await self.registry.get(task_id)
if not record:
await self.registry.remove_running(task_id)
continue
if record.status in ("completed", "failed"):
await self.registry.remove_running(task_id)
logger.info(f"Task moved to finished: {task_id} ({record.status})")
async def run_forever(self, interval: float = 10.0, min_interval: float = 2.0) -> None:
"""启动无限 Tick 循环"""
logger.info("Async Engine started")
while True:
tick_start = asyncio.get_event_loop().time()
await self.tick()
elapsed = asyncio.get_event_loop().time() - tick_start
sleep_time = max(interval - elapsed, min_interval)
await asyncio.sleep(sleep_time)