# import time # from pathlib import Path # from typing import List, Dict, Union # from PIL import Image # import torch # from transformers import AutoModelForCausalLM # from janus.models import MultiModalityCausalLM, VLChatProcessor # from janus.utils.io import load_pil_images # class JanusVisualAssistant: # """A highly optimized visual assistant for multimodal interactions # Attributes: # model: Pretrained language model # processor: Text and image processor # tokenizer: Text tokenizer # device: Current computation device # dtype: Data type for model parameters # """ # def __init__( # self, # model_path: str = "/data/data/luosy/models/Janus-Pro-7B", # dtype: torch.dtype = torch.bfloat16, # device: str = "cuda" # ): # """Initialize model components with efficient memory management""" # self.dtype = dtype # self.device = device # # Initialize components with memory optimization # self.processor = VLChatProcessor.from_pretrained(model_path) # self.tokenizer = self.processor.tokenizer # self.model = self._load_model(model_path).eval() # def _load_model(self, model_path: str) -> MultiModalityCausalLM: # """Load model with optimized memory allocation""" # return AutoModelForCausalLM.from_pretrained( # model_path, # trust_remote_code=True # ).to(self.dtype).to(self.device) # def create_conversation( # self, # image_path: str, # question: str, # system_prompt: str = "你是一个专业的视频理解助手" # ) -> List[Dict]: # """Build conversation structure with efficient image handling""" # return [ # { # "role": "<|User|>", # "content": f"\n{question}", # "images": [self._preprocess_image(image_path)], # }, # {"role": "<|Assistant|>", "content": system_prompt}, # ] # def _preprocess_image(self, image_path: Union[str, Path]) -> str: # """Validate and standardize image input format""" # # Remove actual resizing to use processor's native handling # # Can add caching mechanism here for frequently used images # return str(image_path) # @torch.inference_mode() # def generate_response( # self, # conversation: List[Dict], # generation_config: Dict = None # ) -> str: # """Optimized generation pipeline with batch processing""" # # Default generation parameters # default_config = { # "max_new_tokens": 512, # "do_sample": False, # "use_cache": True, # "temperature": 0.9, # "pad_token_id": self.tokenizer.eos_token_id, # "bos_token_id": self.tokenizer.bos_token_id, # "eos_token_id": self.tokenizer.eos_token_id, # } # config = {**default_config, **(generation_config or {})} # # Batch processing pipeline # pil_images = load_pil_images(conversation) # inputs = self.processor( # conversations=conversation, # images=pil_images, # force_batchify=True # ).to(self.device) # # Direct memory reuse for embeddings # inputs_embeds = self.model.prepare_inputs_embeds(**inputs) # # Accelerated generation # outputs = self.model.language_model.generate( # inputs_embeds=inputs_embeds, # attention_mask=inputs.attention_mask, # **config # ) # return self.tokenizer.decode(outputs[0].cpu().tolist(), skip_special_tokens=True) # def main(): # # Benchmark and example usage # assistant = JanusVisualAssistant() # start_time = time.time() # conversation = assistant.create_conversation( # image_path="/data/data/luosy/project/oral/data/key_frame/frame_001.jpg", # question="以JSON格式回复,包含字段【人物数量】【人物服装】【人物配饰】" # ) # response = assistant.generate_response(conversation) # print(f"Response: {response}") # print(f"Total time: {time.time() - start_time:.2f}s") # if __name__ == "__main__": # main() import time import asyncio import logging from pathlib import Path from typing import List, Dict, Union from PIL import Image import torch from transformers import AutoModelForCausalLM from janus.models import MultiModalityCausalLM, VLChatProcessor from janus.utils.io import load_pil_images from utils.logger_config import setup_logger # 配置日志系统 logger = setup_logger(__name__) class JanusVisualAssistant: """A highly optimized visual assistant for multimodal interactions""" def __init__( self, model_path: str = "/data/data/luosy/models/Janus-Pro-7B", dtype: torch.dtype = torch.bfloat16, device: str = "cuda" ): """Initialize model components with efficient memory management""" self.dtype = dtype self.device = device self.image_cache = {} # Initialize components with memory optimization self.processor = VLChatProcessor.from_pretrained(model_path) self.tokenizer = self.processor.tokenizer self.model = self._load_model(model_path).eval() def _load_model(self, model_path: str) -> MultiModalityCausalLM: """Load model with optimized memory allocation""" return AutoModelForCausalLM.from_pretrained( model_path, trust_remote_code=True ).to(self.dtype).to(self.device) def _monitor_memory(self): """Monitor GPU memory usage""" allocated = torch.cuda.memory_allocated(self.device) reserved = torch.cuda.memory_reserved(self.device) logger.info(f"Memory allocated: {allocated / 1024 ** 2:.2f} MB") logger.info(f"Memory reserved: {reserved / 1024 ** 2:.2f} MB") def create_conversation( self, image_paths: List[str], questions: List[str], system_prompt: str = "你是一个专业的视频理解助手" ) -> List[Dict]: """Build conversation structure with efficient image handling""" if len(image_paths) != len(questions): raise ValueError("The number of images must match the number of questions.") conversations = [] for image_path, question in zip(image_paths, questions): conversations.append({ "role": "<|User|>", "content": f"\n{question}", "images": [self._preprocess_image(image_path)], }) conversations.append({"role": "<|Assistant|>", "content": system_prompt}) return conversations def _preprocess_image(self, image_path: Union[str, Path]) -> str: """Validate and standardize image input format with caching""" if image_path in self.image_cache: return self.image_cache[image_path] # Load and cache the image try: image = Image.open(image_path).convert('RGB') self.image_cache[image_path] = str(image_path) return str(image_path) except Exception as e: logger.error(f"Error processing image {image_path}: {e}") raise @torch.inference_mode() async def generate_response( self, conversation: List[Dict], generation_config: Dict = None ) -> str: """Optimized generation pipeline with batch processing""" # Monitor memory before processing self._monitor_memory() # Default generation parameters default_config = { "max_new_tokens": 512, "do_sample": False, "use_cache": True, "temperature": 0.9, "pad_token_id": self.tokenizer.eos_token_id, "bos_token_id": self.tokenizer.bos_token_id, "eos_token_id": self.tokenizer.eos_token_id, } config = {**default_config, **(generation_config or {})} # Batch processing pipeline pil_images = load_pil_images(conversation) inputs = self.processor( conversations=conversation, images=pil_images, force_batchify=True ).to(self.device) # Direct memory reuse for embeddings inputs_embeds = self.model.prepare_inputs_embeds(**inputs) # Accelerated generation outputs = self.model.language_model.generate( inputs_embeds=inputs_embeds, attention_mask=inputs.attention_mask, **config ) return self.tokenizer.decode(outputs[0].cpu().tolist(), skip_special_tokens=True) async def main(): # Benchmark and example usage assistant = JanusVisualAssistant() start_time = time.time() conversation = assistant.create_conversation( image_paths=["/data/data/luosy/project/oral/data/key_frame/frame_014.jpg"], questions=["以JSON格式回复,包含字段【人物数量】【人物服装】【人物配饰】"] ) response = await assistant.generate_response(conversation) print(f"Response: {response}") print(f"Total time: {time.time() - start_time:.2f}s") if __name__ == "__main__": asyncio.run(main())