frame_extract.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. from moviepy import *
  2. from moviepy.editor import VideoFileClip
  3. import time
  4. import numpy as np
  5. import os
  6. import imageio
  7. from concurrent.futures import ThreadPoolExecutor
  8. import threading
  9. from tqdm import tqdm
  10. from .logger_config import setup_logger
  11. logger = setup_logger(__name__)
  12. def frame_extractor(video_path, interval_sec=60, output_dir="./data/key_frame", max_workers=4):
  13. """
  14. 多线程时间片处理算法
  15. 特点:
  16. 1. 每个线程处理独立时间区间
  17. 2. 智能时间片重叠补偿(解决关键帧对齐问题)
  18. 3. 线程本地缓存机制
  19. """
  20. logger.info(f"开始抽帧")
  21. os.makedirs(output_dir, exist_ok=True)
  22. video_path = str(video_path)
  23. # 预计算总时长和关键时间点
  24. with VideoFileClip(video_path) as clip:
  25. duration = clip.duration
  26. timestamps = np.arange(0, duration, interval_sec)
  27. # 根据CPU核心数分片(增加10%重叠区)
  28. num_workers = min(max_workers, os.cpu_count())
  29. chunk_size = len(timestamps) // num_workers + 1
  30. overlap = int(chunk_size * 0.1)
  31. chunks = []
  32. for i in range(0, len(timestamps), chunk_size - overlap):
  33. chunk = timestamps[i:i + chunk_size]
  34. if len(chunk) > overlap:
  35. chunks.append(chunk)
  36. else:
  37. break
  38. # 线程安全的文件名生成器
  39. counter = threading.local()
  40. def get_filename(base_idx):
  41. if not hasattr(counter, 'value'):
  42. counter.value = base_idx * 10000 # 每个线程分配独立区间
  43. filename = os.path.join(output_dir, f"frame_{counter.value:08d}.jpg")
  44. counter.value += 1
  45. return filename
  46. # 视频帧重命名
  47. def rename_jpg_files(output_dir):
  48. # 获取所有的 jpg 文件
  49. print(f'重命名:{output_dir}')
  50. jpg_files = [f for f in os.listdir(output_dir) if f.endswith('.jpg')]
  51. # 按照文件名排序
  52. jpg_files.sort()
  53. for index, filename in enumerate(jpg_files, start=1):
  54. # 生成新的文件名
  55. new_index = (index - 1) * interval_sec
  56. new_filename = f"frame_{new_index:05d}.jpg"
  57. # 构造完整的文件路径
  58. old_file_path = os.path.join(output_dir, filename)
  59. new_file_path = os.path.join(output_dir, new_filename)
  60. # 重命名文件
  61. os.rename(old_file_path, new_file_path)
  62. def process_chunk(chunk, worker_id):
  63. # 每个线程独立打开视频流
  64. with VideoFileClip(video_path) as clip:
  65. for idx, t in enumerate(chunk):
  66. try:
  67. frame = clip.get_frame(t)
  68. # 使用线程本地内存缓存
  69. if not hasattr(threading.current_thread(), 'frame_buffer'):
  70. threading.current_thread().frame_buffer = []
  71. threading.current_thread().frame_buffer.append(frame)
  72. # 批量保存(每10帧)
  73. if len(threading.current_thread().frame_buffer) >= 10:
  74. for f in threading.current_thread().frame_buffer:
  75. imageio.imwrite(get_filename(worker_id), f)
  76. threading.current_thread().frame_buffer.clear()
  77. except Exception as e:
  78. print(f"Worker {worker_id} error at {t:.2f}s: {str(e)}")
  79. # 保存剩余帧
  80. if hasattr(threading.current_thread(), 'frame_buffer'):
  81. for f in threading.current_thread().frame_buffer:
  82. imageio.imwrite(get_filename(worker_id), f)
  83. threading.current_thread().frame_buffer.clear()
  84. # 启动线程池
  85. with ThreadPoolExecutor(max_workers=num_workers) as executor:
  86. futures = []
  87. for i, chunk in enumerate(chunks):
  88. futures.append(executor.submit(process_chunk, chunk.tolist(), i))
  89. # 等待所有任务完成
  90. for future in tqdm(futures):
  91. future.result()
  92. # 重命名视频帧
  93. rename_jpg_files(output_dir)
  94. if __name__ == "__main__":
  95. start = time.time()
  96. print("start!")
  97. video_path = "/data/data/luosy/project/oral/data/raw_video/test_video.flv"
  98. output_dir = "/data/data/luosy/project/oral/data/key_frame"
  99. frame_extractor(video_path, 600, output_dir)
  100. print(f"cost {time.time() - start} secs")