Files
meijiaka-zy/python-api/app/api/v1/system.py
T
小鱼开发 0c921aca11 chore: 清理废弃代码和文档
- 删除 anytocopy 相关文件(service、handler、文档)
- 删除 KlingAI / MiniMax 开发文档
- 删除 database-design、mvp-lip-sync-replacement 等过时文档
- 删除旧的 docker-compose.yml(已拆分为 dev/test/prod)
- 删除 config/ai_models.yaml(已合并到 platform-config.yaml)
- 从 .env.example 移除 anytocopy 配置
- 从 tasks.py、schemas 移除 copy 任务类型
2026-05-04 16:06:25 +08:00

98 lines
3.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
系统模块 API
============
"""
from fastapi import APIRouter, status
from fastapi.responses import JSONResponse
from app.core.health_checker import check_database, check_redis
from app.core.runtime_config import get_runtime_config
from app.schemas.common import ApiResponse, success_response
router = APIRouter()
@router.get("/health", response_model=ApiResponse[dict])
async def system_health():
"""系统健康检查(详细版)"""
db_ok, db_msg = await check_database()
redis_ok, redis_msg = await check_redis()
services = {
"api": "up",
"database": "connected" if db_ok else db_msg,
"redis": "connected" if redis_ok else redis_msg,
}
if not db_ok:
return JSONResponse(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
content={
"code": status.HTTP_503_SERVICE_UNAVAILABLE,
"message": "数据库连接异常",
"data": {"status": "unhealthy", "services": services},
},
)
if not redis_ok:
return success_response(
message="Redis 连接异常,服务降级",
data={"status": "degraded", "services": services},
)
return success_response(
message="系统运行正常",
data={"status": "healthy", "services": services},
)
@router.get("/version", response_model=ApiResponse[dict])
async def system_version():
"""获取系统版本信息"""
from app.config import get_settings
settings = get_settings()
return success_response(
data={
"name": settings.APP_NAME,
"version": settings.APP_VERSION,
"environment": settings.ENV,
},
message="获取版本成功",
)
# ═══════════════════════════════════════════════════════════════
# 运行时配置管理(Admin
# ═══════════════════════════════════════════════════════════════
@router.get("/admin/runtime-config", response_model=ApiResponse[dict])
async def get_runtime_config_api():
"""获取当前运行时配置"""
runtime = get_runtime_config()
return success_response(
data={
"config": runtime.get_raw(),
"version": runtime.version,
"config_path": str(runtime.config_path),
},
message="获取运行时配置成功",
)
@router.post("/admin/runtime-config/reload", response_model=ApiResponse[dict])
async def reload_runtime_config_api():
"""手动触发运行时配置重载"""
runtime = get_runtime_config()
success = await runtime.force_reload()
return success_response(
data={
"reloaded": success,
"version": runtime.version,
},
message="配置重载成功" if success else "配置重载失败",
)