Files
meijiaka-zy/python-api/scripts/seed_materials.py
T
小鱼开发 91e5cdefbb feat: 空镜素材分类&数据入库
- 重命名素材表 mjk_* -> broll_*,与模型命名保持一致
- 新增 182 个三级场景分类 seed 数据
- 新增 2495 条素材 INSERT SQL(含 ffprobe 时长探测)
- 新增 Alembic 迁移: rename mjk_categories/materials/tags to broll_*
2026-05-15 15:41:23 +08:00

236 lines
7.7 KiB
Python

#!/usr/bin/env python3
"""
空镜素材批量入库脚本
====================
扫描本地素材目录,用 ffprobe 探测时长,生成 INSERT SQL。
用法:
cd python-api
python scripts/seed_materials.py
输出:
scripts/seed_materials.sql
"""
import json
import os
import subprocess
from collections import OrderedDict
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
from pypinyin import lazy_pinyin
# ========== 配置 ==========
# 素材根目录
MATERIALS_DIR = "/Users/0fun/Downloads/装修素材空镜库"
# 七牛云 CDN 前缀
CDN_PREFIX = "https://media.liche.cn/meijiaka-zy/materials"
# 输出 SQL 文件
OUTPUT_SQL = Path(__file__).parent / "seed_materials.sql"
# 中间结果缓存(避免重复探测)
CACHE_FILE = Path(__file__).parent / ".seed_materials_cache.json"
# 并发进程数
MAX_WORKERS = 8
# 分类拼音映射(和 seed_categories.sql 保持一致)
LV1_PINYIN = {
'前期准备类': 'zhunbei',
'拆改改造类': 'chaigai',
'水电隐蔽类': 'shuidian',
'泥瓦工艺类': 'niwa',
'木工定制类': 'mugong',
'油漆墙面类': 'youqi',
'安装收尾类': 'anzhuang',
'软装完工&验收类': 'ruanzhuang',
'网红开篇': 'wanghong',
}
LV2_PINYIN = {
'合同签署镜': 'hetong', '毛坯基础镜': 'maopi', '现场交底镜': 'jiaodi',
'翻新基础镜': 'fanxin', '量房勘测镜': 'liangfang', '墙体拆除镜': 'chaiqiang',
'工地清运镜': 'qingyun', '新建砌筑镜': 'zhuqi', '吊顶造型镜': 'diaoding',
'柜体木作镜': 'muti', '隔音防潮镜': 'gechao', '水电验收镜': 'yanshou',
'水路施工镜': 'shuilu', '电路施工镜': 'dianlu', '墙面基层镜': 'jiceng',
'成品保护镜': 'baohu', '面漆涂刷镜': 'mianqi', '包管找平镜': 'baoguan',
'瓷砖铺贴镜': 'cizhuan', '防水施工镜': 'fangshui', '主材安装镜': 'zhucai',
'收尾细节镜': 'shouwei', '美缝开荒镜': 'meifeng', '全屋验收镜': 'quanyan',
'软装进场镜': 'ruanchang', '恶搞开篇': 'egao', '施工翻车镜': 'fanche',
}
def initial_slug(text: str) -> str:
"""生成场景描述的首字母缩写"""
if '-' in text:
text = text.split('-')[0]
py = lazy_pinyin(text)
initials = [p[0] for p in py if p]
return ''.join(initials).lower()
def get_category_slug(lv1: str, lv2: str, lv3: str) -> str:
"""根据分类生成三级 slug"""
return f"{LV1_PINYIN[lv1]}-{LV2_PINYIN[lv2]}-{initial_slug(lv3)}"
def probe_duration(filepath: str) -> float:
"""用 ffprobe 探测视频时长(秒)"""
try:
result = subprocess.run(
["ffprobe", "-i", filepath, "-show_entries", "format=duration", "-v", "quiet", "-of", "csv=p=0"],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode == 0 and result.stdout.strip():
return round(float(result.stdout.strip()), 2)
except Exception:
pass
# fallback: 用 grep 解析
try:
result = subprocess.run(
["ffprobe", "-i", filepath],
capture_output=True,
text=True,
timeout=10,
)
for line in result.stderr.splitlines():
if "Duration:" in line:
# Duration: 00:00:08.03, start: ...
parts = line.split("Duration:")[1].split(",")[0].strip()
h, m, s = parts.split(":")
return round(float(h) * 3600 + float(m) * 60 + float(s), 2)
except Exception:
pass
return 0.0
def scan_files() -> list[dict]:
"""扫描素材目录,收集文件信息"""
files = []
base = Path(MATERIALS_DIR)
for lv1 in sorted(os.listdir(base)):
lv1_path = base / lv1
if not lv1_path.is_dir() or lv1.startswith("."):
continue
for lv2 in sorted(os.listdir(lv1_path)):
lv2_path = lv1_path / lv2
if not lv2_path.is_dir() or lv2.startswith("."):
continue
for lv3 in sorted(os.listdir(lv2_path)):
lv3_path = lv2_path / lv3
if not lv3_path.is_dir() or lv3.startswith("."):
continue
for mp4 in sorted(os.listdir(lv3_path)):
if not mp4.endswith(".mp4"):
continue
filepath = lv3_path / mp4
files.append({
"lv1": lv1,
"lv2": lv2,
"lv3": lv3,
"filename": mp4,
"filepath": str(filepath),
"category_slug": get_category_slug(lv1, lv2, lv3),
"url": f"{CDN_PREFIX}/{mp4}",
})
return files
def main():
print("=" * 60)
print("空镜素材批量入库脚本")
print("=" * 60)
# 1. 扫描文件
print("\n[1/4] 扫描素材目录...")
files = scan_files()
print(f" 发现 {len(files)} 个 MP4 文件")
# 2. 加载缓存
cache = {}
if CACHE_FILE.exists():
with open(CACHE_FILE, "r", encoding="utf-8") as f:
cache = json.load(f)
print(f" 加载缓存: {len(cache)}")
# 3. 探测时长(多进程)
print(f"\n[2/4] 探测视频时长({MAX_WORKERS} 进程)...")
to_probe = [f for f in files if f["filepath"] not in cache]
print(f" 待探测: {len(to_probe)} 个,已缓存: {len(cache)}")
if to_probe:
completed = 0
with ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = {executor.submit(probe_duration, f["filepath"]): f for f in to_probe}
for future in as_completed(futures):
f = futures[future]
try:
duration = future.result()
except Exception:
duration = 0.0
cache[f["filepath"]] = duration
f["duration"] = duration
completed += 1
if completed % 100 == 0:
print(f" 进度: {completed}/{len(to_probe)}")
# 保存缓存
with open(CACHE_FILE, "w", encoding="utf-8") as f:
json.dump(cache, f, ensure_ascii=False, indent=2)
print(f" 缓存已保存: {CACHE_FILE}")
# 补全已缓存的时长
for f in files:
f["duration"] = cache.get(f["filepath"], 0.0)
# 4. 生成 SQL
print(f"\n[3/4] 生成 INSERT SQL...")
zero_duration = sum(1 for f in files if f["duration"] <= 0)
print(f" 时长为 0 的文件: {zero_duration}")
sql_lines = [
"-- ========================================================",
"-- 空镜素材 Seed 数据",
f"-- 生成时间: 2026-05-14",
f"-- 素材数: {len(files)}",
"-- ========================================================",
"",
"BEGIN;",
"",
]
for f in files:
sql_lines.append(
f"INSERT INTO broll_materials (category_id, title, url, duration, usage_count, status, created_at, updated_at)"
f"\nSELECT id, '{f['filename']}', '{f['url']}', {f['duration']}, 0, 'active', NOW(), NOW()"
f"\nFROM broll_categories WHERE slug = '{f['category_slug']}' AND level = 3;"
)
sql_lines.extend([
"",
"COMMIT;",
"",
])
# 5. 写入文件
print(f"\n[4/4] 写入文件...")
with open(OUTPUT_SQL, "w", encoding="utf-8") as f:
f.write("\n".join(sql_lines))
print(f" 输出: {OUTPUT_SQL}")
print(f" 总行数: {len(sql_lines)}")
print(f" INSERT 语句: {len(files)}")
print(f"\n{'=' * 60}")
print("完成!")
print(f"{'=' * 60}")
if __name__ == "__main__":
main()