123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- from moviepy import *
- import time
- import numpy as np
- import os
- import imageio
- from concurrent.futures import ThreadPoolExecutor
- import threading
- def parallel_fast_extractor(video_path, interval_sec, output_dir, max_workers=4):
- """
- 多线程时间片处理算法
- 特点:
- 1. 每个线程处理独立时间区间
- 2. 智能时间片重叠补偿(解决关键帧对齐问题)
- 3. 线程本地缓存机制
- """
- os.makedirs(output_dir, exist_ok=True)
-
- # 预计算总时长和关键时间点
- with VideoFileClip(video_path) as clip:
- duration = clip.duration
- timestamps = np.arange(0, duration, interval_sec)
-
- # 根据CPU核心数分片(增加10%重叠区)
- num_workers = min(max_workers, os.cpu_count())
- chunk_size = len(timestamps) // num_workers + 1
- overlap = int(chunk_size * 0.1)
- chunks = []
- for i in range(0, len(timestamps), chunk_size - overlap):
- chunk = timestamps[i:i + chunk_size]
- if len(chunk) > overlap:
- chunks.append(chunk)
- else:
- break
-
- # 线程安全的文件名生成器
- counter = threading.local()
- def get_filename(base_idx):
- if not hasattr(counter, 'value'):
- counter.value = base_idx * 10000 # 每个线程分配独立区间
- filename = os.path.join(output_dir, f"frame_{counter.value:08d}.jpg")
- counter.value += 1
- return filename
- # 视频帧重命名
- def rename_jpg_files(output_dir):
- # 获取所有的 jpg 文件
- jpg_files = [f for f in os.listdir(output_dir) if f.endswith('.jpg')]
-
- # 按照文件名排序
- jpg_files.sort()
-
- for index, filename in enumerate(jpg_files, start=1):
- # 生成新的文件名
- new_filename = f"frame_{index:03d}.jpg"
- # 构造完整的文件路径
- old_file_path = os.path.join(output_dir, filename)
- new_file_path = os.path.join(output_dir, new_filename)
- # 重命名文件
- os.rename(old_file_path, new_file_path)
-
- def process_chunk(chunk, worker_id):
- # 每个线程独立打开视频流
- with VideoFileClip(video_path) as clip:
- for idx, t in enumerate(chunk):
- try:
- frame = clip.get_frame(t)
- # 使用线程本地内存缓存
- if not hasattr(threading.current_thread(), 'frame_buffer'):
- threading.current_thread().frame_buffer = []
- threading.current_thread().frame_buffer.append(frame)
-
- # 批量保存(每10帧)
- if len(threading.current_thread().frame_buffer) >= 10:
- for f in threading.current_thread().frame_buffer:
- imageio.imwrite(get_filename(worker_id), f)
- threading.current_thread().frame_buffer.clear()
- except Exception as e:
- print(f"Worker {worker_id} error at {t:.2f}s: {str(e)}")
-
- # 保存剩余帧
- if hasattr(threading.current_thread(), 'frame_buffer'):
- for f in threading.current_thread().frame_buffer:
- imageio.imwrite(get_filename(worker_id), f)
- threading.current_thread().frame_buffer.clear()
- # 启动线程池
- with ThreadPoolExecutor(max_workers=num_workers) as executor:
- futures = []
- for i, chunk in enumerate(chunks):
- futures.append(executor.submit(process_chunk, chunk.tolist(), i))
-
- # 等待所有任务完成
- for future in futures:
- future.result()
-
- # 重命名视频帧
- rename_jpg_files(output_dir)
- if __name__ == "__main__":
- start = time.time()
- print("start!")
- video_path = "/data/data/luosy/project/oral/data/raw_video/test_video.flv"
- output_dir = "/data/data/luosy/project/oral/data/key_frame"
- parallel_fast_extractor(video_path, 600, output_dir)
- print(f"cost {time.time() - start} secs")
|