frame_extractor.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. from moviepy import *
  2. import time
  3. import numpy as np
  4. import os
  5. import imageio
  6. from concurrent.futures import ThreadPoolExecutor
  7. import threading
  8. def parallel_fast_extractor(video_path, interval_sec, output_dir, max_workers=4):
  9. """
  10. 多线程时间片处理算法
  11. 特点:
  12. 1. 每个线程处理独立时间区间
  13. 2. 智能时间片重叠补偿(解决关键帧对齐问题)
  14. 3. 线程本地缓存机制
  15. """
  16. os.makedirs(output_dir, exist_ok=True)
  17. # 预计算总时长和关键时间点
  18. with VideoFileClip(video_path) as clip:
  19. duration = clip.duration
  20. timestamps = np.arange(0, duration, interval_sec)
  21. # 根据CPU核心数分片(增加10%重叠区)
  22. num_workers = min(max_workers, os.cpu_count())
  23. chunk_size = len(timestamps) // num_workers + 1
  24. overlap = int(chunk_size * 0.1)
  25. chunks = []
  26. for i in range(0, len(timestamps), chunk_size - overlap):
  27. chunk = timestamps[i:i + chunk_size]
  28. if len(chunk) > overlap:
  29. chunks.append(chunk)
  30. else:
  31. break
  32. # 线程安全的文件名生成器
  33. counter = threading.local()
  34. def get_filename(base_idx):
  35. if not hasattr(counter, 'value'):
  36. counter.value = base_idx * 10000 # 每个线程分配独立区间
  37. filename = os.path.join(output_dir, f"frame_{counter.value:08d}.jpg")
  38. counter.value += 1
  39. return filename
  40. # 视频帧重命名
  41. def rename_jpg_files(output_dir):
  42. # 获取所有的 jpg 文件
  43. jpg_files = [f for f in os.listdir(output_dir) if f.endswith('.jpg')]
  44. # 按照文件名排序
  45. jpg_files.sort()
  46. for index, filename in enumerate(jpg_files, start=1):
  47. # 生成新的文件名
  48. new_filename = f"frame_{index:03d}.jpg"
  49. # 构造完整的文件路径
  50. old_file_path = os.path.join(output_dir, filename)
  51. new_file_path = os.path.join(output_dir, new_filename)
  52. # 重命名文件
  53. os.rename(old_file_path, new_file_path)
  54. def process_chunk(chunk, worker_id):
  55. # 每个线程独立打开视频流
  56. with VideoFileClip(video_path) as clip:
  57. for idx, t in enumerate(chunk):
  58. try:
  59. frame = clip.get_frame(t)
  60. # 使用线程本地内存缓存
  61. if not hasattr(threading.current_thread(), 'frame_buffer'):
  62. threading.current_thread().frame_buffer = []
  63. threading.current_thread().frame_buffer.append(frame)
  64. # 批量保存(每10帧)
  65. if len(threading.current_thread().frame_buffer) >= 10:
  66. for f in threading.current_thread().frame_buffer:
  67. imageio.imwrite(get_filename(worker_id), f)
  68. threading.current_thread().frame_buffer.clear()
  69. except Exception as e:
  70. print(f"Worker {worker_id} error at {t:.2f}s: {str(e)}")
  71. # 保存剩余帧
  72. if hasattr(threading.current_thread(), 'frame_buffer'):
  73. for f in threading.current_thread().frame_buffer:
  74. imageio.imwrite(get_filename(worker_id), f)
  75. threading.current_thread().frame_buffer.clear()
  76. # 启动线程池
  77. with ThreadPoolExecutor(max_workers=num_workers) as executor:
  78. futures = []
  79. for i, chunk in enumerate(chunks):
  80. futures.append(executor.submit(process_chunk, chunk.tolist(), i))
  81. # 等待所有任务完成
  82. for future in futures:
  83. future.result()
  84. # 重命名视频帧
  85. rename_jpg_files(output_dir)
  86. if __name__ == "__main__":
  87. start = time.time()
  88. print("start!")
  89. video_path = "/data/data/luosy/project/oral/data/raw_video/test_video.flv"
  90. output_dir = "/data/data/luosy/project/oral/data/key_frame"
  91. parallel_fast_extractor(video_path, 600, output_dir)
  92. print(f"cost {time.time() - start} secs")