f01f2c366a
后端: - 新增 POST /upload/image 图片上传(七牛云 image bucket) - 新增 POST /image/remove-background AI 抠图(火山引擎 MediaKit) - 提取 file_validation.py 共享模块 Rust: - 新增 cover_avatar.rs 存储层(cover_avatars.json + 图片本地存储) - 新增 4 个 IPC 命令:load/save/delete/save_image 前端: - 新增 CoverAvatarLibrary 页面(内容管理 → 封面形象) - 新增 coverAvatar API 模块和 coverAvatarStore - 封面设计集成:背景图/封面形象弹窗选择 + Fabric.js 叠加 - 优化左侧布局:视觉素材横向卡片(9:16)+ 文案配置分组
106 lines
3.8 KiB
Python
106 lines
3.8 KiB
Python
"""
|
|
文件校验工具
|
|
==========
|
|
|
|
提供文件头魔数校验和上传文件统一校验功能,
|
|
防止 MIME 伪造攻击和危险文件上传。
|
|
"""
|
|
|
|
from fastapi import HTTPException
|
|
|
|
|
|
def validate_file_magic(content: bytes, expected_content_type: str) -> bool:
|
|
"""通过文件头魔数校验文件真实类型,防止 MIME 伪造攻击。"""
|
|
if len(content) < 12:
|
|
return False
|
|
|
|
# 拒绝常见危险文件头
|
|
dangerous_signatures = [
|
|
(b"MZ", "Windows 可执行文件"), # .exe, .dll
|
|
(b"#!", "Shell 脚本"), # bash, python, etc
|
|
(b"PK\x03\x04", "ZIP 压缩包"), # .zip, .jar, .docx
|
|
(b"<?xml", "XML 文件"),
|
|
(b"<html", "HTML 文件"),
|
|
(b"<!DO", "HTML 文档"),
|
|
(b"%PDF", "PDF 文件"),
|
|
]
|
|
for sig, _ in dangerous_signatures:
|
|
if content.startswith(sig):
|
|
return False
|
|
if b"<script" in content[:512].lower():
|
|
return False
|
|
|
|
main_type = expected_content_type.split("/")[0]
|
|
|
|
# 图片校验
|
|
if main_type == "image":
|
|
if content.startswith(b"\xff\xd8\xff"):
|
|
return expected_content_type in ("image/jpeg", "image/jpg")
|
|
if content.startswith(b"\x89PNG\r\n\x1a\n"):
|
|
return expected_content_type == "image/png"
|
|
if content.startswith(b"GIF89a") or content.startswith(b"GIF87a"):
|
|
return expected_content_type == "image/gif"
|
|
if content.startswith(b"RIFF") and content[8:12] == b"WEBP":
|
|
return expected_content_type == "image/webp"
|
|
return False
|
|
|
|
# 视频校验
|
|
if main_type == "video":
|
|
# MP4 / MOV / M4V 等 ISO Base Media File Format
|
|
if content[4:8] == b"ftyp":
|
|
brand = content[8:12]
|
|
if brand in (b"qt ", b"qtw "):
|
|
return expected_content_type in ("video/quicktime",)
|
|
# mp4, isom, avc1, mp41, mp42 等
|
|
return expected_content_type in (
|
|
"video/mp4",
|
|
"video/quicktime",
|
|
)
|
|
if content.startswith(b"RIFF") and content[8:12] == b"AVI ":
|
|
return expected_content_type == "video/x-msvideo"
|
|
if content.startswith(b"\x1aE\xdf\xa3"):
|
|
return expected_content_type == "video/webm"
|
|
return False
|
|
|
|
# 音频校验
|
|
if main_type == "audio":
|
|
if content[:3] == b"ID3" or content[:2] in (
|
|
b"\xff\xfb",
|
|
b"\xff\xf3",
|
|
b"\xff\xf2",
|
|
):
|
|
return expected_content_type in ("audio/mpeg", "audio/mp3")
|
|
if content.startswith(b"RIFF") and content[8:12] == b"WAVE":
|
|
return expected_content_type in ("audio/wav", "audio/x-wav")
|
|
if content.startswith(b"fLaC"):
|
|
return expected_content_type == "audio/flac"
|
|
if content.startswith(b"OggS"):
|
|
return expected_content_type == "audio/ogg"
|
|
# AAC / M4A(也是 ftyp 格式)
|
|
if content[4:8] == b"ftyp":
|
|
brand = content[8:12]
|
|
if brand in (b"M4A ", b"m4a ", b"mp42", b"isom", b"M4P "):
|
|
return expected_content_type in (
|
|
"audio/mp4",
|
|
"audio/aac",
|
|
"audio/m4a",
|
|
)
|
|
return False
|
|
|
|
return False
|
|
|
|
|
|
def check_upload_file(content: bytes, max_size: int, content_type: str, type_label: str) -> None:
|
|
"""统一校验文件大小和魔数,失败时直接抛 HTTPException。"""
|
|
if len(content) > max_size:
|
|
max_mb = max_size // 1024 // 1024
|
|
raise HTTPException(
|
|
status_code=413,
|
|
detail=f"{type_label}文件大小不能超过 {max_mb}MB",
|
|
)
|
|
if not validate_file_magic(content, content_type):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"{type_label}文件内容与实际格式不符,可能存在安全风险",
|
|
)
|