|
@@ -0,0 +1,422 @@
|
|
|
|
+import re
|
|
|
|
+import cv2
|
|
|
|
+from PIL import Image, ImageDraw, ImageFont
|
|
|
|
+import numpy as np
|
|
|
|
+import pymysql
|
|
|
|
+import pandas as pd
|
|
|
|
+import pymssql
|
|
|
|
+import mysql.connector
|
|
|
|
+import datetime
|
|
|
|
+from config import ocr, ttf_path, sql_config, color_config
|
|
|
|
+import time, os
|
|
|
|
+from PIL import ExifTags
|
|
|
|
+from sql_query import *
|
|
|
|
+
|
|
|
|
+class detection:
|
|
|
|
+ def __init__(self) -> None:
|
|
|
|
+ self.ocr = ocr
|
|
|
|
+ def rotate_image(self, image, angle):
|
|
|
|
+ # 获取原图尺寸
|
|
|
|
+ (h, w) = image.shape[:2]
|
|
|
|
+
|
|
|
|
+ # 计算旋转后的图像需要的尺寸
|
|
|
|
+ diagonal = int(np.sqrt(h**2 + w**2))
|
|
|
|
+
|
|
|
|
+ # 创建一个更大的正方形画布
|
|
|
|
+ square = np.zeros((diagonal, diagonal, 3), dtype=np.uint8)
|
|
|
|
+
|
|
|
|
+ # 计算原图需要平移的距离
|
|
|
|
+ offset_x = (diagonal - w) // 2
|
|
|
|
+ offset_y = (diagonal - h) // 2
|
|
|
|
+
|
|
|
|
+ # 将原图放在新画布中心
|
|
|
|
+ square[offset_y:offset_y+h, offset_x:offset_x+w] = image
|
|
|
|
+
|
|
|
|
+ # 旋转图像
|
|
|
|
+ center = (diagonal // 2, diagonal // 2)
|
|
|
|
+ M = cv2.getRotationMatrix2D(center, angle, 1.0)
|
|
|
|
+ rotated = cv2.warpAffine(square, M, (diagonal, diagonal))
|
|
|
|
+
|
|
|
|
+ return rotated
|
|
|
|
+ def rotate_image_second(self, image, angle):
|
|
|
|
+ (h, w) = image.shape[:2]
|
|
|
|
+ center = (w / 2, h / 2)
|
|
|
|
+ M = cv2.getRotationMatrix2D(center, angle, 1.0)
|
|
|
|
+ rotated = cv2.warpAffine(image, M, (w, h))
|
|
|
|
+ return rotated
|
|
|
|
+
|
|
|
|
+ # 检查是否符合条码的基本格式
|
|
|
|
+ def is_valid_barcode(self, s: str):
|
|
|
|
+ # 检查字符串长度是否大于15
|
|
|
|
+ # if len(s) <= 15 or len(s) > 22:
|
|
|
|
+ # return False
|
|
|
|
+ # 检查字符串开头是否为"1" 或 "I"(有时候1会误识别为I,在这个场景下通常其实为1)
|
|
|
|
+ # if not s.startswith('1'):
|
|
|
|
+ if not (s.startswith('1') or s.startswith("I")):
|
|
|
|
+ return False
|
|
|
|
+ # 检查字符串中是否包含指定的尺码之一
|
|
|
|
+ size_codes = {'XXS', 'XS', 'S', 'M', 'L', 'XL', 'XXL', 'F'}
|
|
|
|
+ if any(code in s for code in size_codes):
|
|
|
|
+ return True
|
|
|
|
+ return False
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ def check_and_return_string(self, data):
|
|
|
|
+ try:
|
|
|
|
+ # 将OCR结果转换为字符串
|
|
|
|
+ data_str = str(data)
|
|
|
|
+ # 使用正则表达式查找以"1"开头并被引号括起来的字符串
|
|
|
|
+ # matches = re.findall(r"'(1.*?)'", data_str)
|
|
|
|
+ matches = re.findall(r"'([1I].*?)'", data_str)
|
|
|
|
+ for match in matches:
|
|
|
|
+ if self.is_valid_barcode(match):
|
|
|
|
+ # 如果条形码以"I"开头,将其转换为以"1"开头
|
|
|
|
+ if match.startswith("I"):
|
|
|
|
+ match = "1" + match[1:]
|
|
|
|
+ return True, match
|
|
|
|
+ return False, None
|
|
|
|
+ except Exception as e:
|
|
|
|
+ print(e)
|
|
|
|
+ return False, None
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ def detect_barcode_ocr(self, img_path):
|
|
|
|
+ time1 = time.time()
|
|
|
|
+ image = cv2.imread(img_path)
|
|
|
|
+ # 定义旋转角度
|
|
|
|
+ angles = [0, 180, 90, 270, 45, 135, 225, 315, 5, 10, 20, 30, 355, 350, 340, 330, 60, 75, 105, 120, 150, 165, 195, 210, 240, 255, 285]
|
|
|
|
+ # 遍历角度进行 OCR 识别
|
|
|
|
+ for angle in angles:
|
|
|
|
+ rotated_image = self.rotate_image(image, angle)
|
|
|
|
+ result = self.ocr.ocr(rotated_image, cls=True)
|
|
|
|
+ has_barcode, barcode = self.check_and_return_string(result)
|
|
|
|
+ if has_barcode:
|
|
|
|
+ time2 = time.time()
|
|
|
|
+ return barcode
|
|
|
|
+ else:
|
|
|
|
+ time2 = time.time()
|
|
|
|
+ return None
|
|
|
|
+
|
|
|
|
+class image_handle:
|
|
|
|
+
|
|
|
|
+ @staticmethod
|
|
|
|
+ def order_points(points):
|
|
|
|
+ # 初始化坐标点
|
|
|
|
+ rect = np.zeros((4, 2), dtype="float32")
|
|
|
|
+
|
|
|
|
+ # 顶点和
|
|
|
|
+ s = points.sum(axis=1)
|
|
|
|
+ rect[0] = points[np.argmin(s)]
|
|
|
|
+ rect[2] = points[np.argmax(s)]
|
|
|
|
+
|
|
|
|
+ # 顶点差
|
|
|
|
+ diff = np.diff(points, axis=1)
|
|
|
|
+ rect[1] = points[np.argmin(diff)]
|
|
|
|
+ rect[3] = points[np.argmax(diff)]
|
|
|
|
+
|
|
|
|
+ return rect
|
|
|
|
+
|
|
|
|
+ @staticmethod
|
|
|
|
+ def draw_box_and_text(image, bbox, text):
|
|
|
|
+ # 使用 CV2 绘制矩形框
|
|
|
|
+ cv2.rectangle(image, tuple(bbox[0]), tuple(bbox[2]), (0, 255, 0), 2)
|
|
|
|
+
|
|
|
|
+ # 转换图像从 BGR 到 RGB
|
|
|
|
+ rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
|
|
|
|
+ pil_image = Image.fromarray(rgb_image)
|
|
|
|
+
|
|
|
|
+ # 创建 ImageDraw 对象
|
|
|
|
+ draw = ImageDraw.Draw(pil_image)
|
|
|
|
+
|
|
|
|
+ # 设置字体
|
|
|
|
+ font = ImageFont.truetype(ttf_path, 10, encoding="utf-8")
|
|
|
|
+
|
|
|
|
+ # 计算文本位置(在框的上方)
|
|
|
|
+ text_position = (bbox[0][0], bbox[0][1] - 15)
|
|
|
|
+
|
|
|
|
+ # 绘制文本
|
|
|
|
+ draw.text(text_position, text, (255, 0, 0), font=font)
|
|
|
|
+
|
|
|
|
+ # 将图像转回 BGR 颜色空间
|
|
|
|
+ result_image = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR)
|
|
|
|
+
|
|
|
|
+ return result_image
|
|
|
|
+
|
|
|
|
+ @staticmethod
|
|
|
|
+ def crop_image_second(image, points):
|
|
|
|
+ time1 = time.time()
|
|
|
|
+ # 获取坐标点顺序
|
|
|
|
+ rect = image_handle.order_points(points)
|
|
|
|
+ (tl, tr, br, bl) = rect
|
|
|
|
+
|
|
|
|
+ # 计算新图像的宽度
|
|
|
|
+ widthA = np.sqrt(((br[0] - bl[0]) ** 2) + ((br[1] - bl[1]) ** 2))
|
|
|
|
+ widthB = np.sqrt(((tr[0] - tl[0]) ** 2) + ((tr[1] - tl[1]) ** 2))
|
|
|
|
+ maxWidth = max(int(widthA), int(widthB))
|
|
|
|
+
|
|
|
|
+ # 计算新图像的高度
|
|
|
|
+ heightA = np.sqrt(((tr[0] - br[0]) ** 2) + ((tr[1] - br[1]) ** 2))
|
|
|
|
+ heightB = np.sqrt(((tl[0] - bl[0]) ** 2) + ((tl[1] - bl[1]) ** 2))
|
|
|
|
+ maxHeight = max(int(heightA), int(heightB))
|
|
|
|
+
|
|
|
|
+ # 构建目标点
|
|
|
|
+ dst = np.array([
|
|
|
|
+ [0, 0],
|
|
|
|
+ [maxWidth - 1, 0],
|
|
|
|
+ [maxWidth - 1, maxHeight - 1],
|
|
|
|
+ [0, maxHeight - 1]], dtype="float32")
|
|
|
|
+
|
|
|
|
+ # 计算透视变换矩阵
|
|
|
|
+ M = cv2.getPerspectiveTransform(rect, dst)
|
|
|
|
+
|
|
|
|
+ # 执行透视变换
|
|
|
|
+ warped = cv2.warpPerspective(image, M, (maxWidth, maxHeight))
|
|
|
|
+ time2 = time.time()
|
|
|
|
+ # print(f"crop_second time: {time2 - time1} s!!")
|
|
|
|
+ return warped
|
|
|
|
+
|
|
|
|
+ @staticmethod
|
|
|
|
+ def clean_old_images(directory, lifetime):
|
|
|
|
+ current_time = time.time()
|
|
|
|
+ for filename in os.listdir(directory):
|
|
|
|
+ file_path = os.path.join(directory, filename)
|
|
|
|
+ if os.path.isfile(file_path):
|
|
|
|
+ file_creation_time = os.path.getctime(file_path)
|
|
|
|
+ if current_time - file_creation_time > lifetime:
|
|
|
|
+ os.remove(file_path)
|
|
|
|
+ # print(f"Deleted old image: {file_path}")
|
|
|
|
+ @staticmethod
|
|
|
|
+ def correct_image_orientation(image):
|
|
|
|
+ try:
|
|
|
|
+ for orientation in ExifTags.TAGS.keys():
|
|
|
|
+ if ExifTags.TAGS[orientation] == 'Orientation':
|
|
|
|
+ break
|
|
|
|
+
|
|
|
|
+ exif = image._getexif()
|
|
|
|
+ if exif is not None:
|
|
|
|
+ orientation = exif.get(orientation, 1)
|
|
|
|
+ if orientation == 3:
|
|
|
|
+ image = image.rotate(180, expand=True)
|
|
|
|
+ elif orientation == 6:
|
|
|
|
+ image = image.rotate(270, expand=True)
|
|
|
|
+ elif orientation == 8:
|
|
|
|
+ image = image.rotate(90, expand=True)
|
|
|
|
+ except (AttributeError, KeyError, IndexError):
|
|
|
|
+ # cases: image don't have getexif
|
|
|
|
+ pass
|
|
|
|
+ return image
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class Compare:
|
|
|
|
+
|
|
|
|
+ @staticmethod
|
|
|
|
+ def remove_whitespace(s):
|
|
|
|
+ # 使用正则表达式匹配所有空白字符并替换为空字符串
|
|
|
|
+ return re.sub(r'\s+', '', s)
|
|
|
|
+
|
|
|
|
+ @staticmethod
|
|
|
|
+ def replace_bracket(s):
|
|
|
|
+ if s:
|
|
|
|
+ # 定义一个正则表达式模式,匹配所有类型的括号
|
|
|
|
+ pattern = r'[\(\)\[\]\{\}\(\)\【\】\{\}〈〉《》]'
|
|
|
|
+ # 使用空字符串替换所有匹配到的括号
|
|
|
|
+ result = re.sub(pattern, '', s)
|
|
|
|
+ return result
|
|
|
|
+ else:
|
|
|
|
+ return s
|
|
|
|
+ @staticmethod
|
|
|
|
+ def convert_new_dic(dataset):
|
|
|
|
+ new_dic = {}
|
|
|
|
+ special_keys = {
|
|
|
|
+ '保养说明': ('K\d{3}', ''),
|
|
|
|
+ '温馨提示': ('H\d{3}', '')
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ for key, value in dataset.items():
|
|
|
|
+ if value:
|
|
|
|
+ value = Compare.remove_whitespace(value)
|
|
|
|
+ for special_key, (pattern, initial) in special_keys.items():
|
|
|
|
+ if key.startswith(special_key):
|
|
|
|
+ value = re.sub(pattern, '', value)
|
|
|
|
+ new_dic[special_key] = new_dic.get(special_key, initial) + value
|
|
|
|
+ break
|
|
|
|
+ else:
|
|
|
|
+ new_dic[key] = value
|
|
|
|
+ elif key.startswith('温馨提示') and '温馨提示' not in new_dic:
|
|
|
|
+ new_dic['温馨提示'] = ''
|
|
|
|
+
|
|
|
|
+ return new_dic
|
|
|
|
+
|
|
|
|
+ @staticmethod
|
|
|
|
+ def en_to_zh_punctuation(s):
|
|
|
|
+ # 英文标点到中文标点的映射
|
|
|
|
+ en_to_zh_map = {
|
|
|
|
+ ',': ',', ';': ';', ':': ':', '~': '~', '(': '(', ')': ')'
|
|
|
|
+ }
|
|
|
|
+ # 替换所有出现的英文标点为对应的中文标点
|
|
|
|
+ for en, zh in en_to_zh_map.items():
|
|
|
|
+ s = s.replace(en, zh)
|
|
|
|
+ return s
|
|
|
|
+
|
|
|
|
+ @staticmethod
|
|
|
|
+ def compare(ocr_result, dataset):
|
|
|
|
+ extra = set(['XXS', 'XS', 'S', 'M', 'L', 'XL', 'XXL'])
|
|
|
|
+ desc = set(['Z', 'G'])
|
|
|
|
+ keyword = set(dataset.keys())
|
|
|
|
+ dataset = Compare.convert_new_dic(dataset)
|
|
|
|
+ dic = {}
|
|
|
|
+ i = 0
|
|
|
|
+
|
|
|
|
+ while i < len(ocr_result):
|
|
|
|
+ text = ocr_result[i][0]
|
|
|
|
+
|
|
|
|
+ if '零售价' in text:
|
|
|
|
+ dic['零售价'] = next((l[0].strip('¥') for l in ocr_result if l[0].startswith('¥')), '')
|
|
|
|
+ elif ':' in text:
|
|
|
|
+ key, value = text.split(':', 1)
|
|
|
|
+ if key in keyword:
|
|
|
|
+ if value:
|
|
|
|
+ dic[key] = Compare.en_to_zh_punctuation(value)
|
|
|
|
+ else:
|
|
|
|
+ j = i + 1
|
|
|
|
+ while j < len(ocr_result) and ocr_result[j][0].split(':')[0] not in keyword | extra | desc:
|
|
|
|
+ value += ocr_result[j][0]
|
|
|
|
+ j += 1
|
|
|
|
+ dic[key] = Compare.en_to_zh_punctuation(re.sub('·', '', value))
|
|
|
|
+ i = j - 1
|
|
|
|
+ elif text in extra:
|
|
|
|
+ dic['尺码'] = text
|
|
|
|
+ elif text[0] in ['(', '('] and text[1].isalpha() and '合格证' not in text:
|
|
|
|
+ dic['desc11' if 'desc11' not in dic and u'\u4e00' <= text[1] <= u'\u9fa5' else 'desc5'] = text
|
|
|
|
+ elif text in desc:
|
|
|
|
+ dic['desc4'] = text
|
|
|
|
+
|
|
|
|
+ i += 1
|
|
|
|
+ if dic.get('desc5','') == dataset.get('desc11', ''):
|
|
|
|
+ dic['desc5'] = dataset.get('desc11', '')
|
|
|
|
+ if dic.get('desc11','') == dataset.get('desc5', ''):
|
|
|
|
+ dic['desc11'] = dataset.get('desc5', '')
|
|
|
|
+
|
|
|
|
+ dataset['价钱下的产品名'] = dataset.pop('desc5') if 'desc5' in dataset else ''
|
|
|
|
+ dataset['备注二'] = dataset.pop('desc4') if 'desc4' in dataset else ''
|
|
|
|
+ dataset['齐码'] = dataset.pop('desc11') if 'desc11' in dataset else ''
|
|
|
|
+
|
|
|
|
+ dic['价钱下的产品名'] = dic.pop('desc5') if 'desc5' in dic else ''
|
|
|
|
+ dic['备注二'] = dic.pop('desc4') if 'desc4' in dic else ''
|
|
|
|
+ dic['齐码'] = dic.pop('desc11') if 'desc11' in dic else ''
|
|
|
|
+
|
|
|
|
+ log = [
|
|
|
|
+ {'name': key, 'value': [dic[key], dataset.get(key, '')]}
|
|
|
|
+ for key in dic
|
|
|
|
+ if Compare.replace_bracket(dic[key]) != Compare.replace_bracket(dataset.get(key, ''))
|
|
|
|
+ ]
|
|
|
|
+
|
|
|
|
+ return dataset, log
|
|
|
|
+
|
|
|
|
+class sql_product:
|
|
|
|
+ @staticmethod
|
|
|
|
+ def size_information(matio_id, color_id):
|
|
|
|
+ item_id = matio_id.split('-')[0]
|
|
|
|
+
|
|
|
|
+ with pymysql.connect(**sql_config) as conn:
|
|
|
|
+ with conn.cursor(pymysql.cursors.DictCursor) as cur:
|
|
|
|
+ cur.execute(size_first_sql.format(matio_id=matio_id, color_id=color_id))
|
|
|
|
+ production = [
|
|
|
|
+ {**({d['SIZE']: d['hao_type'].split('(' if '(' in d['hao_type'] else '(')[1].strip('))').strip()}),
|
|
|
|
+ **{k: d[k] for k in ['size_id', 'clothes_type', 'language']}}
|
|
|
|
+ for d in cur.fetchall()
|
|
|
|
+ ]
|
|
|
|
+ cur.execute(size_check_sql.format(item_id=item_id))
|
|
|
|
+ base_size = {d["size_code"]: d['sizes'] for d in cur.fetchall()}
|
|
|
|
+ logs = [d for d in production if any(d[k] != base_size.get(k) for k in d if k not in ['size_id', 'clothes_type', 'language'])]
|
|
|
|
+ return '0' if logs else '1', logs or "'None'"
|
|
|
|
+
|
|
|
|
+ @staticmethod
|
|
|
|
+ def color_information(matio_id):
|
|
|
|
+ with pymysql.connect(**sql_config) as mysql_conn, pymssql.connect(**color_config) as sql_server_conn:
|
|
|
|
+ with mysql_conn.cursor(pymysql.cursors.DictCursor) as mysql_cur, sql_server_conn.cursor(as_dict=True) as sql_server_cur:
|
|
|
|
+ mysql_cur.execute(color_sql.format(matio_id=matio_id))
|
|
|
|
+ water_mark_result = mysql_cur.fetchall()
|
|
|
|
+
|
|
|
|
+ if not water_mark_result:
|
|
|
|
+ return '1', "'None'"
|
|
|
|
+
|
|
|
|
+ sql_server_cur.execute(color_code_sql.format(code=water_mark_result[0]['color_id']))
|
|
|
|
+ base_result = sql_server_cur.fetchone()
|
|
|
|
+
|
|
|
|
+ logs = [
|
|
|
|
+ {k: d[k] for k in ('color_name', 'language', 'color_id', 'clothes_type')}
|
|
|
|
+ for d in water_mark_result
|
|
|
|
+ if d['color_id'] != base_result['code'] or d['color_name'] != base_result['name']
|
|
|
|
+ ]
|
|
|
|
+
|
|
|
|
+ return '0' if logs else '1', logs or "'None'"
|
|
|
|
+
|
|
|
|
+ @staticmethod
|
|
|
|
+ def sql_information(barcode, barcode_type='RFID吊牌', matio_id=''):
|
|
|
|
+
|
|
|
|
+ conn = mysql.connector.connect(**sql_config)
|
|
|
|
+
|
|
|
|
+ if barcode_type == 'RFID吊牌':
|
|
|
|
+ df_1 = pd.read_sql_query(QUERY_MATIO_ID.format(barcode=barcode), conn)
|
|
|
|
+ if df_1.empty:
|
|
|
|
+ conn.close()
|
|
|
|
+ return {}, matio_id, '', datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
|
|
|
|
+ matio_id, color_id, size = df_1.iloc[0][['matio_id', 'COLOR_ID', 'SIZE']]
|
|
|
|
+ query_condition = query_condition_rfid
|
|
|
|
+ elif barcode_type == '普通吊牌':
|
|
|
|
+ color_id, size = barcode[9:12], barcode[12:]
|
|
|
|
+ query_condition = query_condition_normal
|
|
|
|
+ print(barcode)
|
|
|
|
+ print(matio_id)
|
|
|
|
+ print(color_id)
|
|
|
|
+ print(size)
|
|
|
|
+ conn.close()
|
|
|
|
+ query_second = QUERY_BASE.format(matio_id=matio_id, color_id=color_id, size=size) + " " + query_condition
|
|
|
|
+ print(query_second)
|
|
|
|
+ with pymysql.connect(**sql_config) as connn:
|
|
|
|
+ curr = connn.cursor()
|
|
|
|
+ curr.execute(query_second)
|
|
|
|
+ columns = [desc[0] for desc in curr.description]
|
|
|
|
+ res = curr.fetchall()
|
|
|
|
+
|
|
|
|
+ sql_time = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
|
|
|
|
+
|
|
|
|
+ return (dict(zip(columns, res[0])), matio_id, color_id, sql_time) if res else ({}, matio_id, color_id, sql_time)
|
|
|
|
+
|
|
|
|
+ @staticmethod
|
|
|
|
+ def sql_matio_id(prefix_code):
|
|
|
|
+ with pymysql.connect(**sql_config) as conn:
|
|
|
|
+ with conn.cursor() as cur:
|
|
|
|
+ cur.execute(matio_id_sql.format(prefix_code=prefix_code))
|
|
|
|
+ return [row[0] for row in cur.fetchall()]
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def get_time(strs):
|
|
|
|
+ dicts = {}
|
|
|
|
+ dicts['year'] = int(strs.split(' ')[0].split('-')[0])
|
|
|
|
+ dicts['month'] = int(strs.split(' ')[0].split('-')[1])
|
|
|
|
+ dicts['day'] = int(strs.split(' ')[0].split('-')[2])
|
|
|
|
+ dicts['hour'] = int(strs.split(' ')[1].split(':')[0])
|
|
|
|
+ dicts['minute'] = int(strs.split(' ')[1].split(':')[1])
|
|
|
|
+ dicts['second'] = int(strs.split(' ')[1].split(':')[2])
|
|
|
|
+ return dicts
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+if __name__ == '__main__':
|
|
|
|
+ data_result, matio_id, color_id, _ = sql_product.sql_information(barcode='1CCCAB05005WS', barcode_type='普通吊牌', matio_id='1CCCAB050-01')
|
|
|
|
+ print(data_result)
|
|
|
|
+ # size_compare_flag, size_compare_logs = size_information(matio_id, color_id)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ # result = sql_matio_id('1C3JAA57000BL')
|
|
|
|
+ # print(result)
|
|
|
|
+ # from config import Matio
|
|
|
|
+ # x = Matio()
|
|
|
|
+ # print(x)
|
|
|
|
+ pass
|