import os import cv2 from scenedetect import open_video, SceneManager from scenedetect.detectors import ContentDetector from moviepy.editor import VideoFileClip, concatenate_videoclips from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip from typing import List, Tuple, Optional from utils.logger_config import setup_logger logger = setup_logger(__name__) class VideoAudioProcessor: def __init__(self, output_dir: str = "./output"): """ Initialize VideoAudioProcessor Args: output_dir: Directory to save processed files """ self.output_dir = output_dir self.stt_model = None # SenseVoiceTranscriber() # Create output directory if not exists if output_dir: os.makedirs(output_dir, exist_ok=True) logger.info(f"Initialized VideoAudioProcessor with output directory: {output_dir}") def extract_audio(self, video_path: str) -> Optional[str]: """ Extract audio from video file Args: video_path: Path to video file Returns: str: Path to extracted audio file or None if failed """ try: if not os.path.exists(video_path): logger.error(f"Video file not found: {video_path}") return None # Generate output audio path audio_filename = os.path.splitext(os.path.basename(video_path))[0] + ".wav" audio_path = os.path.join(self.output_dir, audio_filename) # Extract audio using moviepy logger.info(f"Extracting audio from video: {video_path}") video = VideoFileClip(video_path) audio = video.audio audio.write_audiofile(audio_path) # Clean up video.close() audio.close() logger.info(f"Audio extracted successfully: {audio_path}") return audio_path except Exception as e: logger.error(f"Failed to extract audio: {str(e)}") return None def detect_scenes(self, video_path: str, threshold: float = 25.0) -> List[str]: """ Detect scenes in video Args: video_path: Path to video file threshold: Threshold for scene detection Returns: List[str]: List of scene start and end timecode """ try: if not os.path.exists(video_path): logger.error(f"Video file not found: {video_path}") return [] # Detect scenes video = open_video(video_path) scene_manager = SceneManager() scene_manager.add_detector(ContentDetector(threshold=threshold)) scene_manager.detect_scenes(video) scene_list = scene_manager.get_scene_list() logger.info(f"Detected {len(scene_list)} scenes") return scene_list except Exception as e: logger.error(f"Failed to detect scenes: {str(e)}") return [] def extract_frames(self, video_path: str, interval: float = 1.0) -> List[str]: """ Extract frames from video at specified interval Args: video_path: Path to video file interval: Time interval between frames in seconds Returns: List[str]: List of paths to extracted frame images """ try: if not os.path.exists(video_path): logger.error(f"Video file not found: {video_path}") return [] # Create frames directory video_name = os.path.splitext(os.path.basename(video_path))[0] frames_dir = os.path.join(self.output_dir, f"{video_name}_frames") os.makedirs(frames_dir, exist_ok=True) # Open video file cap = cv2.VideoCapture(video_path) if not cap.isOpened(): logger.error("Failed to open video file") return [] # Get video properties fps = cap.get(cv2.CAP_PROP_FPS) frame_interval = int(fps * interval) frame_paths = [] frame_count = 0 frame_saved = 0 logger.info(f"Extracting frames from video: {video_path}") while cap.isOpened(): ret, frame = cap.read() if not ret: break # Save frame at specified interval if frame_count % frame_interval == 0: frame_path = os.path.join(frames_dir, f"frame_{frame_saved:04d}.jpg") cv2.imwrite(frame_path, frame) frame_paths.append(frame_path) frame_saved += 1 frame_count += 1 # Clean up cap.release() logger.info(f"Extracted {len(frame_paths)} frames") return frame_paths except Exception as e: logger.error(f"Failed to extract frames: {str(e)}") return [] def cut_video(self, input_path: str, start_time: float, end_time: float, output_name: Optional[str] = None, output_path: Optional[str] = None) -> Optional[str]: """ Cut video file to specified time range Args: input_path: Path to input video file start_time: Start time in seconds end_time: End time in seconds output_path: Path to save output video file. If None, will generate one based on input path Returns: str: Path to output video file or None if failed """ try: # Validate input file if not os.path.exists(input_path): logger.error(f"Input video file not found: {input_path}") return None # Validate time range if start_time < 0 or end_time <= start_time: logger.error(f"Invalid time range: start={start_time}, end={end_time}") return None # Generate output path if not provided if output_path is None: if output_name is None: filename = os.path.splitext(os.path.basename(input_path))[0] output_path = os.path.join( self.output_dir + "/clip_files/", f"{filename}_cut_{int(start_time)}s_{int(end_time)}s.mp4" ) else: output_path = os.path.join( self.output_dir + "/clip_files/", output_name ) # Ensure output directory exists os.makedirs(os.path.dirname(output_path), exist_ok=True) # 将毫秒转换为秒 start_time = start_time / 1000 end_time = end_time / 1000 # Cut video using ffmpeg logger.info(f"Cutting video from {start_time}s to {end_time}s: {output_path}") ffmpeg_extract_subclip(input_path, start_time, end_time, targetname=output_path) if os.path.exists(output_path): logger.info(f"Video cut successfully: {output_path}") return output_path else: logger.error("Failed to create output video file") return None except Exception as e: logger.error(f"Failed to cut video: {str(e)}") return None def process_video(self, video_path: str, extract_audio: bool = True, extract_frames: bool = True, frame_interval: float = 1.0, cut_video: bool = False, start_time: Optional[float] = None, end_time: Optional[float] = None) -> Tuple[Optional[str], Optional[str], List[str]]: """ Process video file: cut video, extract audio, perform STT, and extract frames Args: video_path: Path to video file extract_audio: Whether to extract audio extract_frames: Whether to extract frames frame_interval: Time interval between frames in seconds cut_video: Whether to cut video start_time: Start time for video cutting in seconds end_time: End time for video cutting in seconds Returns: Tuple containing: - Path to extracted audio file (or None) - Transcribed text (or None) - List of paths to extracted frames """ audio_path = None transcript = None frame_paths = [] try: # Cut video if requested processing_path = video_path if cut_video and start_time is not None and end_time is not None: cut_path = self.cut_video(video_path, start_time, end_time) if cut_path: processing_path = cut_path else: logger.warning("Video cutting failed, proceeding with original video") # Extract audio if requested if extract_audio: audio_path = self.extract_audio(processing_path) if audio_path: # Perform STT on extracted audio transcript = self.stt_model.transcribe(audio_path) # Extract frames if requested if extract_frames: frame_paths = self.extract_frames(processing_path, frame_interval) return audio_path, transcript, frame_paths except Exception as e: logger.error(f"Failed to process video: {str(e)}") return audio_path, transcript, frame_paths def process_audio(self, audio_path: str) -> Optional[str]: """ Process audio file using STT Args: audio_path: Path to audio file Returns: str: Transcribed text or None if failed """ try: if not os.path.exists(audio_path): logger.error(f"Audio file not found: {audio_path}") return None return self.stt_model.transcribe(audio_path) except Exception as e: logger.error(f"Failed to process audio: {str(e)}") return None def concat_videos(self, video_paths: List[str], output_path: str = None) -> Optional[str]: """ Concatenate multiple video files into a single video file Args: video_paths: List of paths to video files to concatenate output_filename: Name of the output video file. If None, will generate one Returns: str: Path to output concatenated video file or None if failed """ try: # Validate input if not video_paths: logger.error("Empty video paths list provided") return None # Convert output_path to string if it's a Path object if output_path is not None: output_path = str(output_path) else: logger.error("Output path is required") return None # Check if all input files exist for video_path in video_paths: if not os.path.exists(video_path): logger.error(f"Video file not found: {video_path}") return None # # Generate output filename if not provided # if output_filename is None: # import time # timestamp = int(time.time()) # output_filename = f"concatenated_video_{timestamp}.mp4" # # Generate full output path # output_path = os.path.join(self.output_dir, output_filename) # Load all video clips logger.info(f"Loading {len(video_paths)} video clips") video_clips = [] try: for video_path in video_paths: clip = VideoFileClip(video_path) video_clips.append(clip) # Concatenate video clips logger.info("Concatenating video clips") final_clip = concatenate_videoclips(video_clips, method="compose") # Write output video logger.info(f"Writing concatenated video to: {output_path}") final_clip.write_videofile(output_path) logger.info("Video concatenation completed successfully") return output_path finally: # Clean up resources for clip in video_clips: clip.close() except Exception as e: logger.error(f"Failed to concatenate videos: {str(e)}") return None media_processor = VideoAudioProcessor() if __name__ == "__main__": # Initialize processor processor = VideoAudioProcessor("./output/room/") # Test video concatenation print("\nTesting video concatenation:") video_segments = [ "./test_data/sample_video1.mp4", "./test_data/sample_video2.mp4", "./test_data/sample_video3.mp4" ] concatenated_video = processor.concat_videos(video_segments, "final_video.mp4") print(f"Concatenated video path: {concatenated_video}")