Browse Source

Merge branch 'feature/eta2.5.9_api_stat' into debug

xyxie 4 ngày trước cách đây
mục cha
commit
670382e27c
1 tập tin đã thay đổi với 52 bổ sung57 xóa
  1. 52 57
      services/websocket_msg.go

+ 52 - 57
services/websocket_msg.go

@@ -6,12 +6,11 @@ import (
 	"eta/eta_api/services/data"
 	"eta/eta_api/utils"
 	"fmt"
+	"runtime"
 	"sync"
 	"time"
-	"runtime"
 
 	"context"
-	
 )
 
 func DealWebSocketMsg(adminId int) {
@@ -63,80 +62,76 @@ func DealEdbInspectionMessage(adminId int) {
 				return
 			}
 			// 使用带超时的 Redis 操作
-			err := utils.Rc.BrpopWithTimeout(cacheKey, 30*time.Second, func(b []byte) {
-				utils.FileLog.Info("收到巡检信息开始处理, adminId:%d", adminId)
-
-				messageList, err := data.GetHistoryInspectionMessages(adminId)
-				if err != nil {
-					utils.FileLog.Error("获取巡检信息历史失败,err:%s, adminId:%d", err.Error(), adminId)
-					return
-				}
-				if len(messageList) == 0 {
-					utils.FileLog.Info("巡检信息历史为空, adminId:%d", adminId)
-					return
-				}
-
-				readList := make([]int64, 0)
-				// 检查连接状态
-				conn := global.MonitorMessageConn[adminId]
-				if conn == nil {
-					utils.FileLog.Error("发送消息时发现连接已断开, adminId:%d", adminId)
-					cancel()
-					return
-				}
-				// 只处理第一条消息的发送,其他消息只标记为已读
-				for i, msg := range messageList {
-					if i == 0 {
-						respData, err := data.SendInspectionMessages(adminId, msg)
-						if err != nil {
-							utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId)
-							continue
-						}
-
-						resp := models.WebsocketMessageResponse{
-							MessageType: 1,
-							Data:       respData,
-						}
+			val := utils.Rc.Get(cacheKey)
+			if val == nil {
+				utils.FileLog.Info("巡检信息历史为空, adminId:%d", adminId)
+				continue
+			}
+			messageList, err := data.GetHistoryInspectionMessages(adminId)
+			if err != nil {
+				utils.FileLog.Error("获取巡检信息历史失败,err:%s, adminId:%d", err.Error(), adminId)
+				continue
+			}
+			if len(messageList) == 0 {
+				utils.FileLog.Info("巡检信息历史为空, adminId:%d", adminId)
+				continue
+			}
 
-						err = WriteWebSocketMessageAsync(ctx, adminId, resp)
-						if err != nil {
-							utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId)
-							continue
-						}
+			readList := make([]int64, 0)
+			// 只处理第一条消息的发送,其他消息只标记为已读
+			for i, msg := range messageList {
+				if i == 0 {
+					respData, err := data.SendInspectionMessages(adminId, msg)
+					if err != nil {
+						utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId)
+						continue
+					}
 
-						utils.FileLog.Info("巡检信息发送成功,adminId:%d, messageId:%d", adminId, msg.MessageId)
+					resp := models.WebsocketMessageResponse{
+						MessageType: 1,
+						Data:       respData,
 					}
-					readList = append(readList, msg.MessageId)
-				}
 
-				if len(readList) > 0 {
-					_, err = data.ReadEdbInspectionMessageList(readList, adminId)
+					err, isClose := WriteWebSocketMessageAsync(ctx, adminId, resp)
 					if err != nil {
-						utils.FileLog.Error("巡检信息已读失败,err:%s, adminId:%d", err.Error(), adminId)
+						utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId)
+						continue
+					}
+					if isClose {
+						utils.FileLog.Error("检查连接状态 发送消息时发现连接已断开, adminId:%d", adminId)
+						cancel()
+						return
 					}
+
+					utils.FileLog.Info("巡检信息发送成功,adminId:%d, messageId:%d", adminId, msg.MessageId)
 				}
-			})
+				readList = append(readList, msg.MessageId)
+			}
 
-			if err != nil && err.Error() != "redis: nil" {
-				utils.FileLog.Error("Redis operation failed: %v", err)
-				continue
-			}else {
-				utils.FileLog.Info("巡检信息处理完成, adminId:%d", adminId)
+			if len(readList) > 0 {
+				_, err = data.ReadEdbInspectionMessageList(readList, adminId)
+				if err != nil {
+					utils.FileLog.Error("巡检信息已读失败,err:%s, adminId:%d", err.Error(), adminId)
+				}
 			}
+		
+			utils.FileLog.Info("巡检信息处理完成, adminId:%d", adminId)
+		
 		}
 	}
 }
 
-func WriteWebSocketMessageAsync(ctx context.Context, adminId int, resp interface{}) error {
+func WriteWebSocketMessageAsync(ctx context.Context, adminId int, resp interface{}) (error, bool) {
 	errChan := make(chan error, 1)
 	var wsWriteMutex sync.Mutex
-	
+	isClose := false
 	go func() {
 		wsWriteMutex.Lock()
 		defer wsWriteMutex.Unlock()
 		
 		conn := global.MonitorMessageConn[adminId]
 		if conn == nil {
+			isClose = true
 			errChan <- fmt.Errorf("connection closed for adminId: %d", adminId)
 			return
 		}
@@ -149,9 +144,9 @@ func WriteWebSocketMessageAsync(ctx context.Context, adminId int, resp interface
 	select {
 	case err := <-errChan:
 		utils.FileLog.Error("WriteWebSocketMessageAsync errChan: %v", err)
-		return err
+		return err, isClose
 	case <-ctx.Done():
 		utils.FileLog.Error("WriteWebSocketMessageAsync ctx.Done(): %v", ctx.Err())
-		return ctx.Err()
+		return ctx.Err(), isClose
 	}
 }