123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620 |
- import os
- import re
- import json
- from pathlib import Path
- from moviepy.editor import VideoFileClip
- from PIL import Image
- import numpy as np
- from tqdm import tqdm
- from .logger_config import setup_logger
- from .llm_label import text_classifer
- logger = setup_logger(__name__)
- def zoom_and_crop_video(input_path, output_path, zoom_factor):
- """
- 放大视频画面并裁剪中心区域,保持原分辨率
- :param input_path: 输入视频路径
- :param output_path: 输出视频路径
- :param zoom_factor: 缩放倍数(需≥1)
- """
- # 加载视频并获取原始尺寸
- clip = VideoFileClip(input_path)
- original_width, original_height = clip.size
-
- if zoom_factor < 1:
- raise ValueError("缩放倍数必须≥1,否则无法裁剪出原始分辨率")
-
- # 放大视频
- zoomed_clip = clip.resize(zoom_factor)
- zoomed_width, zoomed_height = zoomed_clip.size
-
- # 计算裁剪区域中心坐标
- x_center = zoomed_width // 2
- y_center = zoomed_height // 2
- half_w = original_width // 2
- half_h = original_height // 2
-
- # 确定裁剪范围(确保不越界)
- x1 = max(0, x_center - half_w)
- y1 = max(0, y_center - half_h)
- x2 = min(zoomed_width, x_center + half_w)
- y2 = min(zoomed_height, y_center + half_h)
-
- # 裁剪并保存
- cropped_clip = zoomed_clip.crop(x1=x1, y1=y1, x2=x2, y2=y2)
- cropped_clip.write_videofile(
- output_path,
- codec='libx264',
- audio_codec='aac',
- bitrate='8000k' # 提高码率以保持清晰度[[20]]
- )
-
- # 释放资源
- clip.close()
- zoomed_clip.close()
- cropped_clip.close()
- def find_vidoe2cut(json_path, video_list):
- videos = video_list["面料"] + video_list["版型"] + video_list["工艺"]
- print("find_vidoe2cut videos:", videos)
- with open(json_path, 'r', encoding='utf-8') as file:
- data = json.load(file)
- oral_list = data["oral_dict_list"]
-
- new_dict_list = []
- for oral in oral_list:
- if oral["clip_name"] in videos:
- new_dict_list.append(oral)
- data["oral_dict_list"] = new_dict_list
- new_json_path = f"output/for_cut/{os.path.splitext(os.path.basename(json_path))[0]}.json"
- with open(new_json_path, 'w', encoding='utf-8') as file:
- json.dump(data, file, ensure_ascii=False, indent=4)
- print("find_vidoe2cut data:", data)
- def read_video_list(file_path):
- with open(file_path, 'r', encoding='utf-8') as file:
- data = json.load(file)
- return data["脚本"][0]
- def count_none_in_string(input_string, text):
- """
- 计算字符串中 "none" 的出现次数。
- Args:
- input_string (str): 输入字符串
- Returns:
- int: "none" 的出现次数
- """
- return input_string.lower().count(text)
- def filter_oral_data(file_path):
- """
- 从 JSON 文件中读取数据,并过滤出 attribute 中 "none" 出现次数小于 4 的内容,同时保留其他内容。
- Args:
- file_path (str): JSON 文件的路径
- Returns:
- dict: 更新后的 JSON 数据
- """
- with open(file_path, 'r', encoding='utf-8') as file:
- data = json.load(file) # 读取 JSON 数据
- # 过滤 oral_dict_list
- data["oral_dict_list"] = [
- oral for oral in data["oral_dict_list"]
- if count_none_in_string(oral["attribute"], "none") < 4
- ]
- logger.info(f"筛选打标语句:{len(data['oral_dict_list'])} 条")
- with open(file_path, 'w', encoding='utf-8') as file:
- json.dump(data, file, ensure_ascii=False, indent=4)
- def label_data(file_path):
- logger.info(f"语句二次打标")
- with open(file_path, 'r', encoding='utf-8') as file:
- data = json.load(file) # 读取 JSON 数据
- sentences= data["oral_dict_list"]
- for sentence in tqdm(sentences):
- attribute = text_classifer(sentence["text"])
- sentence["attribute_add"] = attribute
- output_path = str(file_path).replace("filter_3", "filter_4")
- with open(output_path, 'w', encoding='utf-8') as file:
- json.dump(data, file, ensure_ascii=False, indent=4)
- def filter_label_data(file_path):
- """
- 从 JSON 文件中读取数据,并过滤出 attribute_add 中 "否" 出现次数为0的内容
- Args:
- file_path (str): JSON 文件的路径
- Returns:
- dict: 更新后的 JSON 数据
- """
- with open(file_path, 'r', encoding='utf-8') as file:
- data = json.load(file) # 读取 JSON 数据
- # 过滤 oral_dict_list
- data["oral_dict_list"] = [
- oral for oral in data["oral_dict_list"]
- if count_none_in_string(oral["attribute_add"], "否") == 0
- ]
- # 删除无用字段
- oral_dict_list = data["oral_dict_list"]
- for i, oral in enumerate(oral_dict_list, start=1):
- oral.pop("spk", None)
- oral.pop("attribute", None)
- oral.pop("attribute_add", None)
- oral["clip_name"] = f"{Path(file_path).stem}_{i:02d}.mp4"
- logger.info(f"二次筛选打标语句:{len(data['oral_dict_list'])} 条")
- with open(file_path, 'w', encoding='utf-8') as file:
- json.dump(data, file, ensure_ascii=False, indent=4)
- def del_key(json_file):
- with open(json_file, "r", encoding="utf-8") as f:
- data = json.load(f)
- data.pop("text", None)
- data.pop("timestamp", None)
- sentences = data["sentence_info"]
- for sentence in sentences:
- sentence.pop("timestamp", None)
- with open(json_file, "w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False)
- def str2timestamp(time_text):
- mid = [int(timestamp.split('_')[-1]) for timestamp in time_text.split('-')]
- return int((mid[0] + mid[1]) / 2)
- def get_video_duration(video_path):
- with VideoFileClip(video_path) as video:
- duration = int(video.duration)
- return duration
- def read_json_file(file_path):
- try:
- with open(file_path, 'r', encoding='utf-8') as file:
- data = json.load(file)
- return data
- except FileNotFoundError:
- print(f"错误: 文件 '{file_path}' 不存在")
- raise
- except json.JSONDecodeError as e:
- print(f"错误: JSON格式不正确 - {e}")
- raise
- except Exception as e:
- print(f"读取JSON文件时发生错误: {e}")
- raise
- def get_frame_number(filename):
- """从文件名中提取帧号数字"""
- match = re.search(r'frame_(\d+)', filename)
- if match:
- return int(match.group(1))
- return 0
- def read_files_in_order(directory):
- """按照frame序号顺序读取目录下的文件"""
- # 获取目录下所有文件
- files = os.listdir(directory)
- # 按照frame号码排序
- sorted_files = sorted(files, key=get_frame_number)
-
- results = []
- for file in sorted_files:
- file_path = os.path.join(directory, file)
- if os.path.isfile(file_path):
- try:
- data = read_json_file(file_path)
- results.append((file, data))
- except Exception as e:
- print(f"读取文件 {file} 时出错: {e}")
-
- return results
- def filter_json_files(directory):
- filtered_results = []
- files = read_files_in_order(directory)
- if "for_show" in directory:
- sub_folder = "for_show/"
- else:
- sub_folder = ""
- # 遍历目录下的所有文件
- for filename, content in tqdm(files):
- if filename.endswith('.json'):
- file_path = os.path.join(directory, filename)
- try:
- with open(file_path, 'r', encoding='utf-8') as file:
- data = json.load(file)
- # 检查条件
- if (data.get("是否有人") == "是" and
- data.get("人物数量") == 1 and
- data.get("人物位置") == "中间"):
- image_path = f"/data/data/luosy/project/oral_local/data/key_frame/{sub_folder}" + filename.split('.')[0] + '.jpg'
- filtered_results.append(image_path)
- except (FileNotFoundError, json.JSONDecodeError) as e:
- print(f"读取文件 {filename} 时出错: {e}")
- return filtered_results
- def find_arithmetic_sequences(input_list):
- """
- 查找输入列表中的等差数列,返回至少包含4个元素的等差数列的索引。
- Args:
- input_list (list): 输入列表
- Returns:
- list: 包含等差数列的起始和结束索引的元组列表
- """
- sequences = []
- n = len(input_list)
- if n < 4:
- return sequences # 如果列表长度小于4,直接返回空列表
- for i in range(n - 3): # 遍历到倒数第4个元素
- for j in range(i + 1, n - 2): # 遍历后续元素
- diff = input_list[j] - input_list[i] # 计算公差
- count = 2 # 已经找到两个元素
- last_index = j # 记录最后一个符合条件的索引
- # 检查后续元素是否符合等差数列
- for k in range(j + 1, n):
- if input_list[k] - input_list[k - 1] == diff:
- count += 1
- last_index = k # 更新最后一个符合条件的索引
- else:
- break # 一旦不符合,退出内层循环
- if count >= 4: # 如果找到的元素数量达到4个,记录索引
- sequences.append((i, last_index))
- return sequences
- def find_longest_arithmetic_sequences(input_list):
- """
- 查找输入列表中的最长等差数列,返回至少包含3个元素的等差数列的索引。
- Args:
- input_list (list): 输入列表
- Returns:
- list: 包含最长等差数列的起始和结束索引的元组列表
- """
- n = len(input_list)
- if n < 3:
- return [] # 如果列表长度小于4,直接返回空列表
- longest_sequences = []
- max_length = 0
- for i in range(n - 3): # 遍历到倒数第4个元素
- for j in range(i + 1, n - 2): # 遍历后续元素
- diff = input_list[j] - input_list[i] # 计算公差
- count = 2 # 已经找到两个元素
- last_index = j # 记录最后一个符合条件的索引
- # 检查后续元素是否符合等差数列
- for k in range(j + 1, n):
- if input_list[k] - input_list[k - 1] == diff:
- count += 1
- last_index = k # 更新最后一个符合条件的索引
- else:
- break # 一旦不符合,退出内层循环
- # 只在找到的元素数量达到4个时记录索引
- if count >= 4:
- if count > max_length: # 找到更长的序列
- max_length = count
- longest_sequences = [(i, last_index)]
- elif count == max_length: # 如果长度相同,添加到结果中
- longest_sequences.append((i, last_index))
- return longest_sequences
- def extract_numbers(input_list):
- """
- 从输入列表中提取每个元素的数字部分,返回一个新列表。
- Args:
- input_list (list): 输入列表,包含各种类型的元素
- Returns:
- list: 提取出的数字列表
- """
- numbers = []
- for item in input_list:
- # 将元素转换为字符串并使用正则表达式提取数字
- found_numbers = re.findall(r'\d+', str(item))
- # 将找到的数字转换为整数并添加到结果列表中
- numbers.extend(int(num) for num in found_numbers)
-
- return numbers
- def filter_json_files_for_show(directory):
- filtered_results = []
- files = read_files_in_order(directory)
- if "for_show" in directory:
- sub_folder = "for_show/"
- else:
- sub_folder = ""
- # 遍历目录下的所有文件
- for filename, content in tqdm(files):
- if filename.endswith('.json'):
- file_path = os.path.join(directory, filename)
- try:
- with open(file_path, 'r', encoding='utf-8') as file:
- data = json.load(file)
- # 检查条件
- if (data.get("是否有人") == "是" and
- data.get("人物数量") == 1 and
- data.get("人物位置") == "中间" and
- data.get("手拿衣服") == "否" and
- data.get("手拿平板") == "否" and
- data.get("卡码拍大") == "否"):
- image_path = f"/data/data/luosy/project/oral_local/data/key_frame/{sub_folder}" + filename.split('.')[0] + '.jpg'
- filtered_results.append(image_path)
- except (FileNotFoundError, json.JSONDecodeError) as e:
- print(f"读取文件 {filename} 时出错: {e}")
- for_filter_list = extract_numbers(filtered_results)
- print(f"过滤列表:{for_filter_list}")
- filter_index = find_longest_arithmetic_sequences(for_filter_list)
- logger.info(f"筛选最长等差列表:{filter_index}")
- return filtered_results[filter_index[0][0]:filter_index[0][1]+1]
- def cut_timestamp(directory):
- results = []
- files = read_files_in_order(directory)
- # 遍历目录下的所有文件
- for filename, content in files:
- if filename.endswith('.json'):
- file_path = os.path.join(directory, filename)
- try:
- with open(file_path, 'r', encoding='utf-8') as file:
- data = json.load(file)
- # 检查条件
- if (data.get("同一个人") == "是" and data.get("穿同套衣服") == "否"):
- cut_timestamp = str2timestamp(data["对比图像"])
- results.append(cut_timestamp)
- except (FileNotFoundError, json.JSONDecodeError) as e:
- print(f"读取文件 {filename} 时出错: {e}")
- logger.info(f"裁切位置:{results}")
- return results
- def convert_timestamp(timestamps, total_duration):
- cut_ranges = []
-
- # 添加从 0 到第一个时间戳的范围
- if timestamps:
- cut_ranges.append((0, timestamps[0]))
-
- # 添加相邻时间戳之间的范围
- for i in range(len(timestamps) - 1):
- start_time = timestamps[i]
- end_time = timestamps[i + 1]
- cut_ranges.append((start_time, end_time))
-
- # 添加最后一个时间戳到视频总时长的范围
- if timestamps:
- cut_ranges.append((timestamps[-1], total_duration))
-
- return cut_ranges
- def calculate_depth_mean(depth_image_path):
- """
- 计算单通道深度图的深度均值
- Args:
- depth_image_path (str): 深度图像的路径
- Returns:
- float: 深度均值
- """
- # 读取深度图像
- depth_image = Image.open(depth_image_path)
- # 将图像转换为 NumPy 数组
- depth_array = np.array(depth_image)
- logger.info(f"深度图尺寸:{depth_image.shape}")
- # 计算深度均值,忽略无效值(如 0)
- depth_mean = np.mean(depth_array[depth_array > 0]) # 只计算大于0的值
- return depth_mean
- def calculate_depth_mean_pil(depth_image):
- """
- 计算单通道深度图的深度均值
- Args:
- depth_image_path (str): 深度图像的路径
- Returns:
- float: 深度均值
- """
- # 读取深度图像
- # depth_image = Image.open(depth_image_path)
- # 将图像转换为 NumPy 数组
- depth_array = np.array(depth_image)[:, 200:-200]
- logger.info(f"深度图均值计算范围:{depth_array.shape}")
- # 计算深度均值,忽略无效值(如 0)
- depth_mean = np.mean(depth_array[depth_array > 0]) # 只计算大于0的值
- return depth_mean
- # def find_show_cut(data):
- # """
- # 找出列表中最长的先递增后递减或先递减后递增的元素索引
- # Args:
- # data (list): 输入数据列表
- # Returns:
- # tuple: 包含最长序列的起始和结束索引
- # """
- # if not data:
- # return None
- # longest_start = longest_end = -1
- # longest_length = 0
- # n = len(data)
- # # 先递增后递减
- # for i in range(1, n):
- # if data[i] >= data[i - 1]: # 递增或相等
- # start = i - 1
- # while i < n and data[i] >= data[i - 1]: # 找到递增序列
- # i += 1
- # # 现在 i 是递增序列的结束位置
- # while i < n and data[i] <= data[i - 1]: # 找到递减序列
- # i += 1
- # end = i - 1 # 递减序列的结束位置
- # # 更新最长序列
- # if (end - start + 1) > longest_length:
- # longest_length = end - start + 1
- # longest_start = start
- # longest_end = end
- # # 先递减后递增
- # for i in range(1, n):
- # if data[i] <= data[i - 1]: # 递减或相等
- # start = i - 1
- # while i < n and data[i] <= data[i - 1]: # 找到递减序列
- # i += 1
- # # 现在 i 是递减序列的结束位置
- # while i < n and data[i] >= data[i - 1]: # 找到递增序列
- # i += 1
- # end = i - 1 # 递增序列的结束位置
- # # 更新最长序列
- # if (end - start + 1) > longest_length:
- # longest_length = end - start + 1
- # longest_start = start
- # longest_end = end
- # if longest_length > 0:
- # return (longest_start, longest_end)
- # else:
- # return None
- def find_show_cut(data, n=40):
- """
- 找出列表中最长的先递增后递减或先递减后递增的元素索引,并且要求找出的元素最大值与最小值的差值要大于n。
- Args:
- data (list): 输入数据列表
- n (int): 最大值与最小值的差值阈值
- Returns:
- tuple: 包含最长序列的起始和结束索引
- """
- if not data:
- return None
- longest_start = longest_end = -1
- longest_length = 0
- def check_difference(start, end):
- """检查最大值与最小值的差值是否大于n"""
- subarray = data[start:end + 1]
- return max(subarray) - min(subarray) > n
- # 先递增后递减
- for i in range(1, len(data)):
- if data[i] >= data[i - 1]: # 递增或相等
- start = i - 1
- while i < len(data) and data[i] >= data[i - 1]: # 找到递增序列
- i += 1
- # 现在 i 是递增序列的结束位置
- while i < len(data) and data[i] <= data[i - 1]: # 找到递减序列
- i += 1
- end = i - 1 # 递减序列的结束位置
- # 更新最长序列
- if (end - start + 1) > longest_length and check_difference(start, end):
- longest_length = end - start + 1
- longest_start = start
- longest_end = end
- # 先递减后递增
- for i in range(1, len(data)):
- if data[i] <= data[i - 1]: # 递减或相等
- start = i - 1
- while i < len(data) and data[i] <= data[i - 1]: # 找到递减序列
- i += 1
- # 现在 i 是递减序列的结束位置
- while i < len(data) and data[i] >= data[i - 1]: # 找到递增序列
- i += 1
- end = i - 1 # 递增序列的结束位置
- # 更新最长序列
- if (end - start + 1) > longest_length and check_difference(start, end):
- longest_length = end - start + 1
- longest_start = start
- longest_end = end
- if longest_length > 0:
- return (longest_start, longest_end)
- else:
- return None
-
- # 使用示例
- if __name__ == "__main__":
- json_directory = "/data/data/luosy/project/oral/data/img_caption" # 替换为你的 JSON 文件目录
- results = filter_json_files(json_directory)
-
- # 打印符合条件的结果
- for result in results:
- print(result)
|