#!/usr/bin/env python3 """批量生成素材入库 SQL 的脚本""" import os import json import subprocess from pathlib import Path from concurrent.futures import ProcessPoolExecutor, as_completed from pypinyin import lazy_pinyin MATERIALS_DIR = "/Users/0fun/Downloads/装修素材空镜库" OUTPUT_SQL = Path("scripts/seed_materials.sql") CACHE_FILE = Path("scripts/.seed_materials_cache.json") CDN_PREFIX = "https://media.liche.cn/meijiaka-zy/materials" LV1_PINYIN = { "前期准备类": "zhunbei", "拆改改造类": "chaigai", "水电隐蔽类": "shuidian", "泥瓦工艺类": "niwa", "木工定制类": "mugong", "油漆墙面类": "youqi", "安装收尾类": "anzhuang", "软装完工&验收类": "ruanzhuang", "网红开篇": "wanghong", } LV2_PINYIN = { "合同签署镜": "hetong", "毛坯基础镜": "maopi", "现场交底镜": "jiaodi", "翻新基础镜": "fanxin", "量房勘测镜": "liangfang", "墙体拆除镜": "chaiqiang", "工地清运镜": "qingyun", "新建砌筑镜": "zhuqi", "吊顶造型镜": "diaoding", "柜体木作镜": "muti", "隔音防潮镜": "gechao", "水电验收镜": "yanshou", "水路施工镜": "shuilu", "电路施工镜": "dianlu", "墙面基层镜": "jiceng", "成品保护镜": "baohu", "面漆涂刷镜": "mianqi", "包管找平镜": "baoguan", "瓷砖铺贴镜": "cizhuan", "防水施工镜": "fangshui", "主材安装镜": "zhucai", "收尾细节镜": "shouwei", "美缝开荒镜": "meifeng", "全屋验收镜": "quanyan", "软装进场镜": "ruanchang", "恶搞开篇": "egao", "施工翻车镜": "fanche", "木作阶段验收镜": "mujiashouyan", } def initial_slug(text: str) -> str: if "-" in text: text = text.split("-")[0] py = lazy_pinyin(text) initials = [p[0] for p in py if p] return "".join(initials).lower() def get_category_slug(lv1: str, lv2: str, lv3: str) -> str: return f"{LV1_PINYIN[lv1]}-{LV2_PINYIN[lv2]}-{initial_slug(lv3)}" def probe_duration(filepath: str) -> float: try: result = subprocess.run( [ "ffprobe", "-i", filepath, "-show_entries", "format=duration", "-v", "quiet", "-of", "csv=p=0", ], capture_output=True, text=True, timeout=10, ) if result.returncode == 0 and result.stdout.strip(): return round(float(result.stdout.strip()), 2) except Exception: pass try: result = subprocess.run( ["ffprobe", "-i", filepath], capture_output=True, text=True, timeout=10, ) for line in result.stderr.splitlines(): if "Duration:" in line: parts = line.split("Duration:")[1].split(",")[0].strip() h, m, s = parts.split(":") return round(float(h) * 3600 + float(m) * 60 + float(s), 2) except Exception: pass return 0.0 def scan_files(): files = [] for lv1 in sorted(os.listdir(MATERIALS_DIR)): lv1_path = os.path.join(MATERIALS_DIR, lv1) if not os.path.isdir(lv1_path) or lv1.startswith("."): continue for lv2 in sorted(os.listdir(lv1_path)): lv2_path = os.path.join(lv1_path, lv2) if not os.path.isdir(lv2_path) or lv2.startswith("."): continue has_subdir = False for lv3 in sorted(os.listdir(lv2_path)): lv3_path = os.path.join(lv2_path, lv3) if not os.path.isdir(lv3_path) or lv3.startswith("."): continue has_subdir = True for mp4 in sorted(os.listdir(lv3_path)): if not mp4.endswith(".mp4"): continue filepath = os.path.join(lv3_path, mp4) files.append({ "lv1": lv1, "lv2": lv2, "lv3": lv3, "filename": mp4, "filepath": filepath, "category_slug": get_category_slug(lv1, lv2, lv3), "url": f"{CDN_PREFIX}/{mp4}", }) if not has_subdir: for mp4 in sorted(os.listdir(lv2_path)): if not mp4.endswith(".mp4"): continue filepath = os.path.join(lv2_path, mp4) files.append({ "lv1": lv1, "lv2": lv2, "lv3": lv2, "filename": mp4, "filepath": filepath, "category_slug": get_category_slug(lv1, lv2, lv2), "url": f"{CDN_PREFIX}/{mp4}", }) return files def generate_sql(): files = scan_files() print(f"扫描到 {len(files)} 个 MP4 文件") cache = {} if CACHE_FILE.exists(): with open(CACHE_FILE, "r", encoding="utf-8") as f: cache = json.load(f) to_probe = [f for f in files if f["filepath"] not in cache] cached_count = len(files) - len(to_probe) print(f"待探测: {len(to_probe)} 个,已缓存: {cached_count} 个") if to_probe: completed = 0 with ProcessPoolExecutor(max_workers=8) as executor: futures = { executor.submit(probe_duration, f["filepath"]): f for f in to_probe } for future in as_completed(futures): f = futures[future] try: duration = future.result() except Exception: duration = 0.0 cache[f["filepath"]] = duration f["duration"] = duration completed += 1 if completed % 100 == 0: print(f" 进度: {completed}/{len(to_probe)}") with open(CACHE_FILE, "w", encoding="utf-8") as f: json.dump(cache, f, ensure_ascii=False, indent=2) print("缓存已保存") for f in files: f["duration"] = cache.get(f["filepath"], 0.0) zero_count = sum(1 for f in files if f["duration"] <= 0) print(f"时长为0的文件: {zero_count} 个") lines = [ "-- ========================================================", "-- 空镜素材 Seed 数据", "-- 表名: mjk_broll_materials", f"-- 素材数: {len(files)} 个", "-- ========================================================", "", "BEGIN;", "", ] for f in files: escaped_filename = f["filename"].replace("'", "''") escaped_url = f["url"].replace("'", "''") lines.append( f"INSERT INTO mjk_broll_materials (category_id, title, url, duration, usage_count, status, created_at, updated_at)\n" f"SELECT id, '{escaped_filename}', '{escaped_url}', {f['duration']}, 0, 'active', NOW(), NOW()\n" f"FROM mjk_broll_categories WHERE slug = '{f['category_slug']}' AND level = 3;" ) lines.extend(["", "COMMIT;", ""]) with open(OUTPUT_SQL, "w", encoding="utf-8") as f: f.write("\n".join(lines)) print(f"\n已生成: {OUTPUT_SQL}") print(f"INSERT: {len(files)} 条") if __name__ == "__main__": generate_sql()