import re import cv2 from PIL import Image, ImageDraw, ImageFont import numpy as np import pymysql import pandas as pd import pymssql import mysql.connector import datetime from config import ocr, ttf_path, sql_config, color_config import time, os from PIL import ExifTags from sql_query import * class detection: def __init__(self) -> None: self.ocr = ocr def rotate_image(self, image, angle): # 获取原图尺寸 (h, w) = image.shape[:2] # 计算旋转后的图像需要的尺寸 diagonal = int(np.sqrt(h**2 + w**2)) # 创建一个更大的正方形画布 square = np.zeros((diagonal, diagonal, 3), dtype=np.uint8) # 计算原图需要平移的距离 offset_x = (diagonal - w) // 2 offset_y = (diagonal - h) // 2 # 将原图放在新画布中心 square[offset_y:offset_y+h, offset_x:offset_x+w] = image # 旋转图像 center = (diagonal // 2, diagonal // 2) M = cv2.getRotationMatrix2D(center, angle, 1.0) rotated = cv2.warpAffine(square, M, (diagonal, diagonal)) return rotated def rotate_image_second(self, image, angle): (h, w) = image.shape[:2] center = (w / 2, h / 2) M = cv2.getRotationMatrix2D(center, angle, 1.0) rotated = cv2.warpAffine(image, M, (w, h)) return rotated # 检查是否符合条码的基本格式 def is_valid_barcode(self, s: str): # 检查字符串长度是否大于15 # if len(s) <= 15 or len(s) > 22: # return False # 检查字符串开头是否为"1" 或 "I"(有时候1会误识别为I,在这个场景下通常其实为1) # if not s.startswith('1'): if not (s.startswith('1') or s.startswith("I")): return False # 检查字符串中是否包含指定的尺码之一 size_codes = {'XXS', 'XS', 'S', 'M', 'L', 'XL', 'XXL', 'F'} if any(code in s for code in size_codes): return True return False def check_and_return_string(self, data): try: # 将OCR结果转换为字符串 data_str = str(data) # 使用正则表达式查找以"1"开头并被引号括起来的字符串 # matches = re.findall(r"'(1.*?)'", data_str) matches = re.findall(r"'([1I].*?)'", data_str) for match in matches: if self.is_valid_barcode(match): # 如果条形码以"I"开头,将其转换为以"1"开头 if match.startswith("I"): match = "1" + match[1:] return True, match return False, None except Exception as e: print(e) return False, None def detect_barcode_ocr(self, img_path): time1 = time.time() image = cv2.imread(img_path) # 定义旋转角度 angles = [0, 180, 90, 270, 45, 135, 225, 315, 5, 10, 20, 30, 355, 350, 340, 330, 60, 75, 105, 120, 150, 165, 195, 210, 240, 255, 285] # 遍历角度进行 OCR 识别 for angle in angles: rotated_image = self.rotate_image(image, angle) result = self.ocr.ocr(rotated_image, cls=True) has_barcode, barcode = self.check_and_return_string(result) if has_barcode: time2 = time.time() return barcode else: time2 = time.time() return None class image_handle: @staticmethod def order_points(points): # 初始化坐标点 rect = np.zeros((4, 2), dtype="float32") # 顶点和 s = points.sum(axis=1) rect[0] = points[np.argmin(s)] rect[2] = points[np.argmax(s)] # 顶点差 diff = np.diff(points, axis=1) rect[1] = points[np.argmin(diff)] rect[3] = points[np.argmax(diff)] return rect @staticmethod def draw_box_and_text(image, bbox, text): # 使用 CV2 绘制矩形框 cv2.rectangle(image, tuple(bbox[0]), tuple(bbox[2]), (0, 255, 0), 2) # 转换图像从 BGR 到 RGB rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) pil_image = Image.fromarray(rgb_image) # 创建 ImageDraw 对象 draw = ImageDraw.Draw(pil_image) # 设置字体 font = ImageFont.truetype(ttf_path, 10, encoding="utf-8") # 计算文本位置(在框的上方) text_position = (bbox[0][0], bbox[0][1] - 15) # 绘制文本 draw.text(text_position, text, (255, 0, 0), font=font) # 将图像转回 BGR 颜色空间 result_image = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR) return result_image @staticmethod def crop_image_second(image, points): time1 = time.time() # 获取坐标点顺序 rect = image_handle.order_points(points) (tl, tr, br, bl) = rect # 计算新图像的宽度 widthA = np.sqrt(((br[0] - bl[0]) ** 2) + ((br[1] - bl[1]) ** 2)) widthB = np.sqrt(((tr[0] - tl[0]) ** 2) + ((tr[1] - tl[1]) ** 2)) maxWidth = max(int(widthA), int(widthB)) # 计算新图像的高度 heightA = np.sqrt(((tr[0] - br[0]) ** 2) + ((tr[1] - br[1]) ** 2)) heightB = np.sqrt(((tl[0] - bl[0]) ** 2) + ((tl[1] - bl[1]) ** 2)) maxHeight = max(int(heightA), int(heightB)) # 构建目标点 dst = np.array([ [0, 0], [maxWidth - 1, 0], [maxWidth - 1, maxHeight - 1], [0, maxHeight - 1]], dtype="float32") # 计算透视变换矩阵 M = cv2.getPerspectiveTransform(rect, dst) # 执行透视变换 warped = cv2.warpPerspective(image, M, (maxWidth, maxHeight)) time2 = time.time() # print(f"crop_second time: {time2 - time1} s!!") return warped @staticmethod def clean_old_images(directory, lifetime): current_time = time.time() for filename in os.listdir(directory): file_path = os.path.join(directory, filename) if os.path.isfile(file_path): file_creation_time = os.path.getctime(file_path) if current_time - file_creation_time > lifetime: os.remove(file_path) # print(f"Deleted old image: {file_path}") @staticmethod def correct_image_orientation(image): try: for orientation in ExifTags.TAGS.keys(): if ExifTags.TAGS[orientation] == 'Orientation': break exif = image._getexif() if exif is not None: orientation = exif.get(orientation, 1) if orientation == 3: image = image.rotate(180, expand=True) elif orientation == 6: image = image.rotate(270, expand=True) elif orientation == 8: image = image.rotate(90, expand=True) except (AttributeError, KeyError, IndexError): # cases: image don't have getexif pass return image class Compare: @staticmethod def remove_whitespace(s): # 使用正则表达式匹配所有空白字符并替换为空字符串 return re.sub(r'\s+', '', s) @staticmethod def replace_bracket(s): if s: # 定义一个正则表达式模式,匹配所有类型的括号 pattern = r'[\(\)\[\]\{\}\(\)\【\】\{\}〈〉《》]' # 使用空字符串替换所有匹配到的括号 result = re.sub(pattern, '', s) return result else: return s @staticmethod def convert_new_dic(dataset): new_dic = {} special_keys = { '保养说明': ('K\d{3}', ''), '温馨提示': ('H\d{3}', '') } for key, value in dataset.items(): if value: value = Compare.remove_whitespace(value) for special_key, (pattern, initial) in special_keys.items(): if key.startswith(special_key): value = re.sub(pattern, '', value) new_dic[special_key] = new_dic.get(special_key, initial) + value break else: new_dic[key] = value elif key.startswith('温馨提示') and '温馨提示' not in new_dic: new_dic['温馨提示'] = '' return new_dic @staticmethod def en_to_zh_punctuation(s): # 英文标点到中文标点的映射 en_to_zh_map = { ',': ',', ';': ';', ':': ':', '~': '~', '(': '(', ')': ')' } # 替换所有出现的英文标点为对应的中文标点 for en, zh in en_to_zh_map.items(): s = s.replace(en, zh) return s @staticmethod def compare(ocr_result, dataset): extra = set(['XXS', 'XS', 'S', 'M', 'L', 'XL', 'XXL']) desc = set(['Z', 'G']) keyword = set(dataset.keys()) dataset = Compare.convert_new_dic(dataset) dic = {} i = 0 while i < len(ocr_result): text = ocr_result[i][0] if '零售价' in text: dic['零售价'] = next((l[0].strip('¥') for l in ocr_result if l[0].startswith('¥')), '') elif ':' in text: key, value = text.split(':', 1) if key in keyword: if value: dic[key] = Compare.en_to_zh_punctuation(value) else: j = i + 1 while j < len(ocr_result) and ocr_result[j][0].split(':')[0] not in keyword | extra | desc: value += ocr_result[j][0] j += 1 dic[key] = Compare.en_to_zh_punctuation(re.sub('·', '', value)) i = j - 1 elif text in extra: dic['尺码'] = text elif text[0] in ['(', '('] and text[1].isalpha() and '合格证' not in text: dic['desc11' if 'desc11' not in dic and u'\u4e00' <= text[1] <= u'\u9fa5' else 'desc5'] = text elif text in desc: dic['desc4'] = text i += 1 if dic.get('desc5','') == dataset.get('desc11', ''): dic['desc5'] = dataset.get('desc11', '') if dic.get('desc11','') == dataset.get('desc5', ''): dic['desc11'] = dataset.get('desc5', '') dataset['价钱下的产品名'] = dataset.pop('desc5') if 'desc5' in dataset else '' dataset['备注二'] = dataset.pop('desc4') if 'desc4' in dataset else '' dataset['齐码'] = dataset.pop('desc11') if 'desc11' in dataset else '' dic['价钱下的产品名'] = dic.pop('desc5') if 'desc5' in dic else '' dic['备注二'] = dic.pop('desc4') if 'desc4' in dic else '' dic['齐码'] = dic.pop('desc11') if 'desc11' in dic else '' log = [ {'name': key, 'value': [dic[key], dataset.get(key, '')]} for key in dic if Compare.replace_bracket(dic[key]) != Compare.replace_bracket(dataset.get(key, '')) ] return dataset, log class sql_product: @staticmethod def size_information(matio_id, color_id): item_id = matio_id.split('-')[0] with pymysql.connect(**sql_config) as conn: with conn.cursor(pymysql.cursors.DictCursor) as cur: cur.execute(size_first_sql.format(matio_id=matio_id, color_id=color_id)) production = [ {**({d['SIZE']: d['hao_type'].split('(' if '(' in d['hao_type'] else '(')[1].strip('))').strip()}), **{k: d[k] for k in ['size_id', 'clothes_type', 'language']}} for d in cur.fetchall() ] cur.execute(size_check_sql.format(item_id=item_id)) base_size = {d["size_code"]: d['sizes'] for d in cur.fetchall()} logs = [d for d in production if any(d[k] != base_size.get(k) for k in d if k not in ['size_id', 'clothes_type', 'language'])] return '0' if logs else '1', logs or "'None'" @staticmethod def color_information(matio_id): with pymysql.connect(**sql_config) as mysql_conn, pymssql.connect(**color_config) as sql_server_conn: with mysql_conn.cursor(pymysql.cursors.DictCursor) as mysql_cur, sql_server_conn.cursor(as_dict=True) as sql_server_cur: mysql_cur.execute(color_sql.format(matio_id=matio_id)) water_mark_result = mysql_cur.fetchall() if not water_mark_result: return '1', "'None'" sql_server_cur.execute(color_code_sql.format(code=water_mark_result[0]['color_id'])) base_result = sql_server_cur.fetchone() logs = [ {k: d[k] for k in ('color_name', 'language', 'color_id', 'clothes_type')} for d in water_mark_result if d['color_id'] != base_result['code'] or d['color_name'] != base_result['name'] ] return '0' if logs else '1', logs or "'None'" @staticmethod def sql_information(barcode, barcode_type='RFID吊牌', matio_id=''): conn = mysql.connector.connect(**sql_config) if barcode_type == 'RFID吊牌': df_1 = pd.read_sql_query(QUERY_MATIO_ID.format(barcode=barcode), conn) if df_1.empty: conn.close() return {}, matio_id, '', datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S") matio_id, color_id, size = df_1.iloc[0][['matio_id', 'COLOR_ID', 'SIZE']] query_condition = query_condition_rfid elif barcode_type == '普通吊牌': color_id, size = barcode[9:12], barcode[12:] query_condition = query_condition_normal print(barcode) print(matio_id) print(color_id) print(size) conn.close() query_second = QUERY_BASE.format(matio_id=matio_id, color_id=color_id, size=size) + " " + query_condition print(query_second) with pymysql.connect(**sql_config) as connn: curr = connn.cursor() curr.execute(query_second) columns = [desc[0] for desc in curr.description] res = curr.fetchall() sql_time = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S") return (dict(zip(columns, res[0])), matio_id, color_id, sql_time) if res else ({}, matio_id, color_id, sql_time) @staticmethod def sql_matio_id(prefix_code): with pymysql.connect(**sql_config) as conn: with conn.cursor() as cur: cur.execute(matio_id_sql.format(prefix_code=prefix_code)) return [row[0] for row in cur.fetchall()] def get_time(strs): dicts = {} dicts['year'] = int(strs.split(' ')[0].split('-')[0]) dicts['month'] = int(strs.split(' ')[0].split('-')[1]) dicts['day'] = int(strs.split(' ')[0].split('-')[2]) dicts['hour'] = int(strs.split(' ')[1].split(':')[0]) dicts['minute'] = int(strs.split(' ')[1].split(':')[1]) dicts['second'] = int(strs.split(' ')[1].split(':')[2]) return dicts if __name__ == '__main__': data_result, matio_id, color_id, _ = sql_product.sql_information(barcode='1CCCAB05005WS', barcode_type='普通吊牌', matio_id='1CCCAB050-01') print(data_result) # size_compare_flag, size_compare_logs = size_information(matio_id, color_id) # result = sql_matio_id('1C3JAA57000BL') # print(result) # from config import Matio # x = Matio() # print(x) pass