""" 异步API客户端基类 提供通用异步API调用功能,包括错误处理、重试机制、日志记录等 使用 aiohttp 实现异步HTTP请求 """ import asyncio import logging from typing import Any, Dict, Optional, Callable from dataclasses import dataclass import aiohttp from aiohttp import ClientSession, ClientTimeout from taskflow.logger import get_logger logger = get_logger("api_modules.base_client_async") @dataclass class RetryConfig: """ 重试配置类 该类用于配置 API 请求的重试机制,包括最大重试次数、退避因子、需要重试的 HTTP 状态码及需要重试的异常类型。 属性: max_retries (int): 最大重试次数,默认为 3。当请求失败达到该次数后不再重试。 backoff_factor (float): 退避因子,控制每次重试间的等待时长。默认为 1.0。 retry_on_status (tuple): 需要进行重试的 HTTP 状态码,默认为 (500, 502, 503, 504)。 retry_on_exception (tuple): 需要重试的异常类型,例如连接超时或连接错误,默认为 (aiohttp.ClientError, asyncio.TimeoutError)。 """ max_retries: int = 3 backoff_factor: float = 1.0 retry_on_status: tuple = (500, 502, 503, 504) retry_on_exception: tuple = (aiohttp.ClientError, asyncio.TimeoutError) class APIError(Exception): """API调用异常""" def __init__(self, message: str, status_code: Optional[int] = None, response: Optional[Dict] = None): """ 初始化API错误 Args: message: 错误消息 status_code: HTTP状态码 response: 响应内容 """ super().__init__(message) self.message = message self.status_code = status_code self.response = response def __str__(self): if self.status_code: return f"{self.message} (Status: {self.status_code})" return self.message class AsyncAPIClient: """ 异步API客户端基类 提供通用的异步API调用功能: - 统一的请求接口 - 自动重试机制 - 错误处理 - 日志记录 - 超时控制 使用示例: >>> async with AsyncAPIClient(base_url="https://api.example.com", api_key="your_key") as client: ... response = await client.post("/endpoint", json={"data": "value"}) """ def __init__( self, base_url: str, api_key: Optional[str] = None, timeout: int = 300, retry_config: Optional[RetryConfig] = None, headers: Optional[Dict[str, str]] = None ): """ 初始化异步API客户端 Args: base_url: API基础URL api_key: API密钥(可选,也可以通过headers传入) timeout: 请求超时时间(秒) retry_config: 重试配置 headers: 默认请求头 """ self.base_url = base_url.rstrip('/') self.api_key = api_key self.timeout = timeout self.retry_config = retry_config or RetryConfig() # 设置默认请求头 self.default_headers = { "Content-Type": "application/json", **({} if headers is None else headers) } if api_key: self.default_headers["Authorization"] = f"Bearer {api_key}" # 创建session(在异步上下文中创建) self._session: Optional[ClientSession] = None logger.info(f"初始化异步API客户端: {self.base_url}") async def _get_session(self) -> ClientSession: """获取或创建session""" if self._session is None or self._session.closed: timeout = ClientTimeout(total=self.timeout) self._session = ClientSession( timeout=timeout, headers=self.default_headers ) return self._session def _build_url(self, endpoint: str) -> str: """ 构建完整的URL Args: endpoint: API端点路径 Returns: 完整的URL """ endpoint = endpoint.lstrip('/') return f"{self.base_url}/{endpoint}" async def _handle_response(self, response: aiohttp.ClientResponse) -> Dict[str, Any]: """ 处理API响应 Args: response: aiohttp响应对象 Returns: 解析后的响应数据 Raises: APIError: 如果请求失败 """ try: response.raise_for_status() except aiohttp.ClientResponseError as e: # 尝试解析错误响应 error_detail = None try: error_detail = await response.json() logger.error(f"API错误响应 (JSON): {error_detail}") except: try: error_detail = await response.text() logger.error(f"API错误响应 (Text): {error_detail}") except: error_detail = str(e) logger.error(f"API错误响应 (String): {error_detail}") raise APIError( message=f"API请求失败:{str(e)}", status_code=response.status, response=error_detail ) # 解析响应数据 try: return await response.json() except aiohttp.ContentTypeError: text = await response.text() return {"content": text} def _log_request(self, method: str, url: str, **kwargs): """记录请求日志""" logger.debug(f"{method} {url}") if "json" in kwargs: logger.debug(f"请求体: {kwargs['json']}") def _log_response(self, status: int, response_data: Any = None): """记录响应日志""" logger.debug(f"响应状态: {status}") if response_data: logger.debug(f"响应体: {response_data}") async def _request_with_retry( self, method: str, url: str, headers: Optional[Dict[str, str]] = None, **kwargs ) -> Dict[str, Any]: """ 带重试机制的请求 Args: method: HTTP方法 url: 完整URL headers: 额外的请求头 **kwargs: 传递给aiohttp的其他参数 Returns: API响应数据 """ session = await self._get_session() # 合并请求头 request_headers = {**self.default_headers} if headers: request_headers.update(headers) # 记录请求 self._log_request(method, url, **kwargs) last_exception = None for attempt in range(self.retry_config.max_retries + 1): try: async with session.request( method=method, url=url, headers=request_headers, **kwargs ) as response: response_data = await self._handle_response(response) self._log_response(response.status, response_data) return response_data except Exception as e: last_exception = e # 检查是否需要重试 should_retry = False # 检查状态码 if isinstance(e, APIError) and e.status_code: if e.status_code in self.retry_config.retry_on_status: should_retry = True # 检查异常类型 if isinstance(e, self.retry_config.retry_on_exception): should_retry = True # 如果不需要重试或已达到最大重试次数,直接抛出异常 if not should_retry or attempt >= self.retry_config.max_retries: break # 计算退避时间 wait_time = self.retry_config.backoff_factor * (2 ** attempt) logger.warning(f"请求失败,{wait_time}秒后重试 (尝试 {attempt + 1}/{self.retry_config.max_retries + 1}): {e}") await asyncio.sleep(wait_time) # 所有重试都失败 logger.error(f"请求异常: {last_exception}") if isinstance(last_exception, APIError): raise last_exception raise APIError(f"网络请求失败: {str(last_exception)}") async def request( self, method: str, endpoint: str, headers: Optional[Dict[str, str]] = None, **kwargs ) -> Dict[str, Any]: """ 发送异步API请求 Args: method: HTTP方法(GET, POST, PUT, DELETE等) endpoint: API端点路径 headers: 额外的请求头(会与默认请求头合并) **kwargs: 传递给aiohttp的其他参数 Returns: API响应数据 Raises: APIError: 如果请求失败 """ url = self._build_url(endpoint) return await self._request_with_retry(method, url, headers, **kwargs) async def get(self, endpoint: str, **kwargs) -> Dict[str, Any]: """发送异步GET请求""" return await self.request("GET", endpoint, **kwargs) async def post(self, endpoint: str, **kwargs) -> Dict[str, Any]: """发送异步POST请求""" return await self.request("POST", endpoint, **kwargs) async def put(self, endpoint: str, **kwargs) -> Dict[str, Any]: """发送异步PUT请求""" return await self.request("PUT", endpoint, **kwargs) async def delete(self, endpoint: str, **kwargs) -> Dict[str, Any]: """发送异步DELETE请求""" return await self.request("DELETE", endpoint, **kwargs) async def patch(self, endpoint: str, **kwargs) -> Dict[str, Any]: """发送异步PATCH请求""" return await self.request("PATCH", endpoint, **kwargs) async def close(self): """关闭session""" if self._session and not self._session.closed: await self._session.close() logger.info("异步API客户端已关闭") async def __aenter__(self): """异步上下文管理器入口""" await self._get_session() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """异步上下文管理器出口""" await self.close()