utils.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. import re
  2. import cv2
  3. from PIL import Image, ImageDraw, ImageFont
  4. import numpy as np
  5. import pymysql
  6. import pandas as pd
  7. import pymssql
  8. import mysql.connector
  9. import datetime
  10. from config import ocr, ttf_path, sql_config, color_config
  11. import time, os
  12. from PIL import ExifTags
  13. from sql_query import *
  14. class detection:
  15. def __init__(self) -> None:
  16. self.ocr = ocr
  17. def rotate_image(self, image, angle):
  18. # 获取原图尺寸
  19. (h, w) = image.shape[:2]
  20. # 计算旋转后的图像需要的尺寸
  21. diagonal = int(np.sqrt(h**2 + w**2))
  22. # 创建一个更大的正方形画布
  23. square = np.zeros((diagonal, diagonal, 3), dtype=np.uint8)
  24. # 计算原图需要平移的距离
  25. offset_x = (diagonal - w) // 2
  26. offset_y = (diagonal - h) // 2
  27. # 将原图放在新画布中心
  28. square[offset_y:offset_y+h, offset_x:offset_x+w] = image
  29. # 旋转图像
  30. center = (diagonal // 2, diagonal // 2)
  31. M = cv2.getRotationMatrix2D(center, angle, 1.0)
  32. rotated = cv2.warpAffine(square, M, (diagonal, diagonal))
  33. return rotated
  34. def rotate_image_second(self, image, angle):
  35. (h, w) = image.shape[:2]
  36. center = (w / 2, h / 2)
  37. M = cv2.getRotationMatrix2D(center, angle, 1.0)
  38. rotated = cv2.warpAffine(image, M, (w, h))
  39. return rotated
  40. # 检查是否符合条码的基本格式
  41. def is_valid_barcode(self, s: str):
  42. # 检查字符串长度是否大于15
  43. # if len(s) <= 15 or len(s) > 22:
  44. # return False
  45. # 检查字符串开头是否为"1" 或 "I"(有时候1会误识别为I,在这个场景下通常其实为1)
  46. # if not s.startswith('1'):
  47. if not (s.startswith('1') or s.startswith("I")):
  48. return False
  49. # 检查字符串中是否包含指定的尺码之一
  50. size_codes = {'XXS', 'XS', 'S', 'M', 'L', 'XL', 'XXL', 'F'}
  51. if any(code in s for code in size_codes):
  52. return True
  53. return False
  54. def check_and_return_string(self, data):
  55. try:
  56. # 将OCR结果转换为字符串
  57. data_str = str(data)
  58. # 使用正则表达式查找以"1"开头并被引号括起来的字符串
  59. # matches = re.findall(r"'(1.*?)'", data_str)
  60. matches = re.findall(r"'([1I].*?)'", data_str)
  61. for match in matches:
  62. if self.is_valid_barcode(match):
  63. # 如果条形码以"I"开头,将其转换为以"1"开头
  64. if match.startswith("I"):
  65. match = "1" + match[1:]
  66. return True, match
  67. return False, None
  68. except Exception as e:
  69. print(e)
  70. return False, None
  71. def detect_barcode_ocr(self, img_path):
  72. time1 = time.time()
  73. image = cv2.imread(img_path)
  74. # 定义旋转角度
  75. angles = [0, 180, 90, 270, 45, 135, 225, 315, 5, 10, 20, 30, 355, 350, 340, 330, 60, 75, 105, 120, 150, 165, 195, 210, 240, 255, 285]
  76. # 遍历角度进行 OCR 识别
  77. for angle in angles:
  78. rotated_image = self.rotate_image(image, angle)
  79. result = self.ocr.ocr(rotated_image, cls=True)
  80. has_barcode, barcode = self.check_and_return_string(result)
  81. if has_barcode:
  82. time2 = time.time()
  83. return barcode
  84. else:
  85. time2 = time.time()
  86. return None
  87. class image_handle:
  88. @staticmethod
  89. def order_points(points):
  90. # 初始化坐标点
  91. rect = np.zeros((4, 2), dtype="float32")
  92. # 顶点和
  93. s = points.sum(axis=1)
  94. rect[0] = points[np.argmin(s)]
  95. rect[2] = points[np.argmax(s)]
  96. # 顶点差
  97. diff = np.diff(points, axis=1)
  98. rect[1] = points[np.argmin(diff)]
  99. rect[3] = points[np.argmax(diff)]
  100. return rect
  101. @staticmethod
  102. def draw_box_and_text(image, bbox, text):
  103. # 使用 CV2 绘制矩形框
  104. cv2.rectangle(image, tuple(bbox[0]), tuple(bbox[2]), (0, 255, 0), 2)
  105. # 转换图像从 BGR 到 RGB
  106. rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
  107. pil_image = Image.fromarray(rgb_image)
  108. # 创建 ImageDraw 对象
  109. draw = ImageDraw.Draw(pil_image)
  110. # 设置字体
  111. font = ImageFont.truetype(ttf_path, 10, encoding="utf-8")
  112. # 计算文本位置(在框的上方)
  113. text_position = (bbox[0][0], bbox[0][1] - 15)
  114. # 绘制文本
  115. draw.text(text_position, text, (255, 0, 0), font=font)
  116. # 将图像转回 BGR 颜色空间
  117. result_image = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR)
  118. return result_image
  119. @staticmethod
  120. def crop_image_second(image, points):
  121. time1 = time.time()
  122. # 获取坐标点顺序
  123. rect = image_handle.order_points(points)
  124. (tl, tr, br, bl) = rect
  125. # 计算新图像的宽度
  126. widthA = np.sqrt(((br[0] - bl[0]) ** 2) + ((br[1] - bl[1]) ** 2))
  127. widthB = np.sqrt(((tr[0] - tl[0]) ** 2) + ((tr[1] - tl[1]) ** 2))
  128. maxWidth = max(int(widthA), int(widthB))
  129. # 计算新图像的高度
  130. heightA = np.sqrt(((tr[0] - br[0]) ** 2) + ((tr[1] - br[1]) ** 2))
  131. heightB = np.sqrt(((tl[0] - bl[0]) ** 2) + ((tl[1] - bl[1]) ** 2))
  132. maxHeight = max(int(heightA), int(heightB))
  133. # 构建目标点
  134. dst = np.array([
  135. [0, 0],
  136. [maxWidth - 1, 0],
  137. [maxWidth - 1, maxHeight - 1],
  138. [0, maxHeight - 1]], dtype="float32")
  139. # 计算透视变换矩阵
  140. M = cv2.getPerspectiveTransform(rect, dst)
  141. # 执行透视变换
  142. warped = cv2.warpPerspective(image, M, (maxWidth, maxHeight))
  143. time2 = time.time()
  144. # print(f"crop_second time: {time2 - time1} s!!")
  145. return warped
  146. @staticmethod
  147. def clean_old_images(directory, lifetime):
  148. current_time = time.time()
  149. for filename in os.listdir(directory):
  150. file_path = os.path.join(directory, filename)
  151. if os.path.isfile(file_path):
  152. file_creation_time = os.path.getctime(file_path)
  153. if current_time - file_creation_time > lifetime:
  154. os.remove(file_path)
  155. # print(f"Deleted old image: {file_path}")
  156. @staticmethod
  157. def correct_image_orientation(image):
  158. try:
  159. for orientation in ExifTags.TAGS.keys():
  160. if ExifTags.TAGS[orientation] == 'Orientation':
  161. break
  162. exif = image._getexif()
  163. if exif is not None:
  164. orientation = exif.get(orientation, 1)
  165. if orientation == 3:
  166. image = image.rotate(180, expand=True)
  167. elif orientation == 6:
  168. image = image.rotate(270, expand=True)
  169. elif orientation == 8:
  170. image = image.rotate(90, expand=True)
  171. except (AttributeError, KeyError, IndexError):
  172. # cases: image don't have getexif
  173. pass
  174. return image
  175. class Compare:
  176. @staticmethod
  177. def remove_whitespace(s):
  178. # 使用正则表达式匹配所有空白字符并替换为空字符串
  179. return re.sub(r'\s+', '', s)
  180. @staticmethod
  181. def replace_bracket(s):
  182. if s:
  183. # 定义一个正则表达式模式,匹配所有类型的括号
  184. pattern = r'[\(\)\[\]\{\}\(\)\【\】\{\}〈〉《》]'
  185. # 使用空字符串替换所有匹配到的括号
  186. result = re.sub(pattern, '', s)
  187. return result
  188. else:
  189. return s
  190. @staticmethod
  191. def convert_new_dic(dataset):
  192. new_dic = {}
  193. special_keys = {
  194. '保养说明': ('K\d{3}', ''),
  195. '温馨提示': ('H\d{3}', '')
  196. }
  197. for key, value in dataset.items():
  198. if value:
  199. value = Compare.remove_whitespace(value)
  200. for special_key, (pattern, initial) in special_keys.items():
  201. if key.startswith(special_key):
  202. value = re.sub(pattern, '', value)
  203. new_dic[special_key] = new_dic.get(special_key, initial) + value
  204. break
  205. else:
  206. new_dic[key] = value
  207. elif key.startswith('温馨提示') and '温馨提示' not in new_dic:
  208. new_dic['温馨提示'] = ''
  209. return new_dic
  210. @staticmethod
  211. def en_to_zh_punctuation(s):
  212. # 英文标点到中文标点的映射
  213. en_to_zh_map = {
  214. ',': ',', ';': ';', ':': ':', '~': '~', '(': '(', ')': ')'
  215. }
  216. # 替换所有出现的英文标点为对应的中文标点
  217. for en, zh in en_to_zh_map.items():
  218. s = s.replace(en, zh)
  219. return s
  220. @staticmethod
  221. def compare(ocr_result, dataset):
  222. extra = set(['XXS', 'XS', 'S', 'M', 'L', 'XL', 'XXL'])
  223. desc = set(['Z', 'G'])
  224. keyword = set(dataset.keys())
  225. dataset = Compare.convert_new_dic(dataset)
  226. dic = {}
  227. i = 0
  228. while i < len(ocr_result):
  229. text = ocr_result[i][0]
  230. if '零售价' in text:
  231. dic['零售价'] = next((l[0].strip('¥') for l in ocr_result if l[0].startswith('¥')), '')
  232. elif ':' in text:
  233. key, value = text.split(':', 1)
  234. if key in keyword:
  235. if value:
  236. dic[key] = Compare.en_to_zh_punctuation(value)
  237. else:
  238. j = i + 1
  239. while j < len(ocr_result) and ocr_result[j][0].split(':')[0] not in keyword | extra | desc:
  240. value += ocr_result[j][0]
  241. j += 1
  242. dic[key] = Compare.en_to_zh_punctuation(re.sub('·', '', value))
  243. i = j - 1
  244. elif text in extra:
  245. dic['尺码'] = text
  246. elif text[0] in ['(', '('] and text[1].isalpha() and '合格证' not in text:
  247. dic['desc11' if 'desc11' not in dic and u'\u4e00' <= text[1] <= u'\u9fa5' else 'desc5'] = text
  248. elif text in desc:
  249. dic['desc4'] = text
  250. i += 1
  251. if dic.get('desc5','') == dataset.get('desc11', ''):
  252. dic['desc5'] = dataset.get('desc11', '')
  253. if dic.get('desc11','') == dataset.get('desc5', ''):
  254. dic['desc11'] = dataset.get('desc5', '')
  255. dataset['价钱下的产品名'] = dataset.pop('desc5') if 'desc5' in dataset else ''
  256. dataset['备注二'] = dataset.pop('desc4') if 'desc4' in dataset else ''
  257. dataset['齐码'] = dataset.pop('desc11') if 'desc11' in dataset else ''
  258. dic['价钱下的产品名'] = dic.pop('desc5') if 'desc5' in dic else ''
  259. dic['备注二'] = dic.pop('desc4') if 'desc4' in dic else ''
  260. dic['齐码'] = dic.pop('desc11') if 'desc11' in dic else ''
  261. log = [
  262. {'name': key, 'value': [dic[key], dataset.get(key, '')]}
  263. for key in dic
  264. if Compare.replace_bracket(dic[key]) != Compare.replace_bracket(dataset.get(key, ''))
  265. ]
  266. return dataset, log
  267. class sql_product:
  268. @staticmethod
  269. def size_information(matio_id, color_id):
  270. item_id = matio_id.split('-')[0]
  271. with pymysql.connect(**sql_config) as conn:
  272. with conn.cursor(pymysql.cursors.DictCursor) as cur:
  273. cur.execute(size_first_sql.format(matio_id=matio_id, color_id=color_id))
  274. production = [
  275. {**({d['SIZE']: d['hao_type'].split('(' if '(' in d['hao_type'] else '(')[1].strip('))').strip()}),
  276. **{k: d[k] for k in ['size_id', 'clothes_type', 'language']}}
  277. for d in cur.fetchall()
  278. ]
  279. cur.execute(size_check_sql.format(item_id=item_id))
  280. base_size = {d["size_code"]: d['sizes'] for d in cur.fetchall()}
  281. logs = [d for d in production if any(d[k] != base_size.get(k) for k in d if k not in ['size_id', 'clothes_type', 'language'])]
  282. return '0' if logs else '1', logs or "'None'"
  283. @staticmethod
  284. def color_information(matio_id):
  285. with pymysql.connect(**sql_config) as mysql_conn, pymssql.connect(**color_config) as sql_server_conn:
  286. with mysql_conn.cursor(pymysql.cursors.DictCursor) as mysql_cur, sql_server_conn.cursor(as_dict=True) as sql_server_cur:
  287. mysql_cur.execute(color_sql.format(matio_id=matio_id))
  288. water_mark_result = mysql_cur.fetchall()
  289. if not water_mark_result:
  290. return '1', "'None'"
  291. sql_server_cur.execute(color_code_sql.format(code=water_mark_result[0]['color_id']))
  292. base_result = sql_server_cur.fetchone()
  293. logs = [
  294. {k: d[k] for k in ('color_name', 'language', 'color_id', 'clothes_type')}
  295. for d in water_mark_result
  296. if d['color_id'] != base_result['code'] or d['color_name'] != base_result['name']
  297. ]
  298. return '0' if logs else '1', logs or "'None'"
  299. @staticmethod
  300. def sql_information(barcode, barcode_type='RFID吊牌', matio_id=''):
  301. conn = mysql.connector.connect(**sql_config)
  302. if barcode_type == 'RFID吊牌':
  303. df_1 = pd.read_sql_query(QUERY_MATIO_ID.format(barcode=barcode), conn)
  304. if df_1.empty:
  305. conn.close()
  306. return {}, matio_id, '', datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
  307. matio_id, color_id, size = df_1.iloc[0][['matio_id', 'COLOR_ID', 'SIZE']]
  308. query_condition = query_condition_rfid
  309. elif barcode_type == '普通吊牌':
  310. color_id, size = barcode[9:12], barcode[12:]
  311. query_condition = query_condition_normal
  312. print(barcode)
  313. print(matio_id)
  314. print(color_id)
  315. print(size)
  316. conn.close()
  317. query_second = QUERY_BASE.format(matio_id=matio_id, color_id=color_id, size=size) + " " + query_condition
  318. print(query_second)
  319. with pymysql.connect(**sql_config) as connn:
  320. curr = connn.cursor()
  321. curr.execute(query_second)
  322. columns = [desc[0] for desc in curr.description]
  323. res = curr.fetchall()
  324. sql_time = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
  325. return (dict(zip(columns, res[0])), matio_id, color_id, sql_time) if res else ({}, matio_id, color_id, sql_time)
  326. @staticmethod
  327. def sql_matio_id(prefix_code):
  328. with pymysql.connect(**sql_config) as conn:
  329. with conn.cursor() as cur:
  330. cur.execute(matio_id_sql.format(prefix_code=prefix_code))
  331. return [row[0] for row in cur.fetchall()]
  332. def get_time(strs):
  333. dicts = {}
  334. dicts['year'] = int(strs.split(' ')[0].split('-')[0])
  335. dicts['month'] = int(strs.split(' ')[0].split('-')[1])
  336. dicts['day'] = int(strs.split(' ')[0].split('-')[2])
  337. dicts['hour'] = int(strs.split(' ')[1].split(':')[0])
  338. dicts['minute'] = int(strs.split(' ')[1].split(':')[1])
  339. dicts['second'] = int(strs.split(' ')[1].split(':')[2])
  340. return dicts
  341. if __name__ == '__main__':
  342. data_result, matio_id, color_id, _ = sql_product.sql_information(barcode='1CCCAB05005WS', barcode_type='普通吊牌', matio_id='1CCCAB050-01')
  343. print(data_result)
  344. # size_compare_flag, size_compare_logs = size_information(matio_id, color_id)
  345. # result = sql_matio_id('1C3JAA57000BL')
  346. # print(result)
  347. # from config import Matio
  348. # x = Matio()
  349. # print(x)
  350. pass