xyxie 2 天之前
父节点
当前提交
7db78f4c6c
共有 2 个文件被更改,包括 43 次插入24 次删除
  1. 1 1
      controllers/edb_monitor/edb_monitor_message.go
  2. 42 23
      services/websocket_msg.go

+ 1 - 1
controllers/edb_monitor/edb_monitor_message.go

@@ -128,7 +128,7 @@ func (m *EdbMonitorMessageController) Connect() {
 	}()
 	
 	// 其他消息处理
-	services.DealWebSocketMsg(conn, sysUser.AdminId)
+	services.DealWebSocketMsg(connKey, conn, sysUser.AdminId)
 	for {
 		ok = utils.Rc.IsExist(connKey)
 		if !ok {

+ 42 - 23
services/websocket_msg.go

@@ -1,6 +1,7 @@
 package services
 
 import (
+	"encoding/json"
 	"eta/eta_api/models"
 	"eta/eta_api/services/data"
 	"eta/eta_api/utils"
@@ -8,24 +9,21 @@ import (
 	"sync"
 	"time"
 
-	"context"	
+	"context"
 
 	"github.com/gorilla/websocket"
 )
 
-func DealWebSocketMsg(conn *websocket.Conn, adminId int) {
-	go DealEdbInspectionMessage(conn, adminId)
+func DealWebSocketMsg(connKey string, conn *websocket.Conn, adminId int) {
+	go DealEdbInspectionMessage(connKey, conn, adminId)
 }
 
 // 处理巡检消息
-func DealEdbInspectionMessage(conn *websocket.Conn, adminId int) {
+func DealEdbInspectionMessage(connKey string, conn *websocket.Conn, adminId int) {
 	// 创建上下文用于控制 goroutine 生命周期
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 
-	// 创建互斥锁保护 WebSocket 写操作
-	var wsWriteMutex sync.Mutex
-
 	// // 监控 goroutine 数量
 	// go func() {
 	// 	ticker := time.NewTicker(time.Minute)
@@ -42,12 +40,12 @@ func DealEdbInspectionMessage(conn *websocket.Conn, adminId int) {
 
 	cacheKey := fmt.Sprintf("%s%d", utils.CACHE_EDB_INSPECTION_MESSAGE, adminId)
 
-	// 设置连接关闭处理器
-	conn.SetCloseHandler(func(code int, text string) error {
-		utils.FileLog.Info("长连接关闭, adminId:%d", adminId)
-		cancel()
-		return nil
-	})
+	// // 设置连接关闭处理器
+	// conn.SetCloseHandler(func(code int, text string) error {
+	// 	utils.FileLog.Info("长连接关闭, adminId:%d", adminId)
+	// 	cancel()
+	// 	return nil
+	// })
 
 	// 添加错误恢复机制
 	defer func() {
@@ -59,6 +57,12 @@ func DealEdbInspectionMessage(conn *websocket.Conn, adminId int) {
 	}()
 
 	for {
+		ok := utils.Rc.IsExist(connKey)
+		if !ok {
+			utils.FileLog.Info("长连接关闭, adminId:%d", adminId)
+			cancel()
+			return
+		}
 		select {
 		case <-ctx.Done():
 			return
@@ -96,15 +100,7 @@ func DealEdbInspectionMessage(conn *websocket.Conn, adminId int) {
 								Data:       respData,
 							}
 
-							// 使用互斥锁保护 WebSocket 写操作
-							wsWriteMutex.Lock()
-							err = conn.WriteJSON(resp)
-							if err != nil {
-								utils.FileLog.Error("巡检信息写通道被锁,消息发送失败,err:%s, adminId:%d", err.Error(), adminId)
-								wsWriteMutex.Unlock()
-								continue
-							}
-							wsWriteMutex.Unlock()
+							err = WriteWebSocketMessage(conn, resp)
 
 							if err != nil {
 								utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId)
@@ -127,7 +123,7 @@ func DealEdbInspectionMessage(conn *websocket.Conn, adminId int) {
 				}
 			})
 
-			if err != nil {
+			if err != nil && err.Error() != "redis: nil" {
 				utils.FileLog.Error("Redis operation failed: %v", err)
 				continue
 			}else{
@@ -135,4 +131,27 @@ func DealEdbInspectionMessage(conn *websocket.Conn, adminId int) {
 			}
 		}
 	}
+}
+
+func WriteWebSocketMessage(conn *websocket.Conn, resp models.WebsocketMessageResponse) error {
+	data, err := json.Marshal(resp)
+	if err != nil {
+		utils.FileLog.Error("Failed to marshal response: %v", err)
+		return err
+	}
+	var wsWriteMutex sync.Mutex
+	done := make(chan error, 1)
+	
+    go func() {
+           wsWriteMutex.Lock()
+           defer wsWriteMutex.Unlock()
+           done <- conn.WriteMessage(websocket.TextMessage, data)
+    }()
+       
+	select {
+	case err := <-done:
+		return err
+	case <-time.After(5 * time.Second):
+		return fmt.Errorf("write timeout")
+	}
 }