import os, io, time, json, csv, ast, traceback, cv2, asyncio, datetime from typing import Optional from concurrent.futures import ThreadPoolExecutor import numpy as np import pandas as pd from PIL import Image from fastapi import FastAPI, File, UploadFile, Form from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from config import ( ocr_images_dir, model, header, port, file_url, Search, ID, Matio, Item ) from utils import get_time, detection, sql_product, image_handle, Compare detect_instance = detection() ocr = detect_instance.ocr app = FastAPI() executor = ThreadPoolExecutor(max_workers=15) camera_connections = {} app.mount(header, StaticFiles(directory=ocr_images_dir), name="static") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.post('/detect_barcode') async def detect_barcode(file: UploadFile = File(...)): try: # 确保目录存在 tmp_dir = os.path.join(ocr_images_dir, 'tmp_images') os.makedirs(tmp_dir, exist_ok=True) # 读取并处理上传的图像 contents = await file.read() upload_timestamp = datetime.datetime.now().strftime("%Y_%m_%d-%H__%M__%S") image = Image.open(io.BytesIO(contents)) image = image_handle.correct_image_orientation(image) image = cv2.cvtColor(np.array(image.convert("RGB")), cv2.COLOR_RGB2BGR) # 保存原始图像 image_origin_path = os.path.join(tmp_dir, f"origin_images_{upload_timestamp}.png") cv2.imwrite(image_origin_path, image) time1 = time.time() # 预测并处理结果 results = model.predict(image, conf=0.3) print(f"predict time is:{time.time() - time1}") for result in results: for obb in result.obb: if int(obb.cls.item()) == 15 and float(obb.conf.item()) > 0: points = obb.xyxyxyxy[0].cpu().numpy() cropped_image = image_handle.crop_image_second(image.copy(), points) barcode_image_path = os.path.join(tmp_dir, f"barcode_images_{upload_timestamp}.png") cv2.imwrite(barcode_image_path, cropped_image) barcode = await asyncio.get_event_loop().run_in_executor( executor, detect_instance.detect_barcode_ocr, barcode_image_path) if barcode: return {'barcode': barcode, 'image_origin_path': image_origin_path, 'upload_timestamp': upload_timestamp} return {'barcode': None, 'image_origin_path': image_origin_path, 'upload_timestamp': upload_timestamp} except Exception as e: print(e) return None, None, None @app.post('/get_barcode') async def get_barcode(message: Optional[str] = Form(None), file: Optional[UploadFile] = File(None)): try: result_dir = os.path.join(ocr_images_dir, 'results') os.makedirs(result_dir, exist_ok=True) if message: item_dict = json.loads(message) message_new = Matio(**item_dict) barcode, image_origin_path, upload_timestamp, barcode_type, matio_id = message_new.barcode, message_new.image_origin_path, message_new.upload_time, message_new.barcode_type, message_new.matio_id elif file: barcode_data = await detect_barcode(file) barcode, image_origin_path, upload_timestamp = barcode_data.get('barcode'), barcode_data.get('image_origin_path'), barcode_data.get('upload_timestamp') barcode_type, matio_id = Matio().barcode_type, Matio().matio_id if not barcode: return JSONResponse(content={"code": 0, "decs": "未识别到barcode,请重新上传"}, status_code=500) image = cv2.imread(image_origin_path) image_width, image_height = image.shape[1], image.shape[0] detection_timestamp = datetime.datetime.now().strftime("%Y_%m_%d-%H__%M__%S") data_result, matio_id, color_id, _ = sql_product.sql_information(barcode=barcode, barcode_type=barcode_type, matio_id=matio_id) results_dir = os.path.join(result_dir, matio_id) os.makedirs(results_dir, exist_ok=True) result_image_path = os.path.join(results_dir, f"ocr_result_images_{detection_timestamp}.png") regular_image_path = os.path.join(results_dir, f"regular_images_{upload_timestamp}.png") cv2.imwrite(regular_image_path, image) ocr_result = [] result = ocr.ocr(regular_image_path, cls=True) for line in result[0]: bbox = [[int(x) for x in point] for point in line[0]] text = line[1][0] image = image_handle.draw_box_and_text(image, bbox, text) ocr_result.append([text]) resize = cv2.resize(image, (image_width, image_height), interpolation=cv2.INTER_AREA) cv2.imwrite(result_image_path, resize) ocr_image_url = file_url + result_image_path.replace(ocr_images_dir, '/') regular_image_url = file_url + regular_image_path.replace(ocr_images_dir, '/') data_set, log = Compare.compare(ocr_result=ocr_result, dataset=data_result) color_set = {k: data_set.get(k) for k in ['color_name', 'color_id', '产品名称', 'language']} color_set['颜色'] = color_set.pop('color_name') if 'color_name' in color_set else '' color_set['色号'] = color_set.pop('color_id') if 'color_id' in color_set else '' color_set['语言'] = color_set.pop('language') if 'language' in color_set else '' name_set = {k: data_set.get(k) for k in ['号型', 'size_id', '产品名称', 'language']} if 'size_id' in name_set: name_set.pop('size_id') name_set['语言'] = name_set.pop('language') if 'language' in name_set else '' for k in ['color_name', 'color_id', '号型', 'size_id', '产品名称', 'language']: data_set.pop(k, None) size_compare_flag, size_compare_logs = sql_product.size_information(matio_id, color_id) color_compare_flag, color_compare_logs = sql_product.color_information(matio_id) csv_file_path = os.path.join(ocr_images_dir, 'history.csv') dicts = { "id": str(len(pd.read_csv(csv_file_path)) if os.path.exists(csv_file_path) else '0'), "matio_id": matio_id, "item_num": matio_id.split('-')[0], "difference": '1' if log else '0', "upload_time": upload_timestamp.replace('__', ':').replace('_', '/').replace('-', ' '), "ocr_time": detection_timestamp.replace('__', ':').replace('_', '/').replace('-', ' '), "logs": log, "regular_image": regular_image_url, "ocr_image": ocr_image_url, "size_compare_flag": size_compare_flag, "size_compare_logs": size_compare_logs if size_compare_logs else "'None'", "color_compare_flag": color_compare_flag, "color_compare_logs": color_compare_logs if color_compare_logs else "'None'", "barcode_type": barcode_type, "data_set": data_set, "color_set": color_set, "name_set": name_set } with open(csv_file_path, 'a', newline='', encoding='utf-8') as csv_file: writer = csv.DictWriter(csv_file, fieldnames=dicts.keys()) if csv_file.tell() == 0: writer.writeheader() writer.writerow(dicts) return JSONResponse(content={"code": 1, "decs": "识别成功"}, status_code=200) except Exception as e: traceback.print_exc() return JSONResponse(content={"code": 0, "decs": "失败,出现错误,请重试"}, status_code=500) @app.post("/test/") async def test(file: Optional[UploadFile] = File(None), item: Optional[str] = Form(None)): if item: item_dict = json.loads(item) item_model = Item(**item_dict) return {"item": item_model.name + item_model.description, "filename": file} else: return {'status': 'succeed', "filename":file.filename} @app.post('/search') async def search_info(message: Search): try: csv_path = os.path.join(ocr_images_dir, 'history.csv') if not os.path.exists(csv_path): return {"code": "1", "decs": None, "data": None} df = pd.read_csv(csv_path) df_length = len(df) # 应用过滤条件 filters = { 'barcode_type': message.barcode_type, 'matio_id': message.matio_id, 'item_num': message.item_num, 'difference': int(message.difference) if message.difference else None } for col, condition in filters.items(): if condition or condition==0: df = df[df[col] == condition] # 时间过滤 for time_col, start_time, end_time in [ ('upload_time', message.uploadStartTime, message.uploadEndTime), ('ocr_time', message.ocrStartTime, message.ocrEndTime) ]: if start_time and end_time: df[time_col] = pd.to_datetime(df[time_col]) start = datetime.datetime(**get_time(start_time)) end = datetime.datetime(**get_time(end_time)) df = df[(df[time_col] >= start) & (df[time_col] <= end)] df[time_col] = df[time_col].dt.strftime("%Y/%m/%d %H:%M:%S") # 排序和分页 df = df.sort_values(by="upload_time", ascending=False) start = (message.pageNum - 1) * message.pageSize end = start + message.pageSize page_data = df.iloc[start:end].to_dict(orient="records") # 处理数据 for record in page_data: for key, value in record.items(): try: record[key] = ast.literal_eval(value) except (SyntaxError, ValueError): pass data = { "records": page_data, "total": str(df_length), "size": str(message.pageSize), "current": str(message.pageNum), "orders": [], "optimizeCountSql": True, "searchCount": True, "countId": '', "maxLimit": '', "pages": str(df_length // message.pageSize + 1) } return {"code": "1", "decs": None, "data": data} except Exception as e: traceback.print_exc() return JSONResponse(content={"code": "0", "decs": str(e)}, status_code=500) @app.post('/get_matio_id') async def get_matio_id(file: UploadFile = File(...)): try: if not file: return {"code": "1", "matio_list": []} barcode_data = await detect_barcode(file) barcode = barcode_data.get('barcode') prefix_code_list = sql_product.sql_matio_id(prefix_code=barcode) return { "code": "1", "matio_list": prefix_code_list, 'image_origin_path': barcode_data.get('image_origin_path'), 'upload_timestamp': barcode_data.get('upload_timestamp'), 'barcode': barcode } except Exception as e: return JSONResponse(content={"code": "0", "decs": str(e)}, status_code=500) @app.post('/show') async def show_info(message: ID): try: csv_path = os.path.join(ocr_images_dir, 'history.csv') df = pd.read_csv(csv_path) data = df[df['id'] == int(message.id)].iloc[0].to_dict() for key, value in data.items(): try: data[key] = ast.literal_eval(value) except (ValueError, SyntaxError): if key == 'logs': data[key] = json.loads(value.replace("'", '"')) return {"code": "1", "decs": None, "data": {"records": data}} except Exception as e: return JSONResponse(content={"code": "0", "decs": str(e)}, status_code=500) @app.post('/history') async def history_info(message: ID): try: csv_path = os.path.join(ocr_images_dir, 'history.csv') df = pd.read_csv(csv_path) # 获取指定 id 对应的 matio_id matio_id = df.loc[df['id'] == int(message.id), 'matio_id'].iloc[0] # 筛选并排序数据 data = df[df['matio_id'] == matio_id].sort_values(by="upload_time", ascending=False) # 格式化时间并转换为字典 data['upload_time'] = pd.to_datetime(data["upload_time"]).dt.strftime("%Y/%m/%d %H:%M:%S") records = data.to_dict(orient='records') # 处理 logs 字段 for record in records: record['logs'] = json.loads(record['logs'].replace("'", '"')) return {"code": "1", "decs": None, "data": {"records": records}} except Exception as e: traceback.print_exc() return JSONResponse(content={"code": "0", "decs": str(e)}, status_code=500) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=port)