multiprocess_server.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543
  1. from fastapi import FastAPI, WebSocket, WebSocketDisconnect, status
  2. import socket
  3. import asyncio
  4. import time
  5. import json
  6. import multiprocessing
  7. from typing import Dict
  8. from uhf.reader import *
  9. from tools_use import ProductInfoTokenManager, ProductImage
  10. import uvicorn
  11. import signal
  12. import os
  13. app = FastAPI()
  14. # 存储活跃的进程
  15. active_processes: Dict[str, multiprocessing.Process] = {}
  16. processes_lock = asyncio.Lock() # 锁用于保护active_processes
  17. # 配置参数
  18. RECONNECT_DELAY = 0.1 # 重连延迟(秒)
  19. HEARTBEAT_INTERVAL = 2 # 主动发送心跳的间隔(秒)
  20. TCP_READ_TIMEOUT = 10 # TCP读取超时时间(秒)
  21. TCP_KEEP_ALIVE_IDLE = 3600 # TCP保持活跃的空闲时间(秒)
  22. TCP_KEEP_ALIVE_INTERVAL = 100 # TCP保持活跃的间隔(秒)
  23. TCP_KEEP_ALIVE_COUNT = 6 # TCP保持活跃的次数
  24. # 创建一个队列用于WebSocket和TCP进程间通信
  25. websocket_queue_dict = {}
  26. async def handle_epc(epc_code, websocket_queue):
  27. """处理EPC数据并将结果发送到队列"""
  28. try:
  29. product_info_instance = ProductInfoTokenManager()
  30. product_image_instance = ProductImage()
  31. barcode = await product_info_instance.get_barcode_from_epc(epc_code)
  32. product_info = await product_info_instance.get_product_info_by_barcode(barcode)
  33. product_image = await product_image_instance.get_product_image_by_barcode(barcode)
  34. response_data = {
  35. "code": 1,
  36. "message": "ok",
  37. "data": {
  38. "barcode": barcode,
  39. "product_info": product_info,
  40. "product_image": product_image
  41. }
  42. }
  43. websocket_queue.put(json.dumps(response_data))
  44. except Exception as e:
  45. print(f"处理EPC数据错误: {str(e)}")
  46. error_response = {
  47. "code": 0,
  48. "message": f"处理EPC数据错误: {str(e)}",
  49. "data": None
  50. }
  51. websocket_queue.put(json.dumps(error_response))
  52. async def connect_to_tcp(server, port_int):
  53. """创建TCP连接并设置参数"""
  54. reader, writer = await asyncio.open_connection(
  55. server,
  56. port_int,
  57. limit=999999,
  58. )
  59. # 设置 TCP keepalive - 提高稳定性
  60. sock = writer.get_extra_info('socket')
  61. sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
  62. # 在支持的平台上设置更细粒度的keepalive参数
  63. if hasattr(socket, 'TCP_KEEPIDLE'):
  64. sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, TCP_KEEP_ALIVE_IDLE)
  65. if hasattr(socket, 'TCP_KEEPINTVL'):
  66. sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, TCP_KEEP_ALIVE_INTERVAL)
  67. if hasattr(socket, 'TCP_KEEPCNT'):
  68. sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, TCP_KEEP_ALIVE_COUNT)
  69. # 设置套接字选项以提高稳定性
  70. # 禁用Nagle算法,减少延迟
  71. sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
  72. # 设置接收和发送缓冲区大小
  73. sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 262144) # 256KB
  74. sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 262144) # 256KB
  75. return reader, writer
  76. async def send_heartbeat(writer, client_id):
  77. """发送心跳包"""
  78. try:
  79. mess = MsgAppGetBaseVersion()
  80. mess.pack()
  81. writer.write(mess.toByte(False))
  82. await writer.drain()
  83. return True
  84. except Exception as e:
  85. print(f"{client_id} 发送心跳包失败: {str(e)}")
  86. return False
  87. async def initialize_rfid_reader(client_id, writer):
  88. """初始化RFID读取器设置"""
  89. try:
  90. # 发送初始库存EPC消息
  91. msg = MsgBaseInventoryEpc(antennaEnable=EnumG.AntennaNo_1.value,
  92. inventoryMode=EnumG.InventoryMode_Inventory.value)
  93. msg.pack()
  94. writer.write(msg.toByte(False))
  95. await writer.drain()
  96. print(f"{client_id} 发送初始化命令")
  97. # 等待短暂时间确保命令被处理
  98. await asyncio.sleep(0.5)
  99. # 再次发送一个心跳确认连接状态
  100. mess = MsgAppGetBaseVersion()
  101. mess.pack()
  102. writer.write(mess.toByte(False))
  103. await writer.drain()
  104. return True
  105. except Exception as e:
  106. print(f"{client_id} 初始化RFID读取器失败: {str(e)}")
  107. return False
  108. async def process_tcp_data(client_id, reader, writer, websocket_queue):
  109. """处理TCP数据流"""
  110. buffer = RingBuffer()
  111. epc_code_temp = ''
  112. last_heartbeat_time = time.time()
  113. while True:
  114. try:
  115. # 使用wait_for防止永久阻塞
  116. response = await asyncio.wait_for(reader.read(1024), timeout=TCP_READ_TIMEOUT)
  117. if not response:
  118. print(f"{client_id} TCP连接已关闭 (空响应)")
  119. return False # 需要重连
  120. # 处理接收到的数据
  121. buffer.writeData(response)
  122. if buffer.dataCount < 48:
  123. continue
  124. if buffer.indexData(0) != 0x5A:
  125. buffer.cleanData(1 * 8)
  126. continue
  127. if buffer.dataCount >= 56:
  128. temp = DynamicBuffer()
  129. temp.putBytes(buffer.readData(40))
  130. dataLen = buffer.readBit(16)
  131. if dataLen != 0:
  132. temp.putShort(dataLen)
  133. if (dataLen + 2) * 8 > buffer.dataCount:
  134. if dataLen > 1024:
  135. buffer.cleanData(1 * 8)
  136. else:
  137. buffer.subPos(temp.len)
  138. continue
  139. else:
  140. temp.putBytes(buffer.readData(dataLen * 8))
  141. data = buffer.readData(16)
  142. temp.putBytes(data)
  143. msg = Message().new(temp.hex)
  144. if msg:
  145. if msg.checkCrc():
  146. buffer.cleanAll()
  147. if msg.mt_8_11 == EnumG.Msg_Type_Bit_Base.value:
  148. if msg.msgId == EnumG.BaseLogMid_Epc.value:
  149. info = LogBaseEpcInfo()
  150. info.cData = msg.cData
  151. info.unPack()
  152. if info.epc != epc_code_temp:
  153. try:
  154. await handle_epc(epc_code=info.epc, websocket_queue=websocket_queue)
  155. print(f"{client_id} 发送EPC: {info.epc}")
  156. epc_code_temp = info.epc
  157. except Exception as e:
  158. print(f"处理EPC出错 {client_id}: {str(e)}")
  159. # 接收到EPC后发送版本查询作为响应
  160. await send_heartbeat(writer, client_id)
  161. last_heartbeat_time = time.time() # 更新心跳时间
  162. elif msg.mt_8_11 == EnumG.Msg_Type_Bit_App.value:
  163. if msg.msgId == EnumG.AppMid_Heartbeat.value:
  164. writer.write(msg.msgData)
  165. await writer.drain()
  166. last_heartbeat_time = time.time() # 更新心跳时间
  167. print(f"{client_id}: 收到并响应心跳")
  168. except asyncio.TimeoutError:
  169. # 只在超时时打印日志,然后继续发送心跳
  170. print(f"{client_id} 读取超时,发送心跳包")
  171. if not await send_heartbeat(writer, client_id):
  172. print(f"{client_id} 心跳发送失败,需要重连")
  173. return False # 需要重连
  174. last_heartbeat_time = time.time()
  175. except Exception as e:
  176. print(f"{client_id} TCP数据处理错误: {str(e)}")
  177. return False # 需要重连
  178. def tcp_process_handler(client_id, server, port_int, queue_to_websocket, queue_from_websocket):
  179. """运行在独立进程中的TCP处理函数"""
  180. # 设置进程信号处理
  181. def handle_signal(signum, frame):
  182. print(f"进程 {client_id} 收到信号 {signum},准备退出")
  183. os._exit(0)
  184. signal.signal(signal.SIGTERM, handle_signal)
  185. signal.signal(signal.SIGINT, handle_signal)
  186. # 创建新的事件循环
  187. loop = asyncio.new_event_loop()
  188. asyncio.set_event_loop(loop)
  189. async def process_main():
  190. reader = None
  191. writer = None
  192. reconnect_count = 0
  193. # 发送初始连接信息
  194. queue_to_websocket.put(json.dumps({
  195. "code": 1,
  196. "message": f"TCP处理进程已启动: {client_id}",
  197. "data": None
  198. }))
  199. while True:
  200. # 检查是否收到来自WebSocket的命令
  201. if not queue_from_websocket.empty():
  202. try:
  203. message = queue_from_websocket.get_nowait()
  204. if message == "CLOSE":
  205. print(f"收到关闭命令,进程 {client_id} 准备退出")
  206. break
  207. elif message.startswith("COMMAND:"):
  208. command = json.loads(message[8:])
  209. if command.get("command") == "reinitialize" and writer:
  210. if await initialize_rfid_reader(client_id, writer):
  211. queue_to_websocket.put(json.dumps({
  212. "code": 1,
  213. "message": "重新初始化RFID读取器成功",
  214. "data": None
  215. }))
  216. else:
  217. queue_to_websocket.put(json.dumps({
  218. "code": 0,
  219. "message": "重新初始化RFID读取器失败",
  220. "data": None
  221. }))
  222. except Exception as e:
  223. print(f"处理队列命令错误: {str(e)}")
  224. try:
  225. # 尝试连接或重连TCP
  226. if reader is None or writer is None:
  227. try:
  228. reader, writer = await connect_to_tcp(server, port_int)
  229. print(f"已连接到 TCP 服务器 {client_id}")
  230. # 初始化RFID读取器
  231. if not await initialize_rfid_reader(client_id, writer):
  232. raise Exception("RFID读取器初始化失败")
  233. # 发送连接成功通知
  234. if reconnect_count > 0:
  235. message = {
  236. "code": 1,
  237. "message": f"TCP重连成功 (第{reconnect_count}次尝试)",
  238. "data": None
  239. }
  240. else:
  241. message = {
  242. "code": 1,
  243. "message": f"成功连接到TCP服务器 {client_id}",
  244. "data": None
  245. }
  246. queue_to_websocket.put(json.dumps(message))
  247. reconnect_count = 0 # 连接成功后重置计数
  248. except Exception as e:
  249. reconnect_count += 1
  250. print(f"TCP连接失败 {client_id} (第{reconnect_count}次尝试): {str(e)}")
  251. # 通知客户端连接失败
  252. message = {
  253. "code": 0,
  254. "message": f"TCP连接失败 (第{reconnect_count}次尝试): {str(e)}",
  255. "data": None
  256. }
  257. queue_to_websocket.put(json.dumps(message))
  258. # 延迟后重试,延迟时间随重试次数增加但不超过30秒
  259. delay = min(RECONNECT_DELAY * min(reconnect_count, 5), 30)
  260. await asyncio.sleep(delay)
  261. continue # 回到循环开始处重试连接
  262. # 处理TCP数据
  263. if not await process_tcp_data(client_id, reader, writer, queue_to_websocket):
  264. # TCP连接出现问题,需要重连
  265. reconnect_count += 1
  266. print(f"TCP连接异常,准备重连 {client_id} (第{reconnect_count}次尝试)")
  267. # 关闭当前的连接
  268. try:
  269. writer.close()
  270. await writer.wait_closed()
  271. except Exception:
  272. pass # 忽略关闭错误
  273. # 清空连接对象准备重连
  274. reader = None
  275. writer = None
  276. # 通知客户端需要重连
  277. message = {
  278. "code": 0,
  279. "message": f"TCP连接断开,准备重连 (第{reconnect_count}次尝试)",
  280. "data": None
  281. }
  282. queue_to_websocket.put(json.dumps(message))
  283. # 延迟后重试
  284. delay = min(RECONNECT_DELAY * min(reconnect_count, 5), 15)
  285. await asyncio.sleep(delay)
  286. continue # 回到循环开始处重试连接
  287. except Exception as e:
  288. print(f"TCP处理循环异常 {client_id}: {str(e)}")
  289. # 关闭当前连接
  290. if writer:
  291. try:
  292. writer.close()
  293. await writer.wait_closed()
  294. except:
  295. pass
  296. # 准备重连
  297. reader = None
  298. writer = None
  299. reconnect_count += 1
  300. # 通知客户端错误情况
  301. message = {
  302. "code": 0,
  303. "message": f"TCP处理出错,准备重连 (第{reconnect_count}次尝试): {str(e)}",
  304. "data": None
  305. }
  306. queue_to_websocket.put(json.dumps(message))
  307. # 延迟后重试
  308. await asyncio.sleep(RECONNECT_DELAY)
  309. # 清理资源
  310. if writer:
  311. try:
  312. writer.close()
  313. await writer.wait_closed()
  314. except:
  315. pass
  316. print(f"TCP处理进程 {client_id} 结束")
  317. # 运行异步主函数
  318. loop.run_until_complete(process_main())
  319. loop.close()
  320. @app.websocket("/ws")
  321. async def websocket_endpoint(websocket: WebSocket, server: str, port: str):
  322. await websocket.accept()
  323. port_int = int(port)
  324. client_id = f"{server}:{port}"
  325. process = None
  326. # 为这个连接创建队列
  327. queue_to_websocket = multiprocessing.Queue()
  328. queue_from_websocket = multiprocessing.Queue()
  329. # 检查该client_id是否已经连接
  330. async with processes_lock:
  331. if client_id in active_processes:
  332. if active_processes[client_id].is_alive():
  333. # 拒绝重复连接
  334. await websocket.send_text(json.dumps({
  335. "code": 0,
  336. "message": f"拒绝连接:服务器 {client_id} 已有活跃连接",
  337. "data": None
  338. }))
  339. await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
  340. print(f"拒绝重复连接: {client_id}")
  341. return
  342. else:
  343. # 进程已死,清理它
  344. active_processes[client_id].terminate()
  345. active_processes.pop(client_id, None)
  346. try:
  347. # 创建和启动TCP处理进程
  348. process = multiprocessing.Process(
  349. target=tcp_process_handler,
  350. args=(client_id, server, port_int, queue_to_websocket, queue_from_websocket),
  351. daemon=True
  352. )
  353. process.start()
  354. # 将进程添加到活跃进程列表
  355. async with processes_lock:
  356. active_processes[client_id] = process
  357. # 创建任务来处理队列到WebSocket的转发
  358. async def forward_queue_to_websocket():
  359. while True:
  360. # 检查进程是否还活着
  361. if not process.is_alive():
  362. print(f"进程已终止: {client_id}")
  363. break
  364. # 检查队列
  365. if not queue_to_websocket.empty():
  366. try:
  367. message = queue_to_websocket.get_nowait()
  368. await websocket.send_text(message)
  369. except Exception as e:
  370. print(f"转发消息到WebSocket时出错: {str(e)}")
  371. break
  372. else:
  373. await asyncio.sleep(0.1) # 短暂休眠以减轻CPU负载
  374. # 启动转发任务
  375. forward_task = asyncio.create_task(forward_queue_to_websocket())
  376. # 等待客户端消息或连接断开
  377. try:
  378. while True:
  379. # 检查进程是否还活着
  380. if not process.is_alive():
  381. print(f"进程已终止,关闭WebSocket: {client_id}")
  382. await websocket.close(code=status.WS_1011_INTERNAL_ERROR)
  383. break
  384. # 使用超时来防止永久阻塞
  385. try:
  386. message = await asyncio.wait_for(websocket.receive_text(), timeout=2)
  387. # 处理客户端可能发送的命令
  388. try:
  389. data = json.loads(message)
  390. if isinstance(data, dict) and "command" in data:
  391. command = data["command"]
  392. # 转发命令到进程
  393. queue_from_websocket.put(f"COMMAND:{message}")
  394. except:
  395. pass
  396. except asyncio.TimeoutError:
  397. # 超时只是用来定期检查连接状态,不需要特殊处理
  398. pass
  399. except WebSocketDisconnect:
  400. print(f"WebSocket客户端断开连接: {client_id}")
  401. except Exception as e:
  402. print(f"处理WebSocket连接时出错: {client_id}, {str(e)}")
  403. try:
  404. await websocket.send_text(json.dumps({
  405. "code": 0,
  406. "message": f"处理连接时出错: {str(e)}",
  407. "data": None
  408. }))
  409. except:
  410. pass
  411. finally:
  412. # 取消转发任务
  413. if 'forward_task' in locals():
  414. forward_task.cancel()
  415. try:
  416. await forward_task
  417. except asyncio.CancelledError:
  418. pass
  419. # 通知进程关闭
  420. try:
  421. queue_from_websocket.put("CLOSE")
  422. # 给进程一些时间来自行清理
  423. await asyncio.sleep(1)
  424. except:
  425. pass
  426. # 如果进程仍在运行,则强制终止
  427. async with processes_lock:
  428. if client_id in active_processes:
  429. if active_processes[client_id].is_alive():
  430. active_processes[client_id].terminate()
  431. active_processes.pop(client_id, None)
  432. # 关闭WebSocket
  433. try:
  434. await websocket.close()
  435. except:
  436. pass
  437. print(f"连接已完全关闭: {client_id}")
  438. # 健康检查端点
  439. @app.get("/health")
  440. async def health_check():
  441. conn_info = {}
  442. async with processes_lock:
  443. for client_id, process in active_processes.items():
  444. conn_info[client_id] = {
  445. "process_alive": process.is_alive(),
  446. "process_pid": process.pid
  447. }
  448. return {
  449. "status": "ok",
  450. "active_processes": len(active_processes),
  451. "connections": conn_info,
  452. "timestamp": time.time()
  453. }
  454. # 关闭时的清理
  455. @app.on_event("shutdown")
  456. async def shutdown_event():
  457. print("服务器关闭,清理所有进程...")
  458. async with processes_lock:
  459. for client_id, process in active_processes.items():
  460. if process.is_alive():
  461. process.terminate()
  462. active_processes.clear()
  463. if __name__ == "__main__":
  464. # 确保多进程安全
  465. multiprocessing.set_start_method('spawn', force=True)
  466. uvicorn.run(app, host="0.0.0.0", port=7066)