Files

49 lines
1.2 KiB
Python

"""
Scheduler 内部数据模型
"""
import json
from dataclasses import dataclass, field
from typing import Any
from app.schemas.enums import TaskStatus
@dataclass
class TaskRecord:
"""调度器内部使用的作业记录"""
task_id: str
task_type: str
user_id: str
project_id: str = ""
status: TaskStatus = field(default=TaskStatus.PENDING)
progress: int = 0
message: str = "等待执行..."
completed: int = 0
total: int = 0
result: dict[str, Any] = field(default_factory=dict)
error: str | None = None
error_code: str | None = None
params: dict[str, Any] | Any = field(default_factory=dict)
created_at: str = ""
@dataclass(frozen=True)
class StateChange:
"""状态变更记录,供 Registry 批量写入"""
task_id: str
field_path: str # 如 "shots.0.status" 或 "status"
value: Any
def to_redis_command(self) -> tuple[str, str, str]:
key = f"task:{self.task_id}"
if isinstance(self.value, dict | list):
value = json.dumps(self.value, ensure_ascii=False)
elif self.value is None:
value = ""
else:
value = str(self.value)
return key, self.field_path, value