소스 검색

Merge remote-tracking branch 'origin/debug' into debug

Roc 1 일 전
부모
커밋
14bcd60383

+ 2 - 0
controllers/bi_dashboard.go

@@ -103,6 +103,7 @@ func (this *BIDaShboardController) AddDashboard() {
 			BiDashboardId: int(id),
 			Type:          v.Type,
 			UniqueCode:    v.UniqueCode,
+			Conf:          v.Conf,
 			Sort:          i + 1,
 			CreateTime:    time.Now(),
 			ModifyTime:    time.Now(),
@@ -176,6 +177,7 @@ func (this *BIDaShboardController) EditDashboard() {
 			BiDashboardId: req.BiDashboardId,
 			Type:          v.Type,
 			UniqueCode:    v.UniqueCode,
+			Conf:          v.Conf,
 			Sort:          v.Sort,
 			CreateTime:    time.Now(),
 			ModifyTime:    time.Now(),

+ 4 - 4
controllers/data_stat/edb_terminal.go

@@ -52,10 +52,10 @@ func (this *EdbTerminalController) Save() {
 		br.Msg = "请输入终端地址或者token"
 		return
 	}*/
-	if req.Num <= 0 {
-		br.Msg = "请输入指标数据量"
-		return
-	}
+	// if req.Num <= 0 {
+	// 	br.Msg = "请输入指标数据量"
+	// 	return
+	// }
 	if req.Source == 0 {
 		br.Msg = "请输入终端类型"
 		return

+ 1 - 1
controllers/edb_monitor/edb_monitor_message.go

@@ -128,7 +128,7 @@ func (m *EdbMonitorMessageController) Connect() {
 	}()
 	
 	// 其他消息处理
-	services.DealWebSocketMsg(conn, sysUser.AdminId)
+	services.DealWebSocketMsg(connKey, conn, sysUser.AdminId)
 	for {
 		ok = utils.Rc.IsExist(connKey)
 		if !ok {

+ 1 - 0
models/bi_dashboard/bi_dashboard.go

@@ -108,6 +108,7 @@ type AddDashboardListReq struct {
 	Type       int
 	UniqueCode string
 	Sort       int
+	Conf       string
 }
 
 type EditDashboardReq struct {

+ 1 - 0
models/bi_dashboard/bi_dashboard_detail.go

@@ -10,6 +10,7 @@ type BiDashboardDetail struct {
 	BiDashboardDetailId int       `orm:"column(bi_dashboard_detail_id);pk" gorm:"primaryKey" ` // bi看板id
 	BiDashboardId       int       `gorm:"column:bi_dashboard_id" `                             // 看板id
 	Type                int       `gorm:"column:type" `                                        // 1图表 2表格
+	Conf                string    `gorm:"column:conf" `                                        // 配置信息
 	UniqueCode          string    `gorm:"column:unique_code;size:32;not null" `                // 报告唯一编码
 	Sort                int       `gorm:"column:sort" `                                        // 排序字段
 	CreateTime          time.Time `gorm:"column:create_time" `                                 // 创建时间

+ 136 - 42
services/websocket_msg.go

@@ -1,64 +1,158 @@
 package services
 
 import (
+	"encoding/json"
 	"eta/eta_api/models"
 	"eta/eta_api/services/data"
 	"eta/eta_api/utils"
+	"fmt"
+	"runtime"
+	"sync"
+	"time"
+
+	"context"
 
 	"github.com/gorilla/websocket"
 )
 
-func DealWebSocketMsg(conn *websocket.Conn, adminId int) {
-	DealEdbInspectionMessage(conn, adminId)
+func DealWebSocketMsg(connKey string, conn *websocket.Conn, adminId int) {
+	go DealEdbInspectionMessage(connKey, conn, adminId)
 }
 
 // 处理巡检消息
-func DealEdbInspectionMessage(conn *websocket.Conn, adminId int) {
-	messageList, err := data.GetHistoryInspectionMessages(adminId)
-	if err != nil {
-		utils.FileLog.Error("获取巡检信息历史失败,err:%s, adminId:%d", err.Error(), adminId)
-		return
-	}
-	success := make(chan int64, 10)
+func DealEdbInspectionMessage(connKey string, conn *websocket.Conn, adminId int) {
+	// 创建上下文用于控制 goroutine 生命周期
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	// 监控 goroutine 数量
 	go func() {
-		defer close(success)
-		for i, msg := range messageList {
-			if i == 0 {
-				// 多条消息仅发送最新一条
-				respData, err := data.SendInspectionMessages(adminId, msg)
+		ticker := time.NewTicker(time.Minute)
+		defer ticker.Stop()
+		for {
+			select {
+			case <-ticker.C:
+				utils.FileLog.Info("Current goroutine count: %d", runtime.NumGoroutine())
+			case <-ctx.Done():
+				return
+			}
+		}
+	}()
+
+	cacheKey := fmt.Sprintf("%s%d", utils.CACHE_EDB_INSPECTION_MESSAGE, adminId)
+
+	// // 设置连接关闭处理器
+	// conn.SetCloseHandler(func(code int, text string) error {
+	// 	utils.FileLog.Info("长连接关闭, adminId:%d", adminId)
+	// 	cancel()
+	// 	return nil
+	// })
+
+	// 添加错误恢复机制
+	defer func() {
+		if r := recover(); r != nil {
+			utils.FileLog.Error("WebSocket handler recovered from panic: %v", r)
+			// 清理资源
+			cancel()
+		}
+	}()
+
+	for {
+		ok := utils.Rc.IsExist(connKey)
+		if !ok {
+			utils.FileLog.Info("长连接关闭, adminId:%d", adminId)
+			cancel()
+			return
+		}
+		select {
+		case <-ctx.Done():
+			return
+		default:
+			// 使用带超时的 Redis 操作
+			err := utils.Rc.BrpopWithTimeout(cacheKey, 30*time.Second, func(b []byte) {
+				utils.FileLog.Info("收到巡检信息开始处理, adminId:%d", adminId)
+
+				messageList, err := data.GetHistoryInspectionMessages(adminId)
 				if err != nil {
-					utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId)
-				} else {
-					resp := models.WebsocketMessageResponse{
-						MessageType: 1,
-						Data: respData,
+					utils.FileLog.Error("获取巡检信息历史失败,err:%s, adminId:%d", err.Error(), adminId)
+					return
+				}
+				if len(messageList) == 0 {
+					utils.FileLog.Info("巡检信息历史为空, adminId:%d", adminId)
+					return
+				}
+				readList := make([]int64, 0)
+				// 消息发送 goroutine
+
+				for i, msg := range messageList {
+					select {
+					case <-ctx.Done():
+							return
+					default:
+						if i == 0 {
+							respData, err := data.SendInspectionMessages(adminId, msg)
+							if err != nil {
+								utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId)
+								continue
+							}
+
+							resp := models.WebsocketMessageResponse{
+								MessageType: 1,
+								Data:       respData,
+							}
+
+							err = WriteWebSocketMessage(conn, resp)
+
+							if err != nil {
+								utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId)
+								continue
+							}
+
+							utils.FileLog.Info("巡检信息发送成功,adminId:%d, messageId:%d", adminId, msg.MessageId)
+							readList = append(readList, msg.MessageId)
+						} else {
+							readList = append(readList, msg.MessageId)
+						}
 					}
-					err = conn.WriteJSON(resp)
+				}
+
+				if len(readList) > 0 {
+					_, err = data.ReadEdbInspectionMessageList(readList, adminId)
 					if err != nil {
-						utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId)
-					} else {
-						utils.FileLog.Info("巡检信息发送成功,adminId:%d, messageId:%d", adminId, msg.MessageId)
-						success <- msg.MessageId
+						utils.FileLog.Error("巡检信息已读失败,err:%s, adminId:%d", err.Error(), adminId)
 					}
 				}
-			} else {
-				success <- msg.MessageId
-			}
-		}
-	}()
-	go func() {
-		readList := make([]int64, 0)
-		for {
-			msgId, ok := <-success
-			if !ok {
-				break
+			})
+
+			if err != nil && err.Error() != "redis: nil" {
+				utils.FileLog.Error("Redis operation failed: %v", err)
+				continue
+			}else{
+				utils.FileLog.Info("巡检信息处理完成, adminId:%d", adminId)
 			}
-			readList = append(readList, msgId)
-		}
-		_, err = data.ReadEdbInspectionMessageList(readList, adminId)
-		if err != nil {
-			utils.FileLog.Error("巡检信息已读失败,err:%s, adminId:%d", err.Error(), adminId)
 		}
-	}()		
-	
+	}
 }
+
+func WriteWebSocketMessage(conn *websocket.Conn, resp models.WebsocketMessageResponse) error {
+	data, err := json.Marshal(resp)
+	if err != nil {
+		utils.FileLog.Error("Failed to marshal response: %v", err)
+		return err
+	}
+	var wsWriteMutex sync.Mutex
+	done := make(chan error, 1)
+	
+    go func() {
+           wsWriteMutex.Lock()
+           defer wsWriteMutex.Unlock()
+           done <- conn.WriteMessage(websocket.TextMessage, data)
+    }()
+       
+	select {
+	case err := <-done:
+		return err
+	case <-time.After(5 * time.Second):
+		return fmt.Errorf("write timeout")
+	}
+}

+ 1 - 1
utils/redis.go

@@ -19,7 +19,7 @@ type RedisClient interface {
 	IsExist(key string) bool
 	LPush(key string, val interface{}) error
 	Brpop(key string, callback func([]byte))
-	BrpopWithTimeout(key string, timeout time.Duration, callback func([]byte))
+	BrpopWithTimeout(key string, timeout time.Duration, callback func([]byte)) error
 	GetRedisTTL(key string) time.Duration
 	Incrby(key string, num int) (interface{}, error)
 	Do(commandName string, args ...interface{}) (reply interface{}, err error)

+ 6 - 1
utils/redis/cluster_redis.go

@@ -255,13 +255,18 @@ func (rc *ClusterRedisClient) Brpop(key string, callback func([]byte)) {
 // @param key
 // @param timeout
 // @param callback
-func (rc *ClusterRedisClient) BrpopWithTimeout(key string, timeout time.Duration, callback func([]byte)) {
+func (rc *ClusterRedisClient) BrpopWithTimeout(key string, timeout time.Duration, callback func([]byte)) (err error) {
 	values, err := rc.redisClient.BRPop(context.TODO(), timeout, key).Result()
 	if err != nil {
 		return
 	}
+	if len(values) < 2 {
+		err = errors.New("redis brpop timeout")
+		return
+	}
 
 	callback([]byte(values[1]))
+	return
 }
 
 // GetRedisTTL

+ 6 - 2
utils/redis/standalone_redis.go

@@ -243,13 +243,17 @@ func (rc *StandaloneRedisClient) Brpop(key string, callback func([]byte)) {
 // @param key
 // @param timeout
 // @param callback
-func (rc *StandaloneRedisClient) BrpopWithTimeout(key string, timeout time.Duration, callback func([]byte)) {
+func (rc *StandaloneRedisClient) BrpopWithTimeout(key string, timeout time.Duration, callback func([]byte)) (err error) {
 	values, err := rc.redisClient.BRPop(context.TODO(), timeout, key).Result()
 	if err != nil {
+		return err
+	}
+	if len(values) < 2 {
+		err = errors.New("redis brpop timeout")
 		return
 	}
-
 	callback([]byte(values[1]))
+	return
 }
 
 // GetRedisTTL