| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321 |
- """
- 异步API客户端基类
- 提供通用异步API调用功能,包括错误处理、重试机制、日志记录等
- 使用 aiohttp 实现异步HTTP请求
- """
- import asyncio
- import logging
- from typing import Any, Dict, Optional, Callable
- from dataclasses import dataclass
- import aiohttp
- from aiohttp import ClientSession, ClientTimeout
- from taskflow.logger import get_logger
- logger = get_logger("api_modules.base_client_async")
- @dataclass
- class RetryConfig:
- """
- 重试配置类
- 该类用于配置 API 请求的重试机制,包括最大重试次数、退避因子、需要重试的 HTTP 状态码及需要重试的异常类型。
- 属性:
- max_retries (int): 最大重试次数,默认为 3。当请求失败达到该次数后不再重试。
- backoff_factor (float): 退避因子,控制每次重试间的等待时长。默认为 1.0。
- retry_on_status (tuple): 需要进行重试的 HTTP 状态码,默认为 (500, 502, 503, 504)。
- retry_on_exception (tuple): 需要重试的异常类型,例如连接超时或连接错误,默认为 (aiohttp.ClientError, asyncio.TimeoutError)。
- """
- max_retries: int = 3
- backoff_factor: float = 1.0
- retry_on_status: tuple = (500, 502, 503, 504)
- retry_on_exception: tuple = (aiohttp.ClientError, asyncio.TimeoutError)
- class APIError(Exception):
- """API调用异常"""
-
- def __init__(self, message: str, status_code: Optional[int] = None, response: Optional[Dict] = None):
- """
- 初始化API错误
-
- Args:
- message: 错误消息
- status_code: HTTP状态码
- response: 响应内容
- """
- super().__init__(message)
- self.message = message
- self.status_code = status_code
- self.response = response
-
- def __str__(self):
- if self.status_code:
- return f"{self.message} (Status: {self.status_code})"
- return self.message
- class AsyncAPIClient:
- """
- 异步API客户端基类
-
- 提供通用的异步API调用功能:
- - 统一的请求接口
- - 自动重试机制
- - 错误处理
- - 日志记录
- - 超时控制
-
- 使用示例:
- >>> async with AsyncAPIClient(base_url="https://api.example.com", api_key="your_key") as client:
- ... response = await client.post("/endpoint", json={"data": "value"})
- """
- def __init__(
- self,
- base_url: str,
- api_key: Optional[str] = None,
- timeout: int = 300,
- retry_config: Optional[RetryConfig] = None,
- headers: Optional[Dict[str, str]] = None
- ):
- """
- 初始化异步API客户端
-
- Args:
- base_url: API基础URL
- api_key: API密钥(可选,也可以通过headers传入)
- timeout: 请求超时时间(秒)
- retry_config: 重试配置
- headers: 默认请求头
- """
- self.base_url = base_url.rstrip('/')
- self.api_key = api_key
- self.timeout = timeout
- self.retry_config = retry_config or RetryConfig()
-
- # 设置默认请求头
- self.default_headers = {
- "Content-Type": "application/json",
- **({} if headers is None else headers)
- }
-
- if api_key:
- self.default_headers["Authorization"] = f"Bearer {api_key}"
-
- # 创建session(在异步上下文中创建)
- self._session: Optional[ClientSession] = None
-
- logger.info(f"初始化异步API客户端: {self.base_url}")
- async def _get_session(self) -> ClientSession:
- """获取或创建session"""
- if self._session is None or self._session.closed:
- timeout = ClientTimeout(total=self.timeout)
- self._session = ClientSession(
- timeout=timeout,
- headers=self.default_headers
- )
- return self._session
- def _build_url(self, endpoint: str) -> str:
- """
- 构建完整的URL
-
- Args:
- endpoint: API端点路径
-
- Returns:
- 完整的URL
- """
- endpoint = endpoint.lstrip('/')
- return f"{self.base_url}/{endpoint}"
- async def _handle_response(self, response: aiohttp.ClientResponse) -> Dict[str, Any]:
- """
- 处理API响应
-
- Args:
- response: aiohttp响应对象
-
- Returns:
- 解析后的响应数据
-
- Raises:
- APIError: 如果请求失败
- """
- try:
- response.raise_for_status()
- except aiohttp.ClientResponseError as e:
- # 尝试解析错误响应
- error_detail = None
- try:
- error_detail = await response.json()
- logger.error(f"API错误响应 (JSON): {error_detail}")
- except:
- try:
- error_detail = await response.text()
- logger.error(f"API错误响应 (Text): {error_detail}")
- except:
- error_detail = str(e)
- logger.error(f"API错误响应 (String): {error_detail}")
- raise APIError(
- message=f"API请求失败:{str(e)}",
- status_code=response.status,
- response=error_detail
- )
-
- # 解析响应数据
- try:
- return await response.json()
- except aiohttp.ContentTypeError:
- text = await response.text()
- return {"content": text}
- def _log_request(self, method: str, url: str, **kwargs):
- """记录请求日志"""
- logger.debug(f"{method} {url}")
- if "json" in kwargs:
- logger.debug(f"请求体: {kwargs['json']}")
- def _log_response(self, status: int, response_data: Any = None):
- """记录响应日志"""
- logger.debug(f"响应状态: {status}")
- if response_data:
- logger.debug(f"响应体: {response_data}")
- async def _request_with_retry(
- self,
- method: str,
- url: str,
- headers: Optional[Dict[str, str]] = None,
- **kwargs
- ) -> Dict[str, Any]:
- """
- 带重试机制的请求
-
- Args:
- method: HTTP方法
- url: 完整URL
- headers: 额外的请求头
- **kwargs: 传递给aiohttp的其他参数
-
- Returns:
- API响应数据
- """
- session = await self._get_session()
-
- # 合并请求头
- request_headers = {**self.default_headers}
- if headers:
- request_headers.update(headers)
-
- # 记录请求
- self._log_request(method, url, **kwargs)
-
- last_exception = None
-
- for attempt in range(self.retry_config.max_retries + 1):
- try:
- async with session.request(
- method=method,
- url=url,
- headers=request_headers,
- **kwargs
- ) as response:
- response_data = await self._handle_response(response)
- self._log_response(response.status, response_data)
- return response_data
-
- except Exception as e:
- last_exception = e
-
- # 检查是否需要重试
- should_retry = False
-
- # 检查状态码
- if isinstance(e, APIError) and e.status_code:
- if e.status_code in self.retry_config.retry_on_status:
- should_retry = True
-
- # 检查异常类型
- if isinstance(e, self.retry_config.retry_on_exception):
- should_retry = True
-
- # 如果不需要重试或已达到最大重试次数,直接抛出异常
- if not should_retry or attempt >= self.retry_config.max_retries:
- break
-
- # 计算退避时间
- wait_time = self.retry_config.backoff_factor * (2 ** attempt)
- logger.warning(f"请求失败,{wait_time}秒后重试 (尝试 {attempt + 1}/{self.retry_config.max_retries + 1}): {e}")
- await asyncio.sleep(wait_time)
-
- # 所有重试都失败
- logger.error(f"请求异常: {last_exception}")
- if isinstance(last_exception, APIError):
- raise last_exception
- raise APIError(f"网络请求失败: {str(last_exception)}")
- async def request(
- self,
- method: str,
- endpoint: str,
- headers: Optional[Dict[str, str]] = None,
- **kwargs
- ) -> Dict[str, Any]:
- """
- 发送异步API请求
-
- Args:
- method: HTTP方法(GET, POST, PUT, DELETE等)
- endpoint: API端点路径
- headers: 额外的请求头(会与默认请求头合并)
- **kwargs: 传递给aiohttp的其他参数
-
- Returns:
- API响应数据
-
- Raises:
- APIError: 如果请求失败
- """
- url = self._build_url(endpoint)
- return await self._request_with_retry(method, url, headers, **kwargs)
- async def get(self, endpoint: str, **kwargs) -> Dict[str, Any]:
- """发送异步GET请求"""
- return await self.request("GET", endpoint, **kwargs)
-
- async def post(self, endpoint: str, **kwargs) -> Dict[str, Any]:
- """发送异步POST请求"""
- return await self.request("POST", endpoint, **kwargs)
-
- async def put(self, endpoint: str, **kwargs) -> Dict[str, Any]:
- """发送异步PUT请求"""
- return await self.request("PUT", endpoint, **kwargs)
-
- async def delete(self, endpoint: str, **kwargs) -> Dict[str, Any]:
- """发送异步DELETE请求"""
- return await self.request("DELETE", endpoint, **kwargs)
-
- async def patch(self, endpoint: str, **kwargs) -> Dict[str, Any]:
- """发送异步PATCH请求"""
- return await self.request("PATCH", endpoint, **kwargs)
-
- async def close(self):
- """关闭session"""
- if self._session and not self._session.closed:
- await self._session.close()
- logger.info("异步API客户端已关闭")
-
- async def __aenter__(self):
- """异步上下文管理器入口"""
- await self._get_session()
- return self
-
- async def __aexit__(self, exc_type, exc_val, exc_tb):
- """异步上下文管理器出口"""
- await self.close()
|