common.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620
  1. import os
  2. import re
  3. import json
  4. from pathlib import Path
  5. from moviepy.editor import VideoFileClip
  6. from PIL import Image
  7. import numpy as np
  8. from tqdm import tqdm
  9. from .logger_config import setup_logger
  10. from .llm_label import text_classifer
  11. logger = setup_logger(__name__)
  12. def zoom_and_crop_video(input_path, output_path, zoom_factor):
  13. """
  14. 放大视频画面并裁剪中心区域,保持原分辨率
  15. :param input_path: 输入视频路径
  16. :param output_path: 输出视频路径
  17. :param zoom_factor: 缩放倍数(需≥1)
  18. """
  19. # 加载视频并获取原始尺寸
  20. clip = VideoFileClip(input_path)
  21. original_width, original_height = clip.size
  22. if zoom_factor < 1:
  23. raise ValueError("缩放倍数必须≥1,否则无法裁剪出原始分辨率")
  24. # 放大视频
  25. zoomed_clip = clip.resize(zoom_factor)
  26. zoomed_width, zoomed_height = zoomed_clip.size
  27. # 计算裁剪区域中心坐标
  28. x_center = zoomed_width // 2
  29. y_center = zoomed_height // 2
  30. half_w = original_width // 2
  31. half_h = original_height // 2
  32. # 确定裁剪范围(确保不越界)
  33. x1 = max(0, x_center - half_w)
  34. y1 = max(0, y_center - half_h)
  35. x2 = min(zoomed_width, x_center + half_w)
  36. y2 = min(zoomed_height, y_center + half_h)
  37. # 裁剪并保存
  38. cropped_clip = zoomed_clip.crop(x1=x1, y1=y1, x2=x2, y2=y2)
  39. cropped_clip.write_videofile(
  40. output_path,
  41. codec='libx264',
  42. audio_codec='aac',
  43. bitrate='8000k' # 提高码率以保持清晰度[[20]]
  44. )
  45. # 释放资源
  46. clip.close()
  47. zoomed_clip.close()
  48. cropped_clip.close()
  49. def find_vidoe2cut(json_path, video_list):
  50. videos = video_list["面料"] + video_list["版型"] + video_list["工艺"]
  51. print("find_vidoe2cut videos:", videos)
  52. with open(json_path, 'r', encoding='utf-8') as file:
  53. data = json.load(file)
  54. oral_list = data["oral_dict_list"]
  55. new_dict_list = []
  56. for oral in oral_list:
  57. if oral["clip_name"] in videos:
  58. new_dict_list.append(oral)
  59. data["oral_dict_list"] = new_dict_list
  60. new_json_path = f"output/for_cut/{os.path.splitext(os.path.basename(json_path))[0]}.json"
  61. with open(new_json_path, 'w', encoding='utf-8') as file:
  62. json.dump(data, file, ensure_ascii=False, indent=4)
  63. print("find_vidoe2cut data:", data)
  64. def read_video_list(file_path):
  65. with open(file_path, 'r', encoding='utf-8') as file:
  66. data = json.load(file)
  67. return data["脚本"][0]
  68. def count_none_in_string(input_string, text):
  69. """
  70. 计算字符串中 "none" 的出现次数。
  71. Args:
  72. input_string (str): 输入字符串
  73. Returns:
  74. int: "none" 的出现次数
  75. """
  76. return input_string.lower().count(text)
  77. def filter_oral_data(file_path):
  78. """
  79. 从 JSON 文件中读取数据,并过滤出 attribute 中 "none" 出现次数小于 4 的内容,同时保留其他内容。
  80. Args:
  81. file_path (str): JSON 文件的路径
  82. Returns:
  83. dict: 更新后的 JSON 数据
  84. """
  85. with open(file_path, 'r', encoding='utf-8') as file:
  86. data = json.load(file) # 读取 JSON 数据
  87. # 过滤 oral_dict_list
  88. data["oral_dict_list"] = [
  89. oral for oral in data["oral_dict_list"]
  90. if count_none_in_string(oral["attribute"], "none") < 4
  91. ]
  92. logger.info(f"筛选打标语句:{len(data['oral_dict_list'])} 条")
  93. with open(file_path, 'w', encoding='utf-8') as file:
  94. json.dump(data, file, ensure_ascii=False, indent=4)
  95. def label_data(file_path):
  96. logger.info(f"语句二次打标")
  97. with open(file_path, 'r', encoding='utf-8') as file:
  98. data = json.load(file) # 读取 JSON 数据
  99. sentences= data["oral_dict_list"]
  100. for sentence in tqdm(sentences):
  101. attribute = text_classifer(sentence["text"])
  102. sentence["attribute_add"] = attribute
  103. output_path = str(file_path).replace("filter_3", "filter_4")
  104. with open(output_path, 'w', encoding='utf-8') as file:
  105. json.dump(data, file, ensure_ascii=False, indent=4)
  106. def filter_label_data(file_path):
  107. """
  108. 从 JSON 文件中读取数据,并过滤出 attribute_add 中 "否" 出现次数为0的内容
  109. Args:
  110. file_path (str): JSON 文件的路径
  111. Returns:
  112. dict: 更新后的 JSON 数据
  113. """
  114. with open(file_path, 'r', encoding='utf-8') as file:
  115. data = json.load(file) # 读取 JSON 数据
  116. # 过滤 oral_dict_list
  117. data["oral_dict_list"] = [
  118. oral for oral in data["oral_dict_list"]
  119. if count_none_in_string(oral["attribute_add"], "否") == 0
  120. ]
  121. # 删除无用字段
  122. oral_dict_list = data["oral_dict_list"]
  123. for i, oral in enumerate(oral_dict_list, start=1):
  124. oral.pop("spk", None)
  125. oral.pop("attribute", None)
  126. oral.pop("attribute_add", None)
  127. oral["clip_name"] = f"{Path(file_path).stem}_{i:02d}.mp4"
  128. logger.info(f"二次筛选打标语句:{len(data['oral_dict_list'])} 条")
  129. with open(file_path, 'w', encoding='utf-8') as file:
  130. json.dump(data, file, ensure_ascii=False, indent=4)
  131. def del_key(json_file):
  132. with open(json_file, "r", encoding="utf-8") as f:
  133. data = json.load(f)
  134. data.pop("text", None)
  135. data.pop("timestamp", None)
  136. sentences = data["sentence_info"]
  137. for sentence in sentences:
  138. sentence.pop("timestamp", None)
  139. with open(json_file, "w", encoding="utf-8") as f:
  140. json.dump(data, f, ensure_ascii=False)
  141. def str2timestamp(time_text):
  142. mid = [int(timestamp.split('_')[-1]) for timestamp in time_text.split('-')]
  143. return int((mid[0] + mid[1]) / 2)
  144. def get_video_duration(video_path):
  145. with VideoFileClip(video_path) as video:
  146. duration = int(video.duration)
  147. return duration
  148. def read_json_file(file_path):
  149. try:
  150. with open(file_path, 'r', encoding='utf-8') as file:
  151. data = json.load(file)
  152. return data
  153. except FileNotFoundError:
  154. print(f"错误: 文件 '{file_path}' 不存在")
  155. raise
  156. except json.JSONDecodeError as e:
  157. print(f"错误: JSON格式不正确 - {e}")
  158. raise
  159. except Exception as e:
  160. print(f"读取JSON文件时发生错误: {e}")
  161. raise
  162. def get_frame_number(filename):
  163. """从文件名中提取帧号数字"""
  164. match = re.search(r'frame_(\d+)', filename)
  165. if match:
  166. return int(match.group(1))
  167. return 0
  168. def read_files_in_order(directory):
  169. """按照frame序号顺序读取目录下的文件"""
  170. # 获取目录下所有文件
  171. files = os.listdir(directory)
  172. # 按照frame号码排序
  173. sorted_files = sorted(files, key=get_frame_number)
  174. results = []
  175. for file in sorted_files:
  176. file_path = os.path.join(directory, file)
  177. if os.path.isfile(file_path):
  178. try:
  179. data = read_json_file(file_path)
  180. results.append((file, data))
  181. except Exception as e:
  182. print(f"读取文件 {file} 时出错: {e}")
  183. return results
  184. def filter_json_files(directory):
  185. filtered_results = []
  186. files = read_files_in_order(directory)
  187. if "for_show" in directory:
  188. sub_folder = "for_show/"
  189. else:
  190. sub_folder = ""
  191. # 遍历目录下的所有文件
  192. for filename, content in tqdm(files):
  193. if filename.endswith('.json'):
  194. file_path = os.path.join(directory, filename)
  195. try:
  196. with open(file_path, 'r', encoding='utf-8') as file:
  197. data = json.load(file)
  198. # 检查条件
  199. if (data.get("是否有人") == "是" and
  200. data.get("人物数量") == 1 and
  201. data.get("人物位置") == "中间"):
  202. image_path = f"/data/data/luosy/project/oral_local/data/key_frame/{sub_folder}" + filename.split('.')[0] + '.jpg'
  203. filtered_results.append(image_path)
  204. except (FileNotFoundError, json.JSONDecodeError) as e:
  205. print(f"读取文件 {filename} 时出错: {e}")
  206. return filtered_results
  207. def find_arithmetic_sequences(input_list):
  208. """
  209. 查找输入列表中的等差数列,返回至少包含4个元素的等差数列的索引。
  210. Args:
  211. input_list (list): 输入列表
  212. Returns:
  213. list: 包含等差数列的起始和结束索引的元组列表
  214. """
  215. sequences = []
  216. n = len(input_list)
  217. if n < 4:
  218. return sequences # 如果列表长度小于4,直接返回空列表
  219. for i in range(n - 3): # 遍历到倒数第4个元素
  220. for j in range(i + 1, n - 2): # 遍历后续元素
  221. diff = input_list[j] - input_list[i] # 计算公差
  222. count = 2 # 已经找到两个元素
  223. last_index = j # 记录最后一个符合条件的索引
  224. # 检查后续元素是否符合等差数列
  225. for k in range(j + 1, n):
  226. if input_list[k] - input_list[k - 1] == diff:
  227. count += 1
  228. last_index = k # 更新最后一个符合条件的索引
  229. else:
  230. break # 一旦不符合,退出内层循环
  231. if count >= 4: # 如果找到的元素数量达到4个,记录索引
  232. sequences.append((i, last_index))
  233. return sequences
  234. def find_longest_arithmetic_sequences(input_list):
  235. """
  236. 查找输入列表中的最长等差数列,返回至少包含3个元素的等差数列的索引。
  237. Args:
  238. input_list (list): 输入列表
  239. Returns:
  240. list: 包含最长等差数列的起始和结束索引的元组列表
  241. """
  242. n = len(input_list)
  243. if n < 3:
  244. return [] # 如果列表长度小于4,直接返回空列表
  245. longest_sequences = []
  246. max_length = 0
  247. for i in range(n - 3): # 遍历到倒数第4个元素
  248. for j in range(i + 1, n - 2): # 遍历后续元素
  249. diff = input_list[j] - input_list[i] # 计算公差
  250. count = 2 # 已经找到两个元素
  251. last_index = j # 记录最后一个符合条件的索引
  252. # 检查后续元素是否符合等差数列
  253. for k in range(j + 1, n):
  254. if input_list[k] - input_list[k - 1] == diff:
  255. count += 1
  256. last_index = k # 更新最后一个符合条件的索引
  257. else:
  258. break # 一旦不符合,退出内层循环
  259. # 只在找到的元素数量达到4个时记录索引
  260. if count >= 4:
  261. if count > max_length: # 找到更长的序列
  262. max_length = count
  263. longest_sequences = [(i, last_index)]
  264. elif count == max_length: # 如果长度相同,添加到结果中
  265. longest_sequences.append((i, last_index))
  266. return longest_sequences
  267. def extract_numbers(input_list):
  268. """
  269. 从输入列表中提取每个元素的数字部分,返回一个新列表。
  270. Args:
  271. input_list (list): 输入列表,包含各种类型的元素
  272. Returns:
  273. list: 提取出的数字列表
  274. """
  275. numbers = []
  276. for item in input_list:
  277. # 将元素转换为字符串并使用正则表达式提取数字
  278. found_numbers = re.findall(r'\d+', str(item))
  279. # 将找到的数字转换为整数并添加到结果列表中
  280. numbers.extend(int(num) for num in found_numbers)
  281. return numbers
  282. def filter_json_files_for_show(directory):
  283. filtered_results = []
  284. files = read_files_in_order(directory)
  285. if "for_show" in directory:
  286. sub_folder = "for_show/"
  287. else:
  288. sub_folder = ""
  289. # 遍历目录下的所有文件
  290. for filename, content in tqdm(files):
  291. if filename.endswith('.json'):
  292. file_path = os.path.join(directory, filename)
  293. try:
  294. with open(file_path, 'r', encoding='utf-8') as file:
  295. data = json.load(file)
  296. # 检查条件
  297. if (data.get("是否有人") == "是" and
  298. data.get("人物数量") == 1 and
  299. data.get("人物位置") == "中间" and
  300. data.get("手拿衣服") == "否" and
  301. data.get("手拿平板") == "否" and
  302. data.get("卡码拍大") == "否"):
  303. image_path = f"/data/data/luosy/project/oral_local/data/key_frame/{sub_folder}" + filename.split('.')[0] + '.jpg'
  304. filtered_results.append(image_path)
  305. except (FileNotFoundError, json.JSONDecodeError) as e:
  306. print(f"读取文件 {filename} 时出错: {e}")
  307. for_filter_list = extract_numbers(filtered_results)
  308. print(f"过滤列表:{for_filter_list}")
  309. filter_index = find_longest_arithmetic_sequences(for_filter_list)
  310. logger.info(f"筛选最长等差列表:{filter_index}")
  311. return filtered_results[filter_index[0][0]:filter_index[0][1]+1]
  312. def cut_timestamp(directory):
  313. results = []
  314. files = read_files_in_order(directory)
  315. # 遍历目录下的所有文件
  316. for filename, content in files:
  317. if filename.endswith('.json'):
  318. file_path = os.path.join(directory, filename)
  319. try:
  320. with open(file_path, 'r', encoding='utf-8') as file:
  321. data = json.load(file)
  322. # 检查条件
  323. if (data.get("同一个人") == "是" and data.get("穿同套衣服") == "否"):
  324. cut_timestamp = str2timestamp(data["对比图像"])
  325. results.append(cut_timestamp)
  326. except (FileNotFoundError, json.JSONDecodeError) as e:
  327. print(f"读取文件 {filename} 时出错: {e}")
  328. logger.info(f"裁切位置:{results}")
  329. return results
  330. def convert_timestamp(timestamps, total_duration):
  331. cut_ranges = []
  332. # 添加从 0 到第一个时间戳的范围
  333. if timestamps:
  334. cut_ranges.append((0, timestamps[0]))
  335. # 添加相邻时间戳之间的范围
  336. for i in range(len(timestamps) - 1):
  337. start_time = timestamps[i]
  338. end_time = timestamps[i + 1]
  339. cut_ranges.append((start_time, end_time))
  340. # 添加最后一个时间戳到视频总时长的范围
  341. if timestamps:
  342. cut_ranges.append((timestamps[-1], total_duration))
  343. return cut_ranges
  344. def calculate_depth_mean(depth_image_path):
  345. """
  346. 计算单通道深度图的深度均值
  347. Args:
  348. depth_image_path (str): 深度图像的路径
  349. Returns:
  350. float: 深度均值
  351. """
  352. # 读取深度图像
  353. depth_image = Image.open(depth_image_path)
  354. # 将图像转换为 NumPy 数组
  355. depth_array = np.array(depth_image)
  356. logger.info(f"深度图尺寸:{depth_image.shape}")
  357. # 计算深度均值,忽略无效值(如 0)
  358. depth_mean = np.mean(depth_array[depth_array > 0]) # 只计算大于0的值
  359. return depth_mean
  360. def calculate_depth_mean_pil(depth_image):
  361. """
  362. 计算单通道深度图的深度均值
  363. Args:
  364. depth_image_path (str): 深度图像的路径
  365. Returns:
  366. float: 深度均值
  367. """
  368. # 读取深度图像
  369. # depth_image = Image.open(depth_image_path)
  370. # 将图像转换为 NumPy 数组
  371. depth_array = np.array(depth_image)[:, 200:-200]
  372. logger.info(f"深度图均值计算范围:{depth_array.shape}")
  373. # 计算深度均值,忽略无效值(如 0)
  374. depth_mean = np.mean(depth_array[depth_array > 0]) # 只计算大于0的值
  375. return depth_mean
  376. # def find_show_cut(data):
  377. # """
  378. # 找出列表中最长的先递增后递减或先递减后递增的元素索引
  379. # Args:
  380. # data (list): 输入数据列表
  381. # Returns:
  382. # tuple: 包含最长序列的起始和结束索引
  383. # """
  384. # if not data:
  385. # return None
  386. # longest_start = longest_end = -1
  387. # longest_length = 0
  388. # n = len(data)
  389. # # 先递增后递减
  390. # for i in range(1, n):
  391. # if data[i] >= data[i - 1]: # 递增或相等
  392. # start = i - 1
  393. # while i < n and data[i] >= data[i - 1]: # 找到递增序列
  394. # i += 1
  395. # # 现在 i 是递增序列的结束位置
  396. # while i < n and data[i] <= data[i - 1]: # 找到递减序列
  397. # i += 1
  398. # end = i - 1 # 递减序列的结束位置
  399. # # 更新最长序列
  400. # if (end - start + 1) > longest_length:
  401. # longest_length = end - start + 1
  402. # longest_start = start
  403. # longest_end = end
  404. # # 先递减后递增
  405. # for i in range(1, n):
  406. # if data[i] <= data[i - 1]: # 递减或相等
  407. # start = i - 1
  408. # while i < n and data[i] <= data[i - 1]: # 找到递减序列
  409. # i += 1
  410. # # 现在 i 是递减序列的结束位置
  411. # while i < n and data[i] >= data[i - 1]: # 找到递增序列
  412. # i += 1
  413. # end = i - 1 # 递增序列的结束位置
  414. # # 更新最长序列
  415. # if (end - start + 1) > longest_length:
  416. # longest_length = end - start + 1
  417. # longest_start = start
  418. # longest_end = end
  419. # if longest_length > 0:
  420. # return (longest_start, longest_end)
  421. # else:
  422. # return None
  423. def find_show_cut(data, n=40):
  424. """
  425. 找出列表中最长的先递增后递减或先递减后递增的元素索引,并且要求找出的元素最大值与最小值的差值要大于n。
  426. Args:
  427. data (list): 输入数据列表
  428. n (int): 最大值与最小值的差值阈值
  429. Returns:
  430. tuple: 包含最长序列的起始和结束索引
  431. """
  432. if not data:
  433. return None
  434. longest_start = longest_end = -1
  435. longest_length = 0
  436. def check_difference(start, end):
  437. """检查最大值与最小值的差值是否大于n"""
  438. subarray = data[start:end + 1]
  439. return max(subarray) - min(subarray) > n
  440. # 先递增后递减
  441. for i in range(1, len(data)):
  442. if data[i] >= data[i - 1]: # 递增或相等
  443. start = i - 1
  444. while i < len(data) and data[i] >= data[i - 1]: # 找到递增序列
  445. i += 1
  446. # 现在 i 是递增序列的结束位置
  447. while i < len(data) and data[i] <= data[i - 1]: # 找到递减序列
  448. i += 1
  449. end = i - 1 # 递减序列的结束位置
  450. # 更新最长序列
  451. if (end - start + 1) > longest_length and check_difference(start, end):
  452. longest_length = end - start + 1
  453. longest_start = start
  454. longest_end = end
  455. # 先递减后递增
  456. for i in range(1, len(data)):
  457. if data[i] <= data[i - 1]: # 递减或相等
  458. start = i - 1
  459. while i < len(data) and data[i] <= data[i - 1]: # 找到递减序列
  460. i += 1
  461. # 现在 i 是递减序列的结束位置
  462. while i < len(data) and data[i] >= data[i - 1]: # 找到递增序列
  463. i += 1
  464. end = i - 1 # 递增序列的结束位置
  465. # 更新最长序列
  466. if (end - start + 1) > longest_length and check_difference(start, end):
  467. longest_length = end - start + 1
  468. longest_start = start
  469. longest_end = end
  470. if longest_length > 0:
  471. return (longest_start, longest_end)
  472. else:
  473. return None
  474. # 使用示例
  475. if __name__ == "__main__":
  476. json_directory = "/data/data/luosy/project/oral/data/img_caption" # 替换为你的 JSON 文件目录
  477. results = filter_json_files(json_directory)
  478. # 打印符合条件的结果
  479. for result in results:
  480. print(result)