Files
meijiaka-zy/scripts/import_viral_opening.py
T

207 lines
6.9 KiB
Python

#!/usr/bin/env python3
"""
网红开篇素材批量入库脚本
============================
功能:
1. 遍历指定目录下的 .mp4 视频
2. 按规则 md5(父目录名_原文件名) 生成新文件名
3. 用 ffprobe 提取视频时长
4. 生成七牛云上传命令 + 数据库 INSERT SQL
用法:
cd /Users/0fun/work/meijiaka-zy
python scripts/import_viral_opening.py \
--src "/Users/0fun/Desktop/网红开篇" \
--bucket "meijiaka-zy" \
--prefix "materials" \
--domain "https://media.liche.cn"
输出:
- scripts/viral_opening_upload.sh # 七牛云批量上传命令
- scripts/viral_opening_insert.sql # 数据库 INSERT 语句
"""
import argparse
import hashlib
import json
import os
import subprocess
import sys
from pathlib import Path
# 目录名 → 三级分类 slug 映射(与 seed_categories.sql 对应)
FOLDER_TO_SLUG = {
"暴力拆除-恶搞开篇": "wanghong-egao-blcc",
"搞笑涂料施工-恶搞开篇": "wanghong-egao-gxtlsg",
"工地恶搞-恶搞开篇": "wanghong-egao-gdeg",
"贴砖恶搞-恶搞开篇": "wanghong-egao-tzeg",
"吸睛画面-恶搞开篇": "wanghong-egao-xjhm",
"炫技-恶搞开篇": "wanghong-egao-xj",
"防水翻车漏水-施工翻车镜": "wanghong-fanche-fsfcls",
}
def get_video_duration(filepath: str) -> float | None:
"""用 ffmpeg -i 提取视频时长(秒),保留 2 位小数"""
try:
result = subprocess.run(
["ffmpeg", "-i", filepath],
capture_output=True,
text=True,
timeout=30,
)
# ffmpeg 把信息输出到 stderr,解析 Duration: 00:00:04.25
import re
match = re.search(r"Duration:\s+(\d+):(\d+):(\d+\.\d+)", result.stderr)
if match:
hours, minutes, seconds = match.groups()
total = float(hours) * 3600 + float(minutes) * 60 + float(seconds)
return round(total, 2)
except Exception as e:
print(f" ⚠️ 读取时长失败: {e}")
return None
def md5_filename(parent_name: str, original_name: str) -> str:
"""
生成新文件名:md5(父目录名_原文件名).mp4
示例:
父目录名: 暴力拆除-恶搞开篇
原文件名: 5月16日(13).mp4
拼接: 暴力拆除-恶搞开篇_5月16日(13).mp4
md5: a3f7b2c8... (32位十六进制)
结果: a3f7b2c8....mp4
"""
raw = f"{parent_name}_{original_name}"
md5_hex = hashlib.md5(raw.encode("utf-8")).hexdigest()
return f"{md5_hex}.mp4"
def scan_videos(src_dir: str) -> list[dict]:
"""扫描目录,返回视频信息列表"""
videos = []
src_path = Path(src_dir)
for mp4_file in sorted(src_path.rglob("*.mp4")):
# 跳过 macOS 系统文件
if mp4_file.name.startswith("."):
continue
parent_folder = mp4_file.parent.name
original_name = mp4_file.name
# 检查分类映射
slug = FOLDER_TO_SLUG.get(parent_folder)
if not slug:
print(f"⚠️ 未找到分类映射: {parent_folder}/{original_name},跳过")
continue
new_filename = md5_filename(parent_folder, original_name)
print(f"📹 处理: {parent_folder}/{original_name}{new_filename}")
duration = get_video_duration(str(mp4_file))
if duration is None:
print(f" ❌ 无法读取时长,跳过")
continue
videos.append({
"original_path": str(mp4_file),
"parent_folder": parent_folder,
"original_name": original_name,
"new_filename": new_filename,
"slug": slug,
"duration": duration,
})
return videos
def generate_outputs(videos: list[dict], bucket: str, prefix: str, domain: str) -> None:
"""生成上传脚本和入库 SQL"""
script_dir = Path(__file__).parent
# 1. 生成上传脚本
upload_script = script_dir / "viral_opening_upload.sh"
with open(upload_script, "w", encoding="utf-8") as f:
f.write("#!/bin/bash\n# 网红开篇素材批量上传脚本\n\n")
for v in videos:
cdn_url = f"{domain}/{bucket}/{prefix}/{v['new_filename']}"
f.write(
f"# {v['parent_folder']}/{v['original_name']} ({v['duration']}s)\n"
f"# qshell put {bucket} {prefix}/{v['new_filename']} "
f"'{v['original_path']}'\n"
f"# 或: qshell fput {bucket} {prefix}/{v['new_filename']} "
f"'{v['original_path']}'\n\n"
)
os.chmod(upload_script, 0o755)
# 2. 生成入库 SQL
sql_file = script_dir / "viral_opening_insert.sql"
with open(sql_file, "w", encoding="utf-8") as f:
f.write("-- 网红开篇素材入库 SQL\n")
f.write("-- 共 {} 个视频\n\n".format(len(videos)))
f.write("BEGIN;\n\n")
for v in videos:
cdn_url = f"{domain}/{bucket}/{prefix}/{v['new_filename']}"
f.write(
"INSERT INTO mjk_broll_materials "
"(category_id, title, url, duration, usage_count, status, created_at, updated_at)\n"
"SELECT id, '{}', '{}', {}, 0, 'active', NOW(), NOW()\n"
"FROM mjk_broll_categories WHERE slug = '{}' AND level = 3;\n".format(
v["new_filename"],
cdn_url,
v["duration"],
v["slug"],
)
)
f.write(
"-- 来源: {} | 时长: {}s | 分类: {}\n\n".format(
v["parent_folder"],
v["duration"],
v["slug"],
)
)
f.write("COMMIT;\n")
# 3. 生成映射 JSON(方便核对)
mapping_file = script_dir / "viral_opening_mapping.json"
with open(mapping_file, "w", encoding="utf-8") as f:
json.dump(videos, f, ensure_ascii=False, indent=2)
print(f"\n✅ 生成完成:")
print(f" - 上传脚本: {upload_script}")
print(f" - 入库 SQL: {sql_file}")
print(f" - 映射 JSON: {mapping_file}")
def main():
parser = argparse.ArgumentParser(description="网红开篇素材批量入库")
parser.add_argument("--src", default="/Users/0fun/Desktop/网红开篇", help="素材源目录")
parser.add_argument("--bucket", default="meijiaka-zy", help="七牛云 bucket")
parser.add_argument("--prefix", default="materials", help="七牛云路径前缀")
parser.add_argument("--domain", default="https://media.liche.cn", help="CDN 域名")
args = parser.parse_args()
if not Path(args.src).exists():
print(f"❌ 目录不存在: {args.src}")
sys.exit(1)
print(f"🔍 扫描目录: {args.src}\n")
videos = scan_videos(args.src)
if not videos:
print("❌ 未找到可处理的视频")
sys.exit(1)
print(f"\n📊 共找到 {len(videos)} 个视频")
generate_outputs(videos, args.bucket, args.prefix, args.domain)
if __name__ == "__main__":
main()