Files
meijiaka-zy/scripts/video-replace-mvp.py
T
小鱼开发 bc724810a6 feat: 视频创作流程全链路优化
- 后端: Vidu Provider、System API、Upload API、素材服务更新
- 前端: 字幕压制、视频生成、配音、本地存储、类型定义优化
- Rust: FFmpeg 命令、视频合成、语音命令、库注册更新
- Store: 项目状态、语音状态管理优化
- 新增: 对口型替换文档、健康检查器、字幕 API 模块、音频对齐工具
- 删除: 废弃的 polish 提示词模板
2026-04-26 21:24:42 +08:00

276 lines
8.3 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
视频片段替换 MVP
================
基于音频文字内容,用人物视频的对应片段替换空镜视频的对应片段。
前置依赖:
pip install openai-whisper
用法示例:
python scripts/video-replace-mvp.py \
--person person.mp4 \
--broll broll.mp4 \
--query "水电改造要注意"
原理:
1. Whisper 识别人物视频音频 → 输出带时间戳的文案
2. 文本匹配找到目标时间段 [start, end]
3. FFmpeg overlay 滤镜:在 [start, end] 区间用人物画面覆盖空镜画面
"""
from __future__ import annotations
import argparse
import json
import shutil
import subprocess
import sys
from difflib import SequenceMatcher
from pathlib import Path
def check_dep(name: str) -> str | None:
"""检查系统命令是否存在"""
path = shutil.which(name)
return path
def ensure_whisper():
"""确保 whisper 可用"""
try:
import whisper # noqa: F401
return True
except ImportError:
print("❌ 未安装 openai-whisper")
print(" 安装命令:pip install openai-whisper")
print(" (首次会自动下载模型,base 模型约 150MB)")
return False
def run_whisper(video_path: str, model: str = "base") -> list[dict]:
"""Whisper 识别,返回 segment 列表(含 start/end/text"""
import whisper
print(f" 加载模型:{model}")
model_obj = whisper.load_model(model)
print(f" 识别中...(模型:{model},视频:{Path(video_path).name}")
result = model_obj.transcribe(
video_path,
language="zh",
word_timestamps=False, # segment 级别够用了
fp16=False, # CPU 友好
)
return result["segments"]
def find_time_range(
segments: list[dict],
query: str,
threshold: float = 0.6,
) -> tuple[float, float, str] | None:
"""
根据查询文字匹配时间段
匹配策略(优先级递减):
1. 精确子串匹配
2. 模糊匹配(最长公共子序列相似度 ≥ threshold)
"""
query = query.strip()
# 1. 精确子串匹配
for seg in segments:
text = seg["text"].strip()
if query in text:
return seg["start"], seg["end"], text
# 2. 模糊匹配
best = None
best_score = 0.0
for seg in segments:
text = seg["text"].strip()
score = SequenceMatcher(None, query, text).ratio()
if score > best_score and score >= threshold:
best_score = score
best = seg
if best:
return best["start"], best["end"], best["text"].strip()
return None
def get_video_info(video_path: str) -> dict:
"""ffprobe 获取视频信息"""
cmd = [
"ffprobe", "-v", "error",
"-select_streams", "v:0",
"-show_entries", "stream=width,height,r_frame_rate,duration",
"-show_entries", "format=duration",
"-of", "json",
video_path,
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
data = json.loads(result.stdout)
stream = data.get("streams", [{}])[0]
fmt = data.get("format", {})
# 解析帧率(如 "25/1" → 25.0
fps_str = stream.get("r_frame_rate", "25/1")
if "/" in fps_str:
num, den = fps_str.split("/")
fps = float(num) / float(den)
else:
fps = float(fps_str)
return {
"width": stream.get("width", 1920),
"height": stream.get("height", 1080),
"fps": fps,
"duration": float(fmt.get("duration", stream.get("duration", 0))),
}
def replace_with_overlay(
person_video: str,
broll_video: str,
start: float,
end: float,
output: str,
crf: int = 18,
):
"""
用 FFmpeg overlay 滤镜替换片段
逻辑:
- 输入0 (broll):底图 + 音频
- 输入1 (person):被截取的画面片段
- [1:v] trim → 截取 [start, end] → setpts 归零 → scale 适配分辨率
- [0:v][clip] overlay → 在 between(t,start,end) 时显示 clip
- 输出:画面 = 替换后的视频,音频 = 原 broll 音频
"""
duration = end - start
broll_info = get_video_info(broll_video)
w, h = broll_info["width"], broll_info["height"]
print(f" 空镜分辨率:{w}x{h}, 帧率:{broll_info['fps']:.2f}fps")
print(f" 截取人物片段:{start:.3f}s ~ {end:.3f}s{duration:.3f}s")
print(f" 正在渲染...CRF={crf}")
# overlay filter
# 注意:between(t,start,end) 中的逗号需要转义
filter_graph = (
f"[1:v]trim=start={start}:end={end},"
f"setpts=PTS-STARTPTS,"
f"scale={w}:{h}:force_original_aspect_ratio=decrease,"
f"pad={w}:{h}:(ow-iw)/2:(oh-ih)/2:black[clip];"
f"[0:v][clip]overlay="
f"enable='between(t\\,{start}\\,{end})':"
f"x=(W-w)/2:y=(H-h)/2[v]"
)
cmd = [
"ffmpeg", "-y",
"-i", broll_video,
"-i", person_video,
"-filter_complex", filter_graph,
"-map", "[v]",
"-map", "0:a",
"-c:v", "libx264", "-crf", str(crf), "-preset", "fast",
"-c:a", "copy",
"-movflags", "+faststart",
output,
]
subprocess.run(cmd, check=True, capture_output=True)
print(f"✅ 输出完成:{output}")
def save_srt(segments: list[dict], path: str):
"""保存 SRT 字幕供人工校对"""
def fmt(s: float) -> str:
h = int(s // 3600)
m = int((s % 3600) // 60)
sec = int(s % 60)
ms = int((s % 1) * 1000)
return f"{h:02d}:{m:02d}:{sec:02d},{ms:03d}"
with open(path, "w", encoding="utf-8") as f:
for i, seg in enumerate(segments, 1):
f.write(f"{i}\n{fmt(seg['start'])} --> {fmt(seg['end'])}\n{seg['text'].strip()}\n\n")
def main():
parser = argparse.ArgumentParser(description="基于音频文字的视频片段替换 MVP")
parser.add_argument("--person", required=True, help="人物出镜视频路径(提供画面)")
parser.add_argument("--broll", required=True, help="空镜视频路径(提供底图+音频)")
parser.add_argument("--query", required=True, help="要替换的文案(如:水电改造要注意)")
parser.add_argument("--output", default="output_replaced.mp4", help="输出文件路径")
parser.add_argument("--model", default="base", choices=["tiny", "base", "small"],
help="Whisper 模型,tiny 最快,small 最准")
parser.add_argument("--crf", type=int, default=18, help="视频质量(0=无损,23=默认,越大越小)")
parser.add_argument("--threshold", type=float, default=0.6,
help="模糊匹配阈值(0~1),低于此值视为未匹配")
args = parser.parse_args()
# 0. 依赖检查
if not check_dep("ffmpeg"):
print("❌ 未找到 ffmpeg")
sys.exit(1)
if not check_dep("ffprobe"):
print("❌ 未找到 ffprobe")
sys.exit(1)
if not ensure_whisper():
sys.exit(1)
for p in (args.person, args.broll):
if not Path(p).exists():
print(f"❌ 文件不存在:{p}")
sys.exit(1)
# 1. ASR 识别人物视频
print(f"\n🎙️ Step 1/3:识别人物视频音频")
segments = run_whisper(args.person, args.model)
print(f" 识别到 {len(segments)} 句话")
# 保存字幕供参考
srt_path = str(Path(args.output).with_suffix(".srt"))
save_srt(segments, srt_path)
print(f"📝 字幕已保存:{srt_path}")
# 2. 文本匹配
print(f"\n🔍 Step 2/3:查找文案「{args.query}")
result = find_time_range(segments, args.query, threshold=args.threshold)
if not result:
print(f"❌ 未找到匹配文案(阈值 {args.threshold}")
print(f" 建议:查看 {srt_path} 里的实际文案,调整 --query 内容")
sys.exit(1)
start, end, matched_text = result
print(f" 匹配文案:「{matched_text}")
print(f" 时间段: {start:.3f}s ~ {end:.3f}s(时长 {end - start:.3f}s")
# 3. FFmpeg 替换
print(f"\n🎬 Step 3/3:替换片段")
replace_with_overlay(
args.person,
args.broll,
start,
end,
args.output,
crf=args.crf,
)
print(f"\n🎉 全部完成!")
print(f" 输出文件:{args.output}")
print(f" 字幕参考:{srt_path}")
if __name__ == "__main__":
main()