gen_video.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. import os
  2. import time
  3. import fal_client
  4. from dotenv import load_dotenv
  5. from typing import Dict, Any, Optional
  6. from backend.utils.logger_config import setup_logger
  7. from backend.utils.system_config import Config
  8. from pathlib import Path
  9. env_path = Path("./backend") / ".env"
  10. load_dotenv(dotenv_path=env_path)
  11. logger = setup_logger(__name__)
  12. class VideoGenerator:
  13. """视频生成服务封装类"""
  14. def __init__(self):
  15. """初始化视频生成服务"""
  16. self.api_key = os.getenv("FAL_KEY")
  17. if not self.api_key:
  18. logger.warning("未设置FAL_KEY环境变量,无法使用视频生成服务")
  19. if self.api_key:
  20. fal_client.fal_key = self.api_key
  21. def generate_video(
  22. self,
  23. prompt: str,
  24. image_url: str,
  25. webhook_url: Optional[str] = None
  26. ) -> Dict[str, Any]:
  27. """
  28. 生成视频
  29. Args:
  30. prompt (str): 视频生成提示
  31. image_url (str): 输入图片的URL
  32. webhook_url (Optional[str], optional): Webhook URL. Defaults to None.
  33. Returns:
  34. Dict[str, Any]: 视频生成结果
  35. """
  36. try:
  37. logger.info(f"开始提图生视频任务,prompt: {prompt}")
  38. # 验证API密钥
  39. if not self.api_key:
  40. raise RuntimeError("未设置FAL_API_KEY环境变量,无法使用视频生成服务")
  41. # 构建请求参数
  42. arguments = {
  43. "prompt": prompt,
  44. "image_url": image_url
  45. }
  46. # 添加webhook_url(如果提供)
  47. kwargs = {}
  48. if webhook_url:
  49. kwargs["webhook_url"] = webhook_url
  50. # 提交任务
  51. handler = fal_client.submit(
  52. "fal-ai/wan-25-preview/image-to-video",
  53. arguments,
  54. **kwargs
  55. )
  56. request_id = handler.request_id
  57. logger.info(f"图生视频任务提交成功,request_id: {request_id}")
  58. return {
  59. "success": True,
  60. "request_id": request_id,
  61. "status": "submitted",
  62. "arguments":arguments
  63. }
  64. except Exception as e:
  65. logger.error(f"图生视频任务提交失败:{str(e)}")
  66. return {
  67. "success": False,
  68. "error": str(e),
  69. "error_type": "submit_error"
  70. }
  71. def check_task_status(self, request_id: str) -> Dict[str, Any]:
  72. """检查任务状态
  73. Args:
  74. request_id (str): 任务ID
  75. Returns:
  76. Dict[str, Any]: 任务状态
  77. """
  78. try:
  79. logger.info(f"开始检查任务状态,request_id: {request_id}")
  80. # 获取任务状态
  81. status = fal_client.status("fal-ai/wan-25-preview/image-to-video", request_id, with_logs=True)
  82. logger.info(f"任务状态检查成功,request_id: {request_id}, status: {status}")
  83. return {
  84. "success": True,
  85. "request_id": request_id,
  86. "status": status
  87. }
  88. except Exception as e:
  89. logger.error(f"任务状态检查失败:{str(e)}")
  90. return {
  91. "success": False,
  92. "error": str(e),
  93. "error_type": "status_error"
  94. }
  95. def get_task_result(self, request_id: str) -> Dict[str, Any]:
  96. """获取任务结果
  97. Args:
  98. request_id (str): 任务ID
  99. Returns:
  100. Dict[str, Any]: 任务结果
  101. """
  102. try:
  103. logger.info(f"开始获取任务结果,request_id: {request_id}")
  104. # 获取任务结果
  105. result = fal_client.result("fal-ai/wan-25-preview/image-to-video", request_id)
  106. logger.info(f"任务结果获取成功,request_id: {request_id}, type_of_result: {type(result)}")
  107. return {
  108. "success": True,
  109. "request_id": request_id,
  110. "result": result
  111. }
  112. except Exception as e:
  113. logger.error(f"任务结果获取失败:{str(e)}")
  114. return {
  115. "success": False,
  116. "error": str(e),
  117. "error_type": "result_get_error"
  118. }
  119. def process_task_sync(
  120. self,
  121. prompt: str,
  122. image_url: str
  123. ) -> Dict[str, Any]:
  124. """
  125. 同步处理任务
  126. Args:
  127. prompt (str): 提示词
  128. image_url (str): 输入图片的URL
  129. Returns:
  130. Dict[str, Any]: 任务结果
  131. """
  132. try:
  133. # 1、提交任务
  134. submit_result = self.generate_video(prompt, image_url)
  135. if not submit_result["success"]:
  136. return submit_result
  137. request_id = submit_result["request_id"]
  138. # 2、等待结果
  139. result = self.get_task_result(request_id)
  140. return result["result"]["video"]["url"]
  141. except Exception as e:
  142. logger.error(f"任务处理失败:{str(e)}")
  143. return {
  144. "success": False,
  145. "error": str(e),
  146. "error_type": "process_error"
  147. }
  148. # 创建全局服务实例
  149. video_generator = VideoGenerator()
  150. if __name__ == "__main__":
  151. # 测试代码
  152. prompt = "zoom out with rotating camera"
  153. image_url = "https://storage.googleapis.com/falserverless/model_tests/wan/dragon-warrior.jpg"
  154. # 生成视频
  155. result = video_generator.process_task_sync(prompt, image_url)
  156. print(result)
  157. # 检查任务状态
  158. print(result)