Browse Source

Merge branch 'feature/eta2.5.9_api_stat' into debug

xyxie 4 days ago
parent
commit
b56a997699

+ 12 - 11
controllers/edb_monitor/edb_monitor_message.go

@@ -3,10 +3,11 @@ package edb_monitor
 import (
 	"encoding/json"
 	"eta/eta_api/controllers"
+	"eta/eta_api/global"
 	"eta/eta_api/models"
 	"eta/eta_api/models/edb_monitor/request"
-	edbmonitor "eta/eta_api/services/edb_monitor"
 	"eta/eta_api/services"
+	edbmonitor "eta/eta_api/services/edb_monitor"
 	"eta/eta_api/utils"
 	"net/http"
 	"strconv"
@@ -48,10 +49,10 @@ func (m *EdbMonitorMessageController) Connect() {
 	}
 
 	var conn *websocket.Conn
-	connKey := edbmonitor.EDB_MONITOR_MESSAGE_CONNECT_CACHE + strconv.Itoa(sysUser.AdminId)
+	connKey := global.EDB_MONITOR_MESSAGE_CONNECT_CACHE + strconv.Itoa(sysUser.AdminId)
 	ok := utils.Rc.IsExist(connKey)
 	if !ok {
-		conn = edbmonitor.MonitorMessageConn[sysUser.AdminId]
+		conn = global.MonitorMessageConn[sysUser.AdminId]
 		if conn != nil {
 			conn.Close()
 		}
@@ -70,9 +71,10 @@ func (m *EdbMonitorMessageController) Connect() {
 	}
 	defer conn.Close()
 
-	edbmonitor.MonitorMessageConn[sysUser.AdminId] = conn
+	global.MonitorMessageConn[sysUser.AdminId] = conn
 	conn.SetCloseHandler(func(code int, text string) error {
-		delete(edbmonitor.MonitorMessageConn, sysUser.AdminId)
+		utils.FileLog.Info("连接关闭SetCloseHandler, adminId:%d", sysUser.AdminId)
+		delete(global.MonitorMessageConn, sysUser.AdminId)
 		utils.Rc.Delete(connKey)
 		return nil
 	})
@@ -80,7 +82,7 @@ func (m *EdbMonitorMessageController) Connect() {
 	go func() {
 		// 心跳检测
 		for {
-			isClose, err := edbmonitor.EdbMonitorMessageHealth(sysUser.AdminId)
+			isClose, err := global.EdbMonitorMessageHealth(sysUser.AdminId)
 			if err != nil {
 				utils.FileLog.Error("指标预警信息健康检查失败,err:%s, adminId:%d", err.Error(), sysUser.AdminId)
 				return
@@ -100,8 +102,7 @@ func (m *EdbMonitorMessageController) Connect() {
 		defer close(success)
 		for i, msg := range messageList {
 			if i == 0 {
-				// 多条消息仅发送最新一条
-				err = edbmonitor.SendMessages(sysUser.AdminId, msg.EdbInfoId, msg.EdbInfoType, msg.EdbClassifyId, msg.EdbUniqueCode, msg.Message, msg.TriggerTime)
+				err := edbmonitor.SendMessages(sysUser.AdminId, msg.EdbInfoId, msg.EdbInfoType, msg.EdbClassifyId, msg.EdbUniqueCode, msg.Message, msg.TriggerTime)
 				if err != nil {
 					utils.FileLog.Error("指标预警信息发送失败,err:%s, adminId:%d", err.Error(), sysUser.AdminId)
 				} else {
@@ -126,9 +127,9 @@ func (m *EdbMonitorMessageController) Connect() {
 			utils.FileLog.Error("指标预警信息已读失败,err:%s, adminId:%d", err.Error(), sysUser.AdminId)
 		}
 	}()
-	
+
 	// 其他消息处理
-	services.DealWebSocketMsg(connKey, conn, sysUser.AdminId)
+	services.DealWebSocketMsg(sysUser.AdminId)
 	for {
 		ok = utils.Rc.IsExist(connKey)
 		if !ok {
@@ -161,7 +162,7 @@ func (m *EdbMonitorMessageController) Close() {
 		return
 	}
 
-	conn := edbmonitor.MonitorMessageConn[sysUser.AdminId]
+	conn := global.MonitorMessageConn[sysUser.AdminId]
 	if conn != nil {
 		conn.Close()
 	}

+ 38 - 0
global/websocket.go

@@ -0,0 +1,38 @@
+package global
+
+import (
+	"errors"
+	"eta/eta_api/utils"
+	"strconv"
+	"time"
+
+	"github.com/gorilla/websocket"
+)
+
+var MonitorMessageConn = make(map[int]*websocket.Conn)
+
+var (
+	EDB_MONITOR_MESSAGE_CONNECT_CACHE = "edb_monitor_message_cache:"
+)
+
+func EdbMonitorMessageHealth(adminId int) (isClose bool, err error) {
+	conn := MonitorMessageConn[adminId]
+	if conn == nil {
+		err = errors.New("no connection")
+		isClose = true
+		return
+	}
+	_, msg, err := conn.ReadMessage()
+	if err != nil {
+		isClose = true
+		return
+	}
+	if string(msg) == "ping" {
+		healthKey := EDB_MONITOR_MESSAGE_CONNECT_CACHE + strconv.Itoa(adminId)
+		err = utils.Rc.Put(healthKey, "1", time.Minute*1)
+		if err != nil {
+			return
+		}
+	}
+	return
+}

+ 23 - 31
services/edb_monitor/edb_monitor_message.go

@@ -2,23 +2,15 @@ package edbmonitor
 
 import (
 	"errors"
+	"eta/eta_api/global"
 	"eta/eta_api/models"
 	edbmonitor "eta/eta_api/models/edb_monitor"
 	"eta/eta_api/models/edb_monitor/response"
 	"eta/eta_api/utils"
-	"strconv"
 	"time"
-
-	"github.com/gorilla/websocket"
 	"github.com/rdlucklib/rdluck_tools/paging"
 )
 
-var (
-	EDB_MONITOR_MESSAGE_CONNECT_CACHE = "edb_monitor_message_cache:"
-)
-
-var MonitorMessageConn = make(map[int]*websocket.Conn)
-
 func ReadEdbMonitorMessage(messageId, adminId int) (msg string, err error) {
 	message, err := edbmonitor.GetEdbMonitorMessageById(messageId)
 	if err != nil {
@@ -52,27 +44,27 @@ func ReadEdbMonitorMessageList(messageId []int, adminId int) (msg string, err er
 	return
 }
 
-func EdbMonitorMessageHealth(adminId int) (isClose bool, err error) {
-	conn := MonitorMessageConn[adminId]
-	if conn == nil {
-		err = errors.New("no connection")
-		isClose = true
-		return
-	}
-	_, msg, err := conn.ReadMessage()
-	if err != nil {
-		isClose = true
-		return
-	}
-	if string(msg) == "ping" {
-		healthKey := EDB_MONITOR_MESSAGE_CONNECT_CACHE + strconv.Itoa(adminId)
-		err = utils.Rc.Put(healthKey, "1", time.Minute*1)
-		if err != nil {
-			return
-		}
-	}
-	return
-}
+// func EdbMonitorMessageHealth(adminId int) (isClose bool, err error) {
+// 	conn := MonitorMessageConn[adminId]
+// 	if conn == nil {
+// 		err = errors.New("no connection")
+// 		isClose = true
+// 		return
+// 	}
+// 	_, msg, err := conn.ReadMessage()
+// 	if err != nil {
+// 		isClose = true
+// 		return
+// 	}
+// 	if string(msg) == "ping" {
+// 		healthKey := EDB_MONITOR_MESSAGE_CONNECT_CACHE + strconv.Itoa(adminId)
+// 		err = utils.Rc.Put(healthKey, "1", time.Minute*1)
+// 		if err != nil {
+// 			return
+// 		}
+// 	}
+// 	return
+// }
 
 func LogMessage(content, uniqueCode string, triggerTime time.Time, edbInfoId, edbInfoType, adminId, isRead, classifyId int) (err error) {
 	message := &edbmonitor.EdbMonitorMessage{
@@ -92,7 +84,7 @@ func LogMessage(content, uniqueCode string, triggerTime time.Time, edbInfoId, ed
 }
 
 func SendMessages(adminId, edbInfoId, edbInfoType int, classifyId int, edbUniqueCode, message string, triggerTime string) (err error) {
-	conn := MonitorMessageConn[adminId]
+	conn := global.MonitorMessageConn[adminId]
 	if conn == nil {
 		return errors.New("no connection")
 	}

+ 63 - 79
services/websocket_msg.go

@@ -1,53 +1,30 @@
 package services
 
 import (
-	"encoding/json"
+	"eta/eta_api/global"
 	"eta/eta_api/models"
 	"eta/eta_api/services/data"
 	"eta/eta_api/utils"
 	"fmt"
-	"runtime"
 	"sync"
 	"time"
 
 	"context"
-
-	"github.com/gorilla/websocket"
 )
 
-func DealWebSocketMsg(connKey string, conn *websocket.Conn, adminId int) {
-	go DealEdbInspectionMessage(connKey, conn, adminId)
+func DealWebSocketMsg(adminId int) {
+	go DealEdbInspectionMessage(adminId)
 }
 
 // 处理巡检消息
-func DealEdbInspectionMessage(connKey string, conn *websocket.Conn, adminId int) {
+func DealEdbInspectionMessage(adminId int) {
+	utils.FileLog.Info("创建协程, adminId:%d", adminId)
 	// 创建上下文用于控制 goroutine 生命周期
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 
-	// 监控 goroutine 数量
-	go func() {
-		ticker := time.NewTicker(time.Minute)
-		defer ticker.Stop()
-		for {
-			select {
-			case <-ticker.C:
-				utils.FileLog.Info("Current goroutine count: %d", runtime.NumGoroutine())
-			case <-ctx.Done():
-				return
-			}
-		}
-	}()
-
 	cacheKey := fmt.Sprintf("%s%d", utils.CACHE_EDB_INSPECTION_MESSAGE, adminId)
 
-	// // 设置连接关闭处理器
-	// conn.SetCloseHandler(func(code int, text string) error {
-	// 	utils.FileLog.Info("长连接关闭, adminId:%d", adminId)
-	// 	cancel()
-	// 	return nil
-	// })
-
 	// 添加错误恢复机制
 	defer func() {
 		if r := recover(); r != nil {
@@ -56,18 +33,20 @@ func DealEdbInspectionMessage(connKey string, conn *websocket.Conn, adminId int)
 			cancel()
 		}
 	}()
-
+	
 	for {
-		ok := utils.Rc.IsExist(connKey)
-		if !ok {
-			utils.FileLog.Info("长连接关闭, adminId:%d", adminId)
-			cancel()
-			return
-		}
 		select {
 		case <-ctx.Done():
+			utils.FileLog.Info("DealEdbInspectionMessage 巡检消息处理协程结束, adminId:%d", adminId)
 			return
 		default:
+			// 检查连接状态
+			conn := global.MonitorMessageConn[adminId]
+			if conn == nil {
+				utils.FileLog.Error("检查连接状态 发送消息时发现连接已断开, adminId:%d", adminId)
+				cancel()
+				return
+			}
 			// 使用带超时的 Redis 操作
 			err := utils.Rc.BrpopWithTimeout(cacheKey, 30*time.Second, func(b []byte) {
 				utils.FileLog.Info("收到巡检信息开始处理, adminId:%d", adminId)
@@ -81,39 +60,38 @@ func DealEdbInspectionMessage(connKey string, conn *websocket.Conn, adminId int)
 					utils.FileLog.Info("巡检信息历史为空, adminId:%d", adminId)
 					return
 				}
-				readList := make([]int64, 0)
-				// 消息发送 goroutine
 
+				readList := make([]int64, 0)
+				// 检查连接状态
+				conn := global.MonitorMessageConn[adminId]
+				if conn == nil {
+					utils.FileLog.Error("发送消息时发现连接已断开, adminId:%d", adminId)
+					cancel()
+					return
+				}
+				// 只处理第一条消息的发送,其他消息只标记为已读
 				for i, msg := range messageList {
-					select {
-					case <-ctx.Done():
-							return
-					default:
-						if i == 0 {
-							respData, err := data.SendInspectionMessages(adminId, msg)
-							if err != nil {
-								utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId)
-								continue
-							}
-
-							resp := models.WebsocketMessageResponse{
-								MessageType: 1,
-								Data:       respData,
-							}
-
-							err = WriteWebSocketMessage(conn, resp)
+					if i == 0 {
+						respData, err := data.SendInspectionMessages(adminId, msg)
+						if err != nil {
+							utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId)
+							continue
+						}
 
-							if err != nil {
-								utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId)
-								continue
-							}
+						resp := models.WebsocketMessageResponse{
+							MessageType: 1,
+							Data:       respData,
+						}
 
-							utils.FileLog.Info("巡检信息发送成功,adminId:%d, messageId:%d", adminId, msg.MessageId)
-							readList = append(readList, msg.MessageId)
-						} else {
-							readList = append(readList, msg.MessageId)
+						err = WriteWebSocketMessageAsync(ctx, adminId, resp)
+						if err != nil {
+							utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId)
+							continue
 						}
+
+						utils.FileLog.Info("巡检信息发送成功,adminId:%d, messageId:%d", adminId, msg.MessageId)
 					}
+					readList = append(readList, msg.MessageId)
 				}
 
 				if len(readList) > 0 {
@@ -127,32 +105,38 @@ func DealEdbInspectionMessage(connKey string, conn *websocket.Conn, adminId int)
 			if err != nil && err.Error() != "redis: nil" {
 				utils.FileLog.Error("Redis operation failed: %v", err)
 				continue
-			}else{
+			}else {
 				utils.FileLog.Info("巡检信息处理完成, adminId:%d", adminId)
 			}
 		}
 	}
 }
 
-func WriteWebSocketMessage(conn *websocket.Conn, resp models.WebsocketMessageResponse) error {
-	data, err := json.Marshal(resp)
-	if err != nil {
-		utils.FileLog.Error("Failed to marshal response: %v", err)
-		return err
-	}
+func WriteWebSocketMessageAsync(ctx context.Context, adminId int, resp interface{}) error {
+	errChan := make(chan error, 1)
 	var wsWriteMutex sync.Mutex
-	done := make(chan error, 1)
 	
-    go func() {
-           wsWriteMutex.Lock()
-           defer wsWriteMutex.Unlock()
-           done <- conn.WriteMessage(websocket.TextMessage, data)
-    }()
-       
+	go func() {
+		wsWriteMutex.Lock()
+		defer wsWriteMutex.Unlock()
+		
+		conn := global.MonitorMessageConn[adminId]
+		if conn == nil {
+			errChan <- fmt.Errorf("connection closed for adminId: %d", adminId)
+			return
+		}
+		
+		// 设置写超时
+		//conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
+		errChan <- conn.WriteJSON(resp)
+	}()
+	
 	select {
-	case err := <-done:
+	case err := <-errChan:
+		utils.FileLog.Error("WriteWebSocketMessageAsync errChan: %v", err)
 		return err
-	case <-time.After(5 * time.Second):
-		return fmt.Errorf("write timeout")
+	case <-ctx.Done():
+		utils.FileLog.Error("WriteWebSocketMessageAsync ctx.Done(): %v", ctx.Err())
+		return ctx.Err()
 	}
 }