7330fdd401
1. upload.py: /video /audio 端点添加 get_current_user 鉴权 2. caption.py: /ata/align 端点添加 get_current_user 鉴权 3. points.py: allow_negative 硬编码 False,禁止客户端控制欠费 4. slot_manager.py: TTL 1800s → 1200s,减少异常崩溃后的槽位泄漏时间 5. points.py: 顺手修复 ruff UP017(timezone.utc → UTC)
316 lines
10 KiB
Python
316 lines
10 KiB
Python
"""
|
|
文件上传 API
|
|
============
|
|
|
|
提供通用文件上传功能,直接上传到七牛云对象存储。
|
|
"""
|
|
|
|
import io
|
|
import logging
|
|
import uuid
|
|
from pathlib import Path
|
|
|
|
from fastapi import APIRouter, Depends, File, HTTPException, UploadFile
|
|
from pydantic import BaseModel, Field
|
|
|
|
from app.api.deps import get_current_user
|
|
from app.config import get_settings
|
|
from app.models.user import User
|
|
from app.schemas.common import ApiResponse, success_response
|
|
from app.services.qiniu_service import get_qiniu_service
|
|
|
|
router = APIRouter(prefix="/upload", tags=["Upload"])
|
|
|
|
logger = logging.getLogger(__name__)
|
|
settings = get_settings()
|
|
|
|
|
|
def _validate_file_magic(content: bytes, expected_content_type: str) -> bool:
|
|
"""通过文件头魔数校验文件真实类型,防止 MIME 伪造攻击。"""
|
|
if len(content) < 12:
|
|
return False
|
|
|
|
# 拒绝常见危险文件头
|
|
dangerous_signatures = [
|
|
(b"MZ", "Windows 可执行文件"), # .exe, .dll
|
|
(b"#!", "Shell 脚本"), # bash, python, etc
|
|
(b"PK\x03\x04", "ZIP 压缩包"), # .zip, .jar, .docx
|
|
(b"<?xml", "XML 文件"),
|
|
(b"<html", "HTML 文件"),
|
|
(b"<!DO", "HTML 文档"),
|
|
(b"%PDF", "PDF 文件"),
|
|
]
|
|
for sig, _ in dangerous_signatures:
|
|
if content.startswith(sig):
|
|
return False
|
|
if b"<script" in content[:512].lower():
|
|
return False
|
|
|
|
main_type = expected_content_type.split("/")[0]
|
|
|
|
# 图片校验
|
|
if main_type == "image":
|
|
if content.startswith(b"\xff\xd8\xff"):
|
|
return expected_content_type in ("image/jpeg", "image/jpg")
|
|
if content.startswith(b"\x89PNG\r\n\x1a\n"):
|
|
return expected_content_type == "image/png"
|
|
if content.startswith(b"GIF89a") or content.startswith(b"GIF87a"):
|
|
return expected_content_type == "image/gif"
|
|
if content.startswith(b"RIFF") and content[8:12] == b"WEBP":
|
|
return expected_content_type == "image/webp"
|
|
return False
|
|
|
|
# 视频校验
|
|
if main_type == "video":
|
|
# MP4 / MOV / M4V 等 ISO Base Media File Format
|
|
if content[4:8] == b"ftyp":
|
|
brand = content[8:12]
|
|
if brand in (b"qt ", b"qtw "):
|
|
return expected_content_type in ("video/quicktime",)
|
|
# mp4, isom, avc1, mp41, mp42 等
|
|
return expected_content_type in (
|
|
"video/mp4",
|
|
"video/quicktime",
|
|
)
|
|
if content.startswith(b"RIFF") and content[8:12] == b"AVI ":
|
|
return expected_content_type == "video/x-msvideo"
|
|
if content.startswith(b"\x1aE\xdf\xa3"):
|
|
return expected_content_type == "video/webm"
|
|
return False
|
|
|
|
# 音频校验
|
|
if main_type == "audio":
|
|
if content[:3] == b"ID3" or content[:2] in (
|
|
b"\xff\xfb",
|
|
b"\xff\xf3",
|
|
b"\xff\xf2",
|
|
):
|
|
return expected_content_type in ("audio/mpeg", "audio/mp3")
|
|
if content.startswith(b"RIFF") and content[8:12] == b"WAVE":
|
|
return expected_content_type in ("audio/wav", "audio/x-wav")
|
|
if content.startswith(b"fLaC"):
|
|
return expected_content_type == "audio/flac"
|
|
if content.startswith(b"OggS"):
|
|
return expected_content_type == "audio/ogg"
|
|
# AAC / M4A(也是 ftyp 格式)
|
|
if content[4:8] == b"ftyp":
|
|
brand = content[8:12]
|
|
if brand in (b"M4A ", b"m4a ", b"mp42", b"isom", b"M4P "):
|
|
return expected_content_type in (
|
|
"audio/mp4",
|
|
"audio/aac",
|
|
"audio/m4a",
|
|
)
|
|
return False
|
|
|
|
return False
|
|
|
|
|
|
def _check_upload_file(content: bytes, max_size: int, content_type: str, type_label: str) -> None:
|
|
"""统一校验文件大小和魔数,失败时直接抛 HTTPException。"""
|
|
if len(content) > max_size:
|
|
max_mb = max_size // 1024 // 1024
|
|
raise HTTPException(
|
|
status_code=413,
|
|
detail=f"{type_label}文件大小不能超过 {max_mb}MB",
|
|
)
|
|
if not _validate_file_magic(content, content_type):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"{type_label}文件内容与实际格式不符,可能存在安全风险",
|
|
)
|
|
|
|
|
|
class UploadResponse(BaseModel):
|
|
"""上传响应"""
|
|
|
|
url: str = Field(..., description="七牛云文件 URL")
|
|
key: str = Field(..., description="七牛云文件 key")
|
|
size: int = Field(..., description="文件大小(字节)")
|
|
|
|
|
|
@router.post("/video", response_model=ApiResponse[UploadResponse])
|
|
async def upload_video(
|
|
file: UploadFile = File(..., description="视频文件"),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""
|
|
上传视频到七牛云
|
|
|
|
支持格式:mp4, mov, avi, webm
|
|
返回七牛云永久访问 URL。
|
|
"""
|
|
try:
|
|
# 验证文件格式
|
|
allowed_types = {
|
|
"video/mp4",
|
|
"video/quicktime",
|
|
"video/x-msvideo",
|
|
"video/webm",
|
|
}
|
|
content_type = file.content_type or ""
|
|
|
|
# 如果 content_type 为空,尝试从文件名推断
|
|
if not content_type:
|
|
ext = Path(file.filename or "").suffix.lower()
|
|
ext_to_mime = {
|
|
".mp4": "video/mp4",
|
|
".mov": "video/quicktime",
|
|
".avi": "video/x-msvideo",
|
|
".webm": "video/webm",
|
|
}
|
|
content_type = ext_to_mime.get(ext, "")
|
|
|
|
if content_type not in allowed_types:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"不支持的文件格式: {content_type},请上传 mp4/mov/avi/webm 视频",
|
|
)
|
|
|
|
# 读取文件内容
|
|
content = await file.read()
|
|
if not content:
|
|
raise HTTPException(status_code=400, detail="文件内容为空")
|
|
|
|
# 校验大小和魔数
|
|
_check_upload_file(
|
|
content,
|
|
settings.UPLOAD_MAX_VIDEO_SIZE,
|
|
content_type,
|
|
"视频",
|
|
)
|
|
|
|
# 生成唯一文件名
|
|
ext = Path(file.filename or "video.mp4").suffix or ".mp4"
|
|
unique_name = f"{uuid.uuid4().hex[:16]}{ext}"
|
|
|
|
# 上传到七牛云
|
|
qiniu = get_qiniu_service()
|
|
bucket, domain = qiniu._get_bucket_and_domain("video")
|
|
key = qiniu.generate_key("video", unique_name)
|
|
stream = io.BytesIO(content)
|
|
result = await qiniu.upload_stream_async(
|
|
stream=stream,
|
|
key=key,
|
|
mime_type=content_type or "video/mp4",
|
|
bucket=bucket,
|
|
domain=domain,
|
|
)
|
|
|
|
url = result.get("url")
|
|
key = result.get("key")
|
|
|
|
if not url:
|
|
raise HTTPException(status_code=500, detail="上传到七牛云失败:未返回 URL")
|
|
|
|
logger.info(f"[Upload] 视频上传成功: {url[:80]}..., size={len(content)}")
|
|
|
|
return success_response(
|
|
data=UploadResponse(
|
|
url=url,
|
|
key=key or unique_name,
|
|
size=len(content),
|
|
)
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"[Upload] 视频上传失败: {e}")
|
|
raise HTTPException(status_code=500, detail=f"上传失败: {e}")
|
|
|
|
|
|
|
|
|
|
@router.post("/audio", response_model=ApiResponse[UploadResponse])
|
|
async def upload_audio(
|
|
file: UploadFile = File(..., description="音频文件"),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""
|
|
上传音频到七牛云
|
|
|
|
支持格式:mp3, wav, aac, m4a, ogg, flac
|
|
"""
|
|
try:
|
|
allowed_types = {
|
|
"audio/mpeg",
|
|
"audio/mp3",
|
|
"audio/wav",
|
|
"audio/x-wav",
|
|
"audio/aac",
|
|
"audio/mp4",
|
|
"audio/ogg",
|
|
"audio/flac",
|
|
"audio/x-flac",
|
|
}
|
|
content_type = file.content_type or ""
|
|
|
|
if not content_type:
|
|
ext = Path(file.filename or "").suffix.lower()
|
|
ext_to_mime = {
|
|
".mp3": "audio/mpeg",
|
|
".wav": "audio/wav",
|
|
".aac": "audio/aac",
|
|
".m4a": "audio/mp4",
|
|
".ogg": "audio/ogg",
|
|
".flac": "audio/flac",
|
|
}
|
|
content_type = ext_to_mime.get(ext, "")
|
|
|
|
if content_type not in allowed_types:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"不支持的音频格式: {content_type},请上传 mp3/wav/aac/m4a/ogg/flac",
|
|
)
|
|
|
|
content = await file.read()
|
|
if not content:
|
|
raise HTTPException(status_code=400, detail="文件内容为空")
|
|
|
|
# 校验大小和魔数
|
|
_check_upload_file(
|
|
content,
|
|
settings.UPLOAD_MAX_AUDIO_SIZE,
|
|
content_type,
|
|
"音频",
|
|
)
|
|
|
|
ext = Path(file.filename or "audio.mp3").suffix or ".mp3"
|
|
unique_name = f"{uuid.uuid4().hex[:16]}{ext}"
|
|
|
|
qiniu = get_qiniu_service()
|
|
# 复用视频 bucket(或根据配置使用音频 bucket)
|
|
bucket, domain = qiniu._get_bucket_and_domain("video")
|
|
key = qiniu.generate_key("audio", unique_name)
|
|
stream = io.BytesIO(content)
|
|
result = await qiniu.upload_stream_async(
|
|
stream=stream,
|
|
key=key,
|
|
mime_type=content_type or "audio/mpeg",
|
|
bucket=bucket,
|
|
domain=domain,
|
|
)
|
|
|
|
url = result.get("url")
|
|
key = result.get("key")
|
|
|
|
if not url:
|
|
raise HTTPException(status_code=500, detail="上传到七牛云失败:未返回 URL")
|
|
|
|
logger.info(f"[Upload] 音频上传成功: {url[:80]}..., size={len(content)}")
|
|
|
|
return success_response(
|
|
data=UploadResponse(
|
|
url=url,
|
|
key=key or unique_name,
|
|
size=len(content),
|
|
)
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"[Upload] 音频上传失败: {e}")
|
|
raise HTTPException(status_code=500, detail=f"上传失败: {e}")
|