Parcourir la source

Merge branch 'feature/eta2.5.9_api_stat' into debug

xyxie il y a 3 jours
Parent
commit
0407db1b21
2 fichiers modifiés avec 22 ajouts et 11 suppressions
  1. 21 10
      services/websocket_msg.go
  2. 1 1
      utils/constants.go

+ 21 - 10
services/websocket_msg.go

@@ -62,10 +62,12 @@ func DealEdbInspectionMessage(adminId int) {
 				return
 			}
 			// 使用带超时的 Redis 操作
-			
-			err := utils.Rc.BrpopWithTimeout(cacheKey, 30*time.Second, func(b []byte) {
+			val := utils.Rc.Get(cacheKey)
+			if val == "" {
+				//utils.FileLog.Info("巡检信息历史为空, adminId:%d", adminId)
+				continue
+			}
 				utils.FileLog.Info("收到巡检信息开始处理, adminId:%d", adminId)
-
 				messageList, err := data.GetHistoryInspectionMessages(adminId)
 				if err != nil {
 					utils.FileLog.Error("获取巡检信息历史失败,err:%s, adminId:%d", err.Error(), adminId)
@@ -77,6 +79,13 @@ func DealEdbInspectionMessage(adminId int) {
 				}
 
 				readList := make([]int64, 0)
+				// 检查连接状态
+				// conn := global.MonitorMessageConn[adminId]
+				// if conn == nil {
+				// 	utils.FileLog.Error("发送消息时发现连接已断开, adminId:%d", adminId)
+				// 	cancel()
+				// 	return
+				// }
 				// 只处理第一条消息的发送,其他消息只标记为已读
 				for i, msg := range messageList {
 					if i == 0 {
@@ -101,6 +110,7 @@ func DealEdbInspectionMessage(adminId int) {
 							cancel()
 							return
 						}
+
 						utils.FileLog.Info("巡检信息发送成功,adminId:%d, messageId:%d", adminId, msg.MessageId)
 					}
 					readList = append(readList, msg.MessageId)
@@ -112,14 +122,14 @@ func DealEdbInspectionMessage(adminId int) {
 						utils.FileLog.Error("巡检信息已读失败,err:%s, adminId:%d", err.Error(), adminId)
 					}
 				}
-			})
+			//})
 
-			if err != nil && err.Error() != "redis: nil" {
-				utils.FileLog.Error("Redis operation failed: %v", err)
-				continue
-			}else {
-				utils.FileLog.Info("巡检信息处理完成, adminId:%d", adminId)
-			}
+			// if err != nil && err.Error() != "redis: nil" {
+			// 	utils.FileLog.Error("Redis operation failed: %v", err)
+			// 	continue
+			// }else {
+			// 	utils.FileLog.Info("巡检信息处理完成, adminId:%d", adminId)
+			// }
 		}
 	}
 }
@@ -128,6 +138,7 @@ func WriteWebSocketMessageAsync(ctx context.Context, adminId int, resp interface
 	errChan := make(chan error, 1)
 	var wsWriteMutex sync.Mutex
 	isClose := false
+	
 	go func() {
 		wsWriteMutex.Lock()
 		defer wsWriteMutex.Unlock()

+ 1 - 1
utils/constants.go

@@ -278,7 +278,7 @@ const (
 	CACHE_CHART_AUTH                        = "eta:chart:auth:"                       //图表数据授权
 	CACHE_REPORT_SHARE_AUTH                 = "eta:report:auth:share:"                //报告短链与报告图表授权映射key
 	CACHE_REPORT_AUTH                       = "eta:report:auth:"                      //报告图表数据授权
-	CACHE_EDB_INSPECTION_MESSAGE            = "eta:edb_inspection:message:"          //巡检消息队列
+	CACHE_EDB_INSPECTION_MESSAGE            = "eta:edb:inspection:message:"          //巡检消息队列
 )
 
 // 模板消息推送类型