auth_service.py 3.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. import hashlib
  2. import uuid
  3. from typing import Optional, Dict, Any
  4. from backend.modules.database.operations import DatabaseOperations
  5. from backend.utils.logger_config import setup_logger
  6. logger = setup_logger(__name__)
  7. class AuthService:
  8. """
  9. 用户认证与管理服务层
  10. """
  11. def __init__(self, db_operations: Optional[DatabaseOperations] = None):
  12. self.db_ops = db_operations or DatabaseOperations()
  13. self._sessions = {} # 简单内存session管理,生产建议用redis/jwt
  14. def _hash_password(self, password: str) -> str:
  15. return hashlib.sha256(password.encode('utf-8')).hexdigest()
  16. def register_user(self, username: str, password: str, is_admin: bool = False) -> Dict[str, Any]:
  17. if not username or not password:
  18. return {"success": False, "error": "用户名和密码不能为空"}
  19. if self.db_ops.get_user_by_username(username):
  20. return {"success": False, "error": "用户名已存在"}
  21. password_hash = self._hash_password(password)
  22. user = self.db_ops.create_user(username, password_hash, is_admin)
  23. logger.info(f"注册新用户: {username}")
  24. return {"success": True, "user": user}
  25. def login_user(self, username: str, password: str) -> Dict[str, Any]:
  26. user = self.db_ops.get_user_by_username(username)
  27. if not user:
  28. return {"success": False, "error": "用户不存在"}
  29. if not user["is_active"]:
  30. return {"success": False, "error": "用户已被禁用"}
  31. password_hash = self._hash_password(password)
  32. if user["password_hash"] != password_hash:
  33. return {"success": False, "error": "密码错误"}
  34. # 生成简单session token
  35. token = str(uuid.uuid4())
  36. self._sessions[token] = user["id"]
  37. logger.info(f"用户登录: {username}")
  38. return {"success": True, "token": token, "user": user}
  39. def logout_user(self, token: str) -> Dict[str, Any]:
  40. if token in self._sessions:
  41. del self._sessions[token]
  42. logger.info(f"用户退出: token={token}")
  43. return {"success": True}
  44. return {"success": False, "error": "无效的token"}
  45. def get_user_info(self, user_id: int) -> Dict[str, Any]:
  46. user = self.db_ops.get_user_by_id(user_id)
  47. if not user:
  48. return {"success": False, "error": "用户不存在"}
  49. return {"success": True, "user": user}
  50. def update_user_info(self, user_id: int, **kwargs) -> Dict[str, Any]:
  51. if "password" in kwargs:
  52. kwargs["password_hash"] = self._hash_password(kwargs.pop("password"))
  53. user = self.db_ops.update_user(user_id, **kwargs)
  54. if not user:
  55. return {"success": False, "error": "用户不存在或更新失败"}
  56. logger.info(f"更新用户信息: {user_id}")
  57. return {"success": True, "user": user}
  58. def delete_user(self, user_id: int) -> Dict[str, Any]:
  59. result = self.db_ops.delete_user(user_id)
  60. if result:
  61. logger.info(f"删除用户: {user_id}")
  62. return {"success": True}
  63. return {"success": False, "error": "用户不存在或删除失败"}
  64. def authenticate(self, token: str) -> Optional[int]:
  65. """根据token获取用户ID,未登录返回None"""
  66. return self._sessions.get(token)
  67. # 全局服务实例
  68. auth_service = AuthService()