50e8b7cda3
- 基于最新本地素材目录重新扫描(2771个MP4) - 新增二级分类:木作阶段验收镜(2个文件) - 全部 ffprobe 探测时长,无0值异常 - 生成 scripts/seed_materials.sql 入库脚本 - 保留 generate_seed_materials.py 供后续复用
225 lines
7.4 KiB
Python
225 lines
7.4 KiB
Python
#!/usr/bin/env python3
|
|
"""批量生成素材入库 SQL 的脚本"""
|
|
|
|
import os
|
|
import json
|
|
import subprocess
|
|
from pathlib import Path
|
|
from concurrent.futures import ProcessPoolExecutor, as_completed
|
|
from pypinyin import lazy_pinyin
|
|
|
|
MATERIALS_DIR = "/Users/0fun/Downloads/装修素材空镜库"
|
|
OUTPUT_SQL = Path("scripts/seed_materials.sql")
|
|
CACHE_FILE = Path("scripts/.seed_materials_cache.json")
|
|
CDN_PREFIX = "https://media.liche.cn/meijiaka-zy/materials"
|
|
|
|
LV1_PINYIN = {
|
|
"前期准备类": "zhunbei",
|
|
"拆改改造类": "chaigai",
|
|
"水电隐蔽类": "shuidian",
|
|
"泥瓦工艺类": "niwa",
|
|
"木工定制类": "mugong",
|
|
"油漆墙面类": "youqi",
|
|
"安装收尾类": "anzhuang",
|
|
"软装完工&验收类": "ruanzhuang",
|
|
"网红开篇": "wanghong",
|
|
}
|
|
|
|
LV2_PINYIN = {
|
|
"合同签署镜": "hetong",
|
|
"毛坯基础镜": "maopi",
|
|
"现场交底镜": "jiaodi",
|
|
"翻新基础镜": "fanxin",
|
|
"量房勘测镜": "liangfang",
|
|
"墙体拆除镜": "chaiqiang",
|
|
"工地清运镜": "qingyun",
|
|
"新建砌筑镜": "zhuqi",
|
|
"吊顶造型镜": "diaoding",
|
|
"柜体木作镜": "muti",
|
|
"隔音防潮镜": "gechao",
|
|
"水电验收镜": "yanshou",
|
|
"水路施工镜": "shuilu",
|
|
"电路施工镜": "dianlu",
|
|
"墙面基层镜": "jiceng",
|
|
"成品保护镜": "baohu",
|
|
"面漆涂刷镜": "mianqi",
|
|
"包管找平镜": "baoguan",
|
|
"瓷砖铺贴镜": "cizhuan",
|
|
"防水施工镜": "fangshui",
|
|
"主材安装镜": "zhucai",
|
|
"收尾细节镜": "shouwei",
|
|
"美缝开荒镜": "meifeng",
|
|
"全屋验收镜": "quanyan",
|
|
"软装进场镜": "ruanchang",
|
|
"恶搞开篇": "egao",
|
|
"施工翻车镜": "fanche",
|
|
"木作阶段验收镜": "mujiashouyan",
|
|
}
|
|
|
|
|
|
def initial_slug(text: str) -> str:
|
|
if "-" in text:
|
|
text = text.split("-")[0]
|
|
py = lazy_pinyin(text)
|
|
initials = [p[0] for p in py if p]
|
|
return "".join(initials).lower()
|
|
|
|
|
|
def get_category_slug(lv1: str, lv2: str, lv3: str) -> str:
|
|
return f"{LV1_PINYIN[lv1]}-{LV2_PINYIN[lv2]}-{initial_slug(lv3)}"
|
|
|
|
|
|
def probe_duration(filepath: str) -> float:
|
|
try:
|
|
result = subprocess.run(
|
|
[
|
|
"ffprobe",
|
|
"-i", filepath,
|
|
"-show_entries", "format=duration",
|
|
"-v", "quiet",
|
|
"-of", "csv=p=0",
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
)
|
|
if result.returncode == 0 and result.stdout.strip():
|
|
return round(float(result.stdout.strip()), 2)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
result = subprocess.run(
|
|
["ffprobe", "-i", filepath],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
)
|
|
for line in result.stderr.splitlines():
|
|
if "Duration:" in line:
|
|
parts = line.split("Duration:")[1].split(",")[0].strip()
|
|
h, m, s = parts.split(":")
|
|
return round(float(h) * 3600 + float(m) * 60 + float(s), 2)
|
|
except Exception:
|
|
pass
|
|
return 0.0
|
|
|
|
|
|
def scan_files():
|
|
files = []
|
|
for lv1 in sorted(os.listdir(MATERIALS_DIR)):
|
|
lv1_path = os.path.join(MATERIALS_DIR, lv1)
|
|
if not os.path.isdir(lv1_path) or lv1.startswith("."):
|
|
continue
|
|
for lv2 in sorted(os.listdir(lv1_path)):
|
|
lv2_path = os.path.join(lv1_path, lv2)
|
|
if not os.path.isdir(lv2_path) or lv2.startswith("."):
|
|
continue
|
|
has_subdir = False
|
|
for lv3 in sorted(os.listdir(lv2_path)):
|
|
lv3_path = os.path.join(lv2_path, lv3)
|
|
if not os.path.isdir(lv3_path) or lv3.startswith("."):
|
|
continue
|
|
has_subdir = True
|
|
for mp4 in sorted(os.listdir(lv3_path)):
|
|
if not mp4.endswith(".mp4"):
|
|
continue
|
|
filepath = os.path.join(lv3_path, mp4)
|
|
files.append({
|
|
"lv1": lv1,
|
|
"lv2": lv2,
|
|
"lv3": lv3,
|
|
"filename": mp4,
|
|
"filepath": filepath,
|
|
"category_slug": get_category_slug(lv1, lv2, lv3),
|
|
"url": f"{CDN_PREFIX}/{mp4}",
|
|
})
|
|
if not has_subdir:
|
|
for mp4 in sorted(os.listdir(lv2_path)):
|
|
if not mp4.endswith(".mp4"):
|
|
continue
|
|
filepath = os.path.join(lv2_path, mp4)
|
|
files.append({
|
|
"lv1": lv1,
|
|
"lv2": lv2,
|
|
"lv3": lv2,
|
|
"filename": mp4,
|
|
"filepath": filepath,
|
|
"category_slug": get_category_slug(lv1, lv2, lv2),
|
|
"url": f"{CDN_PREFIX}/{mp4}",
|
|
})
|
|
return files
|
|
|
|
|
|
def generate_sql():
|
|
files = scan_files()
|
|
print(f"扫描到 {len(files)} 个 MP4 文件")
|
|
|
|
cache = {}
|
|
if CACHE_FILE.exists():
|
|
with open(CACHE_FILE, "r", encoding="utf-8") as f:
|
|
cache = json.load(f)
|
|
|
|
to_probe = [f for f in files if f["filepath"] not in cache]
|
|
cached_count = len(files) - len(to_probe)
|
|
print(f"待探测: {len(to_probe)} 个,已缓存: {cached_count} 个")
|
|
|
|
if to_probe:
|
|
completed = 0
|
|
with ProcessPoolExecutor(max_workers=8) as executor:
|
|
futures = {
|
|
executor.submit(probe_duration, f["filepath"]): f for f in to_probe
|
|
}
|
|
for future in as_completed(futures):
|
|
f = futures[future]
|
|
try:
|
|
duration = future.result()
|
|
except Exception:
|
|
duration = 0.0
|
|
cache[f["filepath"]] = duration
|
|
f["duration"] = duration
|
|
completed += 1
|
|
if completed % 100 == 0:
|
|
print(f" 进度: {completed}/{len(to_probe)}")
|
|
|
|
with open(CACHE_FILE, "w", encoding="utf-8") as f:
|
|
json.dump(cache, f, ensure_ascii=False, indent=2)
|
|
print("缓存已保存")
|
|
|
|
for f in files:
|
|
f["duration"] = cache.get(f["filepath"], 0.0)
|
|
|
|
zero_count = sum(1 for f in files if f["duration"] <= 0)
|
|
print(f"时长为0的文件: {zero_count} 个")
|
|
|
|
lines = [
|
|
"-- ========================================================",
|
|
"-- 空镜素材 Seed 数据",
|
|
"-- 表名: mjk_broll_materials",
|
|
f"-- 素材数: {len(files)} 个",
|
|
"-- ========================================================",
|
|
"",
|
|
"BEGIN;",
|
|
"",
|
|
]
|
|
|
|
for f in files:
|
|
escaped_filename = f["filename"].replace("'", "''")
|
|
escaped_url = f["url"].replace("'", "''")
|
|
lines.append(
|
|
f"INSERT INTO mjk_broll_materials (category_id, title, url, duration, usage_count, status, created_at, updated_at)\n"
|
|
f"SELECT id, '{escaped_filename}', '{escaped_url}', {f['duration']}, 0, 'active', NOW(), NOW()\n"
|
|
f"FROM mjk_broll_categories WHERE slug = '{f['category_slug']}' AND level = 3;"
|
|
)
|
|
|
|
lines.extend(["", "COMMIT;", ""])
|
|
|
|
with open(OUTPUT_SQL, "w", encoding="utf-8") as f:
|
|
f.write("\n".join(lines))
|
|
|
|
print(f"\n已生成: {OUTPUT_SQL}")
|
|
print(f"INSERT: {len(files)} 条")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
generate_sql()
|