123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- import time, requests, hashlib
- from datetime import datetime
- from config import image_app_key, image_app_secret, image_url
- from config import info_appkey, info_appsecret, info_url, info_token_url
- from PIL import ExifTags
- import mysql.connector
- class ProductInfoTokenManager:
- def __init__(self):
- self.token = None
- self.token_expiry = None
- self.info_token_url = info_token_url
- self.info_appkey = info_appkey
- self.info_appsecret = info_appsecret
- self.info_url = info_url
- def get_access_token(self):
- """
- 根据 appkey 和 appsecret 获取访问令牌。
- """
-
- payload = {'appKey': self.info_appkey, 'appSecret': self.info_appsecret}
- response = requests.post(self.info_token_url, json=payload)
- if response.status_code == 200:
- token_data = response.json()
- self.token = token_data['data']
- self.token_expiry = time.time() + 3600 # token 有效期为 1 小时
- else:
- raise Exception("Failed to retrieve access token")
- def get_valid_token(self):
- """
- 获取有效的访问令牌,如果 token 已过期,则重新获取。
- """
- if not self.token or time.time() >= self.token_expiry:
- self.get_access_token()
- return self.token
- def double_try(self, barcode):
- product_info = self.get_product_info_by_barcode(barcode)
- if product_info:
- return barcode, product_info
- product_info_try_1 = self.get_product_info_by_barcode(barcode.replace('5','6'))
- if product_info_try_1:
- return barcode.replace('5','6'), product_info_try_1
- product_info_try_2 = self.get_product_info_by_barcode(barcode.replace('6','5'))
- if product_info_try_2:
- return barcode.replace('6','5'), product_info_try_2
- product_info_try_3 = self.get_product_info_by_barcode(barcode.replace('B','8'))
- if product_info_try_3:
- return barcode.replace('B','8'), product_info_try_3
- product_info_try_4 = self.get_product_info_by_barcode(barcode.replace('8','B'))
- if product_info_try_4:
- return barcode.replace('8','B'), product_info_try_4
- else:
- return barcode, product_info
-
- async def get_product_info_by_barcode(self, barcode):
- """
- 根据条码编号获取商品信息。
- """
- time1 = time.time()
- good_code = barcode[:9] # 必须9
- token = self.get_valid_token()
- headers = {'Authorization': f'Bearer {token}'}
- payload = {'goodsCode': [good_code]}
- # payload = {'goodsCode': ['Gjf052400966']}
- response = requests.post(self.info_url, headers=headers, json=payload)
- time2 = time.time()
- # print(f"get_info_time: {time2 - time1} s!!")
- if response.status_code == 200:
- product_info = response.json()
- if product_info.get('data'):
- return product_info
- else:
- return None
- else:
- return None
- async def get_barcode_from_epc(self, epc_code):
- """
- 根据EPC码获取barcode
- """
- time1 = time.time()
- token = self.get_valid_token()
- headers = {'Authorization': f'{token}'}
- payload = {'rfid': [epc_code]}
- # payload = {'goodsCode': ['Gjf052400966']}
- response = requests.post('http://gbp-api.gloria.com.cn:21015/api/label/query', headers=headers, json=payload)
- time2 = time.time()
- # print(f"get_info_time: {time2 - time1} s!!")
- if response.status_code == 200:
- product_info = response.json()
- data = product_info.get('data')
- if data:
- result = data[0].get('labelCode')
- return result
- else:
- return None
- else:
- return None
- class ProductImage:
- def __init__(self) -> None:
- self.image_app_key = image_app_key
- self.image_app_secret = image_app_secret
- self.image_url = image_url
- def generate_sign(self, timestamp):
- encode_str = self.image_app_key + self.image_app_secret + timestamp
- md5_hash = hashlib.md5(encode_str.encode('utf-8')).hexdigest()
- return md5_hash
- async def get_product_image_by_barcode(self, barcode):
- time1 = time.time()
- part_number = barcode[:12] # 9不包括颜色,12包括颜色
- # Generate the current timestamp
- timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- # Generate the sign
- sign = self.generate_sign(timestamp)
- # Set the headers
- headers = {
- 'appKey': self.image_app_key,
- 'timestamp': timestamp,
- 'sign': sign
- }
- # Set the query parameters
- params = {
- 'partNumber': part_number
- }
- # Define the URL
- # Send the GET request
- response = requests.get(self.image_url, headers=headers, params=params)
- time2 = time.time()
- # print(f"get_image_time: {time2 - time1} s!!")
- # Check the response
- if response.status_code == 200:
- result = response.json()
- if result.get('msg') != '暂无图片、稍后再试' and result.get('imgPath'):
- return response.json()
- else:
- return None
- else:
- return None
- if __name__ == '__main__':
- import asyncio
- from concurrent.futures import ThreadPoolExecutor
- product_info_instance = ProductInfoTokenManager()
- product_image_instance = ProductImage()
- barcode = product_info_instance.get_barcode_from_epc(epc_code="eaaeaeea0000000000651b07".upper())
- print(barcode)
- product_info = product_info_instance.get_product_info_by_barcode(barcode=barcode)
- print(product_info)
- product_image = product_image_instance.get_product_image_by_barcode(barcode=barcode)
- print(product_image)
- # executor = ThreadPoolExecutor(max_workers=15)
- # async def search(message):
- # goods_code = message.get('goods_code')
- # product_info = await asyncio.get_event_loop().run_in_executor(
- # executor, product_info_instance.get_product_info_by_barcode, goods_code)
- # color_data = product_info.get('data',[{}])[0].get('colorData',[{}])
- # goods_name = product_info.get('data',[{}])[0].get('goodsName',[{}])
- # color_codes = {goods_code + item.get('colorCode'): item.get('colorName') for item in color_data if item.get('colorCode')}
- # result = []
- # for code, name in color_codes.items():
- # image_temp = await asyncio.get_event_loop().run_in_executor(
- # executor, product_image_instance.get_product_image_by_barcode, code)
- # print(image_temp)
- # result.append({'goodsCode':goods_code, 'goodsName':goods_name, 'colorCode':code[-3:], 'imgPath':image_temp.get('imgPath'), 'colorName':name})
- # return result
- # async def main():
- # result = await search({'goods_code': '1BCC6N250'})
- # print(result)
- asyncio.run(main())
-
|