| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685 |
- import os
- import base64
- import io
- import asyncio
- import aiohttp
- import time
- import functools
- from concurrent.futures import ThreadPoolExecutor
- from PIL import Image
- from typing import Optional, Dict, Any, Literal, Union, List, Callable
- from volcenginesdkarkruntime import Ark
- from utils.logger_config import setup_logger
- from utils.config_manager import ConfigManager
- from dotenv import load_dotenv
- # 加载.env文件
- load_dotenv()
- logger = setup_logger(__name__)
- def async_performance_monitor(func: Callable):
- """异步方法性能监控装饰器"""
- @functools.wraps(func)
- async def wrapper(*args, **kwargs):
- start_time = time.time()
- try:
- result = await func(*args, **kwargs)
- end_time = time.time()
- execution_time = end_time - start_time
- logger.info(f"{func.__name__} completed in {execution_time:.2f} seconds")
- return result
- except Exception as e:
- end_time = time.time()
- execution_time = end_time - start_time
- logger.error(f"{func.__name__} failed after {execution_time:.2f} seconds: {str(e)}")
- raise
- return wrapper
- def sync_performance_monitor(func: Callable):
- """同步方法性能监控装饰器"""
- @functools.wraps(func)
- def wrapper(*args, **kwargs):
- start_time = time.time()
- try:
- result = func(*args, **kwargs)
- end_time = time.time()
- execution_time = end_time - start_time
- logger.info(f"{func.__name__} completed in {execution_time:.2f} seconds")
- return result
- except Exception as e:
- end_time = time.time()
- execution_time = end_time - start_time
- logger.error(f"{func.__name__} failed after {execution_time:.2f} seconds: {str(e)}")
- raise
- return wrapper
- class MediaCaptioner:
- """媒体描述生成器,使用火山引擎API进行视频、图像和文本内容理解"""
-
- def __init__(self, api_key: Optional[str] = None,
- base_url: str = "https://ark.cn-beijing.volces.com/api/v3",
- model: str = "doubao-seed-1-6-250615",
- config_path: Optional[str] = None):
- """
- 初始化媒体描述生成器
-
- Args:
- api_key: 火山引擎API密钥,如果为None则从环境变量获取
- base_url: API基础URL
- model: 使用的模型ID
- config_path: 提示词配置文件路径
- """
- try:
- self.api_key = api_key or os.getenv("VOLC_API_KEY")
- if not self.api_key:
- raise ValueError("API key must be provided either through constructor or environment variable VOLC_API_KEY")
-
- self.client = Ark(
- api_key=self.api_key,
- base_url=base_url
- )
- self.base_url = base_url
- self.model = model
- self.config_manager = ConfigManager(config_path)
- logger.info(f"Initialized MediaCaptioner with model: {model}")
-
- except Exception as e:
- logger.error(f"Failed to initialize MediaCaptioner: {str(e)}")
- raise
- @sync_performance_monitor
- def _encode_video(self, video_path: str) -> str:
- """
- 将视频文件转换为base64编码
-
- Args:
- video_path: 视频文件路径
-
- Returns:
- str: base64编码的视频数据
-
- Raises:
- FileNotFoundError: 视频文件不存在
- IOError: 读取文件失败
- """
- if not os.path.exists(video_path):
- raise FileNotFoundError(f"Video file not found: {video_path}")
-
- with open(video_path, "rb") as f:
- return base64.b64encode(f.read()).decode("utf-8")
- @sync_performance_monitor
- def _encode_image(self, image_path: str) -> str:
- """
- 将图片文件转换为base64编码
-
- Args:
- image_path: 图片文件路径
-
- Returns:
- str: base64编码的图片数据
-
- Raises:
- FileNotFoundError: 图片文件不存在
- IOError: 读取或处理图片失败
- """
- if not os.path.exists(image_path):
- raise FileNotFoundError(f"Image file not found: {image_path}")
-
- with Image.open(image_path) as img:
- buffered = io.BytesIO()
- img.save(buffered, format="JPEG")
- return base64.b64encode(buffered.getvalue()).decode("utf-8")
- def generate_video_caption(self,
- video_path: str,
- prompt_type: str = "caption",
- scenario: Optional[str] = None,
- fps: int = 2,
- context_info: Optional[str] = None) -> Optional[str]:
- """
- 生成视频描述的同步包装器
-
- Args:
- video_path: 视频文件路径
- prompt_type: 提示词类型
- scenario: 场景类型
- fps: 视频采样帧率
-
- Returns:
- str: 视频描述,如果处理失败则返回None
- """
- try:
- loop = asyncio.get_event_loop()
- except RuntimeError:
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- return loop.run_until_complete(
- self._process_video_async(
- file_path=video_path,
- prompt_type=prompt_type,
- scenario=scenario,
- fps=fps,
- context_info=context_info
- )
- )
- async def generate_video_caption_async(self,
- video_path: str,
- prompt_type: str = "caption",
- scenario: Optional[str] = None,
- fps: int = 2,
- context_info: Optional[str] = None) -> Optional[str]:
- """
- 异步生成视频描述
-
- Args:
- video_path: 视频文件路径
- prompt_type: 提示词类型
- scenario: 场景类型
- fps: 视频采样帧率
-
- Returns:
- str: 视频描述,如果处理失败则返回None
- """
- return await self._process_video_async(
- file_path=video_path,
- prompt_type=prompt_type,
- scenario=scenario,
- fps=fps,
- context_info=context_info
- )
- def generate_image_caption(self,
- image_path: str,
- prompt_type: str = "caption",
- scenario: Optional[str] = None,
- context_info: Optional[str] = None) -> Optional[str]:
- """
- 生成图片描述的同步包装器
-
- Args:
- image_path: 图片文件路径
- prompt_type: 提示词类型
- scenario: 场景类型
-
- Returns:
- str: 图片描述,如果处理失败则返回None
- """
- try:
- loop = asyncio.get_event_loop()
- except RuntimeError:
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- return loop.run_until_complete(
- self._process_image_async(
- file_path=image_path,
- prompt_type=prompt_type,
- scenario=scenario,
- context_info=context_info
- )
- )
- async def generate_image_caption_async(self,
- image_path: str,
- prompt_type: str = "caption",
- scenario: Optional[str] = None,
- context_info: Optional[str] = None) -> Optional[str]:
- """
- 异步生成图片描述
-
- Args:
- image_path: 图片文件路径
- prompt_type: 提示词类型
- scenario: 场景类型
-
- Returns:
- str: 图片描述,如果处理失败则返回None
- """
- return await self._process_image_async(
- file_path=image_path,
- prompt_type=prompt_type,
- scenario=scenario,
- context_info=context_info
- )
- def generate_text_understanding(self,
- user_prompt: str,
- system_prompt: str,
- max_length: Optional[int] = None,
- context_info: Optional[str] = None) -> Optional[str]:
- """
- 生成文本理解结果的同步包装器
-
- Args:
- user_prompt: 需要理解的文本内容
- system_prompt: 提示词类型
- scenario: 场景类型
- max_length: 最大输出长度
-
- Returns:
- str: 文本理解结果,如果处理失败则返回None
- """
- try:
- loop = asyncio.get_event_loop()
- except RuntimeError:
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- return loop.run_until_complete(
- self._process_text_async(
- user_prompt=user_prompt,
- system_prompt=system_prompt,
- max_length=max_length,
- context_info=context_info
- )
- )
- async def generate_text_understanding_async(self,
- text: str,
- prompt_type: str = "summary",
- scenario: Optional[str] = None,
- max_length: Optional[int] = None,
- context_info: Optional[str] = None) -> Optional[str]:
- """
- 异步生成文本理解结果
-
- Args:
- text: 需要理解的文本内容
- prompt_type: 提示词类型
- scenario: 场景类型
- max_length: 最大输出长度
-
- Returns:
- str: 文本理解结果,如果处理失败则返回None
- """
- return await self._process_text_async(
- text=text,
- prompt_type=prompt_type,
- scenario=scenario,
- max_length=max_length,
- context_info=context_info
- )
- def generate_multi_aspect_understanding(self,
- text: str,
- prompt_types: List[str],
- scenario: Optional[str] = None) -> Dict[str, Optional[str]]:
- """
- 从多个角度生成文本理解结果的同步包装器
-
- Args:
- text: 需要理解的文本内容
- prompt_types: 提示词类型列表
- scenario: 场景类型
-
- Returns:
- Dict[str, Optional[str]]: 提示词类型到理解结果的映射
- """
- try:
- loop = asyncio.get_event_loop()
- except RuntimeError:
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- return loop.run_until_complete(
- self.generate_multi_aspect_understanding_async(
- text=text,
- prompt_types=prompt_types,
- scenario=scenario
- )
- )
- async def generate_multi_aspect_understanding_async(self,
- text: str,
- prompt_types: List[str],
- scenario: Optional[str] = None,
- context_info: Optional[str] = None) -> Dict[str, Optional[str]]:
- """
- 异步从多个角度生成文本理解结果
-
- Args:
- text: 需要理解的文本内容
- prompt_types: 提示词类型列表
- scenario: 场景类型
-
- Returns:
- Dict[str, Optional[str]]: 提示词类型到理解结果的映射
- """
- tasks = [
- self._process_text_async(
- text=text,
- prompt_type=prompt_type,
- scenario=scenario,
- context_info=context_info
- )
- for prompt_type in prompt_types
- ]
-
- results = await asyncio.gather(*tasks)
- return dict(zip(prompt_types, results))
- async def _make_api_request(self,
- endpoint: str,
- payload: Dict[str, Any],
- timeout: int = 180) -> Dict[str, Any]:
- """
- 发送API请求的通用方法
-
- Args:
- endpoint: API端点
- payload: 请求负载
- timeout: 超时时间(秒)
-
- Returns:
- Dict[str, Any]: API响应
-
- Raises:
- aiohttp.ClientError: API请求失败
- asyncio.TimeoutError: 请求超时
- """
- try:
- async with aiohttp.ClientSession() as session:
- async with session.post(
- f"{self.base_url}/{endpoint}",
- json=payload,
- headers={"Authorization": f"Bearer {self.api_key}"},
- timeout=timeout
- ) as response:
- if response.status != 200:
- error_text = await response.text()
- raise aiohttp.ClientError(f"API request failed with status {response.status}: {error_text}")
- return await response.json()
- except asyncio.TimeoutError:
- logger.error(f"API request timed out after {timeout} seconds")
- raise
- except Exception as e:
- logger.error(f"API request failed: {str(e)}")
- raise
- @async_performance_monitor
- async def _process_video_async(self, file_path: str, prompt_type: str,
- scenario: Optional[str] = None, fps: int = 2, context_info: Optional[str] = None) -> Optional[str]:
- """异步处理视频文件"""
- try:
- # 在线程池中执行文件IO操作
- loop = asyncio.get_event_loop()
- with ThreadPoolExecutor() as pool:
- base64_video = await loop.run_in_executor(
- pool, self._encode_video, file_path
- )
-
- prompt = self.config_manager.get_prompt("video", prompt_type, scenario)
- # 构建API请求
- payload = {
- "model": self.model,
- "messages": [{
- "role": "system",
- "content": [
- {
- "type": "video_url",
- "video_url": {
- "url": f"data:video/mp4;base64,{base64_video}",
- "fps": fps
- }
- },
- {
- "type": "text",
- "text": prompt
- }
- ]
- },
- {
- "role": "user",
- "content": f"上下文信息:{context_info}"
- }
- ]
- }
-
- # 发送API请求
- response = await self._make_api_request("chat/completions", payload)
- return response["choices"][0]["message"]["content"]
-
- except Exception as e:
- logger.error(f"Failed to process video async: {str(e)}")
- return None
- @async_performance_monitor
- async def _process_image_async(self, file_path: str, prompt_type: str,
- scenario: Optional[str] = None, context_info: Optional[str] = None) -> Optional[str]:
- """异步处理图片文件"""
- try:
- # 在线程池中执行文件IO操作
- loop = asyncio.get_event_loop()
- with ThreadPoolExecutor() as pool:
- base64_image = await loop.run_in_executor(
- pool, self._encode_image, file_path
- )
-
- prompt = self.config_manager.get_prompt("image", prompt_type, scenario)
-
- # 构建API请求
- payload = {
- "model": self.model,
- "messages": [{
- "role": "system",
- "content": [
- {
- "type": "image_url",
- "image_url": {
- "url": f"data:image/jpeg;base64,{base64_image}"
- }
- },
- {
- "type": "text",
- "text": prompt
- }
- ]
- },
- {
- "role": "user",
- "content": f"上下文信息:{context_info}"
- }
- ]
- }
-
- # 发送API请求
- response = await self._make_api_request("chat/completions", payload)
- return response["choices"][0]["message"]["content"]
-
- except Exception as e:
- logger.error(f"Failed to process image async: {str(e)}")
- return None
- @async_performance_monitor
- async def _process_text_async(self, user_prompt: str, system_prompt: str,
- max_length: Optional[int] = None,
- context_info: Optional[str] = None) -> Optional[str]:
- """异步处理文本内容"""
- # try:
- if not user_prompt.strip():
- logger.error("Empty text provided")
- return None
-
- # 构建API请求
- payload = {
- "model": self.model,
- "messages": [
- {
- "role": "system",
- "content": system_prompt
- },
- {
- "role": "user",
- "content": user_prompt
- },
- {
- "role": "user",
- "content": f"上下文信息:{context_info}"
- }
- ],
- "max_tokens": max_length if max_length else None
- }
-
- # 发送API请求
- response = await self._make_api_request("chat/completions", payload)
- return response["choices"][0]["message"]["content"]
-
- # except Exception as e:
- # logger.error(f"Failed to process text async: {str(e)}")
- # return None
- async def generate_batch_captions_async(self,
- files: Dict[str, Dict[str, Union[str, int]]],
- scenario: Optional[str] = None,
- max_concurrent: int = 5) -> Dict[str, Optional[str]]:
- """
- 异步批量生成媒体描述
-
- Args:
- files: 文件配置字典
- scenario: 场景类型
- max_concurrent: 最大并发数
-
- Returns:
- Dict[str, Optional[str]]: 文件路径或标识符到描述的映射
- """
- results = {}
- # 创建信号量控制并发
- semaphore = asyncio.Semaphore(max_concurrent)
-
- async def process_single_file(file_path: str, config: Dict[str, Any]) -> tuple[str, Optional[str]]:
- """处理单个文件的异步函数"""
- async with semaphore: # 使用信号量控制并发
- try:
- media_type = config["type"]
- prompt_type = config.get("prompt_type", "caption" if media_type != "text" else "summary")
-
- if media_type == "video":
- fps = config.get("fps", 2)
- result = await self._process_video_async(
- file_path=file_path,
- prompt_type=prompt_type,
- scenario=scenario,
- fps=fps,
- context_info=config.get("context_info")
- )
- elif media_type == "image":
- result = await self._process_image_async(
- file_path=file_path,
- prompt_type=prompt_type,
- scenario=scenario,
- context_info=config.get("context_info")
- )
- elif media_type == "text":
- if "content" not in config:
- logger.error(f"Text content not provided for {file_path}")
- return file_path, None
-
- result = await self._process_text_async(
- text=config["content"],
- prompt_type=prompt_type,
- scenario=scenario,
- max_length=config.get("max_length"),
- context_info=config.get("context_info")
- )
- else:
- logger.warning(f"Unsupported media type: {media_type}")
- return file_path, None
-
- return file_path, result
-
- except Exception as e:
- logger.error(f"Failed to process file {file_path}: {str(e)}")
- return file_path, None
-
- # 创建所有任务
- tasks = [
- process_single_file(file_path, config)
- for file_path, config in files.items()
- ]
-
- # 并行执行所有任务
- completed_tasks = await asyncio.gather(*tasks)
-
- # 整理结果
- results = dict(completed_tasks)
-
- return results
- def generate_batch_captions(self,
- files: Dict[str, Dict[str, Union[str, int]]],
- scenario: Optional[str] = None) -> Dict[str, Optional[str]]:
- """
- 批量生成媒体描述的同步包装器
-
- Args:
- files: 文件配置字典,格式为:
- {
- "file_path": {
- "type": "video"|"image"|"text",
- "prompt_type": str, # 可选
- "fps": int, # 仅视频可用
- "content": str, # 仅文本类型需要
- "max_length": int # 可选,仅文本类型可用
- }
- }
- scenario: 场景类型
-
- Returns:
- Dict[str, Optional[str]]: 文件路径或标识符到描述的映射
- """
- try:
- loop = asyncio.get_event_loop()
- except RuntimeError:
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- # 运行异步方法
- return loop.run_until_complete(
- self.generate_batch_captions_async(files, scenario)
- )
- media_captioner: MediaCaptioner = MediaCaptioner()
- if __name__ == "__main__":
- async def main():
- # 初始化
- captioner = MediaCaptioner()
- # 处理文本
- text_content = """
- 近日,研究人员在深海发现了一种新的海洋生物物种。
- 这种生物具有独特的生物发光能力,可以在完全黑暗的环境中发出蓝绿色的光。
- 科学家们认为,这一发现对于了解深海生态系统具有重要意义。
- """
-
- # 批量处理示例
- files = {
- "./test_data/sample_video.mp4": {
- "type": "video",
- "prompt_type": "caption",
- "fps": 2
- },
- "./test_data/sample_image.jpg": {
- "type": "image",
- "prompt_type": "caption"
- },
- "text_sample": {
- "type": "text",
- "content": text_content,
- "prompt_type": "summary",
- "max_length": 200
- }
- }
-
- # 异步批量处理
- results = await captioner.generate_batch_captions_async(
- files,
- scenario="academic",
- max_concurrent=5
- )
-
- print("批量处理结果:", results)
- # 运行异步主函数
- asyncio.run(main())
|