123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543 |
- from fastapi import FastAPI, WebSocket, WebSocketDisconnect, status
- import socket
- import asyncio
- import time
- import json
- import multiprocessing
- from typing import Dict
- from uhf.reader import *
- from tools_use import ProductInfoTokenManager, ProductImage
- import uvicorn
- import signal
- import os
- app = FastAPI()
- # 存储活跃的进程
- active_processes: Dict[str, multiprocessing.Process] = {}
- processes_lock = asyncio.Lock() # 锁用于保护active_processes
- # 配置参数
- RECONNECT_DELAY = 0.1 # 重连延迟(秒)
- HEARTBEAT_INTERVAL = 2 # 主动发送心跳的间隔(秒)
- TCP_READ_TIMEOUT = 10 # TCP读取超时时间(秒)
- TCP_KEEP_ALIVE_IDLE = 3600 # TCP保持活跃的空闲时间(秒)
- TCP_KEEP_ALIVE_INTERVAL = 100 # TCP保持活跃的间隔(秒)
- TCP_KEEP_ALIVE_COUNT = 6 # TCP保持活跃的次数
- # 创建一个队列用于WebSocket和TCP进程间通信
- websocket_queue_dict = {}
- async def handle_epc(epc_code, websocket_queue):
- """处理EPC数据并将结果发送到队列"""
- try:
- product_info_instance = ProductInfoTokenManager()
- product_image_instance = ProductImage()
- barcode = await product_info_instance.get_barcode_from_epc(epc_code)
- product_info = await product_info_instance.get_product_info_by_barcode(barcode)
- product_image = await product_image_instance.get_product_image_by_barcode(barcode)
- response_data = {
- "code": 1,
- "message": "ok",
- "data": {
- "barcode": barcode,
- "product_info": product_info,
- "product_image": product_image
- }
- }
- websocket_queue.put(json.dumps(response_data))
- except Exception as e:
- print(f"处理EPC数据错误: {str(e)}")
- error_response = {
- "code": 0,
- "message": f"处理EPC数据错误: {str(e)}",
- "data": None
- }
- websocket_queue.put(json.dumps(error_response))
- async def connect_to_tcp(server, port_int):
- """创建TCP连接并设置参数"""
- reader, writer = await asyncio.open_connection(
- server,
- port_int,
- limit=999999,
- )
-
- # 设置 TCP keepalive - 提高稳定性
- sock = writer.get_extra_info('socket')
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
-
- # 在支持的平台上设置更细粒度的keepalive参数
- if hasattr(socket, 'TCP_KEEPIDLE'):
- sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, TCP_KEEP_ALIVE_IDLE)
- if hasattr(socket, 'TCP_KEEPINTVL'):
- sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, TCP_KEEP_ALIVE_INTERVAL)
- if hasattr(socket, 'TCP_KEEPCNT'):
- sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, TCP_KEEP_ALIVE_COUNT)
-
- # 设置套接字选项以提高稳定性
- # 禁用Nagle算法,减少延迟
- sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
-
- # 设置接收和发送缓冲区大小
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 262144) # 256KB
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 262144) # 256KB
-
- return reader, writer
- async def send_heartbeat(writer, client_id):
- """发送心跳包"""
- try:
- mess = MsgAppGetBaseVersion()
- mess.pack()
- writer.write(mess.toByte(False))
- await writer.drain()
- return True
- except Exception as e:
- print(f"{client_id} 发送心跳包失败: {str(e)}")
- return False
- async def initialize_rfid_reader(client_id, writer):
- """初始化RFID读取器设置"""
- try:
- # 发送初始库存EPC消息
- msg = MsgBaseInventoryEpc(antennaEnable=EnumG.AntennaNo_1.value,
- inventoryMode=EnumG.InventoryMode_Inventory.value)
- msg.pack()
- writer.write(msg.toByte(False))
- await writer.drain()
- print(f"{client_id} 发送初始化命令")
-
- # 等待短暂时间确保命令被处理
- await asyncio.sleep(0.5)
-
- # 再次发送一个心跳确认连接状态
- mess = MsgAppGetBaseVersion()
- mess.pack()
- writer.write(mess.toByte(False))
- await writer.drain()
-
- return True
- except Exception as e:
- print(f"{client_id} 初始化RFID读取器失败: {str(e)}")
- return False
- async def process_tcp_data(client_id, reader, writer, websocket_queue):
- """处理TCP数据流"""
- buffer = RingBuffer()
- epc_code_temp = ''
- last_heartbeat_time = time.time()
-
- while True:
- try:
- # 使用wait_for防止永久阻塞
- response = await asyncio.wait_for(reader.read(1024), timeout=TCP_READ_TIMEOUT)
-
- if not response:
- print(f"{client_id} TCP连接已关闭 (空响应)")
- return False # 需要重连
-
- # 处理接收到的数据
- buffer.writeData(response)
-
- if buffer.dataCount < 48:
- continue
-
- if buffer.indexData(0) != 0x5A:
- buffer.cleanData(1 * 8)
- continue
-
- if buffer.dataCount >= 56:
- temp = DynamicBuffer()
- temp.putBytes(buffer.readData(40))
-
- dataLen = buffer.readBit(16)
-
- if dataLen != 0:
- temp.putShort(dataLen)
- if (dataLen + 2) * 8 > buffer.dataCount:
- if dataLen > 1024:
- buffer.cleanData(1 * 8)
- else:
- buffer.subPos(temp.len)
- continue
- else:
- temp.putBytes(buffer.readData(dataLen * 8))
- data = buffer.readData(16)
- temp.putBytes(data)
-
- msg = Message().new(temp.hex)
- if msg:
- if msg.checkCrc():
- buffer.cleanAll()
- if msg.mt_8_11 == EnumG.Msg_Type_Bit_Base.value:
- if msg.msgId == EnumG.BaseLogMid_Epc.value:
- info = LogBaseEpcInfo()
- info.cData = msg.cData
- info.unPack()
- if info.epc != epc_code_temp:
- try:
- await handle_epc(epc_code=info.epc, websocket_queue=websocket_queue)
- print(f"{client_id} 发送EPC: {info.epc}")
- epc_code_temp = info.epc
- except Exception as e:
- print(f"处理EPC出错 {client_id}: {str(e)}")
-
- # 接收到EPC后发送版本查询作为响应
- await send_heartbeat(writer, client_id)
- last_heartbeat_time = time.time() # 更新心跳时间
-
- elif msg.mt_8_11 == EnumG.Msg_Type_Bit_App.value:
- if msg.msgId == EnumG.AppMid_Heartbeat.value:
- writer.write(msg.msgData)
- await writer.drain()
- last_heartbeat_time = time.time() # 更新心跳时间
- print(f"{client_id}: 收到并响应心跳")
-
- except asyncio.TimeoutError:
- # 只在超时时打印日志,然后继续发送心跳
- print(f"{client_id} 读取超时,发送心跳包")
- if not await send_heartbeat(writer, client_id):
- print(f"{client_id} 心跳发送失败,需要重连")
- return False # 需要重连
- last_heartbeat_time = time.time()
-
- except Exception as e:
- print(f"{client_id} TCP数据处理错误: {str(e)}")
- return False # 需要重连
- def tcp_process_handler(client_id, server, port_int, queue_to_websocket, queue_from_websocket):
- """运行在独立进程中的TCP处理函数"""
- # 设置进程信号处理
- def handle_signal(signum, frame):
- print(f"进程 {client_id} 收到信号 {signum},准备退出")
- os._exit(0)
-
- signal.signal(signal.SIGTERM, handle_signal)
- signal.signal(signal.SIGINT, handle_signal)
-
- # 创建新的事件循环
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
-
- async def process_main():
- reader = None
- writer = None
- reconnect_count = 0
-
- # 发送初始连接信息
- queue_to_websocket.put(json.dumps({
- "code": 1,
- "message": f"TCP处理进程已启动: {client_id}",
- "data": None
- }))
-
- while True:
- # 检查是否收到来自WebSocket的命令
- if not queue_from_websocket.empty():
- try:
- message = queue_from_websocket.get_nowait()
- if message == "CLOSE":
- print(f"收到关闭命令,进程 {client_id} 准备退出")
- break
- elif message.startswith("COMMAND:"):
- command = json.loads(message[8:])
- if command.get("command") == "reinitialize" and writer:
- if await initialize_rfid_reader(client_id, writer):
- queue_to_websocket.put(json.dumps({
- "code": 1,
- "message": "重新初始化RFID读取器成功",
- "data": None
- }))
- else:
- queue_to_websocket.put(json.dumps({
- "code": 0,
- "message": "重新初始化RFID读取器失败",
- "data": None
- }))
- except Exception as e:
- print(f"处理队列命令错误: {str(e)}")
-
- try:
- # 尝试连接或重连TCP
- if reader is None or writer is None:
- try:
- reader, writer = await connect_to_tcp(server, port_int)
- print(f"已连接到 TCP 服务器 {client_id}")
-
- # 初始化RFID读取器
- if not await initialize_rfid_reader(client_id, writer):
- raise Exception("RFID读取器初始化失败")
-
- # 发送连接成功通知
- if reconnect_count > 0:
- message = {
- "code": 1,
- "message": f"TCP重连成功 (第{reconnect_count}次尝试)",
- "data": None
- }
- else:
- message = {
- "code": 1,
- "message": f"成功连接到TCP服务器 {client_id}",
- "data": None
- }
- queue_to_websocket.put(json.dumps(message))
- reconnect_count = 0 # 连接成功后重置计数
-
- except Exception as e:
- reconnect_count += 1
- print(f"TCP连接失败 {client_id} (第{reconnect_count}次尝试): {str(e)}")
-
- # 通知客户端连接失败
- message = {
- "code": 0,
- "message": f"TCP连接失败 (第{reconnect_count}次尝试): {str(e)}",
- "data": None
- }
- queue_to_websocket.put(json.dumps(message))
-
- # 延迟后重试,延迟时间随重试次数增加但不超过30秒
- delay = min(RECONNECT_DELAY * min(reconnect_count, 5), 30)
- await asyncio.sleep(delay)
- continue # 回到循环开始处重试连接
-
- # 处理TCP数据
- if not await process_tcp_data(client_id, reader, writer, queue_to_websocket):
- # TCP连接出现问题,需要重连
- reconnect_count += 1
- print(f"TCP连接异常,准备重连 {client_id} (第{reconnect_count}次尝试)")
-
- # 关闭当前的连接
- try:
- writer.close()
- await writer.wait_closed()
- except Exception:
- pass # 忽略关闭错误
-
- # 清空连接对象准备重连
- reader = None
- writer = None
-
- # 通知客户端需要重连
- message = {
- "code": 0,
- "message": f"TCP连接断开,准备重连 (第{reconnect_count}次尝试)",
- "data": None
- }
- queue_to_websocket.put(json.dumps(message))
-
- # 延迟后重试
- delay = min(RECONNECT_DELAY * min(reconnect_count, 5), 15)
- await asyncio.sleep(delay)
- continue # 回到循环开始处重试连接
-
- except Exception as e:
- print(f"TCP处理循环异常 {client_id}: {str(e)}")
-
- # 关闭当前连接
- if writer:
- try:
- writer.close()
- await writer.wait_closed()
- except:
- pass
-
- # 准备重连
- reader = None
- writer = None
- reconnect_count += 1
-
- # 通知客户端错误情况
- message = {
- "code": 0,
- "message": f"TCP处理出错,准备重连 (第{reconnect_count}次尝试): {str(e)}",
- "data": None
- }
- queue_to_websocket.put(json.dumps(message))
-
- # 延迟后重试
- await asyncio.sleep(RECONNECT_DELAY)
-
- # 清理资源
- if writer:
- try:
- writer.close()
- await writer.wait_closed()
- except:
- pass
-
- print(f"TCP处理进程 {client_id} 结束")
-
- # 运行异步主函数
- loop.run_until_complete(process_main())
- loop.close()
- @app.websocket("/ws")
- async def websocket_endpoint(websocket: WebSocket, server: str, port: str):
- await websocket.accept()
- port_int = int(port)
- client_id = f"{server}:{port}"
- process = None
-
- # 为这个连接创建队列
- queue_to_websocket = multiprocessing.Queue()
- queue_from_websocket = multiprocessing.Queue()
-
- # 检查该client_id是否已经连接
- async with processes_lock:
- if client_id in active_processes:
- if active_processes[client_id].is_alive():
- # 拒绝重复连接
- await websocket.send_text(json.dumps({
- "code": 0,
- "message": f"拒绝连接:服务器 {client_id} 已有活跃连接",
- "data": None
- }))
- await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
- print(f"拒绝重复连接: {client_id}")
- return
- else:
- # 进程已死,清理它
- active_processes[client_id].terminate()
- active_processes.pop(client_id, None)
-
- try:
- # 创建和启动TCP处理进程
- process = multiprocessing.Process(
- target=tcp_process_handler,
- args=(client_id, server, port_int, queue_to_websocket, queue_from_websocket),
- daemon=True
- )
- process.start()
-
- # 将进程添加到活跃进程列表
- async with processes_lock:
- active_processes[client_id] = process
-
- # 创建任务来处理队列到WebSocket的转发
- async def forward_queue_to_websocket():
- while True:
- # 检查进程是否还活着
- if not process.is_alive():
- print(f"进程已终止: {client_id}")
- break
-
- # 检查队列
- if not queue_to_websocket.empty():
- try:
- message = queue_to_websocket.get_nowait()
- await websocket.send_text(message)
- except Exception as e:
- print(f"转发消息到WebSocket时出错: {str(e)}")
- break
- else:
- await asyncio.sleep(0.1) # 短暂休眠以减轻CPU负载
-
- # 启动转发任务
- forward_task = asyncio.create_task(forward_queue_to_websocket())
-
- # 等待客户端消息或连接断开
- try:
- while True:
- # 检查进程是否还活着
- if not process.is_alive():
- print(f"进程已终止,关闭WebSocket: {client_id}")
- await websocket.close(code=status.WS_1011_INTERNAL_ERROR)
- break
-
- # 使用超时来防止永久阻塞
- try:
- message = await asyncio.wait_for(websocket.receive_text(), timeout=2)
-
- # 处理客户端可能发送的命令
- try:
- data = json.loads(message)
- if isinstance(data, dict) and "command" in data:
- command = data["command"]
- # 转发命令到进程
- queue_from_websocket.put(f"COMMAND:{message}")
- except:
- pass
-
- except asyncio.TimeoutError:
- # 超时只是用来定期检查连接状态,不需要特殊处理
- pass
-
- except WebSocketDisconnect:
- print(f"WebSocket客户端断开连接: {client_id}")
-
- except Exception as e:
- print(f"处理WebSocket连接时出错: {client_id}, {str(e)}")
- try:
- await websocket.send_text(json.dumps({
- "code": 0,
- "message": f"处理连接时出错: {str(e)}",
- "data": None
- }))
- except:
- pass
- finally:
- # 取消转发任务
- if 'forward_task' in locals():
- forward_task.cancel()
- try:
- await forward_task
- except asyncio.CancelledError:
- pass
-
- # 通知进程关闭
- try:
- queue_from_websocket.put("CLOSE")
- # 给进程一些时间来自行清理
- await asyncio.sleep(1)
- except:
- pass
-
- # 如果进程仍在运行,则强制终止
- async with processes_lock:
- if client_id in active_processes:
- if active_processes[client_id].is_alive():
- active_processes[client_id].terminate()
- active_processes.pop(client_id, None)
-
- # 关闭WebSocket
- try:
- await websocket.close()
- except:
- pass
-
- print(f"连接已完全关闭: {client_id}")
- # 健康检查端点
- @app.get("/health")
- async def health_check():
- conn_info = {}
- async with processes_lock:
- for client_id, process in active_processes.items():
- conn_info[client_id] = {
- "process_alive": process.is_alive(),
- "process_pid": process.pid
- }
-
- return {
- "status": "ok",
- "active_processes": len(active_processes),
- "connections": conn_info,
- "timestamp": time.time()
- }
- # 关闭时的清理
- @app.on_event("shutdown")
- async def shutdown_event():
- print("服务器关闭,清理所有进程...")
- async with processes_lock:
- for client_id, process in active_processes.items():
- if process.is_alive():
- process.terminate()
- active_processes.clear()
- if __name__ == "__main__":
- # 确保多进程安全
- multiprocessing.set_start_method('spawn', force=True)
- uvicorn.run(app, host="0.0.0.0", port=7066)
|