Selaa lähdekoodia

Merge branch 'feature/eta2.5.9_api_stat' into debug

xyxie 3 päivää sitten
vanhempi
commit
e30423d196

+ 4 - 4
controllers/data_stat/edb_terminal.go

@@ -52,10 +52,10 @@ func (this *EdbTerminalController) Save() {
 		br.Msg = "请输入终端地址或者token"
 		return
 	}*/
-	if req.Num <= 0 {
-		br.Msg = "请输入指标数据量"
-		return
-	}
+	// if req.Num <= 0 {
+	// 	br.Msg = "请输入指标数据量"
+	// 	return
+	// }
 	if req.Source == 0 {
 		br.Msg = "请输入终端类型"
 		return

+ 103 - 42
services/websocket_msg.go

@@ -4,61 +4,122 @@ import (
 	"eta/eta_api/models"
 	"eta/eta_api/services/data"
 	"eta/eta_api/utils"
+	"fmt"
+	"sync"
+	"time"
+
+	"context"	
 
 	"github.com/gorilla/websocket"
 )
 
 func DealWebSocketMsg(conn *websocket.Conn, adminId int) {
-	DealEdbInspectionMessage(conn, adminId)
+	go DealEdbInspectionMessage(conn, adminId)
 }
 
 // 处理巡检消息
 func DealEdbInspectionMessage(conn *websocket.Conn, adminId int) {
-	messageList, err := data.GetHistoryInspectionMessages(adminId)
-	if err != nil {
-		utils.FileLog.Error("获取巡检信息历史失败,err:%s, adminId:%d", err.Error(), adminId)
-		return
-	}
-	success := make(chan int64, 10)
-	go func() {
-		defer close(success)
-		for i, msg := range messageList {
-			if i == 0 {
-				// 多条消息仅发送最新一条
-				respData, err := data.SendInspectionMessages(adminId, msg)
+	// 创建上下文用于控制 goroutine 生命周期
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	// 创建互斥锁保护 WebSocket 写操作
+	var wsWriteMutex sync.Mutex
+
+	// // 监控 goroutine 数量
+	// go func() {
+	// 	ticker := time.NewTicker(time.Minute)
+	// 	defer ticker.Stop()
+	// 	for {
+	// 		select {
+	// 		case <-ticker.C:
+	// 			utils.FileLog.Info("Current goroutine count: %d", runtime.NumGoroutine())
+	// 		case <-ctx.Done():
+	// 			return
+	// 		}
+	// 	}
+	// }()
+
+	cacheKey := fmt.Sprintf("%s%d", utils.CACHE_EDB_INSPECTION_MESSAGE, adminId)
+
+	// 设置连接关闭处理器
+	conn.SetCloseHandler(func(code int, text string) error {
+		cancel()
+		return nil
+	})
+
+	// 添加错误恢复机制
+	defer func() {
+		if r := recover(); r != nil {
+			utils.FileLog.Error("WebSocket handler recovered from panic: %v", r)
+			// 清理资源
+			cancel()
+		}
+	}()
+
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		default:
+			// 使用带超时的 Redis 操作
+			err := utils.Rc.BrpopWithTimeout(cacheKey, 30*time.Second, func(b []byte) {
+				messageList, err := data.GetHistoryInspectionMessages(adminId)
+
 				if err != nil {
-					utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId)
-				} else {
-					resp := models.WebsocketMessageResponse{
-						MessageType: 1,
-						Data: respData,
+					utils.FileLog.Error("获取巡检信息历史失败,err:%s, adminId:%d", err.Error(), adminId)
+					return
+				}
+				readList := make([]int64, 0)
+				// 消息发送 goroutine
+
+				for i, msg := range messageList {
+					select {
+					case <-ctx.Done():
+							return
+					default:
+						if i == 0 {
+							respData, err := data.SendInspectionMessages(adminId, msg)
+							if err != nil {
+								utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId)
+								continue
+							}
+
+							resp := models.WebsocketMessageResponse{
+								MessageType: 1,
+								Data:       respData,
+							}
+
+							// 使用互斥锁保护 WebSocket 写操作
+							wsWriteMutex.Lock()
+							err = conn.WriteJSON(resp)
+							wsWriteMutex.Unlock()
+
+							if err != nil {
+								utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId)
+								continue
+							}
+
+							utils.FileLog.Info("巡检信息发送成功,adminId:%d, messageId:%d", adminId, msg.MessageId)
+							readList = append(readList, msg.MessageId)
+						} else {
+							readList = append(readList, msg.MessageId)
+						}
 					}
-					err = conn.WriteJSON(resp)
+				}
+
+				if len(readList) > 0 {
+					_, err = data.ReadEdbInspectionMessageList(readList, adminId)
 					if err != nil {
-						utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId)
-					} else {
-						utils.FileLog.Info("巡检信息发送成功,adminId:%d, messageId:%d", adminId, msg.MessageId)
-						success <- msg.MessageId
+						utils.FileLog.Error("巡检信息已读失败,err:%s, adminId:%d", err.Error(), adminId)
 					}
 				}
-			} else {
-				success <- msg.MessageId
-			}
-		}
-	}()
-	go func() {
-		readList := make([]int64, 0)
-		for {
-			msgId, ok := <-success
-			if !ok {
-				break
+			})
+
+			if err != nil {
+				utils.FileLog.Error("Redis operation failed: %v", err)
+				return
 			}
-			readList = append(readList, msgId)
 		}
-		_, err = data.ReadEdbInspectionMessageList(readList, adminId)
-		if err != nil {
-			utils.FileLog.Error("巡检信息已读失败,err:%s, adminId:%d", err.Error(), adminId)
-		}
-	}()		
-	
-}
+	}
+}

+ 1 - 1
utils/redis.go

@@ -19,7 +19,7 @@ type RedisClient interface {
 	IsExist(key string) bool
 	LPush(key string, val interface{}) error
 	Brpop(key string, callback func([]byte))
-	BrpopWithTimeout(key string, timeout time.Duration, callback func([]byte))
+	BrpopWithTimeout(key string, timeout time.Duration, callback func([]byte)) error
 	GetRedisTTL(key string) time.Duration
 	Incrby(key string, num int) (interface{}, error)
 	Do(commandName string, args ...interface{}) (reply interface{}, err error)

+ 6 - 1
utils/redis/cluster_redis.go

@@ -255,13 +255,18 @@ func (rc *ClusterRedisClient) Brpop(key string, callback func([]byte)) {
 // @param key
 // @param timeout
 // @param callback
-func (rc *ClusterRedisClient) BrpopWithTimeout(key string, timeout time.Duration, callback func([]byte)) {
+func (rc *ClusterRedisClient) BrpopWithTimeout(key string, timeout time.Duration, callback func([]byte)) (err error) {
 	values, err := rc.redisClient.BRPop(context.TODO(), timeout, key).Result()
 	if err != nil {
 		return
 	}
+	if len(values) < 2 {
+		err = errors.New("redis brpop timeout")
+		return
+	}
 
 	callback([]byte(values[1]))
+	return
 }
 
 // GetRedisTTL

+ 6 - 2
utils/redis/standalone_redis.go

@@ -243,13 +243,17 @@ func (rc *StandaloneRedisClient) Brpop(key string, callback func([]byte)) {
 // @param key
 // @param timeout
 // @param callback
-func (rc *StandaloneRedisClient) BrpopWithTimeout(key string, timeout time.Duration, callback func([]byte)) {
+func (rc *StandaloneRedisClient) BrpopWithTimeout(key string, timeout time.Duration, callback func([]byte)) (err error) {
 	values, err := rc.redisClient.BRPop(context.TODO(), timeout, key).Result()
 	if err != nil {
+		return err
+	}
+	if len(values) < 2 {
+		err = errors.New("redis brpop timeout")
 		return
 	}
-
 	callback([]byte(values[1]))
+	return
 }
 
 // GetRedisTTL