Selaa lähdekoodia

Merge branch 'feature/eta2.5.9_api_stat' into debug

xyxie 4 päivää sitten
vanhempi
commit
24e17c5b63
1 muutettua tiedostoa jossa 42 lisäystä ja 112 poistoa
  1. 42 112
      services/websocket_msg.go

+ 42 - 112
services/websocket_msg.go

@@ -1,13 +1,10 @@
 package services
 
 import (
-	"context"
 	"eta/eta_api/models"
 	"eta/eta_api/services/data"
 	"eta/eta_api/utils"
 	"fmt"
-	"sync"
-	"time"
 
 	"github.com/gorilla/websocket"
 )
@@ -18,118 +15,51 @@ func DealWebSocketMsg(conn *websocket.Conn, adminId int) {
 
 // 处理巡检消息
 func DealEdbInspectionMessage(conn *websocket.Conn, adminId int) {
-	// 创建上下文用于控制 goroutine 生命周期
-	ctx, cancel := context.WithCancel(context.Background())
-	defer cancel()
-
-	// 创建互斥锁保护 WebSocket 写操作
-	var wsWriteMutex sync.Mutex
-
-	// 创建连接关闭标志
-	done := make(chan struct{})
-	defer close(done)
-
-	cacheKey := fmt.Sprintf("%s%d", utils.CACHE_EDB_INSPECTION_MESSAGE, adminId)
-
-	// 监听连接关闭
+	messageList, err := data.GetHistoryInspectionMessages(adminId)
+	if err != nil {
+		utils.FileLog.Error("获取巡检信息历史失败,err:%s, adminId:%d", err.Error(), adminId)
+		return
+	}
+	success := make(chan int64, 10)
 	go func() {
-		<-done
-		cancel()
-	}()
-
-	// 设置连接关闭处理器
-	conn.SetCloseHandler(func(code int, text string) error {
-		close(done)
-		return nil
-	})
-
-	for {
-		select {
-		case <-ctx.Done():
-			return
-		default:
-			// 使用带超时的 Redis 操作
-			utils.Rc.BrpopWithTimeout(cacheKey, 30*time.Second, func(b []byte) {
-				messageList, err := data.GetHistoryInspectionMessages(adminId)
+		defer close(success)
+		for i, msg := range messageList {
+			if i == 0 {
+				// 多条消息仅发送最新一条
+				respData, err := data.SendInspectionMessages(adminId, msg)
 				if err != nil {
-					utils.FileLog.Error("获取巡检信息历史失败,err:%s, adminId:%d", err.Error(), adminId)
-					return
-				}
-
-				success := make(chan int64, 10)
-				var wg sync.WaitGroup
-
-				// 消息发送 goroutine
-				wg.Add(1)
-				go func() {
-					defer wg.Done()
-					defer close(success)
-
-					for i, msg := range messageList {
-						select {
-						case <-ctx.Done():
-							return
-						default:
-							if i == 0 {
-								respData, err := data.SendInspectionMessages(adminId, msg)
-								if err != nil {
-									utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId)
-									continue
-								}
-
-								resp := models.WebsocketMessageResponse{
-									MessageType: 1,
-									Data:       respData,
-								}
-
-								// 使用互斥锁保护 WebSocket 写操作
-								wsWriteMutex.Lock()
-								err = conn.WriteJSON(resp)
-								wsWriteMutex.Unlock()
-
-								if err != nil {
-									utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId)
-									continue
-								}
-
-								utils.FileLog.Info("巡检信息发送成功,adminId:%d, messageId:%d", adminId, msg.MessageId)
-								success <- msg.MessageId
-							} else {
-								success <- msg.MessageId
-							}
-						}
+					utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId)
+				} else {
+					resp := models.WebsocketMessageResponse{
+						MessageType: 1,
+						Data: respData,
 					}
-				}()
-
-				// 消息已读处理 goroutine
-				wg.Add(1)
-				go func() {
-					defer wg.Done()
-					readList := make([]int64, 0)
-					
-					for {
-						select {
-						case <-ctx.Done():
-							return
-						case msgId, ok := <-success:
-							if !ok {
-								// 处理已收集的消息
-								if len(readList) > 0 {
-									_, err = data.ReadEdbInspectionMessageList(readList, adminId)
-									if err != nil {
-										utils.FileLog.Error("巡检信息已读失败,err:%s, adminId:%d", err.Error(), adminId)
-									}
-								}
-								return
-							}
-							readList = append(readList, msgId)
-						}
+					err = conn.WriteJSON(resp)
+					if err != nil {
+						utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId)
+					} else {
+						utils.FileLog.Info("巡检信息发送成功,adminId:%d, messageId:%d", adminId, msg.MessageId)
+						success <- msg.MessageId
 					}
-				}()
-
-				// 等待所有 goroutine 完成
-				wg.Wait()
-			})
+				}
+			} else {
+				success <- msg.MessageId
+			}
 		}
-	}
+	}()
+	go func() {
+		readList := make([]int64, 0)
+		for {
+			msgId, ok := <-success
+			if !ok {
+				break
+			}
+			readList = append(readList, msgId)
+		}
+		_, err = data.ReadEdbInspectionMessageList(readList, adminId)
+		if err != nil {
+			utils.FileLog.Error("巡检信息已读失败,err:%s, adminId:%d", err.Error(), adminId)
+		}
+	}()		
+	
 }