tools_use.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. import time, requests, hashlib
  2. from datetime import datetime
  3. from config import image_app_key, image_app_secret, image_url
  4. from config import info_appkey, info_appsecret, info_url, info_token_url
  5. from PIL import ExifTags
  6. import mysql.connector
  7. class ProductInfoTokenManager:
  8. def __init__(self):
  9. self.token = None
  10. self.token_expiry = None
  11. self.info_token_url = info_token_url
  12. self.info_appkey = info_appkey
  13. self.info_appsecret = info_appsecret
  14. self.info_url = info_url
  15. def get_access_token(self):
  16. """
  17. 根据 appkey 和 appsecret 获取访问令牌。
  18. """
  19. payload = {'appKey': self.info_appkey, 'appSecret': self.info_appsecret}
  20. response = requests.post(self.info_token_url, json=payload)
  21. if response.status_code == 200:
  22. token_data = response.json()
  23. self.token = token_data['data']
  24. self.token_expiry = time.time() + 3600 # token 有效期为 1 小时
  25. else:
  26. raise Exception("Failed to retrieve access token")
  27. def get_valid_token(self):
  28. """
  29. 获取有效的访问令牌,如果 token 已过期,则重新获取。
  30. """
  31. if not self.token or time.time() >= self.token_expiry:
  32. self.get_access_token()
  33. return self.token
  34. def double_try(self, barcode):
  35. product_info = self.get_product_info_by_barcode(barcode)
  36. if product_info:
  37. return barcode, product_info
  38. product_info_try_1 = self.get_product_info_by_barcode(barcode.replace('5','6'))
  39. if product_info_try_1:
  40. return barcode.replace('5','6'), product_info_try_1
  41. product_info_try_2 = self.get_product_info_by_barcode(barcode.replace('6','5'))
  42. if product_info_try_2:
  43. return barcode.replace('6','5'), product_info_try_2
  44. product_info_try_3 = self.get_product_info_by_barcode(barcode.replace('B','8'))
  45. if product_info_try_3:
  46. return barcode.replace('B','8'), product_info_try_3
  47. product_info_try_4 = self.get_product_info_by_barcode(barcode.replace('8','B'))
  48. if product_info_try_4:
  49. return barcode.replace('8','B'), product_info_try_4
  50. else:
  51. return barcode, product_info
  52. async def get_product_info_by_barcode(self, barcode):
  53. """
  54. 根据条码编号获取商品信息。
  55. """
  56. time1 = time.time()
  57. good_code = barcode[:9] # 必须9
  58. token = self.get_valid_token()
  59. headers = {'Authorization': f'Bearer {token}'}
  60. payload = {'goodsCode': [good_code]}
  61. # payload = {'goodsCode': ['Gjf052400966']}
  62. response = requests.post(self.info_url, headers=headers, json=payload)
  63. time2 = time.time()
  64. # print(f"get_info_time: {time2 - time1} s!!")
  65. if response.status_code == 200:
  66. product_info = response.json()
  67. if product_info.get('data'):
  68. return product_info
  69. else:
  70. return None
  71. else:
  72. return None
  73. async def get_barcode_from_epc(self, epc_code):
  74. """
  75. 根据EPC码获取barcode
  76. """
  77. time1 = time.time()
  78. token = self.get_valid_token()
  79. headers = {'Authorization': f'{token}'}
  80. payload = {'rfid': [epc_code]}
  81. # payload = {'goodsCode': ['Gjf052400966']}
  82. response = requests.post('http://gbp-api.gloria.com.cn:21015/api/label/query', headers=headers, json=payload)
  83. time2 = time.time()
  84. # print(f"get_info_time: {time2 - time1} s!!")
  85. if response.status_code == 200:
  86. product_info = response.json()
  87. data = product_info.get('data')
  88. if data:
  89. result = data[0].get('labelCode')
  90. return result
  91. else:
  92. return None
  93. else:
  94. return None
  95. class ProductImage:
  96. def __init__(self) -> None:
  97. self.image_app_key = image_app_key
  98. self.image_app_secret = image_app_secret
  99. self.image_url = image_url
  100. def generate_sign(self, timestamp):
  101. encode_str = self.image_app_key + self.image_app_secret + timestamp
  102. md5_hash = hashlib.md5(encode_str.encode('utf-8')).hexdigest()
  103. return md5_hash
  104. async def get_product_image_by_barcode(self, barcode):
  105. time1 = time.time()
  106. part_number = barcode[:12] # 9不包括颜色,12包括颜色
  107. # Generate the current timestamp
  108. timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  109. # Generate the sign
  110. sign = self.generate_sign(timestamp)
  111. # Set the headers
  112. headers = {
  113. 'appKey': self.image_app_key,
  114. 'timestamp': timestamp,
  115. 'sign': sign
  116. }
  117. # Set the query parameters
  118. params = {
  119. 'partNumber': part_number
  120. }
  121. # Define the URL
  122. # Send the GET request
  123. response = requests.get(self.image_url, headers=headers, params=params)
  124. time2 = time.time()
  125. # print(f"get_image_time: {time2 - time1} s!!")
  126. # Check the response
  127. if response.status_code == 200:
  128. result = response.json()
  129. if result.get('msg') != '暂无图片、稍后再试' and result.get('imgPath'):
  130. return response.json()
  131. else:
  132. return None
  133. else:
  134. return None
  135. if __name__ == '__main__':
  136. import asyncio
  137. from concurrent.futures import ThreadPoolExecutor
  138. product_info_instance = ProductInfoTokenManager()
  139. product_image_instance = ProductImage()
  140. barcode = product_info_instance.get_barcode_from_epc(epc_code="eaaeaeea0000000000651b07".upper())
  141. print(barcode)
  142. product_info = product_info_instance.get_product_info_by_barcode(barcode=barcode)
  143. print(product_info)
  144. product_image = product_image_instance.get_product_image_by_barcode(barcode=barcode)
  145. print(product_image)
  146. # executor = ThreadPoolExecutor(max_workers=15)
  147. # async def search(message):
  148. # goods_code = message.get('goods_code')
  149. # product_info = await asyncio.get_event_loop().run_in_executor(
  150. # executor, product_info_instance.get_product_info_by_barcode, goods_code)
  151. # color_data = product_info.get('data',[{}])[0].get('colorData',[{}])
  152. # goods_name = product_info.get('data',[{}])[0].get('goodsName',[{}])
  153. # color_codes = {goods_code + item.get('colorCode'): item.get('colorName') for item in color_data if item.get('colorCode')}
  154. # result = []
  155. # for code, name in color_codes.items():
  156. # image_temp = await asyncio.get_event_loop().run_in_executor(
  157. # executor, product_image_instance.get_product_image_by_barcode, code)
  158. # print(image_temp)
  159. # result.append({'goodsCode':goods_code, 'goodsName':goods_name, 'colorCode':code[-3:], 'imgPath':image_temp.get('imgPath'), 'colorName':name})
  160. # return result
  161. # async def main():
  162. # result = await search({'goods_code': '1BCC6N250'})
  163. # print(result)
  164. asyncio.run(main())