xyxie пре 4 дана
родитељ
комит
6445ac874f
1 измењених фајлова са 49 додато и 46 уклоњено
  1. 49 46
      services/websocket_msg.go

+ 49 - 46
services/websocket_msg.go

@@ -62,61 +62,64 @@ func DealEdbInspectionMessage(adminId int) {
 				return
 			}
 			// 使用带超时的 Redis 操作
-			val := utils.Rc.Get(cacheKey)
-			if val == nil {
-				utils.FileLog.Info("巡检信息历史为空, adminId:%d", adminId)
-				continue
-			}
-			messageList, err := data.GetHistoryInspectionMessages(adminId)
-			if err != nil {
-				utils.FileLog.Error("获取巡检信息历史失败,err:%s, adminId:%d", err.Error(), adminId)
-				continue
-			}
-			if len(messageList) == 0 {
-				utils.FileLog.Info("巡检信息历史为空, adminId:%d", adminId)
-				continue
-			}
+			
+			err := utils.Rc.BrpopWithTimeout(cacheKey, 30*time.Second, func(b []byte) {
+				utils.FileLog.Info("收到巡检信息开始处理, adminId:%d", adminId)
 
-			readList := make([]int64, 0)
-			// 只处理第一条消息的发送,其他消息只标记为已读
-			for i, msg := range messageList {
-				if i == 0 {
-					respData, err := data.SendInspectionMessages(adminId, msg)
-					if err != nil {
-						utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId)
-						continue
-					}
+				messageList, err := data.GetHistoryInspectionMessages(adminId)
+				if err != nil {
+					utils.FileLog.Error("获取巡检信息历史失败,err:%s, adminId:%d", err.Error(), adminId)
+					return
+				}
+				if len(messageList) == 0 {
+					utils.FileLog.Info("巡检信息历史为空, adminId:%d", adminId)
+					return
+				}
 
-					resp := models.WebsocketMessageResponse{
-						MessageType: 1,
-						Data:       respData,
+				readList := make([]int64, 0)
+				// 只处理第一条消息的发送,其他消息只标记为已读
+				for i, msg := range messageList {
+					if i == 0 {
+						respData, err := data.SendInspectionMessages(adminId, msg)
+						if err != nil {
+							utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId)
+							continue
+						}
+
+						resp := models.WebsocketMessageResponse{
+							MessageType: 1,
+							Data:       respData,
+						}
+
+						err, isClose := WriteWebSocketMessageAsync(ctx, adminId, resp)
+						if err != nil {
+							utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId)
+							continue
+						}
+						if isClose {
+							utils.FileLog.Error("巡检信息发送失败,连接已断开, adminId:%d", adminId)
+							cancel()
+							return
+						}
+						utils.FileLog.Info("巡检信息发送成功,adminId:%d, messageId:%d", adminId, msg.MessageId)
 					}
+					readList = append(readList, msg.MessageId)
+				}
 
-					err, isClose := WriteWebSocketMessageAsync(ctx, adminId, resp)
+				if len(readList) > 0 {
+					_, err = data.ReadEdbInspectionMessageList(readList, adminId)
 					if err != nil {
-						utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId)
-						continue
+						utils.FileLog.Error("巡检信息已读失败,err:%s, adminId:%d", err.Error(), adminId)
 					}
-					if isClose {
-						utils.FileLog.Error("检查连接状态 发送消息时发现连接已断开, adminId:%d", adminId)
-						cancel()
-						return
-					}
-
-					utils.FileLog.Info("巡检信息发送成功,adminId:%d, messageId:%d", adminId, msg.MessageId)
 				}
-				readList = append(readList, msg.MessageId)
-			}
+			})
 
-			if len(readList) > 0 {
-				_, err = data.ReadEdbInspectionMessageList(readList, adminId)
-				if err != nil {
-					utils.FileLog.Error("巡检信息已读失败,err:%s, adminId:%d", err.Error(), adminId)
-				}
+			if err != nil && err.Error() != "redis: nil" {
+				utils.FileLog.Error("Redis operation failed: %v", err)
+				continue
+			}else {
+				utils.FileLog.Info("巡检信息处理完成, adminId:%d", adminId)
 			}
-		
-			utils.FileLog.Info("巡检信息处理完成, adminId:%d", adminId)
-		
 		}
 	}
 }