| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182 |
- import hashlib
- import uuid
- from typing import Optional, Dict, Any
- from backend.modules.database.operations import DatabaseOperations
- from backend.utils.logger_config import setup_logger
- logger = setup_logger(__name__)
- class AuthService:
- """
- 用户认证与管理服务层
- """
- def __init__(self, db_operations: Optional[DatabaseOperations] = None):
- self.db_ops = db_operations or DatabaseOperations()
- self._sessions = {} # 简单内存session管理,生产建议用redis/jwt
- def _hash_password(self, password: str) -> str:
- return hashlib.sha256(password.encode('utf-8')).hexdigest()
- def register_user(self, username: str, password: str, is_admin: bool = False) -> Dict[str, Any]:
- if not username or not password:
- return {"success": False, "error": "用户名和密码不能为空"}
- if self.db_ops.get_user_by_username(username):
- return {"success": False, "error": "用户名已存在"}
- password_hash = self._hash_password(password)
- user = self.db_ops.create_user(username, password_hash, is_admin)
- logger.info(f"注册新用户: {username}")
- return {"success": True, "user": user}
- def login_user(self, username: str, password: str) -> Dict[str, Any]:
- user = self.db_ops.get_user_by_username(username)
- if not user:
- return {"success": False, "error": "用户不存在"}
- if not user["is_active"]:
- return {"success": False, "error": "用户已被禁用"}
- password_hash = self._hash_password(password)
- if user["password_hash"] != password_hash:
- return {"success": False, "error": "密码错误"}
- # 生成简单session token
- token = str(uuid.uuid4())
- self._sessions[token] = user["id"]
- logger.info(f"用户登录: {username}")
- return {"success": True, "token": token, "user": user}
- def logout_user(self, token: str) -> Dict[str, Any]:
- if token in self._sessions:
- del self._sessions[token]
- logger.info(f"用户退出: token={token}")
- return {"success": True}
- return {"success": False, "error": "无效的token"}
- def get_user_info(self, user_id: int) -> Dict[str, Any]:
- user = self.db_ops.get_user_by_id(user_id)
- if not user:
- return {"success": False, "error": "用户不存在"}
- return {"success": True, "user": user}
- def update_user_info(self, user_id: int, **kwargs) -> Dict[str, Any]:
- if "password" in kwargs:
- kwargs["password_hash"] = self._hash_password(kwargs.pop("password"))
- user = self.db_ops.update_user(user_id, **kwargs)
- if not user:
- return {"success": False, "error": "用户不存在或更新失败"}
- logger.info(f"更新用户信息: {user_id}")
- return {"success": True, "user": user}
- def delete_user(self, user_id: int) -> Dict[str, Any]:
- result = self.db_ops.delete_user(user_id)
- if result:
- logger.info(f"删除用户: {user_id}")
- return {"success": True}
- return {"success": False, "error": "用户不存在或删除失败"}
- def authenticate(self, token: str) -> Optional[int]:
- """根据token获取用户ID,未登录返回None"""
- return self._sessions.get(token)
- # 全局服务实例
- auth_service = AuthService()
|