| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- import os
- import tos
- from dotenv import load_dotenv
- from .logger_config import setup_logger
- logger = setup_logger(__name__)
- # 加载环境变量
- load_dotenv()
- # 从环境变量获取 AK 和 SK 信息
- ak = os.getenv('TOS_ACCESS_KEY')
- sk = os.getenv('TOS_SECRET_KEY')
- # 存储桶配置信息
- endpoint = "https://tos-cn-guangzhou.volces.com"
- region = "cn-guangzhou"
- bucket_name = "guide-material"
- def upload_file_to_tos(file_name: str) -> str:
- """
- 上传文件到TOS存储桶并返回访问URL
-
- Args:
- file_name (str): 本地文件的完整路径
-
- Returns:
- str: 上传文件的访问URL
-
- Raises:
- Exception: 上传过程中的任何错误
- """
- try:
- # 从文件路径中提取文件名作为object_key
- object_key = "oral-clips/" + os.path.basename(file_name)
- logger.info(f'开始上传文件: {file_name}')
- logger.info(f'文件将保存为: {object_key}')
-
- # 创建客户端并上传文件
- client = tos.TosClientV2(ak, sk, endpoint, region)
- client.put_object_from_file(bucket_name, object_key, file_name)
-
- # 生成访问URL
- object_url = f"https://testdgxcx-oss.gloria.com.cn/{object_key}"
- logger.info(f'文件上传成功,访问URL: {object_url}')
- return object_url
-
- except tos.exceptions.TosClientError as e:
- error_msg = f'上传失败,客户端错误: message={e.message}, cause={e.cause}'
- logger.error(error_msg)
- raise Exception(error_msg)
- except tos.exceptions.TosServerError as e:
- error_msg = f'上传失败,服务端错误: code={e.code}, request_id={e.request_id}, message={e.message}, status_code={e.status_code}, ec={e.ec}, request_url={e.request_url}'
- logger.error(error_msg)
- raise Exception(error_msg)
- except Exception as e:
- error_msg = f'上传失败,未知错误: {str(e)}'
- logger.error(error_msg)
- raise Exception(error_msg)
- # 使用示例
- if __name__ == "__main__":
- # python -m utils.upload
- test_file = "./output/1ACLAB10A86Y.xlsx"
- try:
- url = upload_file_to_tos(test_file)
- print(f"文件上传成功,访问URL: {url}")
- except Exception as e:
- print(f"上传失败: {str(e)}")
|