| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- import json
- import random
- import websocket
- from PIL import Image
- from datetime import datetime
- from backend.modules.comfyui.image_tools import upload_image, show_gif
- from backend.modules.comfyui.network_tools import get_images
- from backend.utils.logger_config import setup_logger
- from backend.utils.system_config import Config
- from backend.config.prompt_config import FACE_PROMPT, CLOTH_PROMPT, PROMPT_PROMPT
- system_config = Config('./backend/config/ai_swap_face_config.json')
- logger = setup_logger(__name__)
- def parse_workflow(raw_img, face_img, config) -> dict:
- try:
- # 上传原始图片
- logger.info('开始上传原始图片...')
- input_raw_img = upload_image(raw_img, config)
- logger.info('原始图片上传成功')
- # 上传人脸图片
- logger.info('开始上传人脸图片...')
- input_face_img = upload_image(face_img, config)
- logger.info('人脸图片上传成功')
- logger.info(f'Work flow args config: workflow-{config.workflowfile}')
- # 加载工作流配置
- try:
- with open(config.workflowfile, 'r', encoding='utf-8') as workflow:
- workflow_config = json.load(workflow)
- except FileNotFoundError:
- logger.error(f'工作流配置文件不存在: {config.workflowfile}')
- raise RuntimeError(f'工作流配置文件不存在: {config.workflowfile}')
- except json.JSONDecodeError as e:
- logger.error(f'工作流配置文件格式错误: {str(e)}')
- raise RuntimeError(f'工作流配置文件格式错误: {str(e)}')
- # 更新工作流参数
- workflow_config["247"]["inputs"]["image"] = input_raw_img
- workflow_config["245"]["inputs"]["image"] = input_face_img
- logger.info("开始执行工作流...")
- return get_images(workflow_config, config)
-
- except Exception as e:
- logger.error(f"工作流解析失败: {str(e)}")
- raise RuntimeError(f"工作流解析失败: {str(e)}")
-
- def ai_swap_face_process(raw_img, face_img) -> list:
- ws = websocket.WebSocket()
- host_url = f"ws://{system_config.server_address}/ws?clientID={system_config.client_id}"
- try:
- ws.connect(host_url)
- logger.info(f'Request to: {host_url}')
- except Exception as e:
- logger.error(f"无法连接到ComfyUI服务器:{host_url}, 错误:{str(e)}")
- raise RuntimeError(f"ComfyUI服务器连接失败: {str(e)}")
- try:
- images, history_prompt = parse_workflow(raw_img, face_img, system_config)
- logger.info(f"工作流解析完成,获取到{len(images)}个节点输出")
- except Exception as e:
- logger.error(f"工作流解析失败: {str(e)}")
- raise RuntimeError(f"工作流解析失败: {str(e)}")
-
- images_cc = []
- if not images:
- logger.error("没有获取到任何图片输出,请检查ComfyUI服务器状态和工作流配置")
- raise RuntimeError("AI处理未返回任何结果,请检查服务器状态")
- for node_id in images:
- logger.info(f"处理节点 {node_id},包含 {len(images[node_id])} 张图片")
- for image_data in images[node_id]:
- timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
- GIF_LOCATION = f"{system_config.output_dir}/{system_config.idx}_{timestamp}.png"
- try:
- with open(GIF_LOCATION, "wb") as binary_file:
- binary_file.write(image_data)
- show_gif(GIF_LOCATION)
- system_config.idx += 1
- images_cc.append(Image.open(GIF_LOCATION))
- logger.info(f"图片保存成功: {GIF_LOCATION}")
- except Exception as e:
- logger.error(f"图片保存失败: {str(e)}")
- raise RuntimeError(f"图片保存失败: {str(e)}")
-
- logger.info(f'Prompt queue finished! The return result: {len(images_cc)} images')
-
- # 检查是否有处理结果
- if not images_cc:
- logger.error("没有生成任何图片结果")
- raise RuntimeError("AI处理未生成任何图片结果,请检查输入参数和服务器状态")
-
- return images_cc[0], history_prompt
- if __name__ == "__main__":
- import numpy as np
- raw_img = np.array(Image.open('backend/data/02.jpg'))
- face_img = np.array(Image.open('backend/data/face.jpg'))
- images_cc, history_prompt = ai_swap_face_process(raw_img, face_img)
- print(f"历史提示词: {history_prompt}")
|