123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311 |
- import os, io, time, json, csv, ast, traceback, cv2, asyncio, datetime
- from typing import Optional
- from concurrent.futures import ThreadPoolExecutor
- import numpy as np
- import pandas as pd
- from PIL import Image
- from fastapi import FastAPI, File, UploadFile, Form
- from fastapi.responses import JSONResponse
- from fastapi.middleware.cors import CORSMiddleware
- from fastapi.staticfiles import StaticFiles
- from config import (
- ocr_images_dir, model, header, port, file_url,
- Search, ID, Matio, Item
- )
- from utils import get_time, detection, sql_product, image_handle, Compare
- detect_instance = detection()
- ocr = detect_instance.ocr
- app = FastAPI()
- executor = ThreadPoolExecutor(max_workers=15)
- camera_connections = {}
- app.mount(header, StaticFiles(directory=ocr_images_dir), name="static")
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["*"],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- @app.post('/detect_barcode')
- async def detect_barcode(file: UploadFile = File(...)):
- try:
- # 确保目录存在
- tmp_dir = os.path.join(ocr_images_dir, 'tmp_images')
- os.makedirs(tmp_dir, exist_ok=True)
-
- # 读取并处理上传的图像
- contents = await file.read()
- upload_timestamp = datetime.datetime.now().strftime("%Y_%m_%d-%H__%M__%S")
- image = Image.open(io.BytesIO(contents))
- image = image_handle.correct_image_orientation(image)
- image = cv2.cvtColor(np.array(image.convert("RGB")), cv2.COLOR_RGB2BGR)
-
- # 保存原始图像
- image_origin_path = os.path.join(tmp_dir, f"origin_images_{upload_timestamp}.png")
- cv2.imwrite(image_origin_path, image)
- time1 = time.time()
- # 预测并处理结果
- results = model.predict(image, conf=0.3)
- print(f"predict time is:{time.time() - time1}")
-
- for result in results:
- for obb in result.obb:
- if int(obb.cls.item()) == 15 and float(obb.conf.item()) > 0:
- points = obb.xyxyxyxy[0].cpu().numpy()
- cropped_image = image_handle.crop_image_second(image.copy(), points)
- barcode_image_path = os.path.join(tmp_dir, f"barcode_images_{upload_timestamp}.png")
- cv2.imwrite(barcode_image_path, cropped_image)
- barcode = await asyncio.get_event_loop().run_in_executor(
- executor, detect_instance.detect_barcode_ocr, barcode_image_path)
- if barcode:
- return {'barcode': barcode, 'image_origin_path': image_origin_path, 'upload_timestamp': upload_timestamp}
-
- return {'barcode': None, 'image_origin_path': image_origin_path, 'upload_timestamp': upload_timestamp}
- except Exception as e:
- print(e)
- return None, None, None
- @app.post('/get_barcode')
- async def get_barcode(message: Optional[str] = Form(None), file: Optional[UploadFile] = File(None)):
- try:
- result_dir = os.path.join(ocr_images_dir, 'results')
- os.makedirs(result_dir, exist_ok=True)
- if message:
- item_dict = json.loads(message)
- message_new = Matio(**item_dict)
- barcode, image_origin_path, upload_timestamp, barcode_type, matio_id = message_new.barcode, message_new.image_origin_path, message_new.upload_time, message_new.barcode_type, message_new.matio_id
- elif file:
- barcode_data = await detect_barcode(file)
- barcode, image_origin_path, upload_timestamp = barcode_data.get('barcode'), barcode_data.get('image_origin_path'), barcode_data.get('upload_timestamp')
- barcode_type, matio_id = Matio().barcode_type, Matio().matio_id
-
- if not barcode:
- return JSONResponse(content={"code": 0, "decs": "未识别到barcode,请重新上传"}, status_code=500)
-
- image = cv2.imread(image_origin_path)
- image_width, image_height = image.shape[1], image.shape[0]
-
- detection_timestamp = datetime.datetime.now().strftime("%Y_%m_%d-%H__%M__%S")
- data_result, matio_id, color_id, _ = sql_product.sql_information(barcode=barcode, barcode_type=barcode_type, matio_id=matio_id)
-
- results_dir = os.path.join(result_dir, matio_id)
- os.makedirs(results_dir, exist_ok=True)
-
- result_image_path = os.path.join(results_dir, f"ocr_result_images_{detection_timestamp}.png")
- regular_image_path = os.path.join(results_dir, f"regular_images_{upload_timestamp}.png")
- cv2.imwrite(regular_image_path, image)
-
- ocr_result = []
- result = ocr.ocr(regular_image_path, cls=True)
-
- for line in result[0]:
- bbox = [[int(x) for x in point] for point in line[0]]
- text = line[1][0]
- image = image_handle.draw_box_and_text(image, bbox, text)
- ocr_result.append([text])
-
- resize = cv2.resize(image, (image_width, image_height), interpolation=cv2.INTER_AREA)
- cv2.imwrite(result_image_path, resize)
-
- ocr_image_url = file_url + result_image_path.replace(ocr_images_dir, '/')
- regular_image_url = file_url + regular_image_path.replace(ocr_images_dir, '/')
-
- data_set, log = Compare.compare(ocr_result=ocr_result, dataset=data_result)
- color_set = {k: data_set.get(k) for k in ['color_name', 'color_id', '产品名称', 'language']}
- color_set['颜色'] = color_set.pop('color_name') if 'color_name' in color_set else ''
- color_set['色号'] = color_set.pop('color_id') if 'color_id' in color_set else ''
- color_set['语言'] = color_set.pop('language') if 'language' in color_set else ''
- name_set = {k: data_set.get(k) for k in ['号型', 'size_id', '产品名称', 'language']}
- if 'size_id' in name_set:
- name_set.pop('size_id')
- name_set['语言'] = name_set.pop('language') if 'language' in name_set else ''
- for k in ['color_name', 'color_id', '号型', 'size_id', '产品名称', 'language']:
- data_set.pop(k, None)
-
- size_compare_flag, size_compare_logs = sql_product.size_information(matio_id, color_id)
- color_compare_flag, color_compare_logs = sql_product.color_information(matio_id)
-
- csv_file_path = os.path.join(ocr_images_dir, 'history.csv')
- dicts = {
- "id": str(len(pd.read_csv(csv_file_path)) if os.path.exists(csv_file_path) else '0'),
- "matio_id": matio_id,
- "item_num": matio_id.split('-')[0],
- "difference": '1' if log else '0',
- "upload_time": upload_timestamp.replace('__', ':').replace('_', '/').replace('-', ' '),
- "ocr_time": detection_timestamp.replace('__', ':').replace('_', '/').replace('-', ' '),
- "logs": log,
- "regular_image": regular_image_url,
- "ocr_image": ocr_image_url,
- "size_compare_flag": size_compare_flag,
- "size_compare_logs": size_compare_logs if size_compare_logs else "'None'",
- "color_compare_flag": color_compare_flag,
- "color_compare_logs": color_compare_logs if color_compare_logs else "'None'",
- "barcode_type": barcode_type,
- "data_set": data_set,
- "color_set": color_set,
- "name_set": name_set
- }
-
- with open(csv_file_path, 'a', newline='', encoding='utf-8') as csv_file:
- writer = csv.DictWriter(csv_file, fieldnames=dicts.keys())
- if csv_file.tell() == 0:
- writer.writeheader()
- writer.writerow(dicts)
-
- return JSONResponse(content={"code": 1, "decs": "识别成功"}, status_code=200)
-
- except Exception as e:
- traceback.print_exc()
- return JSONResponse(content={"code": 0, "decs": "失败,出现错误,请重试"}, status_code=500)
- @app.post("/test/")
- async def test(file: Optional[UploadFile] = File(None),
- item: Optional[str] = Form(None)):
- if item:
- item_dict = json.loads(item)
- item_model = Item(**item_dict)
- return {"item": item_model.name + item_model.description, "filename": file}
- else:
- return {'status': 'succeed', "filename":file.filename}
- @app.post('/search')
- async def search_info(message: Search):
- try:
- csv_path = os.path.join(ocr_images_dir, 'history.csv')
- if not os.path.exists(csv_path):
- return {"code": "1", "decs": None, "data": None}
-
- df = pd.read_csv(csv_path)
- df_length = len(df)
-
- # 应用过滤条件
- filters = {
- 'barcode_type': message.barcode_type,
- 'matio_id': message.matio_id,
- 'item_num': message.item_num,
- 'difference': int(message.difference) if message.difference else None
- }
- for col, condition in filters.items():
-
- if condition or condition==0:
- df = df[df[col] == condition]
- # 时间过滤
- for time_col, start_time, end_time in [
- ('upload_time', message.uploadStartTime, message.uploadEndTime),
- ('ocr_time', message.ocrStartTime, message.ocrEndTime)
- ]:
- if start_time and end_time:
- df[time_col] = pd.to_datetime(df[time_col])
- start = datetime.datetime(**get_time(start_time))
- end = datetime.datetime(**get_time(end_time))
- df = df[(df[time_col] >= start) & (df[time_col] <= end)]
- df[time_col] = df[time_col].dt.strftime("%Y/%m/%d %H:%M:%S")
-
- # 排序和分页
- df = df.sort_values(by="upload_time", ascending=False)
- start = (message.pageNum - 1) * message.pageSize
- end = start + message.pageSize
- page_data = df.iloc[start:end].to_dict(orient="records")
-
- # 处理数据
- for record in page_data:
- for key, value in record.items():
- try:
- record[key] = ast.literal_eval(value)
- except (SyntaxError, ValueError):
- pass
-
- data = {
- "records": page_data,
- "total": str(df_length),
- "size": str(message.pageSize),
- "current": str(message.pageNum),
- "orders": [],
- "optimizeCountSql": True,
- "searchCount": True,
- "countId": '',
- "maxLimit": '',
- "pages": str(df_length // message.pageSize + 1)
- }
-
- return {"code": "1", "decs": None, "data": data}
-
- except Exception as e:
- traceback.print_exc()
- return JSONResponse(content={"code": "0", "decs": str(e)}, status_code=500)
- @app.post('/get_matio_id')
- async def get_matio_id(file: UploadFile = File(...)):
- try:
- if not file:
- return {"code": "1", "matio_list": []}
-
- barcode_data = await detect_barcode(file)
- barcode = barcode_data.get('barcode')
- prefix_code_list = sql_product.sql_matio_id(prefix_code=barcode)
-
- return {
- "code": "1",
- "matio_list": prefix_code_list,
- 'image_origin_path': barcode_data.get('image_origin_path'),
- 'upload_timestamp': barcode_data.get('upload_timestamp'),
- 'barcode': barcode
- }
-
- except Exception as e:
- return JSONResponse(content={"code": "0", "decs": str(e)}, status_code=500)
- @app.post('/show')
- async def show_info(message: ID):
- try:
- csv_path = os.path.join(ocr_images_dir, 'history.csv')
- df = pd.read_csv(csv_path)
- data = df[df['id'] == int(message.id)].iloc[0].to_dict()
-
- for key, value in data.items():
- try:
- data[key] = ast.literal_eval(value)
- except (ValueError, SyntaxError):
- if key == 'logs':
- data[key] = json.loads(value.replace("'", '"'))
-
- return {"code": "1", "decs": None, "data": {"records": data}}
- except Exception as e:
- return JSONResponse(content={"code": "0", "decs": str(e)}, status_code=500)
- @app.post('/history')
- async def history_info(message: ID):
- try:
- csv_path = os.path.join(ocr_images_dir, 'history.csv')
- df = pd.read_csv(csv_path)
-
- # 获取指定 id 对应的 matio_id
- matio_id = df.loc[df['id'] == int(message.id), 'matio_id'].iloc[0]
-
- # 筛选并排序数据
- data = df[df['matio_id'] == matio_id].sort_values(by="upload_time", ascending=False)
-
- # 格式化时间并转换为字典
- data['upload_time'] = pd.to_datetime(data["upload_time"]).dt.strftime("%Y/%m/%d %H:%M:%S")
- records = data.to_dict(orient='records')
-
- # 处理 logs 字段
- for record in records:
- record['logs'] = json.loads(record['logs'].replace("'", '"'))
-
- return {"code": "1", "decs": None, "data": {"records": records}}
-
- except Exception as e:
- traceback.print_exc()
- return JSONResponse(content={"code": "0", "decs": str(e)}, status_code=500)
- if __name__ == "__main__":
- import uvicorn
- uvicorn.run(app, host="0.0.0.0", port=port)
|