123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181 |
- import base64
- import os
- import io
- from volcenginesdkarkruntime import Ark
- from PIL import Image
- from utils.logger_config import setup_logger
- from utils.llm_outparser import extract_json
- from utils.common import read_json_file
- from config.image_qa import prompt, double_prompt, show_prompt
- import json
- from tqdm import tqdm
- logger = setup_logger(__name__)
- client = Ark(
- base_url="https://ark.cn-beijing.volces.com/api/v3",
- api_key="817dff39-5586-4f9b-acba-55004167c0b1",
- )
- # def encode_image(image_path):
- # with open(image_path, "rb") as image_file:
- # return base64.b64encode(image_file.read()).decode('utf-8')
- def encode_image(image_path, crop_margin=200):
- """裁切图像四周边缘像素并返回中间部分的 base64 编码数据"""
- with Image.open(image_path) as img:
- # 获取图像的宽度和高度
- width, height = img.size
-
- # 计算裁切后的区域
- left = crop_margin
- upper = 0
- right = width - crop_margin
- lower = height
-
- # 裁切图像
- cropped_img = img.crop((left, upper, right, lower))
-
- # 将裁切后的图像转换为 base64 编码
- buffered = io.BytesIO()
- cropped_img.save(buffered, format="JPEG") # 可以根据需要选择格式
- return base64.b64encode(buffered.getvalue()).decode('utf-8')
- def analyze_single_image_content(image_path):
- base64_image = encode_image(image_path)
- response = client.chat.completions.create(
- model="doubao-1-5-vision-pro-32k-250115",
- temperature=1,
- max_tokens=200,
- messages=[
- {
- "role": "user",
- "content": [
- {
- "type": "text",
- "text": prompt(),
- },
- {
- "type": "image_url",
- "image_url": {
- "url": f"data:image/jpg;base64,{base64_image}"
- },
- },
- ],
- }
- ],
- )
-
- return response.choices[0].message.content
- def detect_show(image_path):
- base64_image = encode_image(image_path)
- response = client.chat.completions.create(
- model="doubao-1-5-vision-pro-32k-250115",
- temperature=1,
- max_tokens=200,
- messages=[
- {
- "role": "user",
- "content": [
- {
- "type": "text",
- "text": show_prompt(),
- },
- {
- "type": "image_url",
- "image_url": {
- "url": f"data:image/jpg;base64,{base64_image}"
- },
- },
- ],
- }
- ],
- )
-
- return response.choices[0].message.content
- def analyze_double_image_content(image_path_1, image_path_2):
- base64_image_1 = encode_image(image_path_1)
- base64_image_2 = encode_image(image_path_2)
- response = client.chat.completions.create(
- model="doubao-1-5-vision-pro-32k-250115",
- temperature=1,
- max_tokens=200,
- messages=[
- {
- "role": "user",
- "content": [
- {
- "type": "text",
- "text": double_prompt(),
- },
- {
- "type": "image_url",
- "image_url": {
- "url": f"data:image/jpg;base64,{base64_image_1}"
- },
- },
- {
- "type": "image_url",
- "image_url": {
- "url": f"data:image/jpg;base64,{base64_image_2}"
- },
- },
- ],
- }
- ],
- )
-
- return response.choices[0].message.content
- def image_caption_doubao(image_list):
- # 执行图像理解
- logger.info(f"fisrt_cut: 执行单帧图像理解")
- for image in tqdm(image_list):
- clip_name = os.path.splitext(os.path.basename(image))[0]
- response = analyze_single_image_content(image)
- response_json = json.loads(extract_json(str(response)))
- response_json["视频片段编号"] = clip_name
- response_json = str(response_json).replace("'",'"')
- with open(f'./data/img_caption/{clip_name}.json', 'w', encoding='utf-8') as f:
- f.write(response_json)
- def show_detect_doubao(image_list):
- # 执行图像理解
- logger.info(f"show_cut: 执行单帧图像理解")
- for image in tqdm(image_list):
- clip_name = os.path.splitext(os.path.basename(image))[0]
- response = detect_show(image)
- response_json = json.loads(extract_json(str(response)))
- response_json["视频片段编号"] = clip_name
- response_json = str(response_json).replace("'",'"')
- with open(f'./data/img_caption/for_show/{clip_name}.json', 'w', encoding='utf-8') as f:
- f.write(response_json)
- def image_compare_doubao(image_list):
- logger.info(f"first_cut: 执行两帧对比理解")
- for i in tqdm(range(len(image_list) - 1)):
- image1 = image_list[i]
- image2 = image_list[i + 1]
- clip1_name = os.path.splitext(os.path.basename(image1))[0]
- clip2_name = os.path.splitext(os.path.basename(image2))[0]
- clip_name = clip1_name + '-' + clip2_name
- similarity = analyze_double_image_content(image1, image2)
- similarity_json = json.loads(extract_json(str(similarity)))
- similarity_json["对比图像"] = clip_name
- similarity_json = str(similarity_json).replace("'",'"')
- with open(f'./data/img_caption/for_cut/{clip_name}.json', 'w', encoding='utf-8') as f:
- f.write(similarity_json)
- # 使用示例
- if __name__ == "__main__":
- image_path = "/data/data/luosy/project/oral/data/key_frame/frame_00000000.jpg"
- result = analyze_image_content(image_path)
- print(result)
|