import hashlib import uuid from typing import Optional, Dict, Any from backend.modules.database.operations import DatabaseOperations from backend.utils.logger_config import setup_logger logger = setup_logger(__name__) class AuthService: """ 用户认证与管理服务层 """ def __init__(self, db_operations: Optional[DatabaseOperations] = None): self.db_ops = db_operations or DatabaseOperations() self._sessions = {} # 简单内存session管理,生产建议用redis/jwt def _hash_password(self, password: str) -> str: return hashlib.sha256(password.encode('utf-8')).hexdigest() def register_user(self, username: str, password: str, is_admin: bool = False) -> Dict[str, Any]: if not username or not password: return {"success": False, "error": "用户名和密码不能为空"} if self.db_ops.get_user_by_username(username): return {"success": False, "error": "用户名已存在"} password_hash = self._hash_password(password) user = self.db_ops.create_user(username, password_hash, is_admin) logger.info(f"注册新用户: {username}") return {"success": True, "user": user} def login_user(self, username: str, password: str) -> Dict[str, Any]: user = self.db_ops.get_user_by_username(username) if not user: return {"success": False, "error": "用户不存在"} if not user["is_active"]: return {"success": False, "error": "用户已被禁用"} password_hash = self._hash_password(password) if user["password_hash"] != password_hash: return {"success": False, "error": "密码错误"} # 生成简单session token token = str(uuid.uuid4()) self._sessions[token] = user["id"] logger.info(f"用户登录: {username}") return {"success": True, "token": token, "user": user} def logout_user(self, token: str) -> Dict[str, Any]: if token in self._sessions: del self._sessions[token] logger.info(f"用户退出: token={token}") return {"success": True} return {"success": False, "error": "无效的token"} def get_user_info(self, user_id: int) -> Dict[str, Any]: user = self.db_ops.get_user_by_id(user_id) if not user: return {"success": False, "error": "用户不存在"} return {"success": True, "user": user} def update_user_info(self, user_id: int, **kwargs) -> Dict[str, Any]: if "password" in kwargs: kwargs["password_hash"] = self._hash_password(kwargs.pop("password")) user = self.db_ops.update_user(user_id, **kwargs) if not user: return {"success": False, "error": "用户不存在或更新失败"} logger.info(f"更新用户信息: {user_id}") return {"success": True, "user": user} def delete_user(self, user_id: int) -> Dict[str, Any]: result = self.db_ops.delete_user(user_id) if result: logger.info(f"删除用户: {user_id}") return {"success": True} return {"success": False, "error": "用户不存在或删除失败"} def authenticate(self, token: str) -> Optional[int]: """根据token获取用户ID,未登录返回None""" return self._sessions.get(token) # 全局服务实例 auth_service = AuthService()