rfid_server.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497
  1. from fastapi import FastAPI, WebSocket, WebSocketDisconnect, status
  2. import socket
  3. import asyncio
  4. import time
  5. import json
  6. from typing import Dict
  7. from uhf.reader import *
  8. from tools_use import ProductInfoTokenManager, ProductImage
  9. GClient
  10. app = FastAPI()
  11. # 存储活跃的TCP连接
  12. active_connections: Dict[str, tuple] = {} # {client_id: (reader, writer, websocket, task)}
  13. connection_lock = asyncio.Lock() # 锁用于保护active_connections
  14. # 配置参数
  15. RECONNECT_DELAY = 0.1 # 重连延迟(秒)
  16. HEARTBEAT_INTERVAL = 2 # 主动发送心跳的间隔(秒)
  17. TCP_READ_TIMEOUT = 10 # TCP读取超时时间(秒)
  18. TCP_KEEP_ALIVE_IDLE = 3600 # TCP保持活跃的空闲时间(秒)
  19. TCP_KEEP_ALIVE_INTERVAL = 100 # TCP保持活跃的间隔(秒)
  20. TCP_KEEP_ALIVE_COUNT = 6 # TCP保持活跃的次数
  21. async def handle_epc(epc_code, websocket):
  22. try:
  23. product_info_instance = ProductInfoTokenManager()
  24. product_image_instance = ProductImage()
  25. barcode = await product_info_instance.get_barcode_from_epc(epc_code)
  26. product_info = await product_info_instance.get_product_info_by_barcode(barcode)
  27. product_image = await product_image_instance.get_product_image_by_barcode(barcode)
  28. response_data = {
  29. "code": 1,
  30. "message": "ok",
  31. "data": {
  32. "barcode": barcode,
  33. "product_info": product_info,
  34. "product_image": product_image
  35. }
  36. }
  37. await websocket.send_text(json.dumps(response_data))
  38. except Exception as e:
  39. print(f"处理EPC数据错误: {str(e)}")
  40. error_response = {
  41. "code": 0,
  42. "message": f"处理EPC数据错误: {str(e)}",
  43. "data": None
  44. }
  45. try:
  46. await websocket.send_text(json.dumps(error_response))
  47. except RuntimeError:
  48. # WebSocket可能已关闭
  49. print(f"无法发送错误响应,WebSocket可能已关闭")
  50. async def close_tcp_connection(client_id):
  51. """关闭TCP连接并从活跃连接列表中移除"""
  52. async with connection_lock:
  53. if client_id in active_connections:
  54. _, writer, _, task = active_connections[client_id]
  55. try:
  56. # 取消正在运行的任务
  57. if task and not task.done():
  58. task.cancel()
  59. try:
  60. await task
  61. except asyncio.CancelledError:
  62. pass
  63. # 关闭TCP连接
  64. writer.close()
  65. await writer.wait_closed()
  66. print(f"已关闭TCP连接: {client_id}")
  67. except Exception as e:
  68. print(f"关闭TCP连接时出错: {client_id}, 错误: {str(e)}")
  69. finally:
  70. active_connections.pop(client_id, None)
  71. async def is_websocket_alive(websocket):
  72. """检查WebSocket是否仍然活跃"""
  73. try:
  74. # 简单的非阻塞检查
  75. return websocket.client_state != WebSocketDisconnect
  76. except:
  77. return False
  78. async def send_websocket_message(websocket, message):
  79. """安全地发送WebSocket消息"""
  80. try:
  81. await websocket.send_text(json.dumps(message))
  82. return True
  83. except Exception:
  84. return False
  85. async def connect_to_tcp(server, port_int):
  86. """创建TCP连接并设置参数"""
  87. reader, writer = await asyncio.open_connection(
  88. server,
  89. port_int,
  90. limit=999999,
  91. )
  92. # 设置 TCP keepalive - 提高稳定性
  93. sock = writer.get_extra_info('socket')
  94. sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
  95. # 在支持的平台上设置更细粒度的keepalive参数
  96. if hasattr(socket, 'TCP_KEEPIDLE'):
  97. sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, TCP_KEEP_ALIVE_IDLE)
  98. if hasattr(socket, 'TCP_KEEPINTVL'):
  99. sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, TCP_KEEP_ALIVE_INTERVAL)
  100. if hasattr(socket, 'TCP_KEEPCNT'):
  101. sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, TCP_KEEP_ALIVE_COUNT)
  102. # 设置套接字选项以提高稳定性
  103. # 禁用Nagle算法,减少延迟
  104. sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
  105. # 设置接收和发送缓冲区大小
  106. sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 262144) # 256KB
  107. sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 262144) # 256KB
  108. return reader, writer
  109. async def send_heartbeat(writer, client_id):
  110. """发送心跳包"""
  111. try:
  112. mess = MsgAppGetBaseVersion()
  113. mess.pack()
  114. writer.write(mess.toByte(False))
  115. await writer.drain()
  116. return True
  117. except Exception as e:
  118. print(f"{client_id} 发送心跳包失败: {str(e)}")
  119. return False
  120. async def process_tcp_data(client_id, reader, writer, websocket):
  121. """处理TCP数据流"""
  122. buffer = RingBuffer()
  123. epc_code_temp = ''
  124. last_heartbeat_time = time.time()
  125. while True:
  126. # 定期发送心跳,无需等待超时
  127. # current_time = time.time()
  128. # if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL:
  129. # if not await send_heartbeat(writer, client_id):
  130. # print(f"{client_id} 定期心跳发送失败,需要重连")
  131. # return False # 需要重连
  132. # last_heartbeat_time = current_time
  133. try:
  134. # 使用wait_for防止永久阻塞
  135. response = await asyncio.wait_for(reader.read(1024), timeout=TCP_READ_TIMEOUT)
  136. if not response:
  137. print(f"{client_id} TCP连接已关闭 (空响应)")
  138. return False # 需要重连
  139. # 处理接收到的数据
  140. buffer.writeData(response)
  141. if buffer.dataCount < 48:
  142. continue
  143. if buffer.indexData(0) != 0x5A:
  144. buffer.cleanData(1 * 8)
  145. continue
  146. if buffer.dataCount >= 56:
  147. temp = DynamicBuffer()
  148. temp.putBytes(buffer.readData(40))
  149. dataLen = buffer.readBit(16)
  150. if dataLen != 0:
  151. temp.putShort(dataLen)
  152. if (dataLen + 2) * 8 > buffer.dataCount:
  153. if dataLen > 1024:
  154. buffer.cleanData(1 * 8)
  155. else:
  156. buffer.subPos(temp.len)
  157. continue
  158. else:
  159. temp.putBytes(buffer.readData(dataLen * 8))
  160. data = buffer.readData(16)
  161. temp.putBytes(data)
  162. msg = Message().new(temp.hex)
  163. if msg:
  164. if msg.checkCrc():
  165. buffer.cleanAll()
  166. print(f'mt_8_11******{msg.mt_8_11}')
  167. print(f'msgId******{msg.msgId}')
  168. if msg.mt_8_11 == EnumG.Msg_Type_Bit_Base.value:
  169. if msg.msgId == EnumG.BaseLogMid_Epc.value:
  170. info = LogBaseEpcInfo()
  171. info.cData = msg.cData
  172. info.unPack()
  173. if info.epc != epc_code_temp:
  174. try:
  175. await handle_epc(epc_code=info.epc, websocket=websocket)
  176. print(f"{client_id} 发送EPC: {info.epc}")
  177. epc_code_temp = info.epc
  178. except Exception as e:
  179. print(f"处理EPC出错 {client_id}: {str(e)}")
  180. # 这里不断开TCP连接,因为WebSocket只是数据传递渠道
  181. # 接收到EPC后发送版本查询作为响应
  182. await send_heartbeat(writer, client_id)
  183. last_heartbeat_time = time.time() # 更新心跳时间
  184. elif msg.mt_8_11 == EnumG.Msg_Type_Bit_App.value:
  185. if msg.msgId == EnumG.AppMid_Heartbeat.value:
  186. writer.write(msg.msgData)
  187. await writer.drain()
  188. last_heartbeat_time = time.time() # 更新心跳时间
  189. print(f"{client_id}: 收到并响应心跳")
  190. except asyncio.TimeoutError:
  191. # 只在超时时打印日志,然后继续发送心跳
  192. print(f"{client_id} 读取超时,发送心跳包")
  193. if not await send_heartbeat(writer, client_id):
  194. print(f"{client_id} 心跳发送失败,需要重连")
  195. return False # 需要重连
  196. last_heartbeat_time = time.time()
  197. except Exception as e:
  198. print(f"{client_id} TCP数据处理错误: {str(e)}")
  199. return False # 需要重连
  200. async def initialize_rfid_reader(client_id, writer):
  201. """初始化RFID读取器设置"""
  202. try:
  203. # 发送初始库存EPC消息
  204. msg = MsgBaseInventoryEpc(antennaEnable=EnumG.AntennaNo_1.value,
  205. inventoryMode=EnumG.InventoryMode_Inventory.value)
  206. msg.pack()
  207. writer.write(msg.toByte(False))
  208. await writer.drain()
  209. print(f"{client_id} 发送初始化命令")
  210. # 等待短暂时间确保命令被处理
  211. await asyncio.sleep(0.5)
  212. # 再次发送一个心跳确认连接状态
  213. mess = MsgAppGetBaseVersion()
  214. mess.pack()
  215. writer.write(mess.toByte(False))
  216. await writer.drain()
  217. return True
  218. except Exception as e:
  219. print(f"{client_id} 初始化RFID读取器失败: {str(e)}")
  220. return False
  221. async def tcp_connection_handler(client_id, websocket):
  222. """处理TCP连接的主循环,包括无限重连逻辑"""
  223. server, port_str = client_id.split(":")
  224. port_int = int(port_str)
  225. reconnect_count = 0
  226. reader = None
  227. writer = None
  228. while True:
  229. # 检查WebSocket是否仍然连接
  230. try:
  231. # 如果WebSocket已断开,则退出循环
  232. ping_data = json.dumps({"type": "ping"})
  233. await websocket.send_text(ping_data)
  234. except Exception:
  235. print(f"WebSocket已断开,停止TCP处理: {client_id}")
  236. break
  237. try:
  238. # 尝试连接或重连TCP
  239. if reader is None or writer is None:
  240. try:
  241. reader, writer = await connect_to_tcp(server, port_int)
  242. print(f"已连接到 TCP 服务器 {client_id}")
  243. # 初始化RFID读取器
  244. if not await initialize_rfid_reader(client_id, writer):
  245. raise Exception("RFID读取器初始化失败")
  246. # 更新或添加到活跃连接列表
  247. async with connection_lock:
  248. if client_id in active_connections:
  249. old_reader, old_writer, ws, task = active_connections[client_id]
  250. active_connections[client_id] = (reader, writer, ws, task)
  251. # 发送连接成功通知
  252. if reconnect_count > 0:
  253. message = {
  254. "code": 1,
  255. "message": f"TCP重连成功 (第{reconnect_count}次尝试)",
  256. "data": None
  257. }
  258. else:
  259. message = {
  260. "code": 1,
  261. "message": f"成功连接到TCP服务器 {client_id}",
  262. "data": None
  263. }
  264. await send_websocket_message(websocket, message)
  265. reconnect_count = 0 # 连接成功后重置计数
  266. except Exception as e:
  267. reconnect_count += 1
  268. print(f"TCP连接失败 {client_id} (第{reconnect_count}次尝试): {str(e)}")
  269. # 通知客户端连接失败
  270. message = {
  271. "code": 0,
  272. "message": f"TCP连接失败 (第{reconnect_count}次尝试): {str(e)}",
  273. "data": None
  274. }
  275. await send_websocket_message(websocket, message)
  276. # 延迟后重试,延迟时间随重试次数增加但不超过30秒
  277. delay = min(RECONNECT_DELAY * min(reconnect_count, 5), 30)
  278. await asyncio.sleep(delay)
  279. continue # 回到循环开始处重试连接
  280. # 处理TCP数据
  281. if not await process_tcp_data(client_id, reader, writer, websocket):
  282. # TCP连接出现问题,需要重连
  283. reconnect_count += 1
  284. print(f"TCP连接异常,准备重连 {client_id} (第{reconnect_count}次尝试)")
  285. # 关闭当前的连接
  286. try:
  287. writer.close()
  288. await writer.wait_closed()
  289. except Exception:
  290. pass # 忽略关闭错误
  291. # 清空连接对象准备重连
  292. reader = None
  293. writer = None
  294. # 通知客户端需要重连
  295. message = {
  296. "code": 0,
  297. "message": f"TCP连接断开,准备重连 (第{reconnect_count}次尝试)",
  298. "data": None
  299. }
  300. await send_websocket_message(websocket, message)
  301. # 延迟后重试
  302. delay = min(RECONNECT_DELAY * min(reconnect_count, 5), 15)
  303. await asyncio.sleep(delay)
  304. continue # 回到循环开始处重试连接
  305. except Exception as e:
  306. print(f"TCP处理循环异常 {client_id}: {str(e)}")
  307. # 关闭当前连接
  308. if writer:
  309. try:
  310. writer.close()
  311. await writer.wait_closed()
  312. except:
  313. pass
  314. # 准备重连
  315. reader = None
  316. writer = None
  317. reconnect_count += 1
  318. # 通知客户端错误情况
  319. message = {
  320. "code": 0,
  321. "message": f"TCP处理出错,准备重连 (第{reconnect_count}次尝试): {str(e)}",
  322. "data": None
  323. }
  324. await send_websocket_message(websocket, message)
  325. # 延迟后重试
  326. await asyncio.sleep(RECONNECT_DELAY)
  327. # WebSocket已断开,清理TCP连接
  328. if writer:
  329. try:
  330. writer.close()
  331. await writer.wait_closed()
  332. except:
  333. pass
  334. @app.websocket("/ws")
  335. async def websocket_endpoint(websocket: WebSocket, server: str, port: str):
  336. await websocket.accept()
  337. port_int = int(port)
  338. client_id = f"{server}:{port}"
  339. tcp_task = None
  340. # 检查该client_id是否已经连接
  341. async with connection_lock:
  342. if client_id in active_connections:
  343. # 拒绝重复连接
  344. await websocket.send_text(json.dumps({
  345. "code": 0,
  346. "message": f"拒绝连接:服务器 {client_id} 已有活跃连接",
  347. "data": None
  348. }))
  349. await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
  350. print(f"拒绝重复连接: {client_id}")
  351. return
  352. try:
  353. # 创建TCP处理任务
  354. tcp_task = asyncio.create_task(
  355. tcp_connection_handler(client_id, websocket)
  356. )
  357. # 将连接添加到活跃连接列表
  358. async with connection_lock:
  359. # TCP连接会在handler中创建,所以这里先放入None
  360. active_connections[client_id] = (None, None, websocket, tcp_task)
  361. # 等待客户端消息或连接断开
  362. try:
  363. while True:
  364. # 使用超时来防止永久阻塞
  365. try:
  366. # print('1111')
  367. message = await asyncio.wait_for(websocket.receive_text(), timeout=2)
  368. # print(f"收到WebSocket消息: {client_id}: {message}")
  369. # 处理客户端可能发送的命令
  370. try:
  371. data = json.loads(message)
  372. if isinstance(data, dict) and "command" in data:
  373. command = data["command"]
  374. # 处理客户端命令,例如手动重新初始化
  375. if command == "reinitialize":
  376. async with connection_lock:
  377. if client_id in active_connections:
  378. _, writer, _, _ = active_connections[client_id]
  379. if writer:
  380. await initialize_rfid_reader(client_id, writer)
  381. await websocket.send_text(json.dumps({
  382. "code": 1,
  383. "message": "重新初始化RFID读取器",
  384. "data": None
  385. }))
  386. except:
  387. pass
  388. except asyncio.TimeoutError:
  389. # 超时只是用来定期检查连接状态,不需要特殊处理
  390. pass
  391. except WebSocketDisconnect:
  392. print(f"WebSocket客户端断开连接: {client_id}")
  393. except Exception as e:
  394. print(f"处理WebSocket连接时出错: {client_id}, {str(e)}")
  395. try:
  396. await websocket.send_text(json.dumps({
  397. "code": 0,
  398. "message": f"处理连接时出错: {str(e)}",
  399. "data": None
  400. }))
  401. except:
  402. pass
  403. finally:
  404. # 清理资源
  405. await close_tcp_connection(client_id)
  406. try:
  407. await websocket.close()
  408. except:
  409. pass
  410. print(f"连接已完全关闭: {client_id}")
  411. # 健康检查端点
  412. @app.get("/health")
  413. async def health_check():
  414. conn_info = {}
  415. async with connection_lock:
  416. for client_id in active_connections:
  417. reader, writer, websocket, task = active_connections[client_id]
  418. conn_info[client_id] = {
  419. "tcp_connected": writer is not None,
  420. "websocket_connected": websocket is not None,
  421. "task_running": task is not None and not task.done()
  422. }
  423. return {
  424. "status": "ok",
  425. "active_connections": len(active_connections),
  426. "connections": conn_info,
  427. "timestamp": time.time()
  428. }
  429. if __name__ == "__main__":
  430. import uvicorn
  431. uvicorn.run(app, host="0.0.0.0", port=7066)