import time import json import urllib.request import urllib.parse from backend.utils.logger_config import setup_logger from backend.utils.system_config import Config logger = setup_logger(__name__) def queue_prompt(textPrompt: str, config: Config) -> dict: """ 向服务器队列发送提示词 Args: textPrompt: 提示词 config: 配置 Returns: dict: 提示词ID """ p = {"prompt": textPrompt, "client_id": config.client_id} data = json.dumps(p).encode('utf-8') req = urllib.request.Request(f"http://{config.server_address}/prompt", data=data) logger.info(f"Queue prompt: {textPrompt}") try: response = urllib.request.urlopen(req) return json.loads(response.read()) except Exception as e: logger.error(f"Failed to queue prompt: {e}") raise def get_image(fileName: str, subFolder: str, folder_type: str, config: Config) -> bytes: data = {"filename": fileName, "subfolder": subFolder, "type": folder_type} url_values = urllib.parse.urlencode(data) try: with urllib.request.urlopen(f"http://{config.server_address}/view?{url_values}") as response: return response.read() except Exception as e: logger.error(f"Failed to get image: {e}") raise def get_history(prompt_id: str, config: Config) -> dict: try: with urllib.request.urlopen(f"http://{config.server_address}/history/{prompt_id}") as response: return json.loads(response.read()) except Exception as e: logger.error(f"Failed to get history: {e}") raise def process_images(images: list, config) -> list: images_output = [] for image in images: image_data = get_image(image['filename'], image['subfolder'], image['type'], config) images_output.append(image_data) return images_output def process_videos(videos: list, config) -> list: videos_output = [] for video in videos: video_data = get_image(video['filename'], video['subfolder'], video['type'], config) videos_output.append(video_data) return videos_output def get_images(prompt, config: Config, timeout=160): prompt_id = queue_prompt(prompt, config)['prompt_id'] output_images = {} history_prompt = "" completed = False start_time = time.time() logger.info(f"Start time: {start_time}") while not completed: try: history = get_history(prompt_id, config)[prompt_id] if history is not None: for node_id, node_output in history['outputs'].items(): if 'images' in node_output: output_images[node_id] = process_images(node_output['images'], config) completed = True if 'gifs' in node_output: output_images[node_id] = process_videos(node_output['gifs'], config) completed = True if node_id == '246': history_prompt = node_output['string'][0] completed = True completed = True if time.time() - start_time > timeout: logger.warning(f'Fallback to history by timeout') completed = True logger.info(f"Time cost: {time.time() - start_time}") except Exception as e: pass logger.info('Image acquisition completed') return output_images, history_prompt