base_client.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. """
  2. API客户端基类
  3. 提供通用API调用功能,包括错误处理、重试机制、日志记录等
  4. """
  5. import time
  6. import logging
  7. from typing import Any, Dict, Optional, Callable
  8. from dataclasses import dataclass
  9. import requests
  10. from requests.adapters import HTTPAdapter
  11. from urllib3.util import Retry
  12. from taskflow.logger import get_logger
  13. logger = get_logger("api_modules.base_client")
  14. @dataclass
  15. class RetryConfig:
  16. """
  17. 重试配置类
  18. 该类用于配置 API 请求的重试机制,包括最大重试次数、退避因子、需要重试的 HTTP 状态码及需要重试的异常类型。
  19. 属性:
  20. max_retries (int): 最大重试次数,默认为 3。当请求失败达到该次数后不再重试。
  21. backoff_factor (float): 退避因子,控制每次重试间的等待时长。默认为 1.0。
  22. retry_on_status (tuple): 需要进行重试的 HTTP 状态码,默认为 (500, 502, 503, 504)。
  23. retry_on_exception (tuple): 需要重试的异常类型,例如连接超时或连接错误,默认为 (requests.exceptions.ConnectionError, requests.exceptions.Timeout)。
  24. """
  25. max_retries: int = 3
  26. backoff_factor: float = 1.0
  27. retry_on_status: tuple = (500, 502, 503, 504)
  28. retry_on_exception: tuple = (requests.exceptions.ConnectionError, requests.exceptions.Timeout)
  29. class APIError(Exception):
  30. """API调用异常"""
  31. def __init__(self, message: str, status_code: Optional[int] = None, response: Optional[Dict] = None):
  32. """
  33. 初始化API错误
  34. Args:
  35. message: 错误消息
  36. status_code: HTTP状态码
  37. response: 响应内容
  38. """
  39. super().__init__(message)
  40. self.message = message
  41. self.status_code = status_code
  42. self.response = response
  43. def __str__(self):
  44. if self.status_code:
  45. return f"{self.message} (Status: {self.status_code})"
  46. return self.message
  47. class APIClient:
  48. """
  49. API客户端基类
  50. 提供通用的API调用功能:
  51. - 统一的请求接口
  52. - 自动重试机制
  53. - 错误处理
  54. - 日志记录
  55. - 超时控制
  56. 使用示例:
  57. >>> client = APIClient(base_url="https://api.example.com", api_key="your_key")
  58. >>> response = client.post("/endpoint", json={"data": "value"})
  59. """
  60. def __init__(
  61. self,
  62. base_url: str,
  63. api_key: Optional[str] = None,
  64. timeout: int = 300,
  65. retry_config: Optional[RetryConfig] = None,
  66. headers: Optional[Dict[str, str]] = None
  67. ):
  68. """
  69. 初始化API客户端
  70. Args:
  71. base_url: API基础URL
  72. api_key: API密钥(可选,也可以通过headers传入)
  73. timeout: 请求超时时间(秒)
  74. retry_config: 重试配置
  75. headers: 默认请求头
  76. """
  77. self.base_url = base_url.rstrip('/')
  78. self.api_key = api_key
  79. self.timeout = timeout
  80. self.retry_config = retry_config or RetryConfig()
  81. # 设置默认请求头
  82. self.default_headers = {
  83. "Content-Type": "application/json",
  84. **({} if headers is None else headers)
  85. }
  86. if api_key:
  87. self.default_headers["Authorization"] = f"Bearer {api_key}"
  88. # 创建session并配置重试
  89. self.session = requests.Session()
  90. self._setup_retry()
  91. logger.info(f"初始化API客户端: {self.base_url}")
  92. def _setup_retry(self):
  93. """配置重试机制"""
  94. retry = Retry(
  95. total=self.retry_config.max_retries,
  96. backoff_factor=self.retry_config.backoff_factor,
  97. status_forcelist=self.retry_config.retry_on_status,
  98. raise_on_status=False,
  99. )
  100. adapter = HTTPAdapter(max_retries=retry)
  101. self.session.mount('https://', adapter)
  102. self.session.mount('http://', adapter)
  103. def _build_url(self, endpoint: str) -> str:
  104. """
  105. 构建完整的URL
  106. Args:
  107. endpoint: API端点路径
  108. Returns:
  109. 完整的URL
  110. """
  111. endpoint = endpoint.lstrip('/')
  112. return f"{self.base_url}/{endpoint}"
  113. def _handle_response(self, response: requests.Response) -> Dict[str, Any]:
  114. """
  115. 处理API响应
  116. Args:
  117. response: requests响应对象
  118. Returns:
  119. 解析后的响应数据
  120. Raises:
  121. APIError: 如果请求失败
  122. """
  123. try:
  124. response.raise_for_status()
  125. except requests.exceptions.HTTPError as e:
  126. # 尝试解析错误响应
  127. error_detail = None
  128. try:
  129. error_detail = response.json()
  130. except:
  131. error_detail = response.text
  132. raise APIError(
  133. message=f"API请求失败:{str(e)}",
  134. status_code=response.status_code,
  135. response=error_detail
  136. )
  137. # 解析响应数据
  138. try:
  139. return response.json()
  140. except ValueError:
  141. return {"content": response.text}
  142. def _log_request(self, method: str, url: str, **kwargs):
  143. """记录请求日志"""
  144. logger.debug(f"{method} {url}")
  145. if "json" in kwargs:
  146. logger.debug(f"请求体: {kwargs['json']}")
  147. def _log_response(self, response: requests.Response):
  148. """记录响应日志"""
  149. logger.debug(f"响应状态: {response.status_code}")
  150. try:
  151. logger.debug(f"响应体: {response.json()}")
  152. except:
  153. logger.debug(f"响应体: {response.text[:200]}")
  154. def request(
  155. self,
  156. method: str,
  157. endpoint: str,
  158. headers: Optional[Dict[str, str]] = None,
  159. **kwargs
  160. ) -> Dict[str, Any]:
  161. """
  162. 发送API请求
  163. Args:
  164. method: HTTP方法(GET, POST, PUT, DELETE等)
  165. endpoint: API端点路径
  166. headers: 额外的请求头(会与默认请求头合并)
  167. **kwargs: 传递给requests的其他参数
  168. Returns:
  169. API响应数据
  170. Raises:
  171. APIError: 如果请求失败
  172. """
  173. url = self._build_url(endpoint)
  174. # 合并请求头
  175. request_headers = {**self.default_headers}
  176. if headers:
  177. request_headers.update(headers)
  178. # 记录请求
  179. self._log_request(method, url, **kwargs)
  180. try:
  181. response = self.session.request(
  182. method=method,
  183. url=url,
  184. headers=request_headers,
  185. timeout=self.timeout,
  186. **kwargs
  187. )
  188. # 记录响应
  189. self._log_response(response)
  190. return self._handle_response(response)
  191. except requests.exceptions.RequestException as e:
  192. logger.error(f"请求异常: {e}")
  193. raise APIError(f"网络请求失败: {str(e)}")
  194. def get(self, endpoint: str, **kwargs) -> Dict[str, Any]:
  195. """发送GET请求"""
  196. return self.request("GET", endpoint, **kwargs)
  197. def post(self, endpoint: str, **kwargs) -> Dict[str, Any]:
  198. """发送POST请求"""
  199. return self.request("POST", endpoint, **kwargs)
  200. def put(self, endpoint: str, **kwargs) -> Dict[str, Any]:
  201. """发送PUT请求"""
  202. return self.request("PUT", endpoint, **kwargs)
  203. def delete(self, endpoint: str, **kwargs) -> Dict[str, Any]:
  204. """发送DELETE请求"""
  205. return self.request("DELETE", endpoint, **kwargs)
  206. def patch(self, endpoint: str, **kwargs) -> Dict[str, Any]:
  207. """发送PATCH请求"""
  208. return self.request("PATCH", endpoint, **kwargs)
  209. def close(self):
  210. """关闭session"""
  211. self.session.close()
  212. logger.info("API客户端已关闭")
  213. def __enter__(self):
  214. """上下文管理器入口"""
  215. return self
  216. def __exit__(self, exc_type, exc_val, exc_tb):
  217. """上下文管理器出口"""
  218. self.close()