123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497 |
- from fastapi import FastAPI, WebSocket, WebSocketDisconnect, status
- import socket
- import asyncio
- import time
- import json
- from typing import Dict
- from uhf.reader import *
- from tools_use import ProductInfoTokenManager, ProductImage
- GClient
- app = FastAPI()
- # 存储活跃的TCP连接
- active_connections: Dict[str, tuple] = {} # {client_id: (reader, writer, websocket, task)}
- connection_lock = asyncio.Lock() # 锁用于保护active_connections
- # 配置参数
- RECONNECT_DELAY = 0.1 # 重连延迟(秒)
- HEARTBEAT_INTERVAL = 2 # 主动发送心跳的间隔(秒)
- TCP_READ_TIMEOUT = 10 # TCP读取超时时间(秒)
- TCP_KEEP_ALIVE_IDLE = 3600 # TCP保持活跃的空闲时间(秒)
- TCP_KEEP_ALIVE_INTERVAL = 100 # TCP保持活跃的间隔(秒)
- TCP_KEEP_ALIVE_COUNT = 6 # TCP保持活跃的次数
- async def handle_epc(epc_code, websocket):
- try:
- product_info_instance = ProductInfoTokenManager()
- product_image_instance = ProductImage()
- barcode = await product_info_instance.get_barcode_from_epc(epc_code)
- product_info = await product_info_instance.get_product_info_by_barcode(barcode)
- product_image = await product_image_instance.get_product_image_by_barcode(barcode)
- response_data = {
- "code": 1,
- "message": "ok",
- "data": {
- "barcode": barcode,
- "product_info": product_info,
- "product_image": product_image
- }
- }
- await websocket.send_text(json.dumps(response_data))
- except Exception as e:
- print(f"处理EPC数据错误: {str(e)}")
- error_response = {
- "code": 0,
- "message": f"处理EPC数据错误: {str(e)}",
- "data": None
- }
- try:
- await websocket.send_text(json.dumps(error_response))
- except RuntimeError:
- # WebSocket可能已关闭
- print(f"无法发送错误响应,WebSocket可能已关闭")
- async def close_tcp_connection(client_id):
- """关闭TCP连接并从活跃连接列表中移除"""
- async with connection_lock:
- if client_id in active_connections:
- _, writer, _, task = active_connections[client_id]
- try:
- # 取消正在运行的任务
- if task and not task.done():
- task.cancel()
- try:
- await task
- except asyncio.CancelledError:
- pass
-
- # 关闭TCP连接
- writer.close()
- await writer.wait_closed()
- print(f"已关闭TCP连接: {client_id}")
- except Exception as e:
- print(f"关闭TCP连接时出错: {client_id}, 错误: {str(e)}")
- finally:
- active_connections.pop(client_id, None)
- async def is_websocket_alive(websocket):
- """检查WebSocket是否仍然活跃"""
- try:
- # 简单的非阻塞检查
- return websocket.client_state != WebSocketDisconnect
- except:
- return False
- async def send_websocket_message(websocket, message):
- """安全地发送WebSocket消息"""
- try:
- await websocket.send_text(json.dumps(message))
- return True
- except Exception:
- return False
- async def connect_to_tcp(server, port_int):
- """创建TCP连接并设置参数"""
- reader, writer = await asyncio.open_connection(
- server,
- port_int,
- limit=999999,
- )
-
- # 设置 TCP keepalive - 提高稳定性
- sock = writer.get_extra_info('socket')
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
-
- # 在支持的平台上设置更细粒度的keepalive参数
- if hasattr(socket, 'TCP_KEEPIDLE'):
- sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, TCP_KEEP_ALIVE_IDLE)
- if hasattr(socket, 'TCP_KEEPINTVL'):
- sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, TCP_KEEP_ALIVE_INTERVAL)
- if hasattr(socket, 'TCP_KEEPCNT'):
- sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, TCP_KEEP_ALIVE_COUNT)
-
- # 设置套接字选项以提高稳定性
- # 禁用Nagle算法,减少延迟
- sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
-
- # 设置接收和发送缓冲区大小
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 262144) # 256KB
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 262144) # 256KB
-
- return reader, writer
- async def send_heartbeat(writer, client_id):
- """发送心跳包"""
- try:
- mess = MsgAppGetBaseVersion()
- mess.pack()
- writer.write(mess.toByte(False))
- await writer.drain()
- return True
- except Exception as e:
- print(f"{client_id} 发送心跳包失败: {str(e)}")
- return False
- async def process_tcp_data(client_id, reader, writer, websocket):
- """处理TCP数据流"""
- buffer = RingBuffer()
- epc_code_temp = ''
- last_heartbeat_time = time.time()
-
- while True:
- # 定期发送心跳,无需等待超时
- # current_time = time.time()
- # if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL:
- # if not await send_heartbeat(writer, client_id):
- # print(f"{client_id} 定期心跳发送失败,需要重连")
- # return False # 需要重连
- # last_heartbeat_time = current_time
-
- try:
- # 使用wait_for防止永久阻塞
- response = await asyncio.wait_for(reader.read(1024), timeout=TCP_READ_TIMEOUT)
-
- if not response:
- print(f"{client_id} TCP连接已关闭 (空响应)")
- return False # 需要重连
-
- # 处理接收到的数据
- buffer.writeData(response)
-
- if buffer.dataCount < 48:
- continue
-
- if buffer.indexData(0) != 0x5A:
- buffer.cleanData(1 * 8)
- continue
-
- if buffer.dataCount >= 56:
- temp = DynamicBuffer()
- temp.putBytes(buffer.readData(40))
-
- dataLen = buffer.readBit(16)
-
- if dataLen != 0:
- temp.putShort(dataLen)
- if (dataLen + 2) * 8 > buffer.dataCount:
- if dataLen > 1024:
- buffer.cleanData(1 * 8)
- else:
- buffer.subPos(temp.len)
- continue
- else:
- temp.putBytes(buffer.readData(dataLen * 8))
- data = buffer.readData(16)
- temp.putBytes(data)
-
- msg = Message().new(temp.hex)
- if msg:
- if msg.checkCrc():
- buffer.cleanAll()
- print(f'mt_8_11******{msg.mt_8_11}')
- print(f'msgId******{msg.msgId}')
- if msg.mt_8_11 == EnumG.Msg_Type_Bit_Base.value:
- if msg.msgId == EnumG.BaseLogMid_Epc.value:
- info = LogBaseEpcInfo()
- info.cData = msg.cData
- info.unPack()
- if info.epc != epc_code_temp:
- try:
- await handle_epc(epc_code=info.epc, websocket=websocket)
- print(f"{client_id} 发送EPC: {info.epc}")
- epc_code_temp = info.epc
- except Exception as e:
- print(f"处理EPC出错 {client_id}: {str(e)}")
- # 这里不断开TCP连接,因为WebSocket只是数据传递渠道
-
- # 接收到EPC后发送版本查询作为响应
- await send_heartbeat(writer, client_id)
- last_heartbeat_time = time.time() # 更新心跳时间
-
- elif msg.mt_8_11 == EnumG.Msg_Type_Bit_App.value:
- if msg.msgId == EnumG.AppMid_Heartbeat.value:
- writer.write(msg.msgData)
- await writer.drain()
- last_heartbeat_time = time.time() # 更新心跳时间
- print(f"{client_id}: 收到并响应心跳")
-
- except asyncio.TimeoutError:
- # 只在超时时打印日志,然后继续发送心跳
- print(f"{client_id} 读取超时,发送心跳包")
- if not await send_heartbeat(writer, client_id):
- print(f"{client_id} 心跳发送失败,需要重连")
- return False # 需要重连
- last_heartbeat_time = time.time()
-
- except Exception as e:
- print(f"{client_id} TCP数据处理错误: {str(e)}")
- return False # 需要重连
- async def initialize_rfid_reader(client_id, writer):
- """初始化RFID读取器设置"""
- try:
- # 发送初始库存EPC消息
- msg = MsgBaseInventoryEpc(antennaEnable=EnumG.AntennaNo_1.value,
- inventoryMode=EnumG.InventoryMode_Inventory.value)
- msg.pack()
- writer.write(msg.toByte(False))
- await writer.drain()
- print(f"{client_id} 发送初始化命令")
-
- # 等待短暂时间确保命令被处理
- await asyncio.sleep(0.5)
-
- # 再次发送一个心跳确认连接状态
- mess = MsgAppGetBaseVersion()
- mess.pack()
- writer.write(mess.toByte(False))
- await writer.drain()
-
- return True
- except Exception as e:
- print(f"{client_id} 初始化RFID读取器失败: {str(e)}")
- return False
- async def tcp_connection_handler(client_id, websocket):
- """处理TCP连接的主循环,包括无限重连逻辑"""
- server, port_str = client_id.split(":")
- port_int = int(port_str)
- reconnect_count = 0
- reader = None
- writer = None
-
- while True:
- # 检查WebSocket是否仍然连接
- try:
- # 如果WebSocket已断开,则退出循环
- ping_data = json.dumps({"type": "ping"})
- await websocket.send_text(ping_data)
- except Exception:
- print(f"WebSocket已断开,停止TCP处理: {client_id}")
- break
-
- try:
- # 尝试连接或重连TCP
- if reader is None or writer is None:
- try:
- reader, writer = await connect_to_tcp(server, port_int)
- print(f"已连接到 TCP 服务器 {client_id}")
-
- # 初始化RFID读取器
- if not await initialize_rfid_reader(client_id, writer):
- raise Exception("RFID读取器初始化失败")
-
- # 更新或添加到活跃连接列表
- async with connection_lock:
- if client_id in active_connections:
- old_reader, old_writer, ws, task = active_connections[client_id]
- active_connections[client_id] = (reader, writer, ws, task)
-
- # 发送连接成功通知
- if reconnect_count > 0:
- message = {
- "code": 1,
- "message": f"TCP重连成功 (第{reconnect_count}次尝试)",
- "data": None
- }
- else:
- message = {
- "code": 1,
- "message": f"成功连接到TCP服务器 {client_id}",
- "data": None
- }
- await send_websocket_message(websocket, message)
- reconnect_count = 0 # 连接成功后重置计数
-
- except Exception as e:
- reconnect_count += 1
- print(f"TCP连接失败 {client_id} (第{reconnect_count}次尝试): {str(e)}")
-
- # 通知客户端连接失败
- message = {
- "code": 0,
- "message": f"TCP连接失败 (第{reconnect_count}次尝试): {str(e)}",
- "data": None
- }
- await send_websocket_message(websocket, message)
-
- # 延迟后重试,延迟时间随重试次数增加但不超过30秒
- delay = min(RECONNECT_DELAY * min(reconnect_count, 5), 30)
- await asyncio.sleep(delay)
- continue # 回到循环开始处重试连接
-
- # 处理TCP数据
- if not await process_tcp_data(client_id, reader, writer, websocket):
- # TCP连接出现问题,需要重连
- reconnect_count += 1
- print(f"TCP连接异常,准备重连 {client_id} (第{reconnect_count}次尝试)")
-
- # 关闭当前的连接
- try:
- writer.close()
- await writer.wait_closed()
- except Exception:
- pass # 忽略关闭错误
-
- # 清空连接对象准备重连
- reader = None
- writer = None
-
- # 通知客户端需要重连
- message = {
- "code": 0,
- "message": f"TCP连接断开,准备重连 (第{reconnect_count}次尝试)",
- "data": None
- }
- await send_websocket_message(websocket, message)
-
- # 延迟后重试
- delay = min(RECONNECT_DELAY * min(reconnect_count, 5), 15)
- await asyncio.sleep(delay)
- continue # 回到循环开始处重试连接
-
- except Exception as e:
- print(f"TCP处理循环异常 {client_id}: {str(e)}")
-
- # 关闭当前连接
- if writer:
- try:
- writer.close()
- await writer.wait_closed()
- except:
- pass
-
- # 准备重连
- reader = None
- writer = None
- reconnect_count += 1
-
- # 通知客户端错误情况
- message = {
- "code": 0,
- "message": f"TCP处理出错,准备重连 (第{reconnect_count}次尝试): {str(e)}",
- "data": None
- }
- await send_websocket_message(websocket, message)
-
- # 延迟后重试
- await asyncio.sleep(RECONNECT_DELAY)
-
- # WebSocket已断开,清理TCP连接
- if writer:
- try:
- writer.close()
- await writer.wait_closed()
- except:
- pass
- @app.websocket("/ws")
- async def websocket_endpoint(websocket: WebSocket, server: str, port: str):
- await websocket.accept()
- port_int = int(port)
- client_id = f"{server}:{port}"
- tcp_task = None
-
- # 检查该client_id是否已经连接
- async with connection_lock:
- if client_id in active_connections:
- # 拒绝重复连接
- await websocket.send_text(json.dumps({
- "code": 0,
- "message": f"拒绝连接:服务器 {client_id} 已有活跃连接",
- "data": None
- }))
- await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
- print(f"拒绝重复连接: {client_id}")
- return
-
- try:
- # 创建TCP处理任务
- tcp_task = asyncio.create_task(
- tcp_connection_handler(client_id, websocket)
- )
-
- # 将连接添加到活跃连接列表
- async with connection_lock:
- # TCP连接会在handler中创建,所以这里先放入None
- active_connections[client_id] = (None, None, websocket, tcp_task)
-
- # 等待客户端消息或连接断开
- try:
- while True:
- # 使用超时来防止永久阻塞
- try:
- # print('1111')
- message = await asyncio.wait_for(websocket.receive_text(), timeout=2)
- # print(f"收到WebSocket消息: {client_id}: {message}")
-
- # 处理客户端可能发送的命令
- try:
- data = json.loads(message)
- if isinstance(data, dict) and "command" in data:
- command = data["command"]
-
- # 处理客户端命令,例如手动重新初始化
- if command == "reinitialize":
- async with connection_lock:
- if client_id in active_connections:
- _, writer, _, _ = active_connections[client_id]
- if writer:
- await initialize_rfid_reader(client_id, writer)
- await websocket.send_text(json.dumps({
- "code": 1,
- "message": "重新初始化RFID读取器",
- "data": None
- }))
- except:
- pass
-
- except asyncio.TimeoutError:
- # 超时只是用来定期检查连接状态,不需要特殊处理
- pass
-
- except WebSocketDisconnect:
- print(f"WebSocket客户端断开连接: {client_id}")
-
- except Exception as e:
- print(f"处理WebSocket连接时出错: {client_id}, {str(e)}")
- try:
- await websocket.send_text(json.dumps({
- "code": 0,
- "message": f"处理连接时出错: {str(e)}",
- "data": None
- }))
- except:
- pass
- finally:
- # 清理资源
- await close_tcp_connection(client_id)
- try:
- await websocket.close()
- except:
- pass
- print(f"连接已完全关闭: {client_id}")
- # 健康检查端点
- @app.get("/health")
- async def health_check():
- conn_info = {}
- async with connection_lock:
- for client_id in active_connections:
- reader, writer, websocket, task = active_connections[client_id]
- conn_info[client_id] = {
- "tcp_connected": writer is not None,
- "websocket_connected": websocket is not None,
- "task_running": task is not None and not task.done()
- }
-
- return {
- "status": "ok",
- "active_connections": len(active_connections),
- "connections": conn_info,
- "timestamp": time.time()
- }
- if __name__ == "__main__":
- import uvicorn
- uvicorn.run(app, host="0.0.0.0", port=7066)
|