| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- import os
- import time
- import fal_client
- from dotenv import load_dotenv
- from typing import Dict, Any, Optional
- from backend.utils.logger_config import setup_logger
- from backend.utils.system_config import Config
- from pathlib import Path
- env_path = Path("./backend") / ".env"
- load_dotenv(dotenv_path=env_path)
- logger = setup_logger(__name__)
- class VideoGenerator:
- """视频生成服务封装类"""
- def __init__(self):
- """初始化视频生成服务"""
- self.api_key = os.getenv("FAL_KEY")
- if not self.api_key:
- logger.warning("未设置FAL_KEY环境变量,无法使用视频生成服务")
- if self.api_key:
- fal_client.fal_key = self.api_key
- def generate_video(
- self,
- prompt: str,
- image_url: str,
- webhook_url: Optional[str] = None
- ) -> Dict[str, Any]:
-
- """
- 生成视频
- Args:
- prompt (str): 视频生成提示
- image_url (str): 输入图片的URL
- webhook_url (Optional[str], optional): Webhook URL. Defaults to None.
- Returns:
- Dict[str, Any]: 视频生成结果
- """
- try:
- logger.info(f"开始提图生视频任务,prompt: {prompt}")
- # 验证API密钥
- if not self.api_key:
- raise RuntimeError("未设置FAL_API_KEY环境变量,无法使用视频生成服务")
- # 构建请求参数
- arguments = {
- "prompt": prompt,
- "image_url": image_url
- }
- # 添加webhook_url(如果提供)
- kwargs = {}
- if webhook_url:
- kwargs["webhook_url"] = webhook_url
- # 提交任务
- handler = fal_client.submit(
- "fal-ai/wan-25-preview/image-to-video",
- arguments,
- **kwargs
- )
- request_id = handler.request_id
- logger.info(f"图生视频任务提交成功,request_id: {request_id}")
- return {
- "success": True,
- "request_id": request_id,
- "status": "submitted",
- "arguments":arguments
- }
- except Exception as e:
- logger.error(f"图生视频任务提交失败:{str(e)}")
- return {
- "success": False,
- "error": str(e),
- "error_type": "submit_error"
- }
- def check_task_status(self, request_id: str) -> Dict[str, Any]:
- """检查任务状态
- Args:
- request_id (str): 任务ID
- Returns:
- Dict[str, Any]: 任务状态
- """
- try:
- logger.info(f"开始检查任务状态,request_id: {request_id}")
- # 获取任务状态
- status = fal_client.status("fal-ai/wan-25-preview/image-to-video", request_id, with_logs=True)
-
- logger.info(f"任务状态检查成功,request_id: {request_id}, status: {status}")
- return {
- "success": True,
- "request_id": request_id,
- "status": status
- }
-
- except Exception as e:
- logger.error(f"任务状态检查失败:{str(e)}")
- return {
- "success": False,
- "error": str(e),
- "error_type": "status_error"
- }
- def get_task_result(self, request_id: str) -> Dict[str, Any]:
- """获取任务结果
- Args:
- request_id (str): 任务ID
- Returns:
- Dict[str, Any]: 任务结果
- """
- try:
- logger.info(f"开始获取任务结果,request_id: {request_id}")
- # 获取任务结果
- result = fal_client.result("fal-ai/wan-25-preview/image-to-video", request_id)
- logger.info(f"任务结果获取成功,request_id: {request_id}, type_of_result: {type(result)}")
- return {
- "success": True,
- "request_id": request_id,
- "result": result
- }
-
- except Exception as e:
- logger.error(f"任务结果获取失败:{str(e)}")
- return {
- "success": False,
- "error": str(e),
- "error_type": "result_get_error"
- }
- def process_task_sync(
- self,
- prompt: str,
- image_url: str
- ) -> Dict[str, Any]:
- """
- 同步处理任务
- Args:
- prompt (str): 提示词
- image_url (str): 输入图片的URL
- Returns:
- Dict[str, Any]: 任务结果
- """
- try:
- # 1、提交任务
- submit_result = self.generate_video(prompt, image_url)
- if not submit_result["success"]:
- return submit_result
-
- request_id = submit_result["request_id"]
- # 2、等待结果
- result = self.get_task_result(request_id)
- return result["result"]["video"]["url"]
- except Exception as e:
- logger.error(f"任务处理失败:{str(e)}")
- return {
- "success": False,
- "error": str(e),
- "error_type": "process_error"
- }
- # 创建全局服务实例
- video_generator = VideoGenerator()
- if __name__ == "__main__":
- # 测试代码
- prompt = "zoom out with rotating camera"
- image_url = "https://storage.googleapis.com/falserverless/model_tests/wan/dragon-warrior.jpg"
- # 生成视频
- result = video_generator.process_task_sync(prompt, image_url)
- print(result)
- # 检查任务状态
- print(result)
|