media_processor.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. import os
  2. import cv2
  3. from scenedetect import open_video, SceneManager
  4. from scenedetect.detectors import ContentDetector
  5. from moviepy.editor import VideoFileClip, concatenate_videoclips
  6. from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip
  7. from typing import List, Tuple, Optional
  8. from utils.logger_config import setup_logger
  9. logger = setup_logger(__name__)
  10. class VideoAudioProcessor:
  11. def __init__(self, output_dir: str = "./output"):
  12. """
  13. Initialize VideoAudioProcessor
  14. Args:
  15. output_dir: Directory to save processed files
  16. """
  17. self.output_dir = output_dir
  18. self.stt_model = None # SenseVoiceTranscriber()
  19. # Create output directory if not exists
  20. if output_dir:
  21. os.makedirs(output_dir, exist_ok=True)
  22. logger.info(f"Initialized VideoAudioProcessor with output directory: {output_dir}")
  23. def extract_audio(self, video_path: str) -> Optional[str]:
  24. """
  25. Extract audio from video file
  26. Args:
  27. video_path: Path to video file
  28. Returns:
  29. str: Path to extracted audio file or None if failed
  30. """
  31. try:
  32. if not os.path.exists(video_path):
  33. logger.error(f"Video file not found: {video_path}")
  34. return None
  35. # Generate output audio path
  36. audio_filename = os.path.splitext(os.path.basename(video_path))[0] + ".wav"
  37. audio_path = os.path.join(self.output_dir, audio_filename)
  38. # Extract audio using moviepy
  39. logger.info(f"Extracting audio from video: {video_path}")
  40. video = VideoFileClip(video_path)
  41. audio = video.audio
  42. audio.write_audiofile(audio_path)
  43. # Clean up
  44. video.close()
  45. audio.close()
  46. logger.info(f"Audio extracted successfully: {audio_path}")
  47. return audio_path
  48. except Exception as e:
  49. logger.error(f"Failed to extract audio: {str(e)}")
  50. return None
  51. def detect_scenes(self, video_path: str, threshold: float = 25.0) -> List[str]:
  52. """
  53. Detect scenes in video
  54. Args:
  55. video_path: Path to video file
  56. threshold: Threshold for scene detection
  57. Returns:
  58. List[str]: List of scene start and end timecode
  59. """
  60. try:
  61. if not os.path.exists(video_path):
  62. logger.error(f"Video file not found: {video_path}")
  63. return []
  64. # Detect scenes
  65. video = open_video(video_path)
  66. scene_manager = SceneManager()
  67. scene_manager.add_detector(ContentDetector(threshold=threshold))
  68. scene_manager.detect_scenes(video)
  69. scene_list = scene_manager.get_scene_list()
  70. logger.info(f"Detected {len(scene_list)} scenes")
  71. return scene_list
  72. except Exception as e:
  73. logger.error(f"Failed to detect scenes: {str(e)}")
  74. return []
  75. def extract_frames(self, video_path: str, interval: float = 1.0) -> List[str]:
  76. """
  77. Extract frames from video at specified interval
  78. Args:
  79. video_path: Path to video file
  80. interval: Time interval between frames in seconds
  81. Returns:
  82. List[str]: List of paths to extracted frame images
  83. """
  84. try:
  85. if not os.path.exists(video_path):
  86. logger.error(f"Video file not found: {video_path}")
  87. return []
  88. # Create frames directory
  89. video_name = os.path.splitext(os.path.basename(video_path))[0]
  90. frames_dir = os.path.join(self.output_dir, f"{video_name}_frames")
  91. os.makedirs(frames_dir, exist_ok=True)
  92. # Open video file
  93. cap = cv2.VideoCapture(video_path)
  94. if not cap.isOpened():
  95. logger.error("Failed to open video file")
  96. return []
  97. # Get video properties
  98. fps = cap.get(cv2.CAP_PROP_FPS)
  99. frame_interval = int(fps * interval)
  100. frame_paths = []
  101. frame_count = 0
  102. frame_saved = 0
  103. logger.info(f"Extracting frames from video: {video_path}")
  104. while cap.isOpened():
  105. ret, frame = cap.read()
  106. if not ret:
  107. break
  108. # Save frame at specified interval
  109. if frame_count % frame_interval == 0:
  110. frame_path = os.path.join(frames_dir, f"frame_{frame_saved:04d}.jpg")
  111. cv2.imwrite(frame_path, frame)
  112. frame_paths.append(frame_path)
  113. frame_saved += 1
  114. frame_count += 1
  115. # Clean up
  116. cap.release()
  117. logger.info(f"Extracted {len(frame_paths)} frames")
  118. return frame_paths
  119. except Exception as e:
  120. logger.error(f"Failed to extract frames: {str(e)}")
  121. return []
  122. def cut_video(self, input_path: str, start_time: float, end_time: float, output_name: Optional[str] = None,
  123. output_path: Optional[str] = None) -> Optional[str]:
  124. """
  125. Cut video file to specified time range
  126. Args:
  127. input_path: Path to input video file
  128. start_time: Start time in seconds
  129. end_time: End time in seconds
  130. output_path: Path to save output video file. If None, will generate one based on input path
  131. Returns:
  132. str: Path to output video file or None if failed
  133. """
  134. try:
  135. # Validate input file
  136. if not os.path.exists(input_path):
  137. logger.error(f"Input video file not found: {input_path}")
  138. return None
  139. # Validate time range
  140. if start_time < 0 or end_time <= start_time:
  141. logger.error(f"Invalid time range: start={start_time}, end={end_time}")
  142. return None
  143. # Generate output path if not provided
  144. if output_path is None:
  145. if output_name is None:
  146. filename = os.path.splitext(os.path.basename(input_path))[0]
  147. output_path = os.path.join(
  148. self.output_dir + "/clip_files/",
  149. f"{filename}_cut_{int(start_time)}s_{int(end_time)}s.mp4"
  150. )
  151. else:
  152. output_path = os.path.join(
  153. self.output_dir + "/clip_files/",
  154. output_name
  155. )
  156. # Ensure output directory exists
  157. os.makedirs(os.path.dirname(output_path), exist_ok=True)
  158. # 将毫秒转换为秒
  159. start_time = start_time / 1000
  160. end_time = end_time / 1000
  161. # Cut video using ffmpeg
  162. logger.info(f"Cutting video from {start_time}s to {end_time}s: {output_path}")
  163. ffmpeg_extract_subclip(input_path, start_time, end_time, targetname=output_path)
  164. if os.path.exists(output_path):
  165. logger.info(f"Video cut successfully: {output_path}")
  166. return output_path
  167. else:
  168. logger.error("Failed to create output video file")
  169. return None
  170. except Exception as e:
  171. logger.error(f"Failed to cut video: {str(e)}")
  172. return None
  173. def process_video(self, video_path: str, extract_audio: bool = True, extract_frames: bool = True,
  174. frame_interval: float = 1.0, cut_video: bool = False,
  175. start_time: Optional[float] = None, end_time: Optional[float] = None) -> Tuple[Optional[str], Optional[str], List[str]]:
  176. """
  177. Process video file: cut video, extract audio, perform STT, and extract frames
  178. Args:
  179. video_path: Path to video file
  180. extract_audio: Whether to extract audio
  181. extract_frames: Whether to extract frames
  182. frame_interval: Time interval between frames in seconds
  183. cut_video: Whether to cut video
  184. start_time: Start time for video cutting in seconds
  185. end_time: End time for video cutting in seconds
  186. Returns:
  187. Tuple containing:
  188. - Path to extracted audio file (or None)
  189. - Transcribed text (or None)
  190. - List of paths to extracted frames
  191. """
  192. audio_path = None
  193. transcript = None
  194. frame_paths = []
  195. try:
  196. # Cut video if requested
  197. processing_path = video_path
  198. if cut_video and start_time is not None and end_time is not None:
  199. cut_path = self.cut_video(video_path, start_time, end_time)
  200. if cut_path:
  201. processing_path = cut_path
  202. else:
  203. logger.warning("Video cutting failed, proceeding with original video")
  204. # Extract audio if requested
  205. if extract_audio:
  206. audio_path = self.extract_audio(processing_path)
  207. if audio_path:
  208. # Perform STT on extracted audio
  209. transcript = self.stt_model.transcribe(audio_path)
  210. # Extract frames if requested
  211. if extract_frames:
  212. frame_paths = self.extract_frames(processing_path, frame_interval)
  213. return audio_path, transcript, frame_paths
  214. except Exception as e:
  215. logger.error(f"Failed to process video: {str(e)}")
  216. return audio_path, transcript, frame_paths
  217. def process_audio(self, audio_path: str) -> Optional[str]:
  218. """
  219. Process audio file using STT
  220. Args:
  221. audio_path: Path to audio file
  222. Returns:
  223. str: Transcribed text or None if failed
  224. """
  225. try:
  226. if not os.path.exists(audio_path):
  227. logger.error(f"Audio file not found: {audio_path}")
  228. return None
  229. return self.stt_model.transcribe(audio_path)
  230. except Exception as e:
  231. logger.error(f"Failed to process audio: {str(e)}")
  232. return None
  233. def concat_videos(self, video_paths: List[str], output_path: str = None) -> Optional[str]:
  234. """
  235. Concatenate multiple video files into a single video file
  236. Args:
  237. video_paths: List of paths to video files to concatenate
  238. output_filename: Name of the output video file. If None, will generate one
  239. Returns:
  240. str: Path to output concatenated video file or None if failed
  241. """
  242. try:
  243. # Validate input
  244. if not video_paths:
  245. logger.error("Empty video paths list provided")
  246. return None
  247. # Convert output_path to string if it's a Path object
  248. if output_path is not None:
  249. output_path = str(output_path)
  250. else:
  251. logger.error("Output path is required")
  252. return None
  253. # Check if all input files exist
  254. for video_path in video_paths:
  255. if not os.path.exists(video_path):
  256. logger.error(f"Video file not found: {video_path}")
  257. return None
  258. # # Generate output filename if not provided
  259. # if output_filename is None:
  260. # import time
  261. # timestamp = int(time.time())
  262. # output_filename = f"concatenated_video_{timestamp}.mp4"
  263. # # Generate full output path
  264. # output_path = os.path.join(self.output_dir, output_filename)
  265. # Load all video clips
  266. logger.info(f"Loading {len(video_paths)} video clips")
  267. video_clips = []
  268. try:
  269. for video_path in video_paths:
  270. clip = VideoFileClip(video_path)
  271. video_clips.append(clip)
  272. # Concatenate video clips
  273. logger.info("Concatenating video clips")
  274. final_clip = concatenate_videoclips(video_clips, method="compose")
  275. # Write output video
  276. logger.info(f"Writing concatenated video to: {output_path}")
  277. final_clip.write_videofile(output_path)
  278. logger.info("Video concatenation completed successfully")
  279. return output_path
  280. finally:
  281. # Clean up resources
  282. for clip in video_clips:
  283. clip.close()
  284. except Exception as e:
  285. logger.error(f"Failed to concatenate videos: {str(e)}")
  286. return None
  287. media_processor = VideoAudioProcessor()
  288. if __name__ == "__main__":
  289. # Initialize processor
  290. processor = VideoAudioProcessor("./output/room/")
  291. # Test video concatenation
  292. print("\nTesting video concatenation:")
  293. video_segments = [
  294. "./test_data/sample_video1.mp4",
  295. "./test_data/sample_video2.mp4",
  296. "./test_data/sample_video3.mp4"
  297. ]
  298. concatenated_video = processor.concat_videos(video_segments, "final_video.mp4")
  299. print(f"Concatenated video path: {concatenated_video}")