import os import base64 import io import asyncio import aiohttp import time import functools from concurrent.futures import ThreadPoolExecutor from PIL import Image from typing import Optional, Dict, Any, Literal, Union, List, Callable from volcenginesdkarkruntime import Ark from utils.logger_config import setup_logger from utils.config_manager import ConfigManager from dotenv import load_dotenv # 加载.env文件 load_dotenv() logger = setup_logger(__name__) def async_performance_monitor(func: Callable): """异步方法性能监控装饰器""" @functools.wraps(func) async def wrapper(*args, **kwargs): start_time = time.time() try: result = await func(*args, **kwargs) end_time = time.time() execution_time = end_time - start_time logger.info(f"{func.__name__} completed in {execution_time:.2f} seconds") return result except Exception as e: end_time = time.time() execution_time = end_time - start_time logger.error(f"{func.__name__} failed after {execution_time:.2f} seconds: {str(e)}") raise return wrapper def sync_performance_monitor(func: Callable): """同步方法性能监控装饰器""" @functools.wraps(func) def wrapper(*args, **kwargs): start_time = time.time() try: result = func(*args, **kwargs) end_time = time.time() execution_time = end_time - start_time logger.info(f"{func.__name__} completed in {execution_time:.2f} seconds") return result except Exception as e: end_time = time.time() execution_time = end_time - start_time logger.error(f"{func.__name__} failed after {execution_time:.2f} seconds: {str(e)}") raise return wrapper class MediaCaptioner: """媒体描述生成器,使用火山引擎API进行视频、图像和文本内容理解""" def __init__(self, api_key: Optional[str] = None, base_url: str = "https://ark.cn-beijing.volces.com/api/v3", model: str = "doubao-seed-1-6-250615", config_path: Optional[str] = None): """ 初始化媒体描述生成器 Args: api_key: 火山引擎API密钥,如果为None则从环境变量获取 base_url: API基础URL model: 使用的模型ID config_path: 提示词配置文件路径 """ try: self.api_key = api_key or os.getenv("VOLC_API_KEY") if not self.api_key: raise ValueError("API key must be provided either through constructor or environment variable VOLC_API_KEY") self.client = Ark( api_key=self.api_key, base_url=base_url ) self.base_url = base_url self.model = model self.config_manager = ConfigManager(config_path) logger.info(f"Initialized MediaCaptioner with model: {model}") except Exception as e: logger.error(f"Failed to initialize MediaCaptioner: {str(e)}") raise @sync_performance_monitor def _encode_video(self, video_path: str) -> str: """ 将视频文件转换为base64编码 Args: video_path: 视频文件路径 Returns: str: base64编码的视频数据 Raises: FileNotFoundError: 视频文件不存在 IOError: 读取文件失败 """ if not os.path.exists(video_path): raise FileNotFoundError(f"Video file not found: {video_path}") with open(video_path, "rb") as f: return base64.b64encode(f.read()).decode("utf-8") @sync_performance_monitor def _encode_image(self, image_path: str) -> str: """ 将图片文件转换为base64编码 Args: image_path: 图片文件路径 Returns: str: base64编码的图片数据 Raises: FileNotFoundError: 图片文件不存在 IOError: 读取或处理图片失败 """ if not os.path.exists(image_path): raise FileNotFoundError(f"Image file not found: {image_path}") with Image.open(image_path) as img: buffered = io.BytesIO() img.save(buffered, format="JPEG") return base64.b64encode(buffered.getvalue()).decode("utf-8") def generate_video_caption(self, video_path: str, prompt_type: str = "caption", scenario: Optional[str] = None, fps: int = 2, context_info: Optional[str] = None) -> Optional[str]: """ 生成视频描述的同步包装器 Args: video_path: 视频文件路径 prompt_type: 提示词类型 scenario: 场景类型 fps: 视频采样帧率 Returns: str: 视频描述,如果处理失败则返回None """ try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) return loop.run_until_complete( self._process_video_async( file_path=video_path, prompt_type=prompt_type, scenario=scenario, fps=fps, context_info=context_info ) ) async def generate_video_caption_async(self, video_path: str, prompt_type: str = "caption", scenario: Optional[str] = None, fps: int = 2, context_info: Optional[str] = None) -> Optional[str]: """ 异步生成视频描述 Args: video_path: 视频文件路径 prompt_type: 提示词类型 scenario: 场景类型 fps: 视频采样帧率 Returns: str: 视频描述,如果处理失败则返回None """ return await self._process_video_async( file_path=video_path, prompt_type=prompt_type, scenario=scenario, fps=fps, context_info=context_info ) def generate_image_caption(self, image_path: str, prompt_type: str = "caption", scenario: Optional[str] = None, context_info: Optional[str] = None) -> Optional[str]: """ 生成图片描述的同步包装器 Args: image_path: 图片文件路径 prompt_type: 提示词类型 scenario: 场景类型 Returns: str: 图片描述,如果处理失败则返回None """ try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) return loop.run_until_complete( self._process_image_async( file_path=image_path, prompt_type=prompt_type, scenario=scenario, context_info=context_info ) ) async def generate_image_caption_async(self, image_path: str, prompt_type: str = "caption", scenario: Optional[str] = None, context_info: Optional[str] = None) -> Optional[str]: """ 异步生成图片描述 Args: image_path: 图片文件路径 prompt_type: 提示词类型 scenario: 场景类型 Returns: str: 图片描述,如果处理失败则返回None """ return await self._process_image_async( file_path=image_path, prompt_type=prompt_type, scenario=scenario, context_info=context_info ) def generate_text_understanding(self, user_prompt: str, system_prompt: str, max_length: Optional[int] = None, context_info: Optional[str] = None) -> Optional[str]: """ 生成文本理解结果的同步包装器 Args: user_prompt: 需要理解的文本内容 system_prompt: 提示词类型 scenario: 场景类型 max_length: 最大输出长度 Returns: str: 文本理解结果,如果处理失败则返回None """ try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) return loop.run_until_complete( self._process_text_async( user_prompt=user_prompt, system_prompt=system_prompt, max_length=max_length, context_info=context_info ) ) async def generate_text_understanding_async(self, text: str, prompt_type: str = "summary", scenario: Optional[str] = None, max_length: Optional[int] = None, context_info: Optional[str] = None) -> Optional[str]: """ 异步生成文本理解结果 Args: text: 需要理解的文本内容 prompt_type: 提示词类型 scenario: 场景类型 max_length: 最大输出长度 Returns: str: 文本理解结果,如果处理失败则返回None """ return await self._process_text_async( text=text, prompt_type=prompt_type, scenario=scenario, max_length=max_length, context_info=context_info ) def generate_multi_aspect_understanding(self, text: str, prompt_types: List[str], scenario: Optional[str] = None) -> Dict[str, Optional[str]]: """ 从多个角度生成文本理解结果的同步包装器 Args: text: 需要理解的文本内容 prompt_types: 提示词类型列表 scenario: 场景类型 Returns: Dict[str, Optional[str]]: 提示词类型到理解结果的映射 """ try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) return loop.run_until_complete( self.generate_multi_aspect_understanding_async( text=text, prompt_types=prompt_types, scenario=scenario ) ) async def generate_multi_aspect_understanding_async(self, text: str, prompt_types: List[str], scenario: Optional[str] = None, context_info: Optional[str] = None) -> Dict[str, Optional[str]]: """ 异步从多个角度生成文本理解结果 Args: text: 需要理解的文本内容 prompt_types: 提示词类型列表 scenario: 场景类型 Returns: Dict[str, Optional[str]]: 提示词类型到理解结果的映射 """ tasks = [ self._process_text_async( text=text, prompt_type=prompt_type, scenario=scenario, context_info=context_info ) for prompt_type in prompt_types ] results = await asyncio.gather(*tasks) return dict(zip(prompt_types, results)) async def _make_api_request(self, endpoint: str, payload: Dict[str, Any], timeout: int = 180) -> Dict[str, Any]: """ 发送API请求的通用方法 Args: endpoint: API端点 payload: 请求负载 timeout: 超时时间(秒) Returns: Dict[str, Any]: API响应 Raises: aiohttp.ClientError: API请求失败 asyncio.TimeoutError: 请求超时 """ try: async with aiohttp.ClientSession() as session: async with session.post( f"{self.base_url}/{endpoint}", json=payload, headers={"Authorization": f"Bearer {self.api_key}"}, timeout=timeout ) as response: if response.status != 200: error_text = await response.text() raise aiohttp.ClientError(f"API request failed with status {response.status}: {error_text}") return await response.json() except asyncio.TimeoutError: logger.error(f"API request timed out after {timeout} seconds") raise except Exception as e: logger.error(f"API request failed: {str(e)}") raise @async_performance_monitor async def _process_video_async(self, file_path: str, prompt_type: str, scenario: Optional[str] = None, fps: int = 2, context_info: Optional[str] = None) -> Optional[str]: """异步处理视频文件""" try: # 在线程池中执行文件IO操作 loop = asyncio.get_event_loop() with ThreadPoolExecutor() as pool: base64_video = await loop.run_in_executor( pool, self._encode_video, file_path ) prompt = self.config_manager.get_prompt("video", prompt_type, scenario) # 构建API请求 payload = { "model": self.model, "messages": [{ "role": "system", "content": [ { "type": "video_url", "video_url": { "url": f"data:video/mp4;base64,{base64_video}", "fps": fps } }, { "type": "text", "text": prompt } ] }, { "role": "user", "content": f"上下文信息:{context_info}" } ] } # 发送API请求 response = await self._make_api_request("chat/completions", payload) return response["choices"][0]["message"]["content"] except Exception as e: logger.error(f"Failed to process video async: {str(e)}") return None @async_performance_monitor async def _process_image_async(self, file_path: str, prompt_type: str, scenario: Optional[str] = None, context_info: Optional[str] = None) -> Optional[str]: """异步处理图片文件""" try: # 在线程池中执行文件IO操作 loop = asyncio.get_event_loop() with ThreadPoolExecutor() as pool: base64_image = await loop.run_in_executor( pool, self._encode_image, file_path ) prompt = self.config_manager.get_prompt("image", prompt_type, scenario) # 构建API请求 payload = { "model": self.model, "messages": [{ "role": "system", "content": [ { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{base64_image}" } }, { "type": "text", "text": prompt } ] }, { "role": "user", "content": f"上下文信息:{context_info}" } ] } # 发送API请求 response = await self._make_api_request("chat/completions", payload) return response["choices"][0]["message"]["content"] except Exception as e: logger.error(f"Failed to process image async: {str(e)}") return None @async_performance_monitor async def _process_text_async(self, user_prompt: str, system_prompt: str, max_length: Optional[int] = None, context_info: Optional[str] = None) -> Optional[str]: """异步处理文本内容""" # try: if not user_prompt.strip(): logger.error("Empty text provided") return None # 构建API请求 payload = { "model": self.model, "messages": [ { "role": "system", "content": system_prompt }, { "role": "user", "content": user_prompt }, { "role": "user", "content": f"上下文信息:{context_info}" } ], "max_tokens": max_length if max_length else None } # 发送API请求 response = await self._make_api_request("chat/completions", payload) return response["choices"][0]["message"]["content"] # except Exception as e: # logger.error(f"Failed to process text async: {str(e)}") # return None async def generate_batch_captions_async(self, files: Dict[str, Dict[str, Union[str, int]]], scenario: Optional[str] = None, max_concurrent: int = 5) -> Dict[str, Optional[str]]: """ 异步批量生成媒体描述 Args: files: 文件配置字典 scenario: 场景类型 max_concurrent: 最大并发数 Returns: Dict[str, Optional[str]]: 文件路径或标识符到描述的映射 """ results = {} # 创建信号量控制并发 semaphore = asyncio.Semaphore(max_concurrent) async def process_single_file(file_path: str, config: Dict[str, Any]) -> tuple[str, Optional[str]]: """处理单个文件的异步函数""" async with semaphore: # 使用信号量控制并发 try: media_type = config["type"] prompt_type = config.get("prompt_type", "caption" if media_type != "text" else "summary") if media_type == "video": fps = config.get("fps", 2) result = await self._process_video_async( file_path=file_path, prompt_type=prompt_type, scenario=scenario, fps=fps, context_info=config.get("context_info") ) elif media_type == "image": result = await self._process_image_async( file_path=file_path, prompt_type=prompt_type, scenario=scenario, context_info=config.get("context_info") ) elif media_type == "text": if "content" not in config: logger.error(f"Text content not provided for {file_path}") return file_path, None result = await self._process_text_async( text=config["content"], prompt_type=prompt_type, scenario=scenario, max_length=config.get("max_length"), context_info=config.get("context_info") ) else: logger.warning(f"Unsupported media type: {media_type}") return file_path, None return file_path, result except Exception as e: logger.error(f"Failed to process file {file_path}: {str(e)}") return file_path, None # 创建所有任务 tasks = [ process_single_file(file_path, config) for file_path, config in files.items() ] # 并行执行所有任务 completed_tasks = await asyncio.gather(*tasks) # 整理结果 results = dict(completed_tasks) return results def generate_batch_captions(self, files: Dict[str, Dict[str, Union[str, int]]], scenario: Optional[str] = None) -> Dict[str, Optional[str]]: """ 批量生成媒体描述的同步包装器 Args: files: 文件配置字典,格式为: { "file_path": { "type": "video"|"image"|"text", "prompt_type": str, # 可选 "fps": int, # 仅视频可用 "content": str, # 仅文本类型需要 "max_length": int # 可选,仅文本类型可用 } } scenario: 场景类型 Returns: Dict[str, Optional[str]]: 文件路径或标识符到描述的映射 """ try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # 运行异步方法 return loop.run_until_complete( self.generate_batch_captions_async(files, scenario) ) media_captioner: MediaCaptioner = MediaCaptioner() if __name__ == "__main__": async def main(): # 初始化 captioner = MediaCaptioner() # 处理文本 text_content = """ 近日,研究人员在深海发现了一种新的海洋生物物种。 这种生物具有独特的生物发光能力,可以在完全黑暗的环境中发出蓝绿色的光。 科学家们认为,这一发现对于了解深海生态系统具有重要意义。 """ # 批量处理示例 files = { "./test_data/sample_video.mp4": { "type": "video", "prompt_type": "caption", "fps": 2 }, "./test_data/sample_image.jpg": { "type": "image", "prompt_type": "caption" }, "text_sample": { "type": "text", "content": text_content, "prompt_type": "summary", "max_length": 200 } } # 异步批量处理 results = await captioner.generate_batch_captions_async( files, scenario="academic", max_concurrent=5 ) print("批量处理结果:", results) # 运行异步主函数 asyncio.run(main())