| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 |
- import time
- import json
- import urllib.request
- import urllib.parse
- from backend.utils.logger_config import setup_logger
- from backend.utils.system_config import Config
- logger = setup_logger(__name__)
- def queue_prompt(textPrompt: str, config: Config) -> dict:
- """
- 向服务器队列发送提示词
- Args:
- textPrompt: 提示词
- config: 配置
- Returns:
- dict: 提示词ID
- """
- p = {"prompt": textPrompt, "client_id": config.client_id}
- data = json.dumps(p).encode('utf-8')
- req = urllib.request.Request(f"http://{config.server_address}/prompt", data=data)
- logger.info(f"Queue prompt: {textPrompt}")
- try:
- response = urllib.request.urlopen(req)
- return json.loads(response.read())
- except Exception as e:
- logger.error(f"Failed to queue prompt: {e}")
- raise
- def get_image(fileName: str, subFolder: str, folder_type: str, config: Config) -> bytes:
- data = {"filename": fileName, "subfolder": subFolder, "type": folder_type}
- url_values = urllib.parse.urlencode(data)
-
- try:
- with urllib.request.urlopen(f"http://{config.server_address}/view?{url_values}") as response:
- return response.read()
- except Exception as e:
- logger.error(f"Failed to get image: {e}")
- raise
- def get_history(prompt_id: str, config: Config) -> dict:
- try:
- with urllib.request.urlopen(f"http://{config.server_address}/history/{prompt_id}") as response:
- return json.loads(response.read())
- except Exception as e:
- logger.error(f"Failed to get history: {e}")
- raise
- def process_images(images: list, config) -> list:
- images_output = []
- for image in images:
- image_data = get_image(image['filename'], image['subfolder'], image['type'], config)
- images_output.append(image_data)
- return images_output
- def process_videos(videos: list, config) -> list:
- videos_output = []
- for video in videos:
- video_data = get_image(video['filename'], video['subfolder'], video['type'], config)
- videos_output.append(video_data)
- return videos_output
- def get_images(prompt, config: Config, timeout=160):
- prompt_id = queue_prompt(prompt, config)['prompt_id']
- output_images = {}
- history_prompt = ""
- completed = False
- start_time = time.time()
- logger.info(f"Start time: {start_time}")
- while not completed:
- try:
- history = get_history(prompt_id, config)[prompt_id]
- if history is not None:
- for node_id, node_output in history['outputs'].items():
- if 'images' in node_output:
- output_images[node_id] = process_images(node_output['images'], config)
- completed = True
-
- if 'gifs' in node_output:
- output_images[node_id] = process_videos(node_output['gifs'], config)
- completed = True
- if node_id == '246':
- history_prompt = node_output['string'][0]
- completed = True
- completed = True
- if time.time() - start_time > timeout:
- logger.warning(f'Fallback to history by timeout')
- completed = True
- logger.info(f"Time cost: {time.time() - start_time}")
- except Exception as e:
- pass
- logger.info('Image acquisition completed')
- return output_images, history_prompt
|