import json import re import io import os import base64 import requests import subprocess import shutil from PIL import Image from .logger_config import setup_logger logger = setup_logger(__name__) def string_to_json(markdown_string): try: json_content = re.sub(r'^```json|\n```$', '', markdown_string, flags=re.MULTILINE).strip() if not json_content: json_content = markdown_string # raise ValueError("字符串中未找到有效的JSON内容") # 解析JSON内容 json_data = json.loads(json_content) return json_data except Exception as e: logger.info(f"生成结果解析失败:\n{markdown_string}") def save_json_file(json_data, output_file): try: with open(output_file, mode='w', encoding='utf-8') as f: json.dump(json_data, f, ensure_ascii=False, indent=4) logger.info(f"JSON文件保存成功:{output_file}") return output_file except Exception as e: logger.info(f"处理过程中出错: {e}") return False def save_string_as_json(markdown_string, output_file): """ 从Markdown格式的字符串中提取JSON内容并保存为JSON文件 参数: markdown_string (str): 包含Markdown代码块的字符串 output_file (str): 要保存的JSON文件路径 返回: bool: 保存成功返回True,失败返回False """ json_data = string_to_json(markdown_string) result = save_json_file(json_data, output_file) return result def encode_image(image_path: str) -> str: """ 将图片文件转换为base64编码 Args: image_path: 图片文件路径 Returns: str: base64编码的图片数据 Raises: FileNotFoundError: 图片文件不存在 IOError: 读取或处理图片失败 """ if not os.path.exists(image_path): raise FileNotFoundError(f"Image file not found: {image_path}") with Image.open(image_path) as img: buffered = io.BytesIO() img.save(buffered, format="JPEG") return base64.b64encode(buffered.getvalue()).decode("utf-8") def encode_video(video_path: str) -> str: """ 将视频文件转换为base64编码 Args: video_path: 视频文件路径 Returns: str: base64编码的视频数据 Raises: FileNotFoundError: 视频文件不存在 IOError: 读取文件失败 """ if not os.path.exists(video_path): raise FileNotFoundError(f"Video file not found: {video_path}") with open(video_path, "rb") as f: return base64.b64encode(f.read()).decode("utf-8") def download_image(image_url, output_path): """ 根据图片URL下载图片到本地指定路径 参数: image_url (str): 图片的URL地址 output_path (str): 本地保存路径(包含文件名和扩展名) 返回: bool: 下载成功返回True,失败返回False """ try: # 创建目录(如果不存在) os.makedirs(os.path.dirname(output_path), exist_ok=True) # 发送HTTP GET请求 response = requests.get(image_url, stream=True) response.raise_for_status() # 检查请求是否成功 # 以二进制写入模式保存图片 with open(output_path, 'wb') as f: for chunk in response.iter_content(1024): f.write(chunk) logger.info(f"图片已成功保存到: {output_path}") return True except requests.exceptions.RequestException as e: logger.info(f"下载图片时出错: {e}") except IOError as e: logger.info(f"保存图片时出错: {e}") except Exception as e: logger.info(f"发生未知错误: {e}") return False def download_video(video_url, output_path): """ 根据视频URL下载视频到本地指定路径 参数: video_url (str): 视频的URL地址 output_path (str): 本地保存路径(包含文件名和扩展名) 返回: bool: 下载成功返回True,失败返回False """ try: # 创建目录(如果不存在) os.makedirs(os.path.dirname(output_path), exist_ok=True) # 发送HTTP GET请求 headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3' } response = requests.get(video_url, headers=headers, stream=True) response.raise_for_status() # 检查请求是否成功 # 获取文件总大小(用于进度显示) total_size = int(response.headers.get('content-length', 0)) # 以二进制写入模式保存视频 with open(output_path, 'wb') as f: downloaded_size = 0 for chunk in response.iter_content(chunk_size=8192): if chunk: # 过滤掉保持连接的新块 f.write(chunk) downloaded_size += len(chunk) logger.info(f"\n视频已成功保存到: {output_path}") return True except requests.exceptions.RequestException as e: logger.info(f"下载视频时出错: {e}") except IOError as e: logger.info(f"保存视频时出错: {e}") except Exception as e: logger.info(f"发生未知错误: {e}") return False # 读取JONS文件 def read_json_file(file_path): try: with open(file_path, 'r', encoding='utf-8') as json_file: data = json.load(json_file) return data except FileNotFoundError: logger.info(f"文件 {file_path} 未找到。") except json.JSONDecodeError: logger.info(f"文件 {file_path} 不是有效的JSON格式。") except Exception as e: logger.info(f"发生错误: {e}") def convert_webp_to_png(webp_path, png_path): """ Convert a WebP image to a PNG image. :param webp_path: Path to the input WebP file. :param png_path: Path where the output PNG file will be saved. """ # Open the WebP image file with Image.open(webp_path) as img: # Convert the image to RGB mode if necessary (some WebPs are in RGBA) if img.mode == 'RGBA': img = img.convert('RGB') # Save the image in PNG format img.save(png_path, 'PNG') def compress_video(input_path, output_path, crf=23, preset='medium'): """ 压缩视频文件 参数: input_path (str): 输入视频文件路径 output_path (str): 输出压缩后视频的保存路径 crf (int): 恒定速率因子,值越小质量越好(范围0-51,默认23) preset (str): 编码速度/压缩率的权衡(ultrafast, superfast, veryfast, fast, medium, slow, slower, veryslow) 越慢压缩率越高,但耗时更长(默认medium) 返回: bool: 压缩成功返回True,否则返回False """ # 检查ffmpeg是否安装 if not shutil.which('ffmpeg'): print("错误:未找到ffmpeg,请先安装ffmpeg并确保其在系统PATH中") return False # 检查输入文件是否存在 if not os.path.exists(input_path): print(f"错误:输入文件不存在 - {input_path}") return False # 创建输出目录(如果不存在) output_dir = os.path.dirname(output_path) if output_dir and not os.path.exists(output_dir): os.makedirs(output_dir, exist_ok=True) # ffmpeg命令:使用H.264编码压缩视频,保持原音频质量 cmd = [ 'ffmpeg', '-i', input_path, # 输入文件 '-vcodec', 'libx264', # 视频编码器 '-crf', str(crf), # 恒定速率因子 '-preset', preset, # 编码预设 '-acodec', 'copy', # 复制音频流(不重新编码) '-y', # 覆盖输出文件 output_path # 输出文件 ] try: # 执行命令 result = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) # 检查执行结果 if result.returncode != 0: print(f"压缩失败:{result.stderr}") return False print(f"视频压缩成功,已保存至:{output_path}") return True except Exception as e: print(f"压缩过程中发生错误:{str(e)}") return False def efficient_sort(a, b): """ 高效排序算法,通过贪心策略 """ n = len(a) selected = [False] * n result = [] available = set() # 统计每个元素的依赖 dependencies = [] for i in range(n): # 如果a[i]在b[i]中,它可以立即被处理 immediate = a[i] in b[i] dependencies.append((immediate, i)) # 先处理可以立即处理的元素 for immediate, i in dependencies: if immediate and not selected[i]: result.append(i) selected[i] = True available.update(b[i]) # 然后处理其他元素 while len(result) < n: progress = False for i in range(n): if not selected[i] and a[i] in available: result.append(i) selected[i] = True available.update(b[i]) progress = True # 如果没有进展,选择第一个未处理的元素 if not progress: for i in range(n): if not selected[i]: result.append(i) selected[i] = True available.update(b[i]) break return result # 使用示例 if __name__ == "__main__": # markdown_str = """ # { # "name": "李四", # "age": 25, # "address": { # "city": "上海", # "district": "浦东新区" # }, # "hobbies": ["阅读", "编程", "旅行"] # } # """ # save_markdown_json_as_file(markdown_str, 'output.json') # 示例调用 image_url = "https://ark-project.tos-cn-beijing.volces.com/doc_image/seedream4_imageToimage.png" output_path = "./output/my_image.jpg" # 可以是相对路径或绝对路径 download_image(image_url, output_path)