post_data.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. import os
  2. import tos
  3. from dotenv import load_dotenv
  4. from .logger_config import setup_logger
  5. logger = setup_logger(__name__)
  6. # 加载环境变量
  7. load_dotenv()
  8. # 从环境变量获取 AK 和 SK 信息
  9. ak = os.getenv('TOS_ACCESS_KEY')
  10. sk = os.getenv('TOS_SECRET_KEY')
  11. # 存储桶配置信息
  12. endpoint = "https://tos-cn-guangzhou.volces.com"
  13. region = "cn-guangzhou"
  14. bucket_name = "guide-material"
  15. def upload_file_to_tos(file_name: str) -> str:
  16. """
  17. 上传文件到TOS存储桶并返回访问URL
  18. Args:
  19. file_name (str): 本地文件的完整路径
  20. Returns:
  21. str: 上传文件的访问URL
  22. Raises:
  23. Exception: 上传过程中的任何错误
  24. """
  25. try:
  26. # 从文件路径中提取文件名作为object_key
  27. object_key = "oral-clips/" + os.path.basename(file_name)
  28. logger.info(f'开始上传文件: {file_name}')
  29. logger.info(f'文件将保存为: {object_key}')
  30. # 创建客户端并上传文件
  31. client = tos.TosClientV2(ak, sk, endpoint, region)
  32. client.put_object_from_file(bucket_name, object_key, file_name)
  33. # 生成访问URL
  34. object_url = f"https://testdgxcx-oss.gloria.com.cn/{object_key}"
  35. logger.info(f'文件上传成功,访问URL: {object_url}')
  36. return object_url
  37. except tos.exceptions.TosClientError as e:
  38. error_msg = f'上传失败,客户端错误: message={e.message}, cause={e.cause}'
  39. logger.error(error_msg)
  40. raise Exception(error_msg)
  41. except tos.exceptions.TosServerError as e:
  42. error_msg = f'上传失败,服务端错误: code={e.code}, request_id={e.request_id}, message={e.message}, status_code={e.status_code}, ec={e.ec}, request_url={e.request_url}'
  43. logger.error(error_msg)
  44. raise Exception(error_msg)
  45. except Exception as e:
  46. error_msg = f'上传失败,未知错误: {str(e)}'
  47. logger.error(error_msg)
  48. raise Exception(error_msg)
  49. # 使用示例
  50. if __name__ == "__main__":
  51. # python -m utils.upload
  52. test_file = "./output/1ACLAB10A86Y.xlsx"
  53. try:
  54. url = upload_file_to_tos(test_file)
  55. print(f"文件上传成功,访问URL: {url}")
  56. except Exception as e:
  57. print(f"上传失败: {str(e)}")