| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371 |
- import os
- import cv2
- from scenedetect import open_video, SceneManager
- from scenedetect.detectors import ContentDetector
- from moviepy.editor import VideoFileClip, concatenate_videoclips
- from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip
- from typing import List, Tuple, Optional
- from utils.logger_config import setup_logger
- logger = setup_logger(__name__)
- class VideoAudioProcessor:
- def __init__(self, output_dir: str = "./output"):
- """
- Initialize VideoAudioProcessor
-
- Args:
- output_dir: Directory to save processed files
- """
- self.output_dir = output_dir
- self.stt_model = None # SenseVoiceTranscriber()
-
- # Create output directory if not exists
- if output_dir:
- os.makedirs(output_dir, exist_ok=True)
- logger.info(f"Initialized VideoAudioProcessor with output directory: {output_dir}")
- def extract_audio(self, video_path: str) -> Optional[str]:
- """
- Extract audio from video file
-
- Args:
- video_path: Path to video file
-
- Returns:
- str: Path to extracted audio file or None if failed
- """
- try:
- if not os.path.exists(video_path):
- logger.error(f"Video file not found: {video_path}")
- return None
-
- # Generate output audio path
- audio_filename = os.path.splitext(os.path.basename(video_path))[0] + ".wav"
- audio_path = os.path.join(self.output_dir, audio_filename)
-
- # Extract audio using moviepy
- logger.info(f"Extracting audio from video: {video_path}")
- video = VideoFileClip(video_path)
- audio = video.audio
- audio.write_audiofile(audio_path)
-
- # Clean up
- video.close()
- audio.close()
-
- logger.info(f"Audio extracted successfully: {audio_path}")
- return audio_path
-
- except Exception as e:
- logger.error(f"Failed to extract audio: {str(e)}")
- return None
-
- def detect_scenes(self, video_path: str, threshold: float = 25.0) -> List[str]:
- """
- Detect scenes in video
-
- Args:
- video_path: Path to video file
- threshold: Threshold for scene detection
- Returns:
- List[str]: List of scene start and end timecode
- """
- try:
- if not os.path.exists(video_path):
- logger.error(f"Video file not found: {video_path}")
- return []
-
- # Detect scenes
- video = open_video(video_path)
- scene_manager = SceneManager()
- scene_manager.add_detector(ContentDetector(threshold=threshold))
- scene_manager.detect_scenes(video)
- scene_list = scene_manager.get_scene_list()
- logger.info(f"Detected {len(scene_list)} scenes")
- return scene_list
-
- except Exception as e:
- logger.error(f"Failed to detect scenes: {str(e)}")
- return []
-
- def extract_frames(self, video_path: str, interval: float = 1.0) -> List[str]:
- """
- Extract frames from video at specified interval
-
- Args:
- video_path: Path to video file
- interval: Time interval between frames in seconds
-
- Returns:
- List[str]: List of paths to extracted frame images
- """
- try:
- if not os.path.exists(video_path):
- logger.error(f"Video file not found: {video_path}")
- return []
-
- # Create frames directory
- video_name = os.path.splitext(os.path.basename(video_path))[0]
- frames_dir = os.path.join(self.output_dir, f"{video_name}_frames")
- os.makedirs(frames_dir, exist_ok=True)
-
- # Open video file
- cap = cv2.VideoCapture(video_path)
- if not cap.isOpened():
- logger.error("Failed to open video file")
- return []
-
- # Get video properties
- fps = cap.get(cv2.CAP_PROP_FPS)
- frame_interval = int(fps * interval)
-
- frame_paths = []
- frame_count = 0
- frame_saved = 0
-
- logger.info(f"Extracting frames from video: {video_path}")
- while cap.isOpened():
- ret, frame = cap.read()
- if not ret:
- break
-
- # Save frame at specified interval
- if frame_count % frame_interval == 0:
- frame_path = os.path.join(frames_dir, f"frame_{frame_saved:04d}.jpg")
- cv2.imwrite(frame_path, frame)
- frame_paths.append(frame_path)
- frame_saved += 1
-
- frame_count += 1
-
- # Clean up
- cap.release()
-
- logger.info(f"Extracted {len(frame_paths)} frames")
- return frame_paths
-
- except Exception as e:
- logger.error(f"Failed to extract frames: {str(e)}")
- return []
- def cut_video(self, input_path: str, start_time: float, end_time: float, output_name: Optional[str] = None,
- output_path: Optional[str] = None) -> Optional[str]:
- """
- Cut video file to specified time range
-
- Args:
- input_path: Path to input video file
- start_time: Start time in seconds
- end_time: End time in seconds
- output_path: Path to save output video file. If None, will generate one based on input path
-
- Returns:
- str: Path to output video file or None if failed
- """
- try:
- # Validate input file
- if not os.path.exists(input_path):
- logger.error(f"Input video file not found: {input_path}")
- return None
-
- # Validate time range
- if start_time < 0 or end_time <= start_time:
- logger.error(f"Invalid time range: start={start_time}, end={end_time}")
- return None
-
- # Generate output path if not provided
- if output_path is None:
- if output_name is None:
- filename = os.path.splitext(os.path.basename(input_path))[0]
- output_path = os.path.join(
- self.output_dir + "/clip_files/",
- f"{filename}_cut_{int(start_time)}s_{int(end_time)}s.mp4"
- )
- else:
- output_path = os.path.join(
- self.output_dir + "/clip_files/",
- output_name
- )
-
-
- # Ensure output directory exists
- os.makedirs(os.path.dirname(output_path), exist_ok=True)
- # 将毫秒转换为秒
- start_time = start_time / 1000
- end_time = end_time / 1000
-
- # Cut video using ffmpeg
- logger.info(f"Cutting video from {start_time}s to {end_time}s: {output_path}")
- ffmpeg_extract_subclip(input_path, start_time, end_time, targetname=output_path)
-
- if os.path.exists(output_path):
- logger.info(f"Video cut successfully: {output_path}")
- return output_path
- else:
- logger.error("Failed to create output video file")
- return None
-
- except Exception as e:
- logger.error(f"Failed to cut video: {str(e)}")
- return None
- def process_video(self, video_path: str, extract_audio: bool = True, extract_frames: bool = True,
- frame_interval: float = 1.0, cut_video: bool = False,
- start_time: Optional[float] = None, end_time: Optional[float] = None) -> Tuple[Optional[str], Optional[str], List[str]]:
- """
- Process video file: cut video, extract audio, perform STT, and extract frames
-
- Args:
- video_path: Path to video file
- extract_audio: Whether to extract audio
- extract_frames: Whether to extract frames
- frame_interval: Time interval between frames in seconds
- cut_video: Whether to cut video
- start_time: Start time for video cutting in seconds
- end_time: End time for video cutting in seconds
-
- Returns:
- Tuple containing:
- - Path to extracted audio file (or None)
- - Transcribed text (or None)
- - List of paths to extracted frames
- """
- audio_path = None
- transcript = None
- frame_paths = []
-
- try:
- # Cut video if requested
- processing_path = video_path
- if cut_video and start_time is not None and end_time is not None:
- cut_path = self.cut_video(video_path, start_time, end_time)
- if cut_path:
- processing_path = cut_path
- else:
- logger.warning("Video cutting failed, proceeding with original video")
-
- # Extract audio if requested
- if extract_audio:
- audio_path = self.extract_audio(processing_path)
- if audio_path:
- # Perform STT on extracted audio
- transcript = self.stt_model.transcribe(audio_path)
-
- # Extract frames if requested
- if extract_frames:
- frame_paths = self.extract_frames(processing_path, frame_interval)
-
- return audio_path, transcript, frame_paths
-
- except Exception as e:
- logger.error(f"Failed to process video: {str(e)}")
- return audio_path, transcript, frame_paths
- def process_audio(self, audio_path: str) -> Optional[str]:
- """
- Process audio file using STT
-
- Args:
- audio_path: Path to audio file
-
- Returns:
- str: Transcribed text or None if failed
- """
- try:
- if not os.path.exists(audio_path):
- logger.error(f"Audio file not found: {audio_path}")
- return None
-
- return self.stt_model.transcribe(audio_path)
-
- except Exception as e:
- logger.error(f"Failed to process audio: {str(e)}")
- return None
- def concat_videos(self, video_paths: List[str], output_path: str = None) -> Optional[str]:
- """
- Concatenate multiple video files into a single video file
-
- Args:
- video_paths: List of paths to video files to concatenate
- output_filename: Name of the output video file. If None, will generate one
-
- Returns:
- str: Path to output concatenated video file or None if failed
- """
- try:
- # Validate input
- if not video_paths:
- logger.error("Empty video paths list provided")
- return None
-
- # Convert output_path to string if it's a Path object
- if output_path is not None:
- output_path = str(output_path)
- else:
- logger.error("Output path is required")
- return None
-
- # Check if all input files exist
- for video_path in video_paths:
- if not os.path.exists(video_path):
- logger.error(f"Video file not found: {video_path}")
- return None
-
- # # Generate output filename if not provided
- # if output_filename is None:
- # import time
- # timestamp = int(time.time())
- # output_filename = f"concatenated_video_{timestamp}.mp4"
-
- # # Generate full output path
- # output_path = os.path.join(self.output_dir, output_filename)
-
- # Load all video clips
- logger.info(f"Loading {len(video_paths)} video clips")
- video_clips = []
- try:
- for video_path in video_paths:
- clip = VideoFileClip(video_path)
- video_clips.append(clip)
-
- # Concatenate video clips
- logger.info("Concatenating video clips")
- final_clip = concatenate_videoclips(video_clips, method="compose")
-
- # Write output video
- logger.info(f"Writing concatenated video to: {output_path}")
- final_clip.write_videofile(output_path)
-
- logger.info("Video concatenation completed successfully")
- return output_path
-
- finally:
- # Clean up resources
- for clip in video_clips:
- clip.close()
-
- except Exception as e:
- logger.error(f"Failed to concatenate videos: {str(e)}")
- return None
- media_processor = VideoAudioProcessor()
- if __name__ == "__main__":
- # Initialize processor
- processor = VideoAudioProcessor("./output/room/")
-
-
- # Test video concatenation
- print("\nTesting video concatenation:")
- video_segments = [
- "./test_data/sample_video1.mp4",
- "./test_data/sample_video2.mp4",
- "./test_data/sample_video3.mp4"
- ]
- concatenated_video = processor.concat_videos(video_segments, "final_video.mp4")
- print(f"Concatenated video path: {concatenated_video}")
|