from fastapi import FastAPI, WebSocket, WebSocketDisconnect, status import socket import asyncio import time import json import multiprocessing from typing import Dict from uhf.reader import * from tools_use import ProductInfoTokenManager, ProductImage import uvicorn import signal import os app = FastAPI() # 存储活跃的进程 active_processes: Dict[str, multiprocessing.Process] = {} processes_lock = asyncio.Lock() # 锁用于保护active_processes # 配置参数 RECONNECT_DELAY = 0.1 # 重连延迟(秒) HEARTBEAT_INTERVAL = 2 # 主动发送心跳的间隔(秒) TCP_READ_TIMEOUT = 10 # TCP读取超时时间(秒) TCP_KEEP_ALIVE_IDLE = 3600 # TCP保持活跃的空闲时间(秒) TCP_KEEP_ALIVE_INTERVAL = 100 # TCP保持活跃的间隔(秒) TCP_KEEP_ALIVE_COUNT = 6 # TCP保持活跃的次数 # 创建一个队列用于WebSocket和TCP进程间通信 websocket_queue_dict = {} async def handle_epc(epc_code, websocket_queue): """处理EPC数据并将结果发送到队列""" try: product_info_instance = ProductInfoTokenManager() product_image_instance = ProductImage() barcode = await product_info_instance.get_barcode_from_epc(epc_code) product_info = await product_info_instance.get_product_info_by_barcode(barcode) product_image = await product_image_instance.get_product_image_by_barcode(barcode) response_data = { "code": 1, "message": "ok", "data": { "barcode": barcode, "product_info": product_info, "product_image": product_image } } websocket_queue.put(json.dumps(response_data)) except Exception as e: print(f"处理EPC数据错误: {str(e)}") error_response = { "code": 0, "message": f"处理EPC数据错误: {str(e)}", "data": None } websocket_queue.put(json.dumps(error_response)) async def connect_to_tcp(server, port_int): """创建TCP连接并设置参数""" reader, writer = await asyncio.open_connection( server, port_int, limit=999999, ) # 设置 TCP keepalive - 提高稳定性 sock = writer.get_extra_info('socket') sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # 在支持的平台上设置更细粒度的keepalive参数 if hasattr(socket, 'TCP_KEEPIDLE'): sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, TCP_KEEP_ALIVE_IDLE) if hasattr(socket, 'TCP_KEEPINTVL'): sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, TCP_KEEP_ALIVE_INTERVAL) if hasattr(socket, 'TCP_KEEPCNT'): sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, TCP_KEEP_ALIVE_COUNT) # 设置套接字选项以提高稳定性 # 禁用Nagle算法,减少延迟 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # 设置接收和发送缓冲区大小 sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 262144) # 256KB sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 262144) # 256KB return reader, writer async def send_heartbeat(writer, client_id): """发送心跳包""" try: mess = MsgAppGetBaseVersion() mess.pack() writer.write(mess.toByte(False)) await writer.drain() return True except Exception as e: print(f"{client_id} 发送心跳包失败: {str(e)}") return False async def initialize_rfid_reader(client_id, writer): """初始化RFID读取器设置""" try: # 发送初始库存EPC消息 msg = MsgBaseInventoryEpc(antennaEnable=EnumG.AntennaNo_1.value, inventoryMode=EnumG.InventoryMode_Inventory.value) msg.pack() writer.write(msg.toByte(False)) await writer.drain() print(f"{client_id} 发送初始化命令") # 等待短暂时间确保命令被处理 await asyncio.sleep(0.5) # 再次发送一个心跳确认连接状态 mess = MsgAppGetBaseVersion() mess.pack() writer.write(mess.toByte(False)) await writer.drain() return True except Exception as e: print(f"{client_id} 初始化RFID读取器失败: {str(e)}") return False async def process_tcp_data(client_id, reader, writer, websocket_queue): """处理TCP数据流""" buffer = RingBuffer() epc_code_temp = '' last_heartbeat_time = time.time() while True: try: # 使用wait_for防止永久阻塞 response = await asyncio.wait_for(reader.read(1024), timeout=TCP_READ_TIMEOUT) if not response: print(f"{client_id} TCP连接已关闭 (空响应)") return False # 需要重连 # 处理接收到的数据 buffer.writeData(response) if buffer.dataCount < 48: continue if buffer.indexData(0) != 0x5A: buffer.cleanData(1 * 8) continue if buffer.dataCount >= 56: temp = DynamicBuffer() temp.putBytes(buffer.readData(40)) dataLen = buffer.readBit(16) if dataLen != 0: temp.putShort(dataLen) if (dataLen + 2) * 8 > buffer.dataCount: if dataLen > 1024: buffer.cleanData(1 * 8) else: buffer.subPos(temp.len) continue else: temp.putBytes(buffer.readData(dataLen * 8)) data = buffer.readData(16) temp.putBytes(data) msg = Message().new(temp.hex) if msg: if msg.checkCrc(): buffer.cleanAll() if msg.mt_8_11 == EnumG.Msg_Type_Bit_Base.value: if msg.msgId == EnumG.BaseLogMid_Epc.value: info = LogBaseEpcInfo() info.cData = msg.cData info.unPack() if info.epc != epc_code_temp: try: await handle_epc(epc_code=info.epc, websocket_queue=websocket_queue) print(f"{client_id} 发送EPC: {info.epc}") epc_code_temp = info.epc except Exception as e: print(f"处理EPC出错 {client_id}: {str(e)}") # 接收到EPC后发送版本查询作为响应 await send_heartbeat(writer, client_id) last_heartbeat_time = time.time() # 更新心跳时间 elif msg.mt_8_11 == EnumG.Msg_Type_Bit_App.value: if msg.msgId == EnumG.AppMid_Heartbeat.value: writer.write(msg.msgData) await writer.drain() last_heartbeat_time = time.time() # 更新心跳时间 print(f"{client_id}: 收到并响应心跳") except asyncio.TimeoutError: # 只在超时时打印日志,然后继续发送心跳 print(f"{client_id} 读取超时,发送心跳包") if not await send_heartbeat(writer, client_id): print(f"{client_id} 心跳发送失败,需要重连") return False # 需要重连 last_heartbeat_time = time.time() except Exception as e: print(f"{client_id} TCP数据处理错误: {str(e)}") return False # 需要重连 def tcp_process_handler(client_id, server, port_int, queue_to_websocket, queue_from_websocket): """运行在独立进程中的TCP处理函数""" # 设置进程信号处理 def handle_signal(signum, frame): print(f"进程 {client_id} 收到信号 {signum},准备退出") os._exit(0) signal.signal(signal.SIGTERM, handle_signal) signal.signal(signal.SIGINT, handle_signal) # 创建新的事件循环 loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) async def process_main(): reader = None writer = None reconnect_count = 0 # 发送初始连接信息 queue_to_websocket.put(json.dumps({ "code": 1, "message": f"TCP处理进程已启动: {client_id}", "data": None })) while True: # 检查是否收到来自WebSocket的命令 if not queue_from_websocket.empty(): try: message = queue_from_websocket.get_nowait() if message == "CLOSE": print(f"收到关闭命令,进程 {client_id} 准备退出") break elif message.startswith("COMMAND:"): command = json.loads(message[8:]) if command.get("command") == "reinitialize" and writer: if await initialize_rfid_reader(client_id, writer): queue_to_websocket.put(json.dumps({ "code": 1, "message": "重新初始化RFID读取器成功", "data": None })) else: queue_to_websocket.put(json.dumps({ "code": 0, "message": "重新初始化RFID读取器失败", "data": None })) except Exception as e: print(f"处理队列命令错误: {str(e)}") try: # 尝试连接或重连TCP if reader is None or writer is None: try: reader, writer = await connect_to_tcp(server, port_int) print(f"已连接到 TCP 服务器 {client_id}") # 初始化RFID读取器 if not await initialize_rfid_reader(client_id, writer): raise Exception("RFID读取器初始化失败") # 发送连接成功通知 if reconnect_count > 0: message = { "code": 1, "message": f"TCP重连成功 (第{reconnect_count}次尝试)", "data": None } else: message = { "code": 1, "message": f"成功连接到TCP服务器 {client_id}", "data": None } queue_to_websocket.put(json.dumps(message)) reconnect_count = 0 # 连接成功后重置计数 except Exception as e: reconnect_count += 1 print(f"TCP连接失败 {client_id} (第{reconnect_count}次尝试): {str(e)}") # 通知客户端连接失败 message = { "code": 0, "message": f"TCP连接失败 (第{reconnect_count}次尝试): {str(e)}", "data": None } queue_to_websocket.put(json.dumps(message)) # 延迟后重试,延迟时间随重试次数增加但不超过30秒 delay = min(RECONNECT_DELAY * min(reconnect_count, 5), 30) await asyncio.sleep(delay) continue # 回到循环开始处重试连接 # 处理TCP数据 if not await process_tcp_data(client_id, reader, writer, queue_to_websocket): # TCP连接出现问题,需要重连 reconnect_count += 1 print(f"TCP连接异常,准备重连 {client_id} (第{reconnect_count}次尝试)") # 关闭当前的连接 try: writer.close() await writer.wait_closed() except Exception: pass # 忽略关闭错误 # 清空连接对象准备重连 reader = None writer = None # 通知客户端需要重连 message = { "code": 0, "message": f"TCP连接断开,准备重连 (第{reconnect_count}次尝试)", "data": None } queue_to_websocket.put(json.dumps(message)) # 延迟后重试 delay = min(RECONNECT_DELAY * min(reconnect_count, 5), 15) await asyncio.sleep(delay) continue # 回到循环开始处重试连接 except Exception as e: print(f"TCP处理循环异常 {client_id}: {str(e)}") # 关闭当前连接 if writer: try: writer.close() await writer.wait_closed() except: pass # 准备重连 reader = None writer = None reconnect_count += 1 # 通知客户端错误情况 message = { "code": 0, "message": f"TCP处理出错,准备重连 (第{reconnect_count}次尝试): {str(e)}", "data": None } queue_to_websocket.put(json.dumps(message)) # 延迟后重试 await asyncio.sleep(RECONNECT_DELAY) # 清理资源 if writer: try: writer.close() await writer.wait_closed() except: pass print(f"TCP处理进程 {client_id} 结束") # 运行异步主函数 loop.run_until_complete(process_main()) loop.close() @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket, server: str, port: str): await websocket.accept() port_int = int(port) client_id = f"{server}:{port}" process = None # 为这个连接创建队列 queue_to_websocket = multiprocessing.Queue() queue_from_websocket = multiprocessing.Queue() # 检查该client_id是否已经连接 async with processes_lock: if client_id in active_processes: if active_processes[client_id].is_alive(): # 拒绝重复连接 await websocket.send_text(json.dumps({ "code": 0, "message": f"拒绝连接:服务器 {client_id} 已有活跃连接", "data": None })) await websocket.close(code=status.WS_1008_POLICY_VIOLATION) print(f"拒绝重复连接: {client_id}") return else: # 进程已死,清理它 active_processes[client_id].terminate() active_processes.pop(client_id, None) try: # 创建和启动TCP处理进程 process = multiprocessing.Process( target=tcp_process_handler, args=(client_id, server, port_int, queue_to_websocket, queue_from_websocket), daemon=True ) process.start() # 将进程添加到活跃进程列表 async with processes_lock: active_processes[client_id] = process # 创建任务来处理队列到WebSocket的转发 async def forward_queue_to_websocket(): while True: # 检查进程是否还活着 if not process.is_alive(): print(f"进程已终止: {client_id}") break # 检查队列 if not queue_to_websocket.empty(): try: message = queue_to_websocket.get_nowait() await websocket.send_text(message) except Exception as e: print(f"转发消息到WebSocket时出错: {str(e)}") break else: await asyncio.sleep(0.1) # 短暂休眠以减轻CPU负载 # 启动转发任务 forward_task = asyncio.create_task(forward_queue_to_websocket()) # 等待客户端消息或连接断开 try: while True: # 检查进程是否还活着 if not process.is_alive(): print(f"进程已终止,关闭WebSocket: {client_id}") await websocket.close(code=status.WS_1011_INTERNAL_ERROR) break # 使用超时来防止永久阻塞 try: message = await asyncio.wait_for(websocket.receive_text(), timeout=2) # 处理客户端可能发送的命令 try: data = json.loads(message) if isinstance(data, dict) and "command" in data: command = data["command"] # 转发命令到进程 queue_from_websocket.put(f"COMMAND:{message}") except: pass except asyncio.TimeoutError: # 超时只是用来定期检查连接状态,不需要特殊处理 pass except WebSocketDisconnect: print(f"WebSocket客户端断开连接: {client_id}") except Exception as e: print(f"处理WebSocket连接时出错: {client_id}, {str(e)}") try: await websocket.send_text(json.dumps({ "code": 0, "message": f"处理连接时出错: {str(e)}", "data": None })) except: pass finally: # 取消转发任务 if 'forward_task' in locals(): forward_task.cancel() try: await forward_task except asyncio.CancelledError: pass # 通知进程关闭 try: queue_from_websocket.put("CLOSE") # 给进程一些时间来自行清理 await asyncio.sleep(1) except: pass # 如果进程仍在运行,则强制终止 async with processes_lock: if client_id in active_processes: if active_processes[client_id].is_alive(): active_processes[client_id].terminate() active_processes.pop(client_id, None) # 关闭WebSocket try: await websocket.close() except: pass print(f"连接已完全关闭: {client_id}") # 健康检查端点 @app.get("/health") async def health_check(): conn_info = {} async with processes_lock: for client_id, process in active_processes.items(): conn_info[client_id] = { "process_alive": process.is_alive(), "process_pid": process.pid } return { "status": "ok", "active_processes": len(active_processes), "connections": conn_info, "timestamp": time.time() } # 关闭时的清理 @app.on_event("shutdown") async def shutdown_event(): print("服务器关闭,清理所有进程...") async with processes_lock: for client_id, process in active_processes.items(): if process.is_alive(): process.terminate() active_processes.clear() if __name__ == "__main__": # 确保多进程安全 multiprocessing.set_start_method('spawn', force=True) uvicorn.run(app, host="0.0.0.0", port=7066)