Browse Source

优化消息发送

xyxie 8 giờ trước cách đây
mục cha
commit
302b4abe7c

+ 2 - 1
controllers/edb_monitor/edb_monitor_message.go

@@ -189,7 +189,8 @@ func (m *EdbMonitorMessageController) ConnectV2() {
 			
 	// 创建新的连接对象
 	handlers := make([]func(adminId int)(err error), 0)
-	handlers = append(handlers, edbmonitor.AutoCheckMonitorMessageListByAdminId)
+	//handlers = append(handlers, edbmonitor.AutoCheckMonitorMessageListByAdminId, services.AutoCheckInspectionMessageListByAdminId)
+	//每次建立连接都启动一个协程长时间的监听,容易导致数据库连接被占满,或者超时,所以系统启动时,单独开启一个协程去监听消息
 	connection := global.NewWebSocketConn(conn, connID, sysUser.AdminId, handlers)
 	
 	// 保存连接

+ 2 - 2
global/websocket.go

@@ -152,7 +152,7 @@ func (c *WebSocketConn) writeLoop() {
 			if !ok {
 				return
 			}
-			err := c.Conn.WriteJSON(msg)
+			err := c.Conn.WriteMessage(websocket.TextMessage, msg)
 			if err != nil {
 				utils.FileLog.Error("向客户端 %s 写入失败: %v", c.id, err)
 				c.cancel()
@@ -172,7 +172,7 @@ func (c *WebSocketConn) MonitorMessageHandler(adminId int) {
 		default:
 			time.Sleep(time.Second * 10)
 			for _, handler := range c.monitorHandlers {
-				utils.FileLog.Info("处理注册的消息监控函数")
+				//utils.FileLog.Info("处理注册的消息监控函数")
 				handler(c.adminId)
 			}
 		}

+ 7 - 0
models/data_manage/edb_inspection/edb_inspection_message.go

@@ -176,6 +176,13 @@ func GetUnreadInspectionMessageList(adminIds []int) (items []*EdbInspectionMessa
 	return
 }
 
+func GetUnreadInspectionMessageListByAdminId(adminId int) (items []*EdbInspectionMessage, err error) {
+	o := global.DbMap[utils.DbNameIndex]
+	sql := "SELECT * FROM edb_inspection_message WHERE admin_id =? AND is_read = 0 ORDER BY create_time DESC, message_id DESC"
+	err = o.Raw(sql, adminId).Find(&items).Error
+	return
+}
+
 func SetEdbInspectionMessageRead(messageId []int64) (err error) {
 	o := global.DbMap[utils.DbNameIndex]
 	sql := "UPDATE edb_inspection_message SET is_read = 1, modify_time = ? WHERE message_id IN (?) AND is_read = 0"

+ 13 - 32
services/edb_monitor/edb_monitor_message.go

@@ -167,40 +167,22 @@ func toEdbMonitorMessageResp(items []*edbmonitor.EdbMonitorMessage) (list []*res
 	return
 }
 
-func AutoCheckMonitorMessageList() (err error) {
-	defer func() {
-		if err != nil {
-			utils.FileLog.Error("指标预警信息发送失败,err:%s", err.Error())
-		}
-	}()
-	ticker := time.NewTicker(time.Second * 10)
-	defer ticker.Stop()
-
-	for {
-		select {
-		case <-ticker.C:
-			autoCheckMonitorMessageList()
-		}
+func AutoCheckMonitorMessageList(admins []int) (err error) {
+	if len(admins) == 0 {
+		return nil
 	}
-}
 
-func autoCheckMonitorMessageList() (err error){
+	//utils.FileLog.Info("检查是否有预警信息,活跃用户数: %d", len(admins))
+	
+	// 设置缓存防止重复发送,使用较短的锁定时间
+	cacheKey := fmt.Sprintf("%s", utils.CACHE_EDB_MONITOR_MESSAGE)
+	if !utils.Rc.SetNX(cacheKey, 1, 30*time.Second) {
+		utils.FileLog.Info("其他进程正在处理预警信息,跳过本次检查")
+		return nil
+	}
 	defer func() {
-		if err != nil {
-			utils.FileLog.Error("指标预警信息检查失败,err:%s", err.Error())
-		}
+		_ = utils.Rc.Delete(cacheKey)
 	}()
-	utils.FileLog.Info("检查是否有预警信息")
-	admins := make([]int, 0)
-	for adminId, conn := range global.AdminWebSocketConnMap {
-		if conn == nil {
-			continue
-		}
-		admins = append(admins, adminId)
-	}
-	if len(admins) == 0 {
-		return
-	}
 	messageList, er := edbmonitor.GetEdbMonitorMessageUnreadListByAdminIds(admins)
 	if er != nil {
 		err = fmt.Errorf("获取指标预警信息历史失败,err:%s", er.Error())
@@ -216,9 +198,8 @@ func autoCheckMonitorMessageList() (err error){
 			if err != nil {
 				utils.FileLog.Error("指标预警信息发送失败,err:%s, adminId:%d", err.Error(), msg.AdminId)
 			} 
-		}else {
-			readList = append(readList, msg.EdbMonitorMessageId)
 		}
+		readList = append(readList, msg.EdbMonitorMessageId)
 	}
 
 	err = edbmonitor.SetEdbMonitorMessageReadByIds(readList)

+ 1 - 4
services/task.go

@@ -80,11 +80,8 @@ func Task() {
 	// 权益报告监听入库
 	go AutoInsertRaiReport()
 
-	// 指标预警信息发送
-	//go edbmonitor.AutoCheckMonitorMessageList()
-
 	// 巡检信息发送
-	go AutoCheckInspectionMessageList()
+	go AutoCheckWebsocketMessageList()
 
 	// TODO:数据修复
 	//FixNewEs()

+ 100 - 21
services/websocket_msg.go

@@ -6,6 +6,7 @@ import (
 	"eta/eta_api/models"
 	"eta/eta_api/models/data_manage/edb_inspection"
 	"eta/eta_api/services/data"
+	edb_monitor "eta/eta_api/services/edb_monitor"
 	"eta/eta_api/utils"
 	"fmt"
 	"runtime"
@@ -221,44 +222,123 @@ func DealEdbInspectionMessageTest(adminId int) {
 	}()
 }
 
-func AutoCheckInspectionMessageList() (err error) {
-	defer func() {
-		if err != nil {
-			utils.FileLog.Error("巡检信息发送失败,err:%s", err.Error())
-		}
-	}()
+func AutoCheckWebsocketMessageList() {
 	ticker := time.NewTicker(time.Second * 10)
 	defer ticker.Stop()
 
 	for {
 		select {
 		case <-ticker.C:
-			err = autoCheckInspectionMessageList()
+			// 获取活跃连接
+			admins := make([]int, 0)
+			for adminId, conn := range global.AdminWebSocketConnMap {
+				if conn == nil {
+					continue
+				}
+				admins = append(admins, adminId)
+			}
+			
+			// 如果没有活跃连接,继续等待下一个tick
+			if len(admins) == 0 {
+				//utils.FileLog.Info("当前没有活跃的WebSocket连接")
+				continue
+			}
+
+			// 并发处理不同类型的消息
+			var wg sync.WaitGroup
+			wg.Add(2)
+
+			// 处理巡检消息
+			go func() {
+				defer wg.Done()
+				if err := AutoCheckInspectionMessageList(admins); err != nil {
+					utils.FileLog.Error("处理巡检消息失败: %v", err)
+				}
+			}()
+
+			// 处理监控消息
+			go func() {
+				defer wg.Done()
+				if err := edb_monitor.AutoCheckMonitorMessageList(admins); err != nil {
+					utils.FileLog.Error("处理监控消息失败: %v", err)
+				}
+			}()
+
+			// 等待所有消息处理完成
+			wg.Wait()
 		}
 	}
 }
-func autoCheckInspectionMessageList() (err error) {
+
+func AutoCheckInspectionMessageList(admins []int) (err error) {
+	//utils.FileLog.Info("检查是否有巡检信息")
+	// 设置redis缓存,防止消息重复处理
+	cacheKey := fmt.Sprintf("%s", utils.CACHE_EDB_INSPECTION_MESSAGE)
+	if !utils.Rc.SetNX(cacheKey, 1, 30*time.Second) {
+		err = fmt.Errorf("系统处理中,请稍后重试!")
+		utils.FileLog.Error("巡检信息检查失败,err:%s", err.Error())
+		return
+	}
 	defer func() {
-		if err != nil {
-			utils.FileLog.Error("巡检信息发送失败,err:%s", err.Error())
-		}
+		_ = utils.Rc.Delete(cacheKey)
 	}()
-	utils.FileLog.Info("检查是否有巡检信息")
-	admins := make([]int, 0)
-	for adminId, conn := range global.AdminWebSocketConnMap {
-		if conn == nil {
-			continue
+	messageList, er := edb_inspection.GetUnreadInspectionMessageList(admins)
+	if er != nil {
+		err = fmt.Errorf("获取巡检信息历史失败,err:%s", er.Error())
+		return
+	}
+	readList := make([]int64, 0)
+	for _, msg := range messageList {
+		adminId := int(msg.AdminId)
+		respData, er := data.SendInspectionMessages(adminId, msg)
+		if er != nil {
+			utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", er.Error(), adminId)
+		} else {
+			resp := models.WebsocketMessageResponse{
+				MessageType: 1,
+				Data: respData,
+			}
+			conn := global.AdminWebSocketConnMap[int(msg.AdminId)]
+			if conn == nil {
+				utils.FileLog.Error("巡检信息发送失败,连接已断开, adminId:%d", adminId)
+				return
+			}
+			message, er := json.Marshal(resp)
+			if er != nil {
+				utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", er.Error(), adminId)
+				return
+			}
+			ok := conn.Send(message)
+			if !ok {
+				utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d",  adminId)
+			}
 		}
-		admins = append(admins, adminId)
+		readList = append(readList, msg.MessageId)
 	}
-	if len(admins) == 0 {
+
+	err = edb_inspection.SetEdbInspectionMessageRead(readList)
+	if err != nil {
+		err = fmt.Errorf("巡检信息已读失败,err:%s", err.Error())
 		return
 	}
-	messageList, er := edb_inspection.GetUnreadInspectionMessageList(admins)
+	return
+}
+
+// 弃用
+func AutoCheckInspectionMessageListByAdminId(adminId int) (err error) {
+	defer func() {
+		if err != nil {
+			utils.FileLog.Error("巡检信息发送失败,err:%s", err.Error())
+		}
+	}()
+	messageList, er := edb_inspection.GetUnreadInspectionMessageListByAdminId(adminId)
 	if er != nil {
 		err = fmt.Errorf("获取巡检信息历史失败,err:%s", er.Error())
 		return
 	}
+	if len(messageList) == 0 {
+		return
+	}
 	readList := make([]int64, 0)
 	for _, msg := range messageList {
 		adminId := int(msg.AdminId)
@@ -270,7 +350,7 @@ func autoCheckInspectionMessageList() (err error) {
 				MessageType: 1,
 				Data: respData,
 			}
-			conn := global.AdminWebSocketConnMap[int(msg.AdminId)]
+			conn := global.AdminWebSocketConnMap[adminId]
 			if conn == nil {
 				utils.FileLog.Error("巡检信息发送失败,连接已断开, adminId:%d", adminId)
 				return
@@ -287,7 +367,6 @@ func autoCheckInspectionMessageList() (err error) {
 		}
 		readList = append(readList, msg.MessageId)
 	}
-
 	err = edb_inspection.SetEdbInspectionMessageRead(readList)
 	if err != nil {
 		err = fmt.Errorf("巡检信息已读失败,err:%s", err.Error())