from moviepy import * from moviepy.editor import VideoFileClip import time import numpy as np import os import imageio from concurrent.futures import ThreadPoolExecutor import threading from tqdm import tqdm from .logger_config import setup_logger logger = setup_logger(__name__) def frame_extractor(video_path, interval_sec=60, output_dir="./data/key_frame", max_workers=4): """ 多线程时间片处理算法 特点: 1. 每个线程处理独立时间区间 2. 智能时间片重叠补偿(解决关键帧对齐问题) 3. 线程本地缓存机制 """ logger.info(f"开始抽帧") os.makedirs(output_dir, exist_ok=True) video_path = str(video_path) # 预计算总时长和关键时间点 with VideoFileClip(video_path) as clip: duration = clip.duration timestamps = np.arange(0, duration, interval_sec) # 根据CPU核心数分片(增加10%重叠区) num_workers = min(max_workers, os.cpu_count()) chunk_size = len(timestamps) // num_workers + 1 overlap = int(chunk_size * 0.1) chunks = [] for i in range(0, len(timestamps), chunk_size - overlap): chunk = timestamps[i:i + chunk_size] if len(chunk) > overlap: chunks.append(chunk) else: break # 线程安全的文件名生成器 counter = threading.local() def get_filename(base_idx): if not hasattr(counter, 'value'): counter.value = base_idx * 10000 # 每个线程分配独立区间 filename = os.path.join(output_dir, f"frame_{counter.value:08d}.jpg") counter.value += 1 return filename # 视频帧重命名 def rename_jpg_files(output_dir): # 获取所有的 jpg 文件 print(f'重命名:{output_dir}') jpg_files = [f for f in os.listdir(output_dir) if f.endswith('.jpg')] # 按照文件名排序 jpg_files.sort() for index, filename in enumerate(jpg_files, start=1): # 生成新的文件名 new_index = (index - 1) * interval_sec new_filename = f"frame_{new_index:05d}.jpg" # 构造完整的文件路径 old_file_path = os.path.join(output_dir, filename) new_file_path = os.path.join(output_dir, new_filename) # 重命名文件 os.rename(old_file_path, new_file_path) def process_chunk(chunk, worker_id): # 每个线程独立打开视频流 with VideoFileClip(video_path) as clip: for idx, t in enumerate(chunk): try: frame = clip.get_frame(t) # 使用线程本地内存缓存 if not hasattr(threading.current_thread(), 'frame_buffer'): threading.current_thread().frame_buffer = [] threading.current_thread().frame_buffer.append(frame) # 批量保存(每10帧) if len(threading.current_thread().frame_buffer) >= 10: for f in threading.current_thread().frame_buffer: imageio.imwrite(get_filename(worker_id), f) threading.current_thread().frame_buffer.clear() except Exception as e: print(f"Worker {worker_id} error at {t:.2f}s: {str(e)}") # 保存剩余帧 if hasattr(threading.current_thread(), 'frame_buffer'): for f in threading.current_thread().frame_buffer: imageio.imwrite(get_filename(worker_id), f) threading.current_thread().frame_buffer.clear() # 启动线程池 with ThreadPoolExecutor(max_workers=num_workers) as executor: futures = [] for i, chunk in enumerate(chunks): futures.append(executor.submit(process_chunk, chunk.tolist(), i)) # 等待所有任务完成 for future in tqdm(futures): future.result() # 重命名视频帧 rename_jpg_files(output_dir) if __name__ == "__main__": start = time.time() print("start!") video_path = "/data/data/luosy/project/oral/data/raw_video/test_video.flv" output_dir = "/data/data/luosy/project/oral/data/key_frame" frame_extractor(video_path, 600, output_dir) print(f"cost {time.time() - start} secs")