from fastapi import FastAPI, WebSocket, WebSocketDisconnect, status import socket import asyncio import time import json from typing import Dict from uhf.reader import * from tools_use import ProductInfoTokenManager, ProductImage GClient app = FastAPI() # 存储活跃的TCP连接 active_connections: Dict[str, tuple] = {} # {client_id: (reader, writer, websocket, task)} connection_lock = asyncio.Lock() # 锁用于保护active_connections # 配置参数 RECONNECT_DELAY = 0.1 # 重连延迟(秒) HEARTBEAT_INTERVAL = 2 # 主动发送心跳的间隔(秒) TCP_READ_TIMEOUT = 10 # TCP读取超时时间(秒) TCP_KEEP_ALIVE_IDLE = 3600 # TCP保持活跃的空闲时间(秒) TCP_KEEP_ALIVE_INTERVAL = 100 # TCP保持活跃的间隔(秒) TCP_KEEP_ALIVE_COUNT = 6 # TCP保持活跃的次数 async def handle_epc(epc_code, websocket): try: product_info_instance = ProductInfoTokenManager() product_image_instance = ProductImage() barcode = await product_info_instance.get_barcode_from_epc(epc_code) product_info = await product_info_instance.get_product_info_by_barcode(barcode) product_image = await product_image_instance.get_product_image_by_barcode(barcode) response_data = { "code": 1, "message": "ok", "data": { "barcode": barcode, "product_info": product_info, "product_image": product_image } } await websocket.send_text(json.dumps(response_data)) except Exception as e: print(f"处理EPC数据错误: {str(e)}") error_response = { "code": 0, "message": f"处理EPC数据错误: {str(e)}", "data": None } try: await websocket.send_text(json.dumps(error_response)) except RuntimeError: # WebSocket可能已关闭 print(f"无法发送错误响应,WebSocket可能已关闭") async def close_tcp_connection(client_id): """关闭TCP连接并从活跃连接列表中移除""" async with connection_lock: if client_id in active_connections: _, writer, _, task = active_connections[client_id] try: # 取消正在运行的任务 if task and not task.done(): task.cancel() try: await task except asyncio.CancelledError: pass # 关闭TCP连接 writer.close() await writer.wait_closed() print(f"已关闭TCP连接: {client_id}") except Exception as e: print(f"关闭TCP连接时出错: {client_id}, 错误: {str(e)}") finally: active_connections.pop(client_id, None) async def is_websocket_alive(websocket): """检查WebSocket是否仍然活跃""" try: # 简单的非阻塞检查 return websocket.client_state != WebSocketDisconnect except: return False async def send_websocket_message(websocket, message): """安全地发送WebSocket消息""" try: await websocket.send_text(json.dumps(message)) return True except Exception: return False async def connect_to_tcp(server, port_int): """创建TCP连接并设置参数""" reader, writer = await asyncio.open_connection( server, port_int, limit=999999, ) # 设置 TCP keepalive - 提高稳定性 sock = writer.get_extra_info('socket') sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # 在支持的平台上设置更细粒度的keepalive参数 if hasattr(socket, 'TCP_KEEPIDLE'): sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, TCP_KEEP_ALIVE_IDLE) if hasattr(socket, 'TCP_KEEPINTVL'): sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, TCP_KEEP_ALIVE_INTERVAL) if hasattr(socket, 'TCP_KEEPCNT'): sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, TCP_KEEP_ALIVE_COUNT) # 设置套接字选项以提高稳定性 # 禁用Nagle算法,减少延迟 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # 设置接收和发送缓冲区大小 sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 262144) # 256KB sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 262144) # 256KB return reader, writer async def send_heartbeat(writer, client_id): """发送心跳包""" try: mess = MsgAppGetBaseVersion() mess.pack() writer.write(mess.toByte(False)) await writer.drain() return True except Exception as e: print(f"{client_id} 发送心跳包失败: {str(e)}") return False async def process_tcp_data(client_id, reader, writer, websocket): """处理TCP数据流""" buffer = RingBuffer() epc_code_temp = '' last_heartbeat_time = time.time() while True: # 定期发送心跳,无需等待超时 # current_time = time.time() # if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL: # if not await send_heartbeat(writer, client_id): # print(f"{client_id} 定期心跳发送失败,需要重连") # return False # 需要重连 # last_heartbeat_time = current_time try: # 使用wait_for防止永久阻塞 response = await asyncio.wait_for(reader.read(1024), timeout=TCP_READ_TIMEOUT) if not response: print(f"{client_id} TCP连接已关闭 (空响应)") return False # 需要重连 # 处理接收到的数据 buffer.writeData(response) if buffer.dataCount < 48: continue if buffer.indexData(0) != 0x5A: buffer.cleanData(1 * 8) continue if buffer.dataCount >= 56: temp = DynamicBuffer() temp.putBytes(buffer.readData(40)) dataLen = buffer.readBit(16) if dataLen != 0: temp.putShort(dataLen) if (dataLen + 2) * 8 > buffer.dataCount: if dataLen > 1024: buffer.cleanData(1 * 8) else: buffer.subPos(temp.len) continue else: temp.putBytes(buffer.readData(dataLen * 8)) data = buffer.readData(16) temp.putBytes(data) msg = Message().new(temp.hex) if msg: if msg.checkCrc(): buffer.cleanAll() print(f'mt_8_11******{msg.mt_8_11}') print(f'msgId******{msg.msgId}') if msg.mt_8_11 == EnumG.Msg_Type_Bit_Base.value: if msg.msgId == EnumG.BaseLogMid_Epc.value: info = LogBaseEpcInfo() info.cData = msg.cData info.unPack() if info.epc != epc_code_temp: try: await handle_epc(epc_code=info.epc, websocket=websocket) print(f"{client_id} 发送EPC: {info.epc}") epc_code_temp = info.epc except Exception as e: print(f"处理EPC出错 {client_id}: {str(e)}") # 这里不断开TCP连接,因为WebSocket只是数据传递渠道 # 接收到EPC后发送版本查询作为响应 await send_heartbeat(writer, client_id) last_heartbeat_time = time.time() # 更新心跳时间 elif msg.mt_8_11 == EnumG.Msg_Type_Bit_App.value: if msg.msgId == EnumG.AppMid_Heartbeat.value: writer.write(msg.msgData) await writer.drain() last_heartbeat_time = time.time() # 更新心跳时间 print(f"{client_id}: 收到并响应心跳") except asyncio.TimeoutError: # 只在超时时打印日志,然后继续发送心跳 print(f"{client_id} 读取超时,发送心跳包") if not await send_heartbeat(writer, client_id): print(f"{client_id} 心跳发送失败,需要重连") return False # 需要重连 last_heartbeat_time = time.time() except Exception as e: print(f"{client_id} TCP数据处理错误: {str(e)}") return False # 需要重连 async def initialize_rfid_reader(client_id, writer): """初始化RFID读取器设置""" try: # 发送初始库存EPC消息 msg = MsgBaseInventoryEpc(antennaEnable=EnumG.AntennaNo_1.value, inventoryMode=EnumG.InventoryMode_Inventory.value) msg.pack() writer.write(msg.toByte(False)) await writer.drain() print(f"{client_id} 发送初始化命令") # 等待短暂时间确保命令被处理 await asyncio.sleep(0.5) # 再次发送一个心跳确认连接状态 mess = MsgAppGetBaseVersion() mess.pack() writer.write(mess.toByte(False)) await writer.drain() return True except Exception as e: print(f"{client_id} 初始化RFID读取器失败: {str(e)}") return False async def tcp_connection_handler(client_id, websocket): """处理TCP连接的主循环,包括无限重连逻辑""" server, port_str = client_id.split(":") port_int = int(port_str) reconnect_count = 0 reader = None writer = None while True: # 检查WebSocket是否仍然连接 try: # 如果WebSocket已断开,则退出循环 ping_data = json.dumps({"type": "ping"}) await websocket.send_text(ping_data) except Exception: print(f"WebSocket已断开,停止TCP处理: {client_id}") break try: # 尝试连接或重连TCP if reader is None or writer is None: try: reader, writer = await connect_to_tcp(server, port_int) print(f"已连接到 TCP 服务器 {client_id}") # 初始化RFID读取器 if not await initialize_rfid_reader(client_id, writer): raise Exception("RFID读取器初始化失败") # 更新或添加到活跃连接列表 async with connection_lock: if client_id in active_connections: old_reader, old_writer, ws, task = active_connections[client_id] active_connections[client_id] = (reader, writer, ws, task) # 发送连接成功通知 if reconnect_count > 0: message = { "code": 1, "message": f"TCP重连成功 (第{reconnect_count}次尝试)", "data": None } else: message = { "code": 1, "message": f"成功连接到TCP服务器 {client_id}", "data": None } await send_websocket_message(websocket, message) reconnect_count = 0 # 连接成功后重置计数 except Exception as e: reconnect_count += 1 print(f"TCP连接失败 {client_id} (第{reconnect_count}次尝试): {str(e)}") # 通知客户端连接失败 message = { "code": 0, "message": f"TCP连接失败 (第{reconnect_count}次尝试): {str(e)}", "data": None } await send_websocket_message(websocket, message) # 延迟后重试,延迟时间随重试次数增加但不超过30秒 delay = min(RECONNECT_DELAY * min(reconnect_count, 5), 30) await asyncio.sleep(delay) continue # 回到循环开始处重试连接 # 处理TCP数据 if not await process_tcp_data(client_id, reader, writer, websocket): # TCP连接出现问题,需要重连 reconnect_count += 1 print(f"TCP连接异常,准备重连 {client_id} (第{reconnect_count}次尝试)") # 关闭当前的连接 try: writer.close() await writer.wait_closed() except Exception: pass # 忽略关闭错误 # 清空连接对象准备重连 reader = None writer = None # 通知客户端需要重连 message = { "code": 0, "message": f"TCP连接断开,准备重连 (第{reconnect_count}次尝试)", "data": None } await send_websocket_message(websocket, message) # 延迟后重试 delay = min(RECONNECT_DELAY * min(reconnect_count, 5), 15) await asyncio.sleep(delay) continue # 回到循环开始处重试连接 except Exception as e: print(f"TCP处理循环异常 {client_id}: {str(e)}") # 关闭当前连接 if writer: try: writer.close() await writer.wait_closed() except: pass # 准备重连 reader = None writer = None reconnect_count += 1 # 通知客户端错误情况 message = { "code": 0, "message": f"TCP处理出错,准备重连 (第{reconnect_count}次尝试): {str(e)}", "data": None } await send_websocket_message(websocket, message) # 延迟后重试 await asyncio.sleep(RECONNECT_DELAY) # WebSocket已断开,清理TCP连接 if writer: try: writer.close() await writer.wait_closed() except: pass @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket, server: str, port: str): await websocket.accept() port_int = int(port) client_id = f"{server}:{port}" tcp_task = None # 检查该client_id是否已经连接 async with connection_lock: if client_id in active_connections: # 拒绝重复连接 await websocket.send_text(json.dumps({ "code": 0, "message": f"拒绝连接:服务器 {client_id} 已有活跃连接", "data": None })) await websocket.close(code=status.WS_1008_POLICY_VIOLATION) print(f"拒绝重复连接: {client_id}") return try: # 创建TCP处理任务 tcp_task = asyncio.create_task( tcp_connection_handler(client_id, websocket) ) # 将连接添加到活跃连接列表 async with connection_lock: # TCP连接会在handler中创建,所以这里先放入None active_connections[client_id] = (None, None, websocket, tcp_task) # 等待客户端消息或连接断开 try: while True: # 使用超时来防止永久阻塞 try: # print('1111') message = await asyncio.wait_for(websocket.receive_text(), timeout=2) # print(f"收到WebSocket消息: {client_id}: {message}") # 处理客户端可能发送的命令 try: data = json.loads(message) if isinstance(data, dict) and "command" in data: command = data["command"] # 处理客户端命令,例如手动重新初始化 if command == "reinitialize": async with connection_lock: if client_id in active_connections: _, writer, _, _ = active_connections[client_id] if writer: await initialize_rfid_reader(client_id, writer) await websocket.send_text(json.dumps({ "code": 1, "message": "重新初始化RFID读取器", "data": None })) except: pass except asyncio.TimeoutError: # 超时只是用来定期检查连接状态,不需要特殊处理 pass except WebSocketDisconnect: print(f"WebSocket客户端断开连接: {client_id}") except Exception as e: print(f"处理WebSocket连接时出错: {client_id}, {str(e)}") try: await websocket.send_text(json.dumps({ "code": 0, "message": f"处理连接时出错: {str(e)}", "data": None })) except: pass finally: # 清理资源 await close_tcp_connection(client_id) try: await websocket.close() except: pass print(f"连接已完全关闭: {client_id}") # 健康检查端点 @app.get("/health") async def health_check(): conn_info = {} async with connection_lock: for client_id in active_connections: reader, writer, websocket, task = active_connections[client_id] conn_info[client_id] = { "tcp_connected": writer is not None, "websocket_connected": websocket is not None, "task_running": task is not None and not task.done() } return { "status": "ok", "active_connections": len(active_connections), "connections": conn_info, "timestamp": time.time() } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7066)