Files
meijiaka-zy/python-api/app/utils/file_validation.py
T

106 lines
3.8 KiB
Python

"""
文件校验工具
==========
提供文件头魔数校验和上传文件统一校验功能,
防止 MIME 伪造攻击和危险文件上传。
"""
from fastapi import HTTPException
def validate_file_magic(content: bytes, expected_content_type: str) -> bool:
"""通过文件头魔数校验文件真实类型,防止 MIME 伪造攻击。"""
if len(content) < 12:
return False
# 拒绝常见危险文件头
dangerous_signatures = [
(b"MZ", "Windows 可执行文件"), # .exe, .dll
(b"#!", "Shell 脚本"), # bash, python, etc
(b"PK\x03\x04", "ZIP 压缩包"), # .zip, .jar, .docx
(b"<?xml", "XML 文件"),
(b"<html", "HTML 文件"),
(b"<!DO", "HTML 文档"),
(b"%PDF", "PDF 文件"),
]
for sig, _ in dangerous_signatures:
if content.startswith(sig):
return False
if b"<script" in content[:512].lower():
return False
main_type = expected_content_type.split("/")[0]
# 图片校验
if main_type == "image":
if content.startswith(b"\xff\xd8\xff"):
return expected_content_type in ("image/jpeg", "image/jpg")
if content.startswith(b"\x89PNG\r\n\x1a\n"):
return expected_content_type == "image/png"
if content.startswith(b"GIF89a") or content.startswith(b"GIF87a"):
return expected_content_type == "image/gif"
if content.startswith(b"RIFF") and content[8:12] == b"WEBP":
return expected_content_type == "image/webp"
return False
# 视频校验
if main_type == "video":
# MP4 / MOV / M4V 等 ISO Base Media File Format
if content[4:8] == b"ftyp":
brand = content[8:12]
if brand in (b"qt ", b"qtw "):
return expected_content_type in ("video/quicktime",)
# mp4, isom, avc1, mp41, mp42 等
return expected_content_type in (
"video/mp4",
"video/quicktime",
)
if content.startswith(b"RIFF") and content[8:12] == b"AVI ":
return expected_content_type == "video/x-msvideo"
if content.startswith(b"\x1aE\xdf\xa3"):
return expected_content_type == "video/webm"
return False
# 音频校验
if main_type == "audio":
if content[:3] == b"ID3" or content[:2] in (
b"\xff\xfb",
b"\xff\xf3",
b"\xff\xf2",
):
return expected_content_type in ("audio/mpeg", "audio/mp3")
if content.startswith(b"RIFF") and content[8:12] == b"WAVE":
return expected_content_type in ("audio/wav", "audio/x-wav")
if content.startswith(b"fLaC"):
return expected_content_type == "audio/flac"
if content.startswith(b"OggS"):
return expected_content_type == "audio/ogg"
# AAC / M4A(也是 ftyp 格式)
if content[4:8] == b"ftyp":
brand = content[8:12]
if brand in (b"M4A ", b"m4a ", b"mp42", b"isom", b"M4P "):
return expected_content_type in (
"audio/mp4",
"audio/aac",
"audio/m4a",
)
return False
return False
def check_upload_file(content: bytes, max_size: int, content_type: str, type_label: str) -> None:
"""统一校验文件大小和魔数,失败时直接抛 HTTPException。"""
if len(content) > max_size:
max_mb = max_size // 1024 // 1024
raise HTTPException(
status_code=413,
detail=f"{type_label}文件大小不能超过 {max_mb}MB",
)
if not validate_file_magic(content, content_type):
raise HTTPException(
status_code=400,
detail=f"{type_label}文件内容与实际格式不符,可能存在安全风险",
)