import os import time import fal_client from dotenv import load_dotenv from typing import Dict, Any, Optional from backend.utils.logger_config import setup_logger from backend.utils.system_config import Config from pathlib import Path env_path = Path("./backend") / ".env" load_dotenv(dotenv_path=env_path) logger = setup_logger(__name__) class VideoGenerator: """视频生成服务封装类""" def __init__(self): """初始化视频生成服务""" self.api_key = os.getenv("FAL_KEY") if not self.api_key: logger.warning("未设置FAL_KEY环境变量,无法使用视频生成服务") if self.api_key: fal_client.fal_key = self.api_key def generate_video( self, prompt: str, image_url: str, webhook_url: Optional[str] = None ) -> Dict[str, Any]: """ 生成视频 Args: prompt (str): 视频生成提示 image_url (str): 输入图片的URL webhook_url (Optional[str], optional): Webhook URL. Defaults to None. Returns: Dict[str, Any]: 视频生成结果 """ try: logger.info(f"开始提图生视频任务,prompt: {prompt}") # 验证API密钥 if not self.api_key: raise RuntimeError("未设置FAL_API_KEY环境变量,无法使用视频生成服务") # 构建请求参数 arguments = { "prompt": prompt, "image_url": image_url } # 添加webhook_url(如果提供) kwargs = {} if webhook_url: kwargs["webhook_url"] = webhook_url # 提交任务 handler = fal_client.submit( "fal-ai/wan-25-preview/image-to-video", arguments, **kwargs ) request_id = handler.request_id logger.info(f"图生视频任务提交成功,request_id: {request_id}") return { "success": True, "request_id": request_id, "status": "submitted", "arguments":arguments } except Exception as e: logger.error(f"图生视频任务提交失败:{str(e)}") return { "success": False, "error": str(e), "error_type": "submit_error" } def check_task_status(self, request_id: str) -> Dict[str, Any]: """检查任务状态 Args: request_id (str): 任务ID Returns: Dict[str, Any]: 任务状态 """ try: logger.info(f"开始检查任务状态,request_id: {request_id}") # 获取任务状态 status = fal_client.status("fal-ai/wan-25-preview/image-to-video", request_id, with_logs=True) logger.info(f"任务状态检查成功,request_id: {request_id}, status: {status}") return { "success": True, "request_id": request_id, "status": status } except Exception as e: logger.error(f"任务状态检查失败:{str(e)}") return { "success": False, "error": str(e), "error_type": "status_error" } def get_task_result(self, request_id: str) -> Dict[str, Any]: """获取任务结果 Args: request_id (str): 任务ID Returns: Dict[str, Any]: 任务结果 """ try: logger.info(f"开始获取任务结果,request_id: {request_id}") # 获取任务结果 result = fal_client.result("fal-ai/wan-25-preview/image-to-video", request_id) logger.info(f"任务结果获取成功,request_id: {request_id}, type_of_result: {type(result)}") return { "success": True, "request_id": request_id, "result": result } except Exception as e: logger.error(f"任务结果获取失败:{str(e)}") return { "success": False, "error": str(e), "error_type": "result_get_error" } def process_task_sync( self, prompt: str, image_url: str ) -> Dict[str, Any]: """ 同步处理任务 Args: prompt (str): 提示词 image_url (str): 输入图片的URL Returns: Dict[str, Any]: 任务结果 """ try: # 1、提交任务 submit_result = self.generate_video(prompt, image_url) if not submit_result["success"]: return submit_result request_id = submit_result["request_id"] # 2、等待结果 result = self.get_task_result(request_id) return result["result"]["video"]["url"] except Exception as e: logger.error(f"任务处理失败:{str(e)}") return { "success": False, "error": str(e), "error_type": "process_error" } # 创建全局服务实例 video_generator = VideoGenerator() if __name__ == "__main__": # 测试代码 prompt = "zoom out with rotating camera" image_url = "https://storage.googleapis.com/falserverless/model_tests/wan/dragon-warrior.jpg" # 生成视频 result = video_generator.process_task_sync(prompt, image_url) print(result) # 检查任务状态 print(result)