feat(cover-avatar): 封面形象功能

后端:
- 新增 POST /upload/image 图片上传(七牛云 image bucket)
- 新增 POST /image/remove-background AI 抠图(火山引擎 MediaKit)
- 提取 file_validation.py 共享模块

Rust:
- 新增 cover_avatar.rs 存储层(cover_avatars.json + 图片本地存储)
- 新增 4 个 IPC 命令:load/save/delete/save_image

前端:
- 新增 CoverAvatarLibrary 页面(内容管理 → 封面形象)
- 新增 coverAvatar API 模块和 coverAvatarStore
- 封面设计集成:背景图/封面形象弹窗选择 + Fabric.js 叠加
- 优化左侧布局:视觉素材横向卡片(9:16)+ 文案配置分组
This commit is contained in:
小鱼开发
2026-05-22 18:38:18 +08:00
parent c55c256dc7
commit f01f2c366a
29 changed files with 1925 additions and 248 deletions
+193
View File
@@ -0,0 +1,193 @@
"""
图片处理 API
============
提供图片上传(七牛云)和 AI 抠图(火山引擎 MediaKit)功能。
"""
import io
import logging
import uuid
from pathlib import Path
from fastapi import APIRouter, Depends, File, HTTPException, Request, UploadFile
from pydantic import BaseModel, Field
from app.api.deps import get_current_user
from app.config import get_settings
from app.models.user import User
from app.schemas.common import ApiResponse, success_response
from app.services.qiniu_service import get_qiniu_service
from app.services.volcengine_mediakit_service import VolcengineMediakitService
from app.utils.file_validation import check_upload_file
router = APIRouter(tags=["Image"])
logger = logging.getLogger(__name__)
settings = get_settings()
# ── Dependencies ──
async def get_mediakit_service(request: Request) -> VolcengineMediakitService:
"""FastAPI Depends:从 app.state 获取全局 VolcengineMediakitService 实例。"""
service = getattr(request.app.state, "volcengine_mediakit_service", None)
if service is None:
raise HTTPException(
status_code=503,
detail="MediaKit 服务未初始化,请检查配置",
)
return service
# ── Schemas ──
class ImageUploadResponse(BaseModel):
"""图片上传响应"""
url: str = Field(..., description="七牛云图片 URL")
key: str = Field(..., description="七牛云文件 key")
size: int = Field(..., description="文件大小(字节)")
class RemoveBackgroundResponse(BaseModel):
"""抠图响应"""
url: str = Field(..., description="抠图结果图片 URL")
class RemoveBackgroundRequest(BaseModel):
"""抠图请求"""
image_url: str = Field(..., description="原始图片 URL")
scene: str = Field(default="general", description="场景类型:general(通用)或 product(商品)")
# ── Endpoints ──
@router.post("/upload/image", response_model=ApiResponse[ImageUploadResponse])
async def upload_image(
file: UploadFile = File(..., description="图片文件"),
current_user: User = Depends(get_current_user),
) -> ApiResponse[ImageUploadResponse]:
"""
上传图片到七牛云
支持格式:jpg, jpeg, png, gif, webp
返回七牛云永久访问 URL。
"""
try:
allowed_types = {
"image/jpeg",
"image/jpg",
"image/png",
"image/gif",
"image/webp",
}
content_type = file.content_type or ""
# 如果 content_type 为空,尝试从文件名推断
if not content_type:
ext = Path(file.filename or "").suffix.lower()
ext_to_mime = {
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".png": "image/png",
".gif": "image/gif",
".webp": "image/webp",
}
content_type = ext_to_mime.get(ext, "")
if content_type not in allowed_types:
raise HTTPException(
status_code=400,
detail=f"不支持的图片格式: {content_type},请上传 jpg/png/gif/webp 图片",
)
# 读取文件内容
content = await file.read()
if not content:
raise HTTPException(status_code=400, detail="文件内容为空")
# 校验大小和魔数
check_upload_file(
content,
settings.UPLOAD_MAX_IMAGE_SIZE,
content_type,
"图片",
)
# 生成唯一文件名
ext = Path(file.filename or "image.jpg").suffix or ".jpg"
unique_name = f"{uuid.uuid4().hex[:16]}{ext}"
# 上传到七牛云
qiniu = get_qiniu_service()
bucket, domain = qiniu._get_bucket_and_domain("image")
file_key = qiniu.generate_key("image", unique_name)
stream = io.BytesIO(content)
result = await qiniu.upload_stream_async(
stream=stream,
key=file_key,
mime_type=content_type or "image/jpeg",
bucket=bucket,
domain=domain,
)
url = result.get("url")
returned_key = result.get("key")
if not url:
raise HTTPException(status_code=500, detail="上传到七牛云失败:未返回 URL")
logger.info(f"[Upload] 图片上传成功: {url[:80]}..., size={len(content)}")
return success_response(
data=ImageUploadResponse(
url=url,
key=returned_key or file_key,
size=len(content),
)
)
except HTTPException:
raise
except Exception as e:
logger.error(f"[Upload] 图片上传失败: {e}")
raise HTTPException(status_code=500, detail=f"上传失败: {e}")
@router.post("/image/remove-background", response_model=ApiResponse[RemoveBackgroundResponse])
async def remove_background(
req: RemoveBackgroundRequest,
current_user: User = Depends(get_current_user),
mediakit_service: VolcengineMediakitService = Depends(get_mediakit_service),
) -> ApiResponse[RemoveBackgroundResponse]:
"""
AI 抠图(火山引擎 MediaKit
移除图片背景,返回透明背景图片 URL。
"""
try:
result = await mediakit_service.remove_background(
image_url=req.image_url,
scene=req.scene,
)
if not result.image_url:
raise HTTPException(status_code=500, detail="抠图失败:未返回结果图片 URL")
logger.info(f"[RemoveBackground] 抠图成功: {result.image_url[:80]}...")
return success_response(
data=RemoveBackgroundResponse(url=result.image_url),
message="抠图成功",
)
except HTTPException:
raise
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"[RemoveBackground] 抠图失败: {e}")
raise HTTPException(status_code=500, detail=f"抠图失败: {e}")
+4
View File
@@ -10,6 +10,7 @@ from app.api.v1 import (
caption,
cover_background,
events,
image,
materials,
points,
script,
@@ -59,5 +60,8 @@ api_router.include_router(cover_background.router, tags=["Cover Background"])
# 积分系统模块
api_router.include_router(points.router, tags=["Points"])
# 图片处理模块(上传 + 抠图)
api_router.include_router(image.router, tags=["Image"])
# 应用更新模块
api_router.include_router(update.router, prefix="/update", tags=["Update"])
+3 -97
View File
@@ -18,6 +18,7 @@ from app.config import get_settings
from app.models.user import User
from app.schemas.common import ApiResponse, success_response
from app.services.qiniu_service import get_qiniu_service
from app.utils.file_validation import check_upload_file
router = APIRouter(prefix="/upload", tags=["Upload"])
@@ -25,101 +26,6 @@ logger = logging.getLogger(__name__)
settings = get_settings()
def _validate_file_magic(content: bytes, expected_content_type: str) -> bool:
"""通过文件头魔数校验文件真实类型,防止 MIME 伪造攻击。"""
if len(content) < 12:
return False
# 拒绝常见危险文件头
dangerous_signatures = [
(b"MZ", "Windows 可执行文件"), # .exe, .dll
(b"#!", "Shell 脚本"), # bash, python, etc
(b"PK\x03\x04", "ZIP 压缩包"), # .zip, .jar, .docx
(b"<?xml", "XML 文件"),
(b"<html", "HTML 文件"),
(b"<!DO", "HTML 文档"),
(b"%PDF", "PDF 文件"),
]
for sig, _ in dangerous_signatures:
if content.startswith(sig):
return False
if b"<script" in content[:512].lower():
return False
main_type = expected_content_type.split("/")[0]
# 图片校验
if main_type == "image":
if content.startswith(b"\xff\xd8\xff"):
return expected_content_type in ("image/jpeg", "image/jpg")
if content.startswith(b"\x89PNG\r\n\x1a\n"):
return expected_content_type == "image/png"
if content.startswith(b"GIF89a") or content.startswith(b"GIF87a"):
return expected_content_type == "image/gif"
if content.startswith(b"RIFF") and content[8:12] == b"WEBP":
return expected_content_type == "image/webp"
return False
# 视频校验
if main_type == "video":
# MP4 / MOV / M4V 等 ISO Base Media File Format
if content[4:8] == b"ftyp":
brand = content[8:12]
if brand in (b"qt ", b"qtw "):
return expected_content_type in ("video/quicktime",)
# mp4, isom, avc1, mp41, mp42 等
return expected_content_type in (
"video/mp4",
"video/quicktime",
)
if content.startswith(b"RIFF") and content[8:12] == b"AVI ":
return expected_content_type == "video/x-msvideo"
if content.startswith(b"\x1aE\xdf\xa3"):
return expected_content_type == "video/webm"
return False
# 音频校验
if main_type == "audio":
if content[:3] == b"ID3" or content[:2] in (
b"\xff\xfb",
b"\xff\xf3",
b"\xff\xf2",
):
return expected_content_type in ("audio/mpeg", "audio/mp3")
if content.startswith(b"RIFF") and content[8:12] == b"WAVE":
return expected_content_type in ("audio/wav", "audio/x-wav")
if content.startswith(b"fLaC"):
return expected_content_type == "audio/flac"
if content.startswith(b"OggS"):
return expected_content_type == "audio/ogg"
# AAC / M4A(也是 ftyp 格式)
if content[4:8] == b"ftyp":
brand = content[8:12]
if brand in (b"M4A ", b"m4a ", b"mp42", b"isom", b"M4P "):
return expected_content_type in (
"audio/mp4",
"audio/aac",
"audio/m4a",
)
return False
return False
def _check_upload_file(content: bytes, max_size: int, content_type: str, type_label: str) -> None:
"""统一校验文件大小和魔数,失败时直接抛 HTTPException。"""
if len(content) > max_size:
max_mb = max_size // 1024 // 1024
raise HTTPException(
status_code=413,
detail=f"{type_label}文件大小不能超过 {max_mb}MB",
)
if not _validate_file_magic(content, content_type):
raise HTTPException(
status_code=400,
detail=f"{type_label}文件内容与实际格式不符,可能存在安全风险",
)
class UploadResponse(BaseModel):
"""上传响应"""
@@ -173,7 +79,7 @@ async def upload_video(
raise HTTPException(status_code=400, detail="文件内容为空")
# 校验大小和魔数
_check_upload_file(
check_upload_file(
content,
settings.UPLOAD_MAX_VIDEO_SIZE,
content_type,
@@ -269,7 +175,7 @@ async def upload_audio(
raise HTTPException(status_code=400, detail="文件内容为空")
# 校验大小和魔数
_check_upload_file(
check_upload_file(
content,
settings.UPLOAD_MAX_AUDIO_SIZE,
content_type,
+105
View File
@@ -0,0 +1,105 @@
"""
文件校验工具
==========
提供文件头魔数校验和上传文件统一校验功能,
防止 MIME 伪造攻击和危险文件上传。
"""
from fastapi import HTTPException
def validate_file_magic(content: bytes, expected_content_type: str) -> bool:
"""通过文件头魔数校验文件真实类型,防止 MIME 伪造攻击。"""
if len(content) < 12:
return False
# 拒绝常见危险文件头
dangerous_signatures = [
(b"MZ", "Windows 可执行文件"), # .exe, .dll
(b"#!", "Shell 脚本"), # bash, python, etc
(b"PK\x03\x04", "ZIP 压缩包"), # .zip, .jar, .docx
(b"<?xml", "XML 文件"),
(b"<html", "HTML 文件"),
(b"<!DO", "HTML 文档"),
(b"%PDF", "PDF 文件"),
]
for sig, _ in dangerous_signatures:
if content.startswith(sig):
return False
if b"<script" in content[:512].lower():
return False
main_type = expected_content_type.split("/")[0]
# 图片校验
if main_type == "image":
if content.startswith(b"\xff\xd8\xff"):
return expected_content_type in ("image/jpeg", "image/jpg")
if content.startswith(b"\x89PNG\r\n\x1a\n"):
return expected_content_type == "image/png"
if content.startswith(b"GIF89a") or content.startswith(b"GIF87a"):
return expected_content_type == "image/gif"
if content.startswith(b"RIFF") and content[8:12] == b"WEBP":
return expected_content_type == "image/webp"
return False
# 视频校验
if main_type == "video":
# MP4 / MOV / M4V 等 ISO Base Media File Format
if content[4:8] == b"ftyp":
brand = content[8:12]
if brand in (b"qt ", b"qtw "):
return expected_content_type in ("video/quicktime",)
# mp4, isom, avc1, mp41, mp42 等
return expected_content_type in (
"video/mp4",
"video/quicktime",
)
if content.startswith(b"RIFF") and content[8:12] == b"AVI ":
return expected_content_type == "video/x-msvideo"
if content.startswith(b"\x1aE\xdf\xa3"):
return expected_content_type == "video/webm"
return False
# 音频校验
if main_type == "audio":
if content[:3] == b"ID3" or content[:2] in (
b"\xff\xfb",
b"\xff\xf3",
b"\xff\xf2",
):
return expected_content_type in ("audio/mpeg", "audio/mp3")
if content.startswith(b"RIFF") and content[8:12] == b"WAVE":
return expected_content_type in ("audio/wav", "audio/x-wav")
if content.startswith(b"fLaC"):
return expected_content_type == "audio/flac"
if content.startswith(b"OggS"):
return expected_content_type == "audio/ogg"
# AAC / M4A(也是 ftyp 格式)
if content[4:8] == b"ftyp":
brand = content[8:12]
if brand in (b"M4A ", b"m4a ", b"mp42", b"isom", b"M4P "):
return expected_content_type in (
"audio/mp4",
"audio/aac",
"audio/m4a",
)
return False
return False
def check_upload_file(content: bytes, max_size: int, content_type: str, type_label: str) -> None:
"""统一校验文件大小和魔数,失败时直接抛 HTTPException。"""
if len(content) > max_size:
max_mb = max_size // 1024 // 1024
raise HTTPException(
status_code=413,
detail=f"{type_label}文件大小不能超过 {max_mb}MB",
)
if not validate_file_magic(content, content_type):
raise HTTPException(
status_code=400,
detail=f"{type_label}文件内容与实际格式不符,可能存在安全风险",
)