Forráskód Böngészése

Merge branch 'feature/eta2.5.9_api_stat' into debug

xyxie 4 napja
szülő
commit
b3eee5756c
2 módosított fájl, 20 hozzáadás és 1 törlés
  1. 19 1
      services/websocket_msg.go
  2. 1 0
      utils/constants.go

+ 19 - 1
services/websocket_msg.go

@@ -39,6 +39,15 @@ func DealEdbInspectionMessage(connKey string, conn *websocket.Conn, adminId int)
 		}
 	}()
 
+	cacheKey := fmt.Sprintf("%s%d", utils.CACHE_EDB_INSPECTION_MESSAGE, adminId)
+
+	// // 设置连接关闭处理器
+	// conn.SetCloseHandler(func(code int, text string) error {
+	// 	utils.FileLog.Info("长连接关闭, adminId:%d", adminId)
+	// 	cancel()
+	// 	return nil
+	// })
+
 	// 添加错误恢复机制
 	defer func() {
 		if r := recover(); r != nil {
@@ -59,7 +68,8 @@ func DealEdbInspectionMessage(connKey string, conn *websocket.Conn, adminId int)
 		case <-ctx.Done():
 			return
 		default:
-			
+			// 使用带超时的 Redis 操作
+			err := utils.Rc.BrpopWithTimeout(cacheKey, 30*time.Second, func(b []byte) {
 				utils.FileLog.Info("收到巡检信息开始处理, adminId:%d", adminId)
 
 				messageList, err := data.GetHistoryInspectionMessages(adminId)
@@ -112,6 +122,14 @@ func DealEdbInspectionMessage(connKey string, conn *websocket.Conn, adminId int)
 						utils.FileLog.Error("巡检信息已读失败,err:%s, adminId:%d", err.Error(), adminId)
 					}
 				}
+			})
+
+			if err != nil && err.Error() != "redis: nil" {
+				utils.FileLog.Error("Redis operation failed: %v", err)
+				continue
+			}else{
+				utils.FileLog.Info("巡检信息处理完成, adminId:%d", adminId)
+			}
 		}
 	}
 }

+ 1 - 0
utils/constants.go

@@ -278,6 +278,7 @@ const (
 	CACHE_CHART_AUTH                        = "eta:chart:auth:"                       //图表数据授权
 	CACHE_REPORT_SHARE_AUTH                 = "eta:report:auth:share:"                //报告短链与报告图表授权映射key
 	CACHE_REPORT_AUTH                       = "eta:report:auth:"                      //报告图表数据授权
+	CACHE_EDB_INSPECTION_MESSAGE            = "eta:edb:inspection:message:"          //巡检消息队列
 )
 
 // 模板消息推送类型