import base64 import os import io from volcenginesdkarkruntime import Ark from PIL import Image from utils.logger_config import setup_logger from utils.llm_outparser import extract_json from utils.common import read_json_file from config.image_qa import prompt, double_prompt, show_prompt import json from tqdm import tqdm logger = setup_logger(__name__) client = Ark( base_url="https://ark.cn-beijing.volces.com/api/v3", api_key="817dff39-5586-4f9b-acba-55004167c0b1", ) # def encode_image(image_path): # with open(image_path, "rb") as image_file: # return base64.b64encode(image_file.read()).decode('utf-8') def encode_image(image_path, crop_margin=200): """裁切图像四周边缘像素并返回中间部分的 base64 编码数据""" with Image.open(image_path) as img: # 获取图像的宽度和高度 width, height = img.size # 计算裁切后的区域 left = crop_margin upper = 0 right = width - crop_margin lower = height # 裁切图像 cropped_img = img.crop((left, upper, right, lower)) # 将裁切后的图像转换为 base64 编码 buffered = io.BytesIO() cropped_img.save(buffered, format="JPEG") # 可以根据需要选择格式 return base64.b64encode(buffered.getvalue()).decode('utf-8') def analyze_single_image_content(image_path): base64_image = encode_image(image_path) response = client.chat.completions.create( model="doubao-1-5-vision-pro-32k-250115", temperature=1, max_tokens=200, messages=[ { "role": "user", "content": [ { "type": "text", "text": prompt(), }, { "type": "image_url", "image_url": { "url": f"data:image/jpg;base64,{base64_image}" }, }, ], } ], ) return response.choices[0].message.content def detect_show(image_path): base64_image = encode_image(image_path) response = client.chat.completions.create( model="doubao-1-5-vision-pro-32k-250115", temperature=1, max_tokens=200, messages=[ { "role": "user", "content": [ { "type": "text", "text": show_prompt(), }, { "type": "image_url", "image_url": { "url": f"data:image/jpg;base64,{base64_image}" }, }, ], } ], ) return response.choices[0].message.content def analyze_double_image_content(image_path_1, image_path_2): base64_image_1 = encode_image(image_path_1) base64_image_2 = encode_image(image_path_2) response = client.chat.completions.create( model="doubao-1-5-vision-pro-32k-250115", temperature=1, max_tokens=200, messages=[ { "role": "user", "content": [ { "type": "text", "text": double_prompt(), }, { "type": "image_url", "image_url": { "url": f"data:image/jpg;base64,{base64_image_1}" }, }, { "type": "image_url", "image_url": { "url": f"data:image/jpg;base64,{base64_image_2}" }, }, ], } ], ) return response.choices[0].message.content def image_caption_doubao(image_list): # 执行图像理解 logger.info(f"fisrt_cut: 执行单帧图像理解") for image in tqdm(image_list): clip_name = os.path.splitext(os.path.basename(image))[0] response = analyze_single_image_content(image) response_json = json.loads(extract_json(str(response))) response_json["视频片段编号"] = clip_name response_json = str(response_json).replace("'",'"') with open(f'./data/img_caption/{clip_name}.json', 'w', encoding='utf-8') as f: f.write(response_json) def show_detect_doubao(image_list): # 执行图像理解 logger.info(f"show_cut: 执行单帧图像理解") for image in tqdm(image_list): clip_name = os.path.splitext(os.path.basename(image))[0] response = detect_show(image) response_json = json.loads(extract_json(str(response))) response_json["视频片段编号"] = clip_name response_json = str(response_json).replace("'",'"') with open(f'./data/img_caption/for_show/{clip_name}.json', 'w', encoding='utf-8') as f: f.write(response_json) def image_compare_doubao(image_list): logger.info(f"first_cut: 执行两帧对比理解") for i in tqdm(range(len(image_list) - 1)): image1 = image_list[i] image2 = image_list[i + 1] clip1_name = os.path.splitext(os.path.basename(image1))[0] clip2_name = os.path.splitext(os.path.basename(image2))[0] clip_name = clip1_name + '-' + clip2_name similarity = analyze_double_image_content(image1, image2) similarity_json = json.loads(extract_json(str(similarity))) similarity_json["对比图像"] = clip_name similarity_json = str(similarity_json).replace("'",'"') with open(f'./data/img_caption/for_cut/{clip_name}.json', 'w', encoding='utf-8') as f: f.write(similarity_json) # 使用示例 if __name__ == "__main__": image_path = "/data/data/luosy/project/oral/data/key_frame/frame_00000000.jpg" result = analyze_image_content(image_path) print(result)