Files
meijiaka-zy/python-api/scripts/generate_seed_materials.py
小鱼开发 50e8b7cda3 feat(seed): 更新素材 seed 数据(2771条,含木作阶段验收镜)
- 基于最新本地素材目录重新扫描(2771个MP4)
- 新增二级分类:木作阶段验收镜(2个文件)
- 全部 ffprobe 探测时长,无0值异常
- 生成 scripts/seed_materials.sql 入库脚本
- 保留 generate_seed_materials.py 供后续复用
2026-05-15 16:55:08 +08:00

225 lines
7.4 KiB
Python

#!/usr/bin/env python3
"""批量生成素材入库 SQL 的脚本"""
import os
import json
import subprocess
from pathlib import Path
from concurrent.futures import ProcessPoolExecutor, as_completed
from pypinyin import lazy_pinyin
MATERIALS_DIR = "/Users/0fun/Downloads/装修素材空镜库"
OUTPUT_SQL = Path("scripts/seed_materials.sql")
CACHE_FILE = Path("scripts/.seed_materials_cache.json")
CDN_PREFIX = "https://media.liche.cn/meijiaka-zy/materials"
LV1_PINYIN = {
"前期准备类": "zhunbei",
"拆改改造类": "chaigai",
"水电隐蔽类": "shuidian",
"泥瓦工艺类": "niwa",
"木工定制类": "mugong",
"油漆墙面类": "youqi",
"安装收尾类": "anzhuang",
"软装完工&验收类": "ruanzhuang",
"网红开篇": "wanghong",
}
LV2_PINYIN = {
"合同签署镜": "hetong",
"毛坯基础镜": "maopi",
"现场交底镜": "jiaodi",
"翻新基础镜": "fanxin",
"量房勘测镜": "liangfang",
"墙体拆除镜": "chaiqiang",
"工地清运镜": "qingyun",
"新建砌筑镜": "zhuqi",
"吊顶造型镜": "diaoding",
"柜体木作镜": "muti",
"隔音防潮镜": "gechao",
"水电验收镜": "yanshou",
"水路施工镜": "shuilu",
"电路施工镜": "dianlu",
"墙面基层镜": "jiceng",
"成品保护镜": "baohu",
"面漆涂刷镜": "mianqi",
"包管找平镜": "baoguan",
"瓷砖铺贴镜": "cizhuan",
"防水施工镜": "fangshui",
"主材安装镜": "zhucai",
"收尾细节镜": "shouwei",
"美缝开荒镜": "meifeng",
"全屋验收镜": "quanyan",
"软装进场镜": "ruanchang",
"恶搞开篇": "egao",
"施工翻车镜": "fanche",
"木作阶段验收镜": "mujiashouyan",
}
def initial_slug(text: str) -> str:
if "-" in text:
text = text.split("-")[0]
py = lazy_pinyin(text)
initials = [p[0] for p in py if p]
return "".join(initials).lower()
def get_category_slug(lv1: str, lv2: str, lv3: str) -> str:
return f"{LV1_PINYIN[lv1]}-{LV2_PINYIN[lv2]}-{initial_slug(lv3)}"
def probe_duration(filepath: str) -> float:
try:
result = subprocess.run(
[
"ffprobe",
"-i", filepath,
"-show_entries", "format=duration",
"-v", "quiet",
"-of", "csv=p=0",
],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode == 0 and result.stdout.strip():
return round(float(result.stdout.strip()), 2)
except Exception:
pass
try:
result = subprocess.run(
["ffprobe", "-i", filepath],
capture_output=True,
text=True,
timeout=10,
)
for line in result.stderr.splitlines():
if "Duration:" in line:
parts = line.split("Duration:")[1].split(",")[0].strip()
h, m, s = parts.split(":")
return round(float(h) * 3600 + float(m) * 60 + float(s), 2)
except Exception:
pass
return 0.0
def scan_files():
files = []
for lv1 in sorted(os.listdir(MATERIALS_DIR)):
lv1_path = os.path.join(MATERIALS_DIR, lv1)
if not os.path.isdir(lv1_path) or lv1.startswith("."):
continue
for lv2 in sorted(os.listdir(lv1_path)):
lv2_path = os.path.join(lv1_path, lv2)
if not os.path.isdir(lv2_path) or lv2.startswith("."):
continue
has_subdir = False
for lv3 in sorted(os.listdir(lv2_path)):
lv3_path = os.path.join(lv2_path, lv3)
if not os.path.isdir(lv3_path) or lv3.startswith("."):
continue
has_subdir = True
for mp4 in sorted(os.listdir(lv3_path)):
if not mp4.endswith(".mp4"):
continue
filepath = os.path.join(lv3_path, mp4)
files.append({
"lv1": lv1,
"lv2": lv2,
"lv3": lv3,
"filename": mp4,
"filepath": filepath,
"category_slug": get_category_slug(lv1, lv2, lv3),
"url": f"{CDN_PREFIX}/{mp4}",
})
if not has_subdir:
for mp4 in sorted(os.listdir(lv2_path)):
if not mp4.endswith(".mp4"):
continue
filepath = os.path.join(lv2_path, mp4)
files.append({
"lv1": lv1,
"lv2": lv2,
"lv3": lv2,
"filename": mp4,
"filepath": filepath,
"category_slug": get_category_slug(lv1, lv2, lv2),
"url": f"{CDN_PREFIX}/{mp4}",
})
return files
def generate_sql():
files = scan_files()
print(f"扫描到 {len(files)} 个 MP4 文件")
cache = {}
if CACHE_FILE.exists():
with open(CACHE_FILE, "r", encoding="utf-8") as f:
cache = json.load(f)
to_probe = [f for f in files if f["filepath"] not in cache]
cached_count = len(files) - len(to_probe)
print(f"待探测: {len(to_probe)} 个,已缓存: {cached_count}")
if to_probe:
completed = 0
with ProcessPoolExecutor(max_workers=8) as executor:
futures = {
executor.submit(probe_duration, f["filepath"]): f for f in to_probe
}
for future in as_completed(futures):
f = futures[future]
try:
duration = future.result()
except Exception:
duration = 0.0
cache[f["filepath"]] = duration
f["duration"] = duration
completed += 1
if completed % 100 == 0:
print(f" 进度: {completed}/{len(to_probe)}")
with open(CACHE_FILE, "w", encoding="utf-8") as f:
json.dump(cache, f, ensure_ascii=False, indent=2)
print("缓存已保存")
for f in files:
f["duration"] = cache.get(f["filepath"], 0.0)
zero_count = sum(1 for f in files if f["duration"] <= 0)
print(f"时长为0的文件: {zero_count}")
lines = [
"-- ========================================================",
"-- 空镜素材 Seed 数据",
"-- 表名: mjk_broll_materials",
f"-- 素材数: {len(files)}",
"-- ========================================================",
"",
"BEGIN;",
"",
]
for f in files:
escaped_filename = f["filename"].replace("'", "''")
escaped_url = f["url"].replace("'", "''")
lines.append(
f"INSERT INTO mjk_broll_materials (category_id, title, url, duration, usage_count, status, created_at, updated_at)\n"
f"SELECT id, '{escaped_filename}', '{escaped_url}', {f['duration']}, 0, 'active', NOW(), NOW()\n"
f"FROM mjk_broll_categories WHERE slug = '{f['category_slug']}' AND level = 3;"
)
lines.extend(["", "COMMIT;", ""])
with open(OUTPUT_SQL, "w", encoding="utf-8") as f:
f.write("\n".join(lines))
print(f"\n已生成: {OUTPUT_SQL}")
print(f"INSERT: {len(files)}")
if __name__ == "__main__":
generate_sql()