Files
小鱼开发 b597d715c8 fix: 认证流程修复 + alembic 迁移补全 + 前端僵尸代码清理
后端:
- 修复 get_current_user 未校验 is_active,被封禁用户仍可用旧 Token
- auth.py 捕获 ValueError 转 HTTPException(验证码错误、账号被封、Token 无效等不再返回 500)
- 修正 SMS 每日上限注释(3次 → 10次)
- 修复迁移脚本外键引用错误:users.id → mjk_users.id
- 新建积分系统 4 张表的迁移(mjk_user_points/batches/transactions/recharge_orders)
- pyproject.toml 补充 alembic + psycopg2-binary 依赖
- ruff 格式修复(import 排序等)

前端:
- 修复 doRefreshToken 成功后不持久化新 Token 的严重 bug
- 修复应用重启后 SSE 不自动重连(收不到踢人通知)
- 修复 App.tsx handleLogout 未 await
- client.ts 统一从 utils/env 导入 isTauri,默认 base URL 兜底 localhost:8000
- 清理 ~20 个未使用的 hooks/utils/api 模块/组件导出
- 修复所有 ESLint 警告(206 → 0)和 TSC 错误
- 测试通过(5/5)

其他:
- 更新 requirements.lock 和 uv.lock
2026-05-08 11:10:48 +08:00

103 lines
2.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
SSE 事件推送 API
================
用于单设备登录的实时踢人通知。
前端通过 EventSource 建立长连接,服务端通过该连接主动推送消息。
"""
from __future__ import annotations
import asyncio
from fastapi import APIRouter, Depends, HTTPException, Query, status
from fastapi.responses import StreamingResponse
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.security import verify_access_token
from app.crud.user import user as user_crud
from app.db.session import get_db
from app.services.auth_service import register_sse_connection, unregister_sse_connection
router = APIRouter()
# SSE 心跳间隔(秒)
HEARTBEAT_INTERVAL = 30
@router.get("")
async def sse_events(
token: str = Query(..., description="Access Token"),
db: AsyncSession = Depends(get_db),
):
"""
SSE 事件流端点
前端通过 EventSource 连接此端点,建立后:
- 每 30 秒收到一次心跳({"type": "ping"}
- 账号在其他设备登录时收到踢人消息({"type": "kick"}
连接断开时自动清理。
"""
# 手动验证 Access TokenSSE 通过 query 传 token,无法使用 Authorization Header
payload = verify_access_token(token)
if not payload or not payload.get("sub"):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token 无效或已过期",
)
user_id = payload["sub"]
# 验证用户存在且状态正常
user = await user_crud.get(db, id=user_id)
if user is None or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户不存在或已被封禁",
)
queue: asyncio.Queue[str] = asyncio.Queue(maxsize=10)
# 注册 SSE 连接
register_sse_connection(user_id, queue)
async def event_generator():
try:
# 发送连接成功事件
yield 'data: {"type": "connected"}\n\n'
while True:
# 等待消息或心跳超时
try:
message = await asyncio.wait_for(
queue.get(),
timeout=HEARTBEAT_INTERVAL,
)
yield f"data: {message}\n\n"
# 如果是 kick 消息,发送后关闭连接
if '"type": "kick"' in message:
break
except TimeoutError:
# 心跳
yield 'data: {"type": "ping"}\n\n'
except asyncio.CancelledError:
# 客户端断开连接
pass
finally:
unregister_sse_connection(user_id)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # 禁用 Nginx 缓冲
},
)