from backend.modules.database.connection import DatabaseConnection from backend.modules.database.operations import DatabaseOperations from backend.modules.database.models import ImageRecord, User from backend.utils.logger_config import setup_logger from typing import Optional, List, Dict, Any import os import uuid logger = setup_logger(__name__) """ 实现用户素材管理服务:用户上传素材,用户管理素材,用户删除素材 """ class UserMaterialService: def __init__(self): self.db_ops = DatabaseOperations() self.upload_dir = os.path.join(os.getcwd(), 'backend', 'data', 'materials') os.makedirs(self.upload_dir, exist_ok=True) def upload_material(self, user_id: int, file_bytes: bytes, image_type: str, original_filename: str) -> Dict[str, Any]: """ 用户上传素材 :param user_id: 用户ID :param file_bytes: 文件内容(二进制) :param image_type: 图片类型(face/cloth/result/original) :param original_filename: 原始文件名 :return: 上传结果 """ logger.info(f"用户{user_id}上传素材: {original_filename}, 类型: {image_type}") user = self.db_ops.get_user_by_id(user_id) if not user: return {"success": False, "error": "用户不存在"} if not user.get("is_active", False): return {"success": False, "error": "用户已被禁用"} # 生成唯一文件名 ext = os.path.splitext(original_filename)[-1] unique_name = f"{user_id}_{uuid.uuid4().hex}{ext}" save_path = os.path.join(self.upload_dir, unique_name) try: with open(save_path, 'wb') as f: f.write(file_bytes) except Exception as e: logger.error(f"文件保存失败: {e}") return {"success": False, "error": "文件保存失败"} # 在数据库中记录素材信息 image_record = self.db_ops.create_image_record( user_id=user_id, image_type=image_type, original_filename=original_filename, stored_path=save_path ) if not image_record: return {"success": False, "error": "数据库记录失败"} # 生成可访问的URL file_name = os.path.basename(save_path) file_url = f"/materials/{file_name}" return { "success": True, "material_id": image_record['id'], "file_path": save_path, "file_url": file_url, "original_filename": original_filename } def list_materials(self, user_id: int, material_type: Optional[str] = None, page: int = 1, page_size: int = 20) -> Dict[str, Any]: """ 获取用户素材列表 :param user_id: 用户ID :param material_type: 素材类型(可选) :param page: 页码 :param page_size: 每页数量 :return: 素材列表 """ logger.info(f"获取用户{user_id}的素材列表, 类型: {material_type}, 页码: {page}") user = self.db_ops.get_user_by_id(user_id) if not user: return {"success": False, "error": "用户不存在"} if not user.get("is_active", False): return {"success": False, "error": "用户已被禁用"} result = self.db_ops.get_user_images(user_id, material_type, page, page_size) # 保持与旧版本兼容的数据格式 return { "success": True, "images": result.get("images", []), "total": result.get("total", 0), "page": result.get("page", 1), "page_size": result.get("page_size", 20), "total_pages": result.get("total_pages", 1) } def delete_material(self, user_id: int, material_id: int) -> Dict[str, Any]: """ 删除用户素材 :param user_id: 用户ID :param material_id: 素材ID :return: 删除结果 """ logger.info(f"用户{user_id}请求删除素材{material_id}") user = self.db_ops.get_user_by_id(user_id) if not user: return {"success": False, "error": "用户不存在"} if not user.get("is_active", False): return {"success": False, "error": "用户已被禁用"} image = self.db_ops.get_image_record_by_id(material_id) if not image: return {"success": False, "error": "素材不存在"} if image['user_id'] != user_id: return {"success": False, "error": "无权删除他人素材"} # 删除文件 try: if os.path.exists(image['stored_path']): os.remove(image['stored_path']) except Exception as e: logger.error(f"文件删除失败: {e}") return {"success": False, "error": "文件删除失败"} # 删除数据库记录 success = self.db_ops.delete_image_record(material_id) if not success: return {"success": False, "error": "数据库删除失败"} return {"success": True, "message": "素材删除成功"} def update_material(self, user_id: int, material_id: int, data: dict) -> Dict[str, Any]: """ 更新用户素材信息 :param user_id: 用户ID :param material_id: 素材ID :param data: 更新数据 :return: 更新结果 """ logger.info(f"用户{user_id}请求更新素材{material_id}") user = self.db_ops.get_user_by_id(user_id) if not user: return {"success": False, "error": "用户不存在"} if not user.get("is_active", False): return {"success": False, "error": "用户已被禁用"} image = self.db_ops.get_image_record_by_id(material_id) if not image: return {"success": False, "error": "素材不存在"} if image['user_id'] != user_id: return {"success": False, "error": "无权更新他人素材"} # 更新数据库记录 success = self.db_ops.update_image_record(material_id, data) if not success: return {"success": False, "error": "数据库更新失败"} return {"success": True, "message": "素材更新成功"} user_material_service = UserMaterialService()