import os import re import json from pathlib import Path from moviepy.editor import VideoFileClip from PIL import Image import numpy as np from tqdm import tqdm from .logger_config import setup_logger from .llm_label import text_classifer logger = setup_logger(__name__) def zoom_and_crop_video(input_path, output_path, zoom_factor): """ 放大视频画面并裁剪中心区域,保持原分辨率 :param input_path: 输入视频路径 :param output_path: 输出视频路径 :param zoom_factor: 缩放倍数(需≥1) """ # 加载视频并获取原始尺寸 clip = VideoFileClip(input_path) original_width, original_height = clip.size if zoom_factor < 1: raise ValueError("缩放倍数必须≥1,否则无法裁剪出原始分辨率") # 放大视频 zoomed_clip = clip.resize(zoom_factor) zoomed_width, zoomed_height = zoomed_clip.size # 计算裁剪区域中心坐标 x_center = zoomed_width // 2 y_center = zoomed_height // 2 half_w = original_width // 2 half_h = original_height // 2 # 确定裁剪范围(确保不越界) x1 = max(0, x_center - half_w) y1 = max(0, y_center - half_h) x2 = min(zoomed_width, x_center + half_w) y2 = min(zoomed_height, y_center + half_h) # 裁剪并保存 cropped_clip = zoomed_clip.crop(x1=x1, y1=y1, x2=x2, y2=y2) cropped_clip.write_videofile( output_path, codec='libx264', audio_codec='aac', bitrate='8000k' # 提高码率以保持清晰度[[20]] ) # 释放资源 clip.close() zoomed_clip.close() cropped_clip.close() def find_vidoe2cut(json_path, video_list): videos = video_list["面料"] + video_list["版型"] + video_list["工艺"] print("find_vidoe2cut videos:", videos) with open(json_path, 'r', encoding='utf-8') as file: data = json.load(file) oral_list = data["oral_dict_list"] new_dict_list = [] for oral in oral_list: if oral["clip_name"] in videos: new_dict_list.append(oral) data["oral_dict_list"] = new_dict_list new_json_path = f"output/for_cut/{os.path.splitext(os.path.basename(json_path))[0]}.json" with open(new_json_path, 'w', encoding='utf-8') as file: json.dump(data, file, ensure_ascii=False, indent=4) print("find_vidoe2cut data:", data) def read_video_list(file_path): with open(file_path, 'r', encoding='utf-8') as file: data = json.load(file) return data["脚本"][0] def count_none_in_string(input_string, text): """ 计算字符串中 "none" 的出现次数。 Args: input_string (str): 输入字符串 Returns: int: "none" 的出现次数 """ return input_string.lower().count(text) def filter_oral_data(file_path): """ 从 JSON 文件中读取数据,并过滤出 attribute 中 "none" 出现次数小于 4 的内容,同时保留其他内容。 Args: file_path (str): JSON 文件的路径 Returns: dict: 更新后的 JSON 数据 """ with open(file_path, 'r', encoding='utf-8') as file: data = json.load(file) # 读取 JSON 数据 # 过滤 oral_dict_list data["oral_dict_list"] = [ oral for oral in data["oral_dict_list"] if count_none_in_string(oral["attribute"], "none") < 4 ] logger.info(f"筛选打标语句:{len(data['oral_dict_list'])} 条") with open(file_path, 'w', encoding='utf-8') as file: json.dump(data, file, ensure_ascii=False, indent=4) def label_data(file_path): logger.info(f"语句二次打标") with open(file_path, 'r', encoding='utf-8') as file: data = json.load(file) # 读取 JSON 数据 sentences= data["oral_dict_list"] for sentence in tqdm(sentences): attribute = text_classifer(sentence["text"]) sentence["attribute_add"] = attribute output_path = str(file_path).replace("filter_3", "filter_4") with open(output_path, 'w', encoding='utf-8') as file: json.dump(data, file, ensure_ascii=False, indent=4) def filter_label_data(file_path): """ 从 JSON 文件中读取数据,并过滤出 attribute_add 中 "否" 出现次数为0的内容 Args: file_path (str): JSON 文件的路径 Returns: dict: 更新后的 JSON 数据 """ with open(file_path, 'r', encoding='utf-8') as file: data = json.load(file) # 读取 JSON 数据 # 过滤 oral_dict_list data["oral_dict_list"] = [ oral for oral in data["oral_dict_list"] if count_none_in_string(oral["attribute_add"], "否") == 0 ] # 删除无用字段 oral_dict_list = data["oral_dict_list"] for i, oral in enumerate(oral_dict_list, start=1): oral.pop("spk", None) oral.pop("attribute", None) oral.pop("attribute_add", None) oral["clip_name"] = f"{Path(file_path).stem}_{i:02d}.mp4" logger.info(f"二次筛选打标语句:{len(data['oral_dict_list'])} 条") with open(file_path, 'w', encoding='utf-8') as file: json.dump(data, file, ensure_ascii=False, indent=4) def del_key(json_file): with open(json_file, "r", encoding="utf-8") as f: data = json.load(f) data.pop("text", None) data.pop("timestamp", None) sentences = data["sentence_info"] for sentence in sentences: sentence.pop("timestamp", None) with open(json_file, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False) def str2timestamp(time_text): mid = [int(timestamp.split('_')[-1]) for timestamp in time_text.split('-')] return int((mid[0] + mid[1]) / 2) def get_video_duration(video_path): with VideoFileClip(video_path) as video: duration = int(video.duration) return duration def read_json_file(file_path): try: with open(file_path, 'r', encoding='utf-8') as file: data = json.load(file) return data except FileNotFoundError: print(f"错误: 文件 '{file_path}' 不存在") raise except json.JSONDecodeError as e: print(f"错误: JSON格式不正确 - {e}") raise except Exception as e: print(f"读取JSON文件时发生错误: {e}") raise def get_frame_number(filename): """从文件名中提取帧号数字""" match = re.search(r'frame_(\d+)', filename) if match: return int(match.group(1)) return 0 def read_files_in_order(directory): """按照frame序号顺序读取目录下的文件""" # 获取目录下所有文件 files = os.listdir(directory) # 按照frame号码排序 sorted_files = sorted(files, key=get_frame_number) results = [] for file in sorted_files: file_path = os.path.join(directory, file) if os.path.isfile(file_path): try: data = read_json_file(file_path) results.append((file, data)) except Exception as e: print(f"读取文件 {file} 时出错: {e}") return results def filter_json_files(directory): filtered_results = [] files = read_files_in_order(directory) if "for_show" in directory: sub_folder = "for_show/" else: sub_folder = "" # 遍历目录下的所有文件 for filename, content in tqdm(files): if filename.endswith('.json'): file_path = os.path.join(directory, filename) try: with open(file_path, 'r', encoding='utf-8') as file: data = json.load(file) # 检查条件 if (data.get("是否有人") == "是" and data.get("人物数量") == 1 and data.get("人物位置") == "中间"): image_path = f"/data/data/luosy/project/oral_local/data/key_frame/{sub_folder}" + filename.split('.')[0] + '.jpg' filtered_results.append(image_path) except (FileNotFoundError, json.JSONDecodeError) as e: print(f"读取文件 {filename} 时出错: {e}") return filtered_results def find_arithmetic_sequences(input_list): """ 查找输入列表中的等差数列,返回至少包含4个元素的等差数列的索引。 Args: input_list (list): 输入列表 Returns: list: 包含等差数列的起始和结束索引的元组列表 """ sequences = [] n = len(input_list) if n < 4: return sequences # 如果列表长度小于4,直接返回空列表 for i in range(n - 3): # 遍历到倒数第4个元素 for j in range(i + 1, n - 2): # 遍历后续元素 diff = input_list[j] - input_list[i] # 计算公差 count = 2 # 已经找到两个元素 last_index = j # 记录最后一个符合条件的索引 # 检查后续元素是否符合等差数列 for k in range(j + 1, n): if input_list[k] - input_list[k - 1] == diff: count += 1 last_index = k # 更新最后一个符合条件的索引 else: break # 一旦不符合,退出内层循环 if count >= 4: # 如果找到的元素数量达到4个,记录索引 sequences.append((i, last_index)) return sequences def find_longest_arithmetic_sequences(input_list): """ 查找输入列表中的最长等差数列,返回至少包含3个元素的等差数列的索引。 Args: input_list (list): 输入列表 Returns: list: 包含最长等差数列的起始和结束索引的元组列表 """ n = len(input_list) if n < 3: return [] # 如果列表长度小于4,直接返回空列表 longest_sequences = [] max_length = 0 for i in range(n - 3): # 遍历到倒数第4个元素 for j in range(i + 1, n - 2): # 遍历后续元素 diff = input_list[j] - input_list[i] # 计算公差 count = 2 # 已经找到两个元素 last_index = j # 记录最后一个符合条件的索引 # 检查后续元素是否符合等差数列 for k in range(j + 1, n): if input_list[k] - input_list[k - 1] == diff: count += 1 last_index = k # 更新最后一个符合条件的索引 else: break # 一旦不符合,退出内层循环 # 只在找到的元素数量达到4个时记录索引 if count >= 4: if count > max_length: # 找到更长的序列 max_length = count longest_sequences = [(i, last_index)] elif count == max_length: # 如果长度相同,添加到结果中 longest_sequences.append((i, last_index)) return longest_sequences def extract_numbers(input_list): """ 从输入列表中提取每个元素的数字部分,返回一个新列表。 Args: input_list (list): 输入列表,包含各种类型的元素 Returns: list: 提取出的数字列表 """ numbers = [] for item in input_list: # 将元素转换为字符串并使用正则表达式提取数字 found_numbers = re.findall(r'\d+', str(item)) # 将找到的数字转换为整数并添加到结果列表中 numbers.extend(int(num) for num in found_numbers) return numbers def filter_json_files_for_show(directory): filtered_results = [] files = read_files_in_order(directory) if "for_show" in directory: sub_folder = "for_show/" else: sub_folder = "" # 遍历目录下的所有文件 for filename, content in tqdm(files): if filename.endswith('.json'): file_path = os.path.join(directory, filename) try: with open(file_path, 'r', encoding='utf-8') as file: data = json.load(file) # 检查条件 if (data.get("是否有人") == "是" and data.get("人物数量") == 1 and data.get("人物位置") == "中间" and data.get("手拿衣服") == "否" and data.get("手拿平板") == "否" and data.get("卡码拍大") == "否"): image_path = f"/data/data/luosy/project/oral_local/data/key_frame/{sub_folder}" + filename.split('.')[0] + '.jpg' filtered_results.append(image_path) except (FileNotFoundError, json.JSONDecodeError) as e: print(f"读取文件 {filename} 时出错: {e}") for_filter_list = extract_numbers(filtered_results) print(f"过滤列表:{for_filter_list}") filter_index = find_longest_arithmetic_sequences(for_filter_list) logger.info(f"筛选最长等差列表:{filter_index}") return filtered_results[filter_index[0][0]:filter_index[0][1]+1] def cut_timestamp(directory): results = [] files = read_files_in_order(directory) # 遍历目录下的所有文件 for filename, content in files: if filename.endswith('.json'): file_path = os.path.join(directory, filename) try: with open(file_path, 'r', encoding='utf-8') as file: data = json.load(file) # 检查条件 if (data.get("同一个人") == "是" and data.get("穿同套衣服") == "否"): cut_timestamp = str2timestamp(data["对比图像"]) results.append(cut_timestamp) except (FileNotFoundError, json.JSONDecodeError) as e: print(f"读取文件 {filename} 时出错: {e}") logger.info(f"裁切位置:{results}") return results def convert_timestamp(timestamps, total_duration): cut_ranges = [] # 添加从 0 到第一个时间戳的范围 if timestamps: cut_ranges.append((0, timestamps[0])) # 添加相邻时间戳之间的范围 for i in range(len(timestamps) - 1): start_time = timestamps[i] end_time = timestamps[i + 1] cut_ranges.append((start_time, end_time)) # 添加最后一个时间戳到视频总时长的范围 if timestamps: cut_ranges.append((timestamps[-1], total_duration)) return cut_ranges def calculate_depth_mean(depth_image_path): """ 计算单通道深度图的深度均值 Args: depth_image_path (str): 深度图像的路径 Returns: float: 深度均值 """ # 读取深度图像 depth_image = Image.open(depth_image_path) # 将图像转换为 NumPy 数组 depth_array = np.array(depth_image) logger.info(f"深度图尺寸:{depth_image.shape}") # 计算深度均值,忽略无效值(如 0) depth_mean = np.mean(depth_array[depth_array > 0]) # 只计算大于0的值 return depth_mean def calculate_depth_mean_pil(depth_image): """ 计算单通道深度图的深度均值 Args: depth_image_path (str): 深度图像的路径 Returns: float: 深度均值 """ # 读取深度图像 # depth_image = Image.open(depth_image_path) # 将图像转换为 NumPy 数组 depth_array = np.array(depth_image)[:, 200:-200] logger.info(f"深度图均值计算范围:{depth_array.shape}") # 计算深度均值,忽略无效值(如 0) depth_mean = np.mean(depth_array[depth_array > 0]) # 只计算大于0的值 return depth_mean # def find_show_cut(data): # """ # 找出列表中最长的先递增后递减或先递减后递增的元素索引 # Args: # data (list): 输入数据列表 # Returns: # tuple: 包含最长序列的起始和结束索引 # """ # if not data: # return None # longest_start = longest_end = -1 # longest_length = 0 # n = len(data) # # 先递增后递减 # for i in range(1, n): # if data[i] >= data[i - 1]: # 递增或相等 # start = i - 1 # while i < n and data[i] >= data[i - 1]: # 找到递增序列 # i += 1 # # 现在 i 是递增序列的结束位置 # while i < n and data[i] <= data[i - 1]: # 找到递减序列 # i += 1 # end = i - 1 # 递减序列的结束位置 # # 更新最长序列 # if (end - start + 1) > longest_length: # longest_length = end - start + 1 # longest_start = start # longest_end = end # # 先递减后递增 # for i in range(1, n): # if data[i] <= data[i - 1]: # 递减或相等 # start = i - 1 # while i < n and data[i] <= data[i - 1]: # 找到递减序列 # i += 1 # # 现在 i 是递减序列的结束位置 # while i < n and data[i] >= data[i - 1]: # 找到递增序列 # i += 1 # end = i - 1 # 递增序列的结束位置 # # 更新最长序列 # if (end - start + 1) > longest_length: # longest_length = end - start + 1 # longest_start = start # longest_end = end # if longest_length > 0: # return (longest_start, longest_end) # else: # return None def find_show_cut(data, n=40): """ 找出列表中最长的先递增后递减或先递减后递增的元素索引,并且要求找出的元素最大值与最小值的差值要大于n。 Args: data (list): 输入数据列表 n (int): 最大值与最小值的差值阈值 Returns: tuple: 包含最长序列的起始和结束索引 """ if not data: return None longest_start = longest_end = -1 longest_length = 0 def check_difference(start, end): """检查最大值与最小值的差值是否大于n""" subarray = data[start:end + 1] return max(subarray) - min(subarray) > n # 先递增后递减 for i in range(1, len(data)): if data[i] >= data[i - 1]: # 递增或相等 start = i - 1 while i < len(data) and data[i] >= data[i - 1]: # 找到递增序列 i += 1 # 现在 i 是递增序列的结束位置 while i < len(data) and data[i] <= data[i - 1]: # 找到递减序列 i += 1 end = i - 1 # 递减序列的结束位置 # 更新最长序列 if (end - start + 1) > longest_length and check_difference(start, end): longest_length = end - start + 1 longest_start = start longest_end = end # 先递减后递增 for i in range(1, len(data)): if data[i] <= data[i - 1]: # 递减或相等 start = i - 1 while i < len(data) and data[i] <= data[i - 1]: # 找到递减序列 i += 1 # 现在 i 是递减序列的结束位置 while i < len(data) and data[i] >= data[i - 1]: # 找到递增序列 i += 1 end = i - 1 # 递增序列的结束位置 # 更新最长序列 if (end - start + 1) > longest_length and check_difference(start, end): longest_length = end - start + 1 longest_start = start longest_end = end if longest_length > 0: return (longest_start, longest_end) else: return None # 使用示例 if __name__ == "__main__": json_directory = "/data/data/luosy/project/oral/data/img_caption" # 替换为你的 JSON 文件目录 results = filter_json_files(json_directory) # 打印符合条件的结果 for result in results: print(result)