tools.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. import json
  2. import re
  3. import io
  4. import os
  5. import base64
  6. import requests
  7. import subprocess
  8. import shutil
  9. from PIL import Image
  10. from .logger_config import setup_logger
  11. logger = setup_logger(__name__)
  12. def string_to_json(markdown_string):
  13. try:
  14. json_content = re.sub(r'^```json|\n```$', '', markdown_string, flags=re.MULTILINE).strip()
  15. if not json_content:
  16. json_content = markdown_string
  17. # raise ValueError("字符串中未找到有效的JSON内容")
  18. # 解析JSON内容
  19. json_data = json.loads(json_content)
  20. return json_data
  21. except Exception as e:
  22. logger.info(f"生成结果解析失败:\n{markdown_string}")
  23. def save_json_file(json_data, output_file):
  24. try:
  25. with open(output_file, mode='w', encoding='utf-8') as f:
  26. json.dump(json_data, f, ensure_ascii=False, indent=4)
  27. logger.info(f"JSON文件保存成功:{output_file}")
  28. return output_file
  29. except Exception as e:
  30. logger.info(f"处理过程中出错: {e}")
  31. return False
  32. def save_string_as_json(markdown_string, output_file):
  33. """
  34. 从Markdown格式的字符串中提取JSON内容并保存为JSON文件
  35. 参数:
  36. markdown_string (str): 包含Markdown代码块的字符串
  37. output_file (str): 要保存的JSON文件路径
  38. 返回:
  39. bool: 保存成功返回True,失败返回False
  40. """
  41. json_data = string_to_json(markdown_string)
  42. result = save_json_file(json_data, output_file)
  43. return result
  44. def encode_image(image_path: str) -> str:
  45. """
  46. 将图片文件转换为base64编码
  47. Args:
  48. image_path: 图片文件路径
  49. Returns:
  50. str: base64编码的图片数据
  51. Raises:
  52. FileNotFoundError: 图片文件不存在
  53. IOError: 读取或处理图片失败
  54. """
  55. if not os.path.exists(image_path):
  56. raise FileNotFoundError(f"Image file not found: {image_path}")
  57. with Image.open(image_path) as img:
  58. buffered = io.BytesIO()
  59. img.save(buffered, format="JPEG")
  60. return base64.b64encode(buffered.getvalue()).decode("utf-8")
  61. def encode_video(video_path: str) -> str:
  62. """
  63. 将视频文件转换为base64编码
  64. Args:
  65. video_path: 视频文件路径
  66. Returns:
  67. str: base64编码的视频数据
  68. Raises:
  69. FileNotFoundError: 视频文件不存在
  70. IOError: 读取文件失败
  71. """
  72. if not os.path.exists(video_path):
  73. raise FileNotFoundError(f"Video file not found: {video_path}")
  74. with open(video_path, "rb") as f:
  75. return base64.b64encode(f.read()).decode("utf-8")
  76. def download_image(image_url, output_path):
  77. """
  78. 根据图片URL下载图片到本地指定路径
  79. 参数:
  80. image_url (str): 图片的URL地址
  81. output_path (str): 本地保存路径(包含文件名和扩展名)
  82. 返回:
  83. bool: 下载成功返回True,失败返回False
  84. """
  85. try:
  86. # 创建目录(如果不存在)
  87. os.makedirs(os.path.dirname(output_path), exist_ok=True)
  88. # 发送HTTP GET请求
  89. response = requests.get(image_url, stream=True)
  90. response.raise_for_status() # 检查请求是否成功
  91. # 以二进制写入模式保存图片
  92. with open(output_path, 'wb') as f:
  93. for chunk in response.iter_content(1024):
  94. f.write(chunk)
  95. logger.info(f"图片已成功保存到: {output_path}")
  96. return True
  97. except requests.exceptions.RequestException as e:
  98. logger.info(f"下载图片时出错: {e}")
  99. except IOError as e:
  100. logger.info(f"保存图片时出错: {e}")
  101. except Exception as e:
  102. logger.info(f"发生未知错误: {e}")
  103. return False
  104. def download_video(video_url, output_path):
  105. """
  106. 根据视频URL下载视频到本地指定路径
  107. 参数:
  108. video_url (str): 视频的URL地址
  109. output_path (str): 本地保存路径(包含文件名和扩展名)
  110. 返回:
  111. bool: 下载成功返回True,失败返回False
  112. """
  113. try:
  114. # 创建目录(如果不存在)
  115. os.makedirs(os.path.dirname(output_path), exist_ok=True)
  116. # 发送HTTP GET请求
  117. headers = {
  118. 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
  119. }
  120. response = requests.get(video_url, headers=headers, stream=True)
  121. response.raise_for_status() # 检查请求是否成功
  122. # 获取文件总大小(用于进度显示)
  123. total_size = int(response.headers.get('content-length', 0))
  124. # 以二进制写入模式保存视频
  125. with open(output_path, 'wb') as f:
  126. downloaded_size = 0
  127. for chunk in response.iter_content(chunk_size=8192):
  128. if chunk: # 过滤掉保持连接的新块
  129. f.write(chunk)
  130. downloaded_size += len(chunk)
  131. logger.info(f"\n视频已成功保存到: {output_path}")
  132. return True
  133. except requests.exceptions.RequestException as e:
  134. logger.info(f"下载视频时出错: {e}")
  135. except IOError as e:
  136. logger.info(f"保存视频时出错: {e}")
  137. except Exception as e:
  138. logger.info(f"发生未知错误: {e}")
  139. return False
  140. # 读取JONS文件
  141. def read_json_file(file_path):
  142. try:
  143. with open(file_path, 'r', encoding='utf-8') as json_file:
  144. data = json.load(json_file)
  145. return data
  146. except FileNotFoundError:
  147. logger.info(f"文件 {file_path} 未找到。")
  148. except json.JSONDecodeError:
  149. logger.info(f"文件 {file_path} 不是有效的JSON格式。")
  150. except Exception as e:
  151. logger.info(f"发生错误: {e}")
  152. def convert_webp_to_png(webp_path, png_path):
  153. """
  154. Convert a WebP image to a PNG image.
  155. :param webp_path: Path to the input WebP file.
  156. :param png_path: Path where the output PNG file will be saved.
  157. """
  158. # Open the WebP image file
  159. with Image.open(webp_path) as img:
  160. # Convert the image to RGB mode if necessary (some WebPs are in RGBA)
  161. if img.mode == 'RGBA':
  162. img = img.convert('RGB')
  163. # Save the image in PNG format
  164. img.save(png_path, 'PNG')
  165. def compress_video(input_path, output_path, crf=23, preset='medium'):
  166. """
  167. 压缩视频文件
  168. 参数:
  169. input_path (str): 输入视频文件路径
  170. output_path (str): 输出压缩后视频的保存路径
  171. crf (int): 恒定速率因子,值越小质量越好(范围0-51,默认23)
  172. preset (str): 编码速度/压缩率的权衡(ultrafast, superfast, veryfast, fast, medium, slow, slower, veryslow)
  173. 越慢压缩率越高,但耗时更长(默认medium)
  174. 返回:
  175. bool: 压缩成功返回True,否则返回False
  176. """
  177. # 检查ffmpeg是否安装
  178. if not shutil.which('ffmpeg'):
  179. print("错误:未找到ffmpeg,请先安装ffmpeg并确保其在系统PATH中")
  180. return False
  181. # 检查输入文件是否存在
  182. if not os.path.exists(input_path):
  183. print(f"错误:输入文件不存在 - {input_path}")
  184. return False
  185. # 创建输出目录(如果不存在)
  186. output_dir = os.path.dirname(output_path)
  187. if output_dir and not os.path.exists(output_dir):
  188. os.makedirs(output_dir, exist_ok=True)
  189. # ffmpeg命令:使用H.264编码压缩视频,保持原音频质量
  190. cmd = [
  191. 'ffmpeg',
  192. '-i', input_path, # 输入文件
  193. '-vcodec', 'libx264', # 视频编码器
  194. '-crf', str(crf), # 恒定速率因子
  195. '-preset', preset, # 编码预设
  196. '-acodec', 'copy', # 复制音频流(不重新编码)
  197. '-y', # 覆盖输出文件
  198. output_path # 输出文件
  199. ]
  200. try:
  201. # 执行命令
  202. result = subprocess.run(
  203. cmd,
  204. stdout=subprocess.PIPE,
  205. stderr=subprocess.PIPE,
  206. text=True
  207. )
  208. # 检查执行结果
  209. if result.returncode != 0:
  210. print(f"压缩失败:{result.stderr}")
  211. return False
  212. print(f"视频压缩成功,已保存至:{output_path}")
  213. return True
  214. except Exception as e:
  215. print(f"压缩过程中发生错误:{str(e)}")
  216. return False
  217. def efficient_sort(a, b):
  218. """
  219. 高效排序算法,通过贪心策略
  220. """
  221. n = len(a)
  222. selected = [False] * n
  223. result = []
  224. available = set()
  225. # 统计每个元素的依赖
  226. dependencies = []
  227. for i in range(n):
  228. # 如果a[i]在b[i]中,它可以立即被处理
  229. immediate = a[i] in b[i]
  230. dependencies.append((immediate, i))
  231. # 先处理可以立即处理的元素
  232. for immediate, i in dependencies:
  233. if immediate and not selected[i]:
  234. result.append(i)
  235. selected[i] = True
  236. available.update(b[i])
  237. # 然后处理其他元素
  238. while len(result) < n:
  239. progress = False
  240. for i in range(n):
  241. if not selected[i] and a[i] in available:
  242. result.append(i)
  243. selected[i] = True
  244. available.update(b[i])
  245. progress = True
  246. # 如果没有进展,选择第一个未处理的元素
  247. if not progress:
  248. for i in range(n):
  249. if not selected[i]:
  250. result.append(i)
  251. selected[i] = True
  252. available.update(b[i])
  253. break
  254. return result
  255. # 使用示例
  256. if __name__ == "__main__":
  257. # markdown_str = """
  258. # {
  259. # "name": "李四",
  260. # "age": 25,
  261. # "address": {
  262. # "city": "上海",
  263. # "district": "浦东新区"
  264. # },
  265. # "hobbies": ["阅读", "编程", "旅行"]
  266. # }
  267. # """
  268. # save_markdown_json_as_file(markdown_str, 'output.json')
  269. # 示例调用
  270. image_url = "https://ark-project.tos-cn-beijing.volces.com/doc_image/seedream4_imageToimage.png"
  271. output_path = "./output/my_image.jpg" # 可以是相对路径或绝对路径
  272. download_image(image_url, output_path)