| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- from backend.modules.database.connection import DatabaseConnection
- from backend.modules.database.operations import DatabaseOperations
- from backend.modules.database.models import ImageRecord, User
- from backend.utils.logger_config import setup_logger
- from typing import Optional, List, Dict, Any
- import os
- import uuid
- logger = setup_logger(__name__)
- """
- 实现用户素材管理服务:用户上传素材,用户管理素材,用户删除素材
- """
- class UserMaterialService:
- def __init__(self):
- self.db_ops = DatabaseOperations()
- self.upload_dir = os.path.join(os.getcwd(), 'backend', 'data', 'materials')
- os.makedirs(self.upload_dir, exist_ok=True)
- def upload_material(self, user_id: int, file_bytes: bytes, image_type: str, original_filename: str) -> Dict[str, Any]:
- """
- 用户上传素材
- :param user_id: 用户ID
- :param file_bytes: 文件内容(二进制)
- :param image_type: 图片类型(face/cloth/result/original)
- :param original_filename: 原始文件名
- :return: 上传结果
- """
- logger.info(f"用户{user_id}上传素材: {original_filename}, 类型: {image_type}")
- user = self.db_ops.get_user_by_id(user_id)
- if not user:
- return {"success": False, "error": "用户不存在"}
- if not user.get("is_active", False):
- return {"success": False, "error": "用户已被禁用"}
- # 生成唯一文件名
- ext = os.path.splitext(original_filename)[-1]
- unique_name = f"{user_id}_{uuid.uuid4().hex}{ext}"
- save_path = os.path.join(self.upload_dir, unique_name)
- try:
- with open(save_path, 'wb') as f:
- f.write(file_bytes)
- except Exception as e:
- logger.error(f"文件保存失败: {e}")
- return {"success": False, "error": "文件保存失败"}
- # 在数据库中记录素材信息
- image_record = self.db_ops.create_image_record(
- user_id=user_id,
- image_type=image_type,
- original_filename=original_filename,
- stored_path=save_path
- )
- if not image_record:
- return {"success": False, "error": "数据库记录失败"}
-
- # 生成可访问的URL
- file_name = os.path.basename(save_path)
- file_url = f"/materials/{file_name}"
-
- return {
- "success": True,
- "material_id": image_record['id'],
- "file_path": save_path,
- "file_url": file_url,
- "original_filename": original_filename
- }
- def list_materials(self, user_id: int, material_type: Optional[str] = None, page: int = 1, page_size: int = 20) -> Dict[str, Any]:
- """
- 获取用户素材列表
- :param user_id: 用户ID
- :param material_type: 素材类型(可选)
- :param page: 页码
- :param page_size: 每页数量
- :return: 素材列表
- """
- logger.info(f"获取用户{user_id}的素材列表, 类型: {material_type}, 页码: {page}")
- user = self.db_ops.get_user_by_id(user_id)
- if not user:
- return {"success": False, "error": "用户不存在"}
- if not user.get("is_active", False):
- return {"success": False, "error": "用户已被禁用"}
- result = self.db_ops.get_user_images(user_id, material_type, page, page_size)
- # 保持与旧版本兼容的数据格式
- return {
- "success": True,
- "images": result.get("images", []),
- "total": result.get("total", 0),
- "page": result.get("page", 1),
- "page_size": result.get("page_size", 20),
- "total_pages": result.get("total_pages", 1)
- }
- def delete_material(self, user_id: int, material_id: int) -> Dict[str, Any]:
- """
- 删除用户素材
- :param user_id: 用户ID
- :param material_id: 素材ID
- :return: 删除结果
- """
- logger.info(f"用户{user_id}请求删除素材{material_id}")
- user = self.db_ops.get_user_by_id(user_id)
- if not user:
- return {"success": False, "error": "用户不存在"}
- if not user.get("is_active", False):
- return {"success": False, "error": "用户已被禁用"}
- image = self.db_ops.get_image_record_by_id(material_id)
- if not image:
- return {"success": False, "error": "素材不存在"}
- if image['user_id'] != user_id:
- return {"success": False, "error": "无权删除他人素材"}
- # 删除文件
- try:
- if os.path.exists(image['stored_path']):
- os.remove(image['stored_path'])
- except Exception as e:
- logger.error(f"文件删除失败: {e}")
- return {"success": False, "error": "文件删除失败"}
- # 删除数据库记录
- success = self.db_ops.delete_image_record(material_id)
- if not success:
- return {"success": False, "error": "数据库删除失败"}
- return {"success": True, "message": "素材删除成功"}
- def update_material(self, user_id: int, material_id: int, data: dict) -> Dict[str, Any]:
- """
- 更新用户素材信息
- :param user_id: 用户ID
- :param material_id: 素材ID
- :param data: 更新数据
- :return: 更新结果
- """
- logger.info(f"用户{user_id}请求更新素材{material_id}")
- user = self.db_ops.get_user_by_id(user_id)
- if not user:
- return {"success": False, "error": "用户不存在"}
- if not user.get("is_active", False):
- return {"success": False, "error": "用户已被禁用"}
-
- image = self.db_ops.get_image_record_by_id(material_id)
- if not image:
- return {"success": False, "error": "素材不存在"}
- if image['user_id'] != user_id:
- return {"success": False, "error": "无权更新他人素材"}
-
- # 更新数据库记录
- success = self.db_ops.update_image_record(material_id, data)
- if not success:
- return {"success": False, "error": "数据库更新失败"}
-
- return {"success": True, "message": "素材更新成功"}
- user_material_service = UserMaterialService()
|