| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333 |
- import json
- import re
- import io
- import os
- import base64
- import requests
- import subprocess
- import shutil
- from PIL import Image
- from .logger_config import setup_logger
- logger = setup_logger(__name__)
- def string_to_json(markdown_string):
- try:
- json_content = re.sub(r'^```json|\n```$', '', markdown_string, flags=re.MULTILINE).strip()
- if not json_content:
- json_content = markdown_string
- # raise ValueError("字符串中未找到有效的JSON内容")
-
- # 解析JSON内容
- json_data = json.loads(json_content)
-
- return json_data
-
- except Exception as e:
- logger.info(f"生成结果解析失败:\n{markdown_string}")
- def save_json_file(json_data, output_file):
- try:
- with open(output_file, mode='w', encoding='utf-8') as f:
- json.dump(json_data, f, ensure_ascii=False, indent=4)
- logger.info(f"JSON文件保存成功:{output_file}")
- return output_file
- except Exception as e:
- logger.info(f"处理过程中出错: {e}")
- return False
- def save_string_as_json(markdown_string, output_file):
- """
- 从Markdown格式的字符串中提取JSON内容并保存为JSON文件
-
- 参数:
- markdown_string (str): 包含Markdown代码块的字符串
- output_file (str): 要保存的JSON文件路径
-
- 返回:
- bool: 保存成功返回True,失败返回False
- """
- json_data = string_to_json(markdown_string)
- result = save_json_file(json_data, output_file)
- return result
- def encode_image(image_path: str) -> str:
- """
- 将图片文件转换为base64编码
-
- Args:
- image_path: 图片文件路径
-
- Returns:
- str: base64编码的图片数据
-
- Raises:
- FileNotFoundError: 图片文件不存在
- IOError: 读取或处理图片失败
- """
- if not os.path.exists(image_path):
- raise FileNotFoundError(f"Image file not found: {image_path}")
-
- with Image.open(image_path) as img:
- buffered = io.BytesIO()
- img.save(buffered, format="JPEG")
- return base64.b64encode(buffered.getvalue()).decode("utf-8")
- def encode_video(video_path: str) -> str:
- """
- 将视频文件转换为base64编码
-
- Args:
- video_path: 视频文件路径
-
- Returns:
- str: base64编码的视频数据
-
- Raises:
- FileNotFoundError: 视频文件不存在
- IOError: 读取文件失败
- """
- if not os.path.exists(video_path):
- raise FileNotFoundError(f"Video file not found: {video_path}")
-
- with open(video_path, "rb") as f:
- return base64.b64encode(f.read()).decode("utf-8")
- def download_image(image_url, output_path):
- """
- 根据图片URL下载图片到本地指定路径
-
- 参数:
- image_url (str): 图片的URL地址
- output_path (str): 本地保存路径(包含文件名和扩展名)
-
- 返回:
- bool: 下载成功返回True,失败返回False
- """
- try:
- # 创建目录(如果不存在)
- os.makedirs(os.path.dirname(output_path), exist_ok=True)
-
- # 发送HTTP GET请求
- response = requests.get(image_url, stream=True)
- response.raise_for_status() # 检查请求是否成功
-
- # 以二进制写入模式保存图片
- with open(output_path, 'wb') as f:
- for chunk in response.iter_content(1024):
- f.write(chunk)
-
- logger.info(f"图片已成功保存到: {output_path}")
- return True
-
- except requests.exceptions.RequestException as e:
- logger.info(f"下载图片时出错: {e}")
- except IOError as e:
- logger.info(f"保存图片时出错: {e}")
- except Exception as e:
- logger.info(f"发生未知错误: {e}")
-
- return False
- def download_video(video_url, output_path):
- """
- 根据视频URL下载视频到本地指定路径
-
- 参数:
- video_url (str): 视频的URL地址
- output_path (str): 本地保存路径(包含文件名和扩展名)
-
- 返回:
- bool: 下载成功返回True,失败返回False
- """
- try:
- # 创建目录(如果不存在)
- os.makedirs(os.path.dirname(output_path), exist_ok=True)
-
- # 发送HTTP GET请求
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
- }
- response = requests.get(video_url, headers=headers, stream=True)
- response.raise_for_status() # 检查请求是否成功
-
- # 获取文件总大小(用于进度显示)
- total_size = int(response.headers.get('content-length', 0))
-
- # 以二进制写入模式保存视频
- with open(output_path, 'wb') as f:
- downloaded_size = 0
- for chunk in response.iter_content(chunk_size=8192):
- if chunk: # 过滤掉保持连接的新块
- f.write(chunk)
- downloaded_size += len(chunk)
-
- logger.info(f"\n视频已成功保存到: {output_path}")
- return True
-
- except requests.exceptions.RequestException as e:
- logger.info(f"下载视频时出错: {e}")
- except IOError as e:
- logger.info(f"保存视频时出错: {e}")
- except Exception as e:
- logger.info(f"发生未知错误: {e}")
-
- return False
- # 读取JONS文件
- def read_json_file(file_path):
- try:
- with open(file_path, 'r', encoding='utf-8') as json_file:
- data = json.load(json_file)
- return data
- except FileNotFoundError:
- logger.info(f"文件 {file_path} 未找到。")
- except json.JSONDecodeError:
- logger.info(f"文件 {file_path} 不是有效的JSON格式。")
- except Exception as e:
- logger.info(f"发生错误: {e}")
- def convert_webp_to_png(webp_path, png_path):
- """
- Convert a WebP image to a PNG image.
- :param webp_path: Path to the input WebP file.
- :param png_path: Path where the output PNG file will be saved.
- """
- # Open the WebP image file
- with Image.open(webp_path) as img:
- # Convert the image to RGB mode if necessary (some WebPs are in RGBA)
- if img.mode == 'RGBA':
- img = img.convert('RGB')
-
- # Save the image in PNG format
- img.save(png_path, 'PNG')
- def compress_video(input_path, output_path, crf=23, preset='medium'):
- """
- 压缩视频文件
-
- 参数:
- input_path (str): 输入视频文件路径
- output_path (str): 输出压缩后视频的保存路径
- crf (int): 恒定速率因子,值越小质量越好(范围0-51,默认23)
- preset (str): 编码速度/压缩率的权衡(ultrafast, superfast, veryfast, fast, medium, slow, slower, veryslow)
- 越慢压缩率越高,但耗时更长(默认medium)
- 返回:
- bool: 压缩成功返回True,否则返回False
- """
- # 检查ffmpeg是否安装
- if not shutil.which('ffmpeg'):
- print("错误:未找到ffmpeg,请先安装ffmpeg并确保其在系统PATH中")
- return False
-
- # 检查输入文件是否存在
- if not os.path.exists(input_path):
- print(f"错误:输入文件不存在 - {input_path}")
- return False
-
- # 创建输出目录(如果不存在)
- output_dir = os.path.dirname(output_path)
- if output_dir and not os.path.exists(output_dir):
- os.makedirs(output_dir, exist_ok=True)
-
- # ffmpeg命令:使用H.264编码压缩视频,保持原音频质量
- cmd = [
- 'ffmpeg',
- '-i', input_path, # 输入文件
- '-vcodec', 'libx264', # 视频编码器
- '-crf', str(crf), # 恒定速率因子
- '-preset', preset, # 编码预设
- '-acodec', 'copy', # 复制音频流(不重新编码)
- '-y', # 覆盖输出文件
- output_path # 输出文件
- ]
-
- try:
- # 执行命令
- result = subprocess.run(
- cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- text=True
- )
-
- # 检查执行结果
- if result.returncode != 0:
- print(f"压缩失败:{result.stderr}")
- return False
-
- print(f"视频压缩成功,已保存至:{output_path}")
- return True
-
- except Exception as e:
- print(f"压缩过程中发生错误:{str(e)}")
- return False
- def efficient_sort(a, b):
- """
- 高效排序算法,通过贪心策略
- """
- n = len(a)
- selected = [False] * n
- result = []
- available = set()
-
- # 统计每个元素的依赖
- dependencies = []
- for i in range(n):
- # 如果a[i]在b[i]中,它可以立即被处理
- immediate = a[i] in b[i]
- dependencies.append((immediate, i))
-
- # 先处理可以立即处理的元素
- for immediate, i in dependencies:
- if immediate and not selected[i]:
- result.append(i)
- selected[i] = True
- available.update(b[i])
-
- # 然后处理其他元素
- while len(result) < n:
- progress = False
-
- for i in range(n):
- if not selected[i] and a[i] in available:
- result.append(i)
- selected[i] = True
- available.update(b[i])
- progress = True
-
- # 如果没有进展,选择第一个未处理的元素
- if not progress:
- for i in range(n):
- if not selected[i]:
- result.append(i)
- selected[i] = True
- available.update(b[i])
- break
-
- return result
- # 使用示例
- if __name__ == "__main__":
- # markdown_str = """
- # {
- # "name": "李四",
- # "age": 25,
- # "address": {
- # "city": "上海",
- # "district": "浦东新区"
- # },
- # "hobbies": ["阅读", "编程", "旅行"]
- # }
- # """
- # save_markdown_json_as_file(markdown_str, 'output.json')
- # 示例调用
- image_url = "https://ark-project.tos-cn-beijing.volces.com/doc_image/seedream4_imageToimage.png"
- output_path = "./output/my_image.jpg" # 可以是相对路径或绝对路径
- download_image(image_url, output_path)
|