import time, requests, hashlib from datetime import datetime from config import image_app_key, image_app_secret, image_url from config import info_appkey, info_appsecret, info_url, info_token_url from PIL import ExifTags import mysql.connector class ProductInfoTokenManager: def __init__(self): self.token = None self.token_expiry = None self.info_token_url = info_token_url self.info_appkey = info_appkey self.info_appsecret = info_appsecret self.info_url = info_url def get_access_token(self): """ 根据 appkey 和 appsecret 获取访问令牌。 """ payload = {'appKey': self.info_appkey, 'appSecret': self.info_appsecret} response = requests.post(self.info_token_url, json=payload) if response.status_code == 200: token_data = response.json() self.token = token_data['data'] self.token_expiry = time.time() + 3600 # token 有效期为 1 小时 else: raise Exception("Failed to retrieve access token") def get_valid_token(self): """ 获取有效的访问令牌,如果 token 已过期,则重新获取。 """ if not self.token or time.time() >= self.token_expiry: self.get_access_token() return self.token def double_try(self, barcode): product_info = self.get_product_info_by_barcode(barcode) if product_info: return barcode, product_info product_info_try_1 = self.get_product_info_by_barcode(barcode.replace('5','6')) if product_info_try_1: return barcode.replace('5','6'), product_info_try_1 product_info_try_2 = self.get_product_info_by_barcode(barcode.replace('6','5')) if product_info_try_2: return barcode.replace('6','5'), product_info_try_2 product_info_try_3 = self.get_product_info_by_barcode(barcode.replace('B','8')) if product_info_try_3: return barcode.replace('B','8'), product_info_try_3 product_info_try_4 = self.get_product_info_by_barcode(barcode.replace('8','B')) if product_info_try_4: return barcode.replace('8','B'), product_info_try_4 else: return barcode, product_info async def get_product_info_by_barcode(self, barcode): """ 根据条码编号获取商品信息。 """ time1 = time.time() good_code = barcode[:9] # 必须9 token = self.get_valid_token() headers = {'Authorization': f'Bearer {token}'} payload = {'goodsCode': [good_code]} # payload = {'goodsCode': ['Gjf052400966']} response = requests.post(self.info_url, headers=headers, json=payload) time2 = time.time() # print(f"get_info_time: {time2 - time1} s!!") if response.status_code == 200: product_info = response.json() if product_info.get('data'): return product_info else: return None else: return None async def get_barcode_from_epc(self, epc_code): """ 根据EPC码获取barcode """ time1 = time.time() token = self.get_valid_token() headers = {'Authorization': f'{token}'} payload = {'rfid': [epc_code]} # payload = {'goodsCode': ['Gjf052400966']} response = requests.post('http://gbp-api.gloria.com.cn:21015/api/label/query', headers=headers, json=payload) time2 = time.time() # print(f"get_info_time: {time2 - time1} s!!") if response.status_code == 200: product_info = response.json() data = product_info.get('data') if data: result = data[0].get('labelCode') return result else: return None else: return None class ProductImage: def __init__(self) -> None: self.image_app_key = image_app_key self.image_app_secret = image_app_secret self.image_url = image_url def generate_sign(self, timestamp): encode_str = self.image_app_key + self.image_app_secret + timestamp md5_hash = hashlib.md5(encode_str.encode('utf-8')).hexdigest() return md5_hash async def get_product_image_by_barcode(self, barcode): time1 = time.time() part_number = barcode[:12] # 9不包括颜色,12包括颜色 # Generate the current timestamp timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # Generate the sign sign = self.generate_sign(timestamp) # Set the headers headers = { 'appKey': self.image_app_key, 'timestamp': timestamp, 'sign': sign } # Set the query parameters params = { 'partNumber': part_number } # Define the URL # Send the GET request response = requests.get(self.image_url, headers=headers, params=params) time2 = time.time() # print(f"get_image_time: {time2 - time1} s!!") # Check the response if response.status_code == 200: result = response.json() if result.get('msg') != '暂无图片、稍后再试' and result.get('imgPath'): return response.json() else: return None else: return None if __name__ == '__main__': import asyncio from concurrent.futures import ThreadPoolExecutor product_info_instance = ProductInfoTokenManager() product_image_instance = ProductImage() barcode = product_info_instance.get_barcode_from_epc(epc_code="eaaeaeea0000000000651b07".upper()) print(barcode) product_info = product_info_instance.get_product_info_by_barcode(barcode=barcode) print(product_info) product_image = product_image_instance.get_product_image_by_barcode(barcode=barcode) print(product_image) # executor = ThreadPoolExecutor(max_workers=15) # async def search(message): # goods_code = message.get('goods_code') # product_info = await asyncio.get_event_loop().run_in_executor( # executor, product_info_instance.get_product_info_by_barcode, goods_code) # color_data = product_info.get('data',[{}])[0].get('colorData',[{}]) # goods_name = product_info.get('data',[{}])[0].get('goodsName',[{}]) # color_codes = {goods_code + item.get('colorCode'): item.get('colorName') for item in color_data if item.get('colorCode')} # result = [] # for code, name in color_codes.items(): # image_temp = await asyncio.get_event_loop().run_in_executor( # executor, product_image_instance.get_product_image_by_barcode, code) # print(image_temp) # result.append({'goodsCode':goods_code, 'goodsName':goods_name, 'colorCode':code[-3:], 'imgPath':image_temp.get('imgPath'), 'colorName':name}) # return result # async def main(): # result = await search({'goods_code': '1BCC6N250'}) # print(result) asyncio.run(main())