test_api.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. # import requests
  2. # url = "http://localhost:8000/auto_post/xiaohongshu/post"
  3. # data = {
  4. # "image_paths": ["D:/桌面/研究生/组会/rednote/data/flower.jpg", "D:/桌面/研究生/组会/rednote/data/test_img.png"],
  5. # "title": "测试标题",
  6. # "description": "测试描述",
  7. # "topics": ["测试话题"],
  8. # "schedule_time": "2025-07-09 10:26"
  9. # }
  10. # response = requests.post(url, json=data)
  11. # print(response.json())
  12. # curl -X POST "http://localhost:5000/xiaohongshu/post" \
  13. # -H "Content-Type: application/json" \
  14. # -d '{
  15. # "image_paths": ["D:/桌面/研究生/组会/rednote/data/flower.jpg", "D:/桌面/研究生/组会/rednote/data/test_img.png"],
  16. # "title": "测试标题",
  17. # "description": "测试描述",
  18. # "topics": ["测试话题"],
  19. # "schedule_time": "2025-07-09 10:26"
  20. # }'
  21. # import requests
  22. # def download_video(url, filename):
  23. # try:
  24. # # 发起请求,设置stream=True以流式下载
  25. # response = requests.get(url, stream=True)
  26. # response.raise_for_status() # 检查请求是否成功
  27. # # 以二进制写入模式打开文件
  28. # with open(filename, 'wb') as f:
  29. # # 逐块写入数据,chunk_size可调整
  30. # for chunk in response.iter_content(chunk_size=8192):
  31. # if chunk:
  32. # f.write(chunk)
  33. # print(f"视频已成功下载到:{filename}")
  34. # except Exception as e:
  35. # print(f"下载过程中出现错误:{e}")
  36. # # 使用示例
  37. # video_url = "https://storage.googleapis.com/falserverless/example_outputs/wan-25-i2v-output.mp4" # 请替换为实际视频直链
  38. # download_video(video_url, "my_video.mp4")
  39. # import os
  40. # import fal_client
  41. # from pathlib import Path
  42. # from dotenv import load_dotenv
  43. # env_path = Path("./backend") / ".env"
  44. # load_dotenv(dotenv_path=env_path)
  45. # fal_client.fal_key = os.getenv('FAL_KEY2')
  46. # def on_queue_update(update):
  47. # if isinstance(update, fal_client.InProgress):
  48. # for log in update.logs:
  49. # print(log["message"])
  50. # url_one = fal_client.upload_file("003.jpg")
  51. # url_two = fal_client.upload_file("002.jpg")
  52. # get_cloth_prompt = "Remove the people in the image, but keep all the clothes worn by the people in the image, and display all the extracted clothes on a transparent human-shaped stand."
  53. # swap_cloth_prompt = "去除这个女人身上的所有衣服,然后把第二张图片中的衣服穿到这个女人身上,保持真实的光影、阴影和布料垂感。在无缝融合新衣物的同时,保持人物肤色、样貌的一致性;保持新衣物颜色、细节、设计和版型的一致性;最终效果应看起来像人物实际上正在穿这件新衣服。这个女人行走在森林中。"
  54. # for i in range(5):
  55. # result = fal_client.subscribe(
  56. # "fal-ai/nano-banana/edit",
  57. # arguments={
  58. # "prompt": get_cloth_prompt,
  59. # "image_urls": [url_one]
  60. # },
  61. # with_logs=True,
  62. # on_queue_update=on_queue_update,
  63. # )
  64. # print(f'{i} - ',result["images"][0]["url"])
  65. from openai import OpenAI
  66. import os
  67. import io
  68. import base64
  69. from PIL import Image
  70. from io import BytesIO
  71. def save_base64_to_image_pillow(base64_string, output_filename):
  72. """
  73. 使用Pillow库将Base64字符串保存为图片文件
  74. Args:
  75. base64_string (str): Base64编码的图片字符串,可包含或不包含Data URL前缀。
  76. output_filename (str): 要保存的图片文件名,如 'output.png'。
  77. """
  78. # 可选:如果Base64字符串包含Data URL前缀(如"data:image/png;base64,"),需要去除
  79. if ',' in base64_string:
  80. base64_string = base64_string.split(',')[1]
  81. # 1. 将Base64字符串解码为二进制数据
  82. image_data = base64.b64decode(base64_string)
  83. # 2. 将二进制数据转换为图像对象
  84. img = Image.open(BytesIO(image_data))
  85. # 3. 保存图像到文件
  86. img.save(output_filename)
  87. print(f"图片已保存为: {output_filename}")
  88. def encode_image(image_path: str) -> str:
  89. """
  90. 将图片文件转换为base64编码
  91. Args:
  92. image_path: 图片文件路径
  93. Returns:
  94. str: base64编码的图片数据
  95. Raises:
  96. FileNotFoundError: 图片文件不存在
  97. IOError: 读取或处理图片失败
  98. """
  99. if not os.path.exists(image_path):
  100. raise FileNotFoundError(f"Image file not found: {image_path}")
  101. with Image.open(image_path) as img:
  102. buffered = io.BytesIO()
  103. img.save(buffered, format="JPEG")
  104. return base64.b64encode(buffered.getvalue()).decode("utf-8")
  105. client = OpenAI(
  106. api_key="sk-qnsfJw0vsAitlnrXcOeBYrLbTv9LXfsN1m3jIUfMJagan5IR",
  107. base_url="https://api.openaius.com/v1"
  108. )
  109. user_prompt = "融合两张图片内容"
  110. image_url1 = encode_image("002.jpg")
  111. image_url2 = encode_image("003.jpg")
  112. print(len(image_url1))
  113. print(len(image_url2))
  114. response = client.chat.completions.create(
  115. model="gemini-2.5-flash-image-preview",
  116. messages=[
  117. {
  118. "role": "system",
  119. "content": "根据用户要求进行图片生成"
  120. },
  121. {
  122. "role": "user",
  123. "content": [
  124. {
  125. "type": "text",
  126. "text": user_prompt
  127. },
  128. {
  129. "type": "image_url",
  130. "image_url": {
  131. "url": image_url1
  132. }
  133. },
  134. {
  135. "type": "image_url",
  136. "image_url": {
  137. "url": image_url2
  138. }
  139. }
  140. ]
  141. }
  142. ]
  143. )
  144. # 写入单行字符串
  145. text = str(response)
  146. with open("output.txt", "w", encoding="utf-8") as file:
  147. file.write(text)
  148. result = response.choices[0].message.content
  149. save_base64_to_image_pillow(result, "output.png")
  150. print(result)