""" AI换衣服API接口 提供RESTful API接口,支持AI换衣服、历史记录查询等功能 """ import os import uvicorn from pydantic import BaseModel, Field from sqlalchemy import func, and_ from datetime import datetime, timedelta from typing import Optional, List, Dict, Any, Union from starlette.responses import FileResponse from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from fastapi.staticfiles import StaticFiles from fastapi import FastAPI, HTTPException, Depends, UploadFile, File, Form, Body, APIRouter from backend.services.ai_swap_cloth_service import AISwapClothService, process_swap_cloth_with_record from backend.modules.database.operations import DatabaseOperations from backend.utils.logger_config import setup_logger from backend.api.auth_api import router as auth_router from backend.api.user_material_api import router as user_material_router from backend.api.user_template_api import router as user_template_router from backend.modules.database.models import ProcessRecord from backend.api.auto_post_api import router as auto_post_router logger = setup_logger(__name__) router = APIRouter() # 创建服务实例 ai_swap_cloth_service = AISwapClothService() db_ops = DatabaseOperations() # 启动任务队列服务 from backend.services.task_queue_service import get_task_queue_service task_queue_service = get_task_queue_service() task_queue_service.start() # ==================== 数据模型 ==================== class SwapClothRequest(BaseModel): user_id: int = Field(..., description="用户ID") raw_image_id: int = Field(..., description="原始图片ID") cloth_image_id: int = Field(..., description="衣服图片ID") quantity: Optional[int] = Field(1, ge=1, le=10, description="每组生成数量") class SwapClothResponse(BaseModel): success: bool task_id: Optional[str] = None process_record_id: Optional[int] = None result_image_id: Optional[int] = None copywriter_text: Optional[str] = None history_prompt: Optional[str] = None error: Optional[str] = None error_type: Optional[str] = None # ==================== 中间件和依赖 ==================== async def verify_user(user_id: int): """验证用户是否存在且有效""" user = db_ops.get_user_by_id(user_id) if not user: raise HTTPException(status_code=404, detail=f"用户ID {user_id} 不存在") if not user.get("is_active", False): raise HTTPException(status_code=403, detail="用户账户已被禁用") return user # ==================== API路由 ==================== @router.post("/api/v1/swap-cloth", response_model=SwapClothResponse, tags=["AI换衣服"]) async def process_swap_cloth(request: SwapClothRequest): """ 执行AI换衣服(异步任务) - **user_id**: 用户ID - **raw_image_id**: 原始图片ID - **cloth_image_id**: 衣服图片ID - **quantity**: 每组生成数量(可选) """ try: # 验证用户 await verify_user(request.user_id) # 提交任务到队列(非阻塞) task_id = ai_swap_cloth_service.submit_swap_cloth_task( user_id=request.user_id, raw_image_id=request.raw_image_id, cloth_image_id=request.cloth_image_id, quantity=request.quantity or 1 ) logger.info(f"用户 {request.user_id} 换衣服任务已提交,任务ID: {task_id}") return SwapClothResponse( success=True, task_id=task_id, process_record_id=None, result_image_id=None, copywriter_text=None, history_prompt=None ) except HTTPException: raise except Exception as e: logger.error(f"AI换衣服API处理异常: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=f"服务器内部错误: {str(e)}")