瀏覽代碼

Merge branch 'feature/eta2.5.9_api_stat_websocket' of eta_server/eta_api into master

xyxie 5 天之前
父節點
當前提交
1c572ce73d

+ 62 - 3
controllers/edb_monitor/edb_monitor_message.go

@@ -9,6 +9,7 @@ import (
 	"eta/eta_api/services"
 	edbmonitor "eta/eta_api/services/edb_monitor"
 	"eta/eta_api/utils"
+	"fmt"
 	"net/http"
 	"strconv"
 	"time"
@@ -29,11 +30,11 @@ var upgrader = websocket.Upgrader{
 }
 
 // GetMonitorLevel
-// @Title 预警管理消息
+// @Title 预警管理消息 弃用
 // @Description 预警管理消息
 // @Param   request body request.EdbMonitorSaveRequest  true  "每页数据条数"
 // @Success 200 {object} models.EnglishReportEmailPageListResp
-// @router /message/connect [get]
+// @router /message/connectV1 [get]
 func (m *EdbMonitorMessageController) Connect() {
 	br := new(models.BaseResponse).Init()
 	defer func() {
@@ -143,6 +144,64 @@ func (m *EdbMonitorMessageController) Connect() {
 	}
 }
 
+
+// GetMonitorLevel
+// @Title 预警管理消息
+// @Description 预警管理消息
+// @Param   request body request.EdbMonitorSaveRequest  true  "每页数据条数"
+// @Success 200 {object} models.EnglishReportEmailPageListResp
+// @router /message/connect [get]
+func (m *EdbMonitorMessageController) ConnectV2() {
+	// 不要在WebSocket连接中使用defer m.ServeJSON(),因为连接已被劫持
+	sysUser := m.SysUser
+	if sysUser == nil {
+		// 在升级连接前处理错误
+		br := new(models.BaseResponse).Init()
+		br.Msg = "请登录"
+		br.ErrMsg = "请登录,SysUser Is Empty"
+		br.Ret = 408
+		m.Data["json"] = br
+		m.ServeJSON()
+		return
+	}
+	
+	oldConn := global.AdminWebSocketConnMap[sysUser.AdminId]
+	if oldConn != nil {
+		oldConn.Close()
+	}
+	
+    // 建立长连接
+	var conn *websocket.Conn
+	var err error
+	conn, err = upgrader.Upgrade(m.Ctx.ResponseWriter, m.Ctx.Request, nil)
+	if err != nil {
+		// 在升级连接失败时处理错误
+		br := new(models.BaseResponse).Init()
+		br.Msg = "连接失败"
+		br.ErrMsg = "连接失败,err:" + err.Error()
+		m.Data["json"] = br
+		m.ServeJSON()
+		return
+	}
+	
+	// 为新连接创建唯一ID
+	connID := fmt.Sprintf("%s-%d", conn.RemoteAddr().String(), time.Now().UnixNano())
+			
+	// 创建新的连接对象
+	handlers := make([]func(adminId int)(err error), 0)
+	//handlers = append(handlers, edbmonitor.AutoCheckMonitorMessageListByAdminId, services.AutoCheckInspectionMessageListByAdminId)
+	//每次建立连接都启动一个协程长时间的监听,容易导致数据库连接被占满,或者超时,所以系统启动时,单独开启一个协程去监听消息
+	connection := global.NewWebSocketConn(conn, connID, sysUser.AdminId, handlers)
+	
+	// 保存连接
+	global.WebSocketConnMap[connID] = connection
+	
+	// 启动连接的协程
+	connection.Start()
+	
+	// 注意:不要在这里使用defer conn.Close(),因为连接会在WebSocketConn.Close()中关闭
+	// 连接将保持开放状态,直到客户端断开连接或通过其他方法关闭
+}
 // Close
 // @Title 预警管理消息
 // @Description 预警管理消息
@@ -163,7 +222,7 @@ func (m *EdbMonitorMessageController) Close() {
 		return
 	}
 
-	conn := global.MonitorMessageConn[sysUser.AdminId]
+	conn := global.AdminWebSocketConnMap[sysUser.AdminId]
 	if conn != nil {
 		conn.Close()
 	}

+ 170 - 1
global/websocket.go

@@ -1,9 +1,11 @@
 package global
 
 import (
+	"context"
 	"errors"
 	"eta/eta_api/utils"
 	"strconv"
+	"sync"
 	"time"
 
 	"github.com/gorilla/websocket"
@@ -11,6 +13,173 @@ import (
 
 var MonitorMessageConn = make(map[int]*websocket.Conn)
 
+var WebSocketConnMap = make(map[string]*WebSocketConn)
+
+var AdminWebSocketConnMap = make(map[int]*WebSocketConn)
+
+var WebSocketConnAdminIdMap = make(map[string]int)
+
+type WebSocketConn struct {
+	Conn *websocket.Conn
+	id   string
+	ctx  context.Context
+	cancel context.CancelFunc
+	wg        sync.WaitGroup
+	sendChan  chan []byte
+	closeChan chan struct{}
+	monitorHandlers []func(adminId int)(err error)
+	adminId int
+}
+
+// NewConnection 创建一个新的连接对象
+func NewWebSocketConn(conn *websocket.Conn, id string, adminId int, monitorHandlers []func(adminId int)(err error)) *WebSocketConn {
+	WebSocketConnAdminIdMap[id] = adminId
+	ctx, cancel := context.WithCancel(context.Background())
+	webSocketConn := &WebSocketConn{
+		Conn:      conn,
+		id:        id,
+		ctx:       ctx,
+		cancel:    cancel,
+		sendChan:  make(chan []byte, 100), // 发送消息的缓冲通道
+		closeChan: make(chan struct{}),
+		adminId:   adminId,
+	}
+	AdminWebSocketConnMap[adminId] = webSocketConn
+	webSocketConn.monitorHandlers = monitorHandlers
+	return webSocketConn
+}
+
+// Start 启动连接的读写协程
+func (c *WebSocketConn) Start() {
+	utils.FileLog.Info("客户端 %s 已连接", c.id)
+
+	// 启动读协程
+	c.wg.Add(1)
+	go c.readLoop()
+	
+	// 启动写协程
+	c.wg.Add(1)
+	go c.writeLoop()
+
+	if len(c.monitorHandlers) > 0 {
+		// 启动消息监听协程
+		c.wg.Add(1)
+		go c.MonitorMessageHandler(c.adminId)
+	}
+}
+
+// Close 关闭连接并清理资源
+func (c *WebSocketConn) Close() {
+	utils.FileLog.Info("关闭与客户端 %s 的连接", c.id)
+	c.cancel() // 通知所有协程退出
+	close(c.closeChan)
+	c.Conn.Close()
+	c.wg.Wait() // 等待所有协程退出
+	close(c.sendChan)
+	// 从adminWebSocketConnMap中删除
+	delete(AdminWebSocketConnMap, WebSocketConnAdminIdMap[c.id])
+	delete(WebSocketConnMap, c.id)
+	utils.FileLog.Info("客户端 %s 的所有协程已清理完毕", c.id)
+}
+
+// Send 发送消息到客户端
+func (c *WebSocketConn) Send(msg []byte) bool {
+	select {
+	case c.sendChan <- msg:
+		return true
+	case <-c.ctx.Done():
+		return false
+	default: // 通道已满,表示客户端处理速度慢
+		utils.FileLog.Info("客户端 %s 处理速度过慢,丢弃消息", c.id)
+		return false
+	}
+}
+
+// readLoop 处理来自客户端的消息
+func (c *WebSocketConn) readLoop() {
+	defer c.wg.Done()
+	defer utils.FileLog.Info("客户端 %s 的读协程已退出", c.id)
+
+	for {
+		select {
+		case <-c.ctx.Done():
+			return
+		default:
+			// 设置读取超时
+			_, msg, err := c.Conn.ReadMessage()
+			if err != nil {
+				utils.FileLog.Error("从客户端 %s 读取失败: %v", c.id, err)
+				c.cancel()
+				return
+			}
+			message := string(msg)
+			// 处理消息
+			//utils.FileLog.Info("收到客户端 %s 消息: %s", c.id, message)
+			
+			// 启动一个新的协程处理消息,避免阻塞读取循环
+			go c.handleMessage(message)
+		}
+	}
+}
+
+// handleMessage 处理收到的消息
+func (c *WebSocketConn) handleMessage(message string) {
+	defer utils.FileLog.Info("处理来自客户端 %s 的消息: %s, 结束", c.id, message)
+	// 模拟消息处理
+	//utils.FileLog.Info("处理来自客户端 %s 的消息: %s", c.id, message)
+	
+	// 如果是PING消息,回复PONG
+	if message == "ping\n" {
+		//c.Send([]byte("PONG\n"))
+		return
+	}
+	
+	// 回复消息
+	//response := fmt.Sprintf("已收到消息: %s", message)
+	//c.Send([]byte(response))
+}
+
+// writeLoop 发送消息到客户端
+func (c *WebSocketConn) writeLoop() {
+	defer c.wg.Done()
+	defer utils.FileLog.Info("客户端 %s 的写协程已退出", c.id)
+
+	for {
+		select {
+		case <-c.ctx.Done():
+			return
+		case msg, ok := <-c.sendChan:
+			if !ok {
+				return
+			}
+			err := c.Conn.WriteMessage(websocket.TextMessage, msg)
+			if err != nil {
+				utils.FileLog.Error("向客户端 %s 写入失败: %v", c.id, err)
+				c.cancel()
+				return
+			}
+		}
+	}
+}
+
+func (c *WebSocketConn) MonitorMessageHandler(adminId int) {
+	defer c.wg.Done()
+	defer utils.FileLog.Info("客户端 %s 的消息监控协程已退出", c.id)
+	for {
+		select {
+		case <-c.ctx.Done():
+			return
+		default:
+			time.Sleep(time.Second * 10)
+			for _, handler := range c.monitorHandlers {
+				//utils.FileLog.Info("处理注册的消息监控函数")
+				handler(c.adminId)
+			}
+		}
+
+	}
+}
+
 var (
 	EDB_MONITOR_MESSAGE_CONNECT_CACHE = "edb_monitor_message_cache:"
 )
@@ -35,4 +204,4 @@ func EdbMonitorMessageHealth(adminId int) (isClose bool, err error) {
 		}
 	}
 	return
-}
+}

+ 22 - 1
models/data_manage/edb_inspection/edb_inspection_message.go

@@ -167,4 +167,25 @@ func GetEdbInspectionMessagePageByAdminId(adminId, startSize, pageSize int) (ite
 	sql := "SELECT * FROM edb_inspection_message WHERE admin_id =? ORDER BY is_read ASC, create_time DESC LIMIT ?,?"
 	err = o.Raw(sql, adminId, startSize, pageSize).Find(&items).Error
 	return
-} 
+} 
+
+func GetUnreadInspectionMessageList(adminIds []int) (items []*EdbInspectionMessage, err error) {
+	o := global.DbMap[utils.DbNameIndex]
+	sql := "SELECT * FROM edb_inspection_message WHERE admin_id IN (?) AND is_read = 0 ORDER BY create_time DESC, message_id DESC"
+	err = o.Raw(sql, adminIds).Find(&items).Error
+	return
+}
+
+func GetUnreadInspectionMessageListByAdminId(adminId int) (items []*EdbInspectionMessage, err error) {
+	o := global.DbMap[utils.DbNameIndex]
+	sql := "SELECT * FROM edb_inspection_message WHERE admin_id =? AND is_read = 0 ORDER BY create_time DESC, message_id DESC"
+	err = o.Raw(sql, adminId).Find(&items).Error
+	return
+}
+
+func SetEdbInspectionMessageRead(messageId []int64) (err error) {
+	o := global.DbMap[utils.DbNameIndex]
+	sql := "UPDATE edb_inspection_message SET is_read = 1, modify_time = ? WHERE message_id IN (?) AND is_read = 0"
+	err = o.Exec(sql, time.Now(), messageId).Error
+	return
+}

+ 28 - 0
models/edb_monitor/edb_monitor_message.go

@@ -80,3 +80,31 @@ func GetEdbMonitorMessagePageByAdminId(adminId, startSize, pageSize int) (items
 	err = o.Raw(sql, adminId, startSize, pageSize).Find(&items).Error
 	return
 }
+
+func GetEdbMonitorMessageUnreadListByAdminIds(adminIds []int) (items []*EdbMonitorMessage, err error) {
+	o := global.DbMap[utils.DbNameIndex]
+	sql := "SELECT * FROM edb_monitor_message WHERE is_read = 0 AND admin_id IN (?) ORDER BY create_time DESC, edb_monitor_message_id DESC"
+	err = o.Raw(sql, adminIds).Find(&items).Error
+	return
+}
+
+func SetEdbMonitorMessageReadByIds(msgIds []int) (err error) {
+	o := global.DbMap[utils.DbNameIndex]
+	sql := "UPDATE edb_monitor_message SET is_read = 1 WHERE edb_monitor_message_id IN (?) and is_read=0"
+	err = o.Exec(sql, msgIds).Error
+	return
+}
+
+func GetEdbMonitorMessageUnreadByAdminId(adminId int) (items []*EdbMonitorMessage, err error) {
+	o := global.DbMap[utils.DbNameIndex]
+	sql := "SELECT * FROM edb_monitor_message WHERE admin_id =? AND is_read = 0 ORDER BY create_time DESC, edb_monitor_message_id DESC"
+	err = o.Raw(sql, adminId).Find(&items).Error
+	return
+}
+
+func SetEdbMonitorMessageReadByAdminId(adminId int, msgIds []int) (err error) {
+	o := global.DbMap[utils.DbNameIndex]
+	sql := "UPDATE edb_monitor_message SET is_read = 1 WHERE admin_id =? AND edb_monitor_message_id IN (?) and is_read=0"
+	err = o.Exec(sql, adminId, msgIds).Error
+	return
+}

+ 10 - 1
routers/commentsRouter.go

@@ -7911,13 +7911,22 @@ func init() {
 
     beego.GlobalControllerRouter["eta/eta_api/controllers/edb_monitor:EdbMonitorMessageController"] = append(beego.GlobalControllerRouter["eta/eta_api/controllers/edb_monitor:EdbMonitorMessageController"],
         beego.ControllerComments{
-            Method: "Connect",
+            Method: "ConnectV2",
             Router: `/message/connect`,
             AllowHTTPMethods: []string{"get"},
             MethodParams: param.Make(),
             Filters: nil,
             Params: nil})
 
+    beego.GlobalControllerRouter["eta/eta_api/controllers/edb_monitor:EdbMonitorMessageController"] = append(beego.GlobalControllerRouter["eta/eta_api/controllers/edb_monitor:EdbMonitorMessageController"],
+        beego.ControllerComments{
+            Method: "Connect",
+            Router: `/message/connectV1`,
+            AllowHTTPMethods: []string{"get"},
+            MethodParams: param.Make(),
+            Filters: nil,
+            Params: nil})
+
     beego.GlobalControllerRouter["eta/eta_api/controllers/edb_monitor:EdbMonitorMessageController"] = append(beego.GlobalControllerRouter["eta/eta_api/controllers/edb_monitor:EdbMonitorMessageController"],
         beego.ControllerComments{
             Method: "List",

+ 104 - 3
services/edb_monitor/edb_monitor_message.go

@@ -1,13 +1,16 @@
 package edbmonitor
 
 import (
+	"encoding/json"
 	"errors"
 	"eta/eta_api/global"
 	"eta/eta_api/models"
 	edbmonitor "eta/eta_api/models/edb_monitor"
 	"eta/eta_api/models/edb_monitor/response"
 	"eta/eta_api/utils"
+	"fmt"
 	"time"
+
 	"github.com/rdlucklib/rdluck_tools/paging"
 )
 
@@ -84,9 +87,10 @@ func LogMessage(content, uniqueCode string, triggerTime time.Time, edbInfoId, ed
 }
 
 func SendMessages(adminId, edbInfoId, edbInfoType int, classifyId int, edbUniqueCode, message string, triggerTime string) (err error) {
-	conn := global.MonitorMessageConn[adminId]
+	conn := global.AdminWebSocketConnMap[adminId]
 	if conn == nil {
-		return errors.New("no connection")
+		err = errors.New("no connection")
+		return
 	}
 	resp := models.WebsocketMessageResponse{
 		MessageType: 0,
@@ -99,7 +103,17 @@ func SendMessages(adminId, edbInfoId, edbInfoType int, classifyId int, edbUnique
 			TriggerTime:   triggerTime,
 		},
 	}
-	return conn.WriteJSON(resp)
+	jsonData, err := json.Marshal(resp)
+	if err != nil {
+		err = fmt.Errorf("json marshal failed, err:%s", err.Error())
+		return
+	}
+	ok := conn.Send(jsonData)
+	if !ok {
+		err = fmt.Errorf("send message failed, err:%s", err.Error())
+		return
+	}
+	return
 }
 
 func GetHistoryMessages(adminId int) (items []*response.EdbMonitorMessageResp, err error) {
@@ -152,3 +166,90 @@ func toEdbMonitorMessageResp(items []*edbmonitor.EdbMonitorMessage) (list []*res
 	}
 	return
 }
+
+func AutoCheckMonitorMessageList(admins []int) (err error) {
+	if len(admins) == 0 {
+		return nil
+	}
+
+	//utils.FileLog.Info("检查是否有预警信息,活跃用户数: %d", len(admins))
+	
+	// 设置缓存防止重复发送,使用较短的锁定时间
+	cacheKey := fmt.Sprintf("%s", utils.CACHE_EDB_MONITOR_MESSAGE)
+	if !utils.Rc.SetNX(cacheKey, 1, 30*time.Second) {
+		utils.FileLog.Info("其他进程正在处理预警信息,跳过本次检查")
+		return nil
+	}
+	defer func() {
+		_ = utils.Rc.Delete(cacheKey)
+	}()
+	messageList, er := edbmonitor.GetEdbMonitorMessageUnreadListByAdminIds(admins)
+	if er != nil {
+		err = fmt.Errorf("获取指标预警信息历史失败,err:%s", er.Error())
+		return
+	}
+	readList := make([]int, 0)
+	adminMsgMap := make(map[int]int)
+	for _, msg := range messageList {
+		if _, ok := adminMsgMap[msg.AdminId]; !ok {
+			adminMsgMap[msg.AdminId] = msg.EdbMonitorMessageId
+			triggerTime := utils.TimeTransferString(utils.FormatDateTime, msg.MonitorTriggerTime)
+			err := SendMessages(msg.AdminId, msg.EdbInfoId, msg.EdbInfoType, msg.EdbClassifyId, msg.EdbUniqueCode, msg.Message, triggerTime)
+			if err != nil {
+				utils.FileLog.Error("指标预警信息发送失败,err:%s, adminId:%d", err.Error(), msg.AdminId)
+			} 
+		}
+		readList = append(readList, msg.EdbMonitorMessageId)
+	}
+
+	err = edbmonitor.SetEdbMonitorMessageReadByIds(readList)
+	if err != nil {
+		err = fmt.Errorf("指标预警信息已读失败,err:%s", err.Error())
+		return
+	}
+	return
+}
+
+func AutoCheckMonitorMessageListByAdminId(adminId int) (err error){
+	// 设置缓存防止重复发送
+	cacheKey := fmt.Sprintf("%s%d", utils.CACHE_EDB_MONITOR_MESSAGE, adminId)
+	if !utils.Rc.SetNX(cacheKey, 1, 10*time.Minute) {
+		err = fmt.Errorf("系统处理中,请稍后重试!")
+		utils.FileLog.Error("指标预警信息检查失败,err:%s", err.Error())
+		return
+	}
+	defer func() {
+		if err != nil {
+			utils.FileLog.Error("指标预警信息检查失败,err:%s", err.Error())
+		}
+		_ = utils.Rc.Delete(cacheKey)
+	}()
+	utils.FileLog.Info("检查是否有预警信息")
+	messageList, er := edbmonitor.GetEdbMonitorMessageUnreadByAdminId(adminId)
+	if er != nil {
+		err = fmt.Errorf("获取指标预警信息历史失败,err:%s", er.Error())
+		return
+	}
+	if len(messageList) == 0 {
+		return
+	}
+	readList := make([]int, 0)
+	for k, msg := range messageList {
+		if k == 0 {
+			triggerTime := utils.TimeTransferString(utils.FormatDateTime, msg.MonitorTriggerTime)
+			err = SendMessages(msg.AdminId, msg.EdbInfoId, msg.EdbInfoType, msg.EdbClassifyId, msg.EdbUniqueCode, msg.Message, triggerTime)
+			if err != nil {
+				utils.FileLog.Error("指标预警信息发送失败,err:%s, adminId:%d", err.Error(), msg.AdminId)
+				return
+			} 
+		}
+		readList = append(readList, msg.EdbMonitorMessageId)
+	}
+
+	err = edbmonitor.SetEdbMonitorMessageReadByAdminId(adminId, readList)
+	if err != nil {
+		err = fmt.Errorf("指标预警信息已读失败,err:%s", err.Error())
+		return
+	}
+	return
+}

+ 3 - 0
services/task.go

@@ -86,6 +86,9 @@ func Task() {
 	// 权益报告监听入库
 	go AutoInsertRaiReport()
 
+	// 巡检信息发送
+	go AutoCheckWebsocketMessageList()
+
 	// TODO:数据修复
 	//FixNewEs()
 	fmt.Println("task end")

+ 161 - 1
services/websocket_msg.go

@@ -1,9 +1,12 @@
 package services
 
 import (
+	"encoding/json"
 	"eta/eta_api/global"
 	"eta/eta_api/models"
+	"eta/eta_api/models/data_manage/edb_inspection"
 	"eta/eta_api/services/data"
+	edb_monitor "eta/eta_api/services/edb_monitor"
 	"eta/eta_api/utils"
 	"fmt"
 	"runtime"
@@ -205,7 +208,164 @@ func DealEdbInspectionMessageTest(adminId int) {
 			_, err = data.ReadEdbInspectionMessageList(readList, adminId)
 			if err != nil {
 				utils.FileLog.Error("巡检信息已读失败,err:%s, adminId:%d", err.Error(), adminId)
+			return
+		}
+		_, err = data.ReadEdbInspectionMessageList(readList, adminId)
+		if err != nil {
+			utils.FileLog.Error("巡检信息已读失败,err:%s, adminId:%d", err.Error(), adminId)
+		}
+	}()
+}
+
+func AutoCheckWebsocketMessageList() {
+	ticker := time.NewTicker(time.Second * 10)
+	defer ticker.Stop()
+
+	for {
+		select {
+		case <-ticker.C:
+			// 获取活跃连接
+			admins := make([]int, 0)
+			for adminId, conn := range global.AdminWebSocketConnMap {
+				if conn == nil {
+					continue
+				}
+				admins = append(admins, adminId)
+			}
+			
+			// 如果没有活跃连接,继续等待下一个tick
+			if len(admins) == 0 {
+				//utils.FileLog.Info("当前没有活跃的WebSocket连接")
+				continue
+			}
+
+			// 并发处理不同类型的消息
+			var wg sync.WaitGroup
+			wg.Add(2)
+
+			// 处理巡检消息
+			go func() {
+				defer wg.Done()
+				if err := AutoCheckInspectionMessageList(admins); err != nil {
+					utils.FileLog.Error("处理巡检消息失败: %v", err)
+				}
+			}()
+
+			// 处理监控消息
+			go func() {
+				defer wg.Done()
+				if err := edb_monitor.AutoCheckMonitorMessageList(admins); err != nil {
+					utils.FileLog.Error("处理监控消息失败: %v", err)
+				}
+			}()
+
+			// 等待所有消息处理完成
+			wg.Wait()
+		}
+	}
+}
+
+func AutoCheckInspectionMessageList(admins []int) (err error) {
+	//utils.FileLog.Info("检查是否有巡检信息")
+	// 设置redis缓存,防止消息重复处理
+	cacheKey := fmt.Sprintf("%s", utils.CACHE_EDB_INSPECTION_MESSAGE)
+	if !utils.Rc.SetNX(cacheKey, 1, 30*time.Second) {
+		err = fmt.Errorf("系统处理中,请稍后重试!")
+		utils.FileLog.Error("巡检信息检查失败,err:%s", err.Error())
+		return
+	}
+	defer func() {
+		_ = utils.Rc.Delete(cacheKey)
+	}()
+	messageList, er := edb_inspection.GetUnreadInspectionMessageList(admins)
+	if er != nil {
+		err = fmt.Errorf("获取巡检信息历史失败,err:%s", er.Error())
+		return
+	}
+	readList := make([]int64, 0)
+	for _, msg := range messageList {
+		adminId := int(msg.AdminId)
+		respData, er := data.SendInspectionMessages(adminId, msg)
+		if er != nil {
+			utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", er.Error(), adminId)
+		} else {
+			resp := models.WebsocketMessageResponse{
+				MessageType: 1,
+				Data: respData,
+			}
+			conn := global.AdminWebSocketConnMap[int(msg.AdminId)]
+			if conn == nil {
+				utils.FileLog.Error("巡检信息发送失败,连接已断开, adminId:%d", adminId)
 				return
 			}
-		}()
+			message, er := json.Marshal(resp)
+			if er != nil {
+				utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", er.Error(), adminId)
+				return
+			}
+			ok := conn.Send(message)
+			if !ok {
+				utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d",  adminId)
+			}
+		}
+		readList = append(readList, msg.MessageId)
+	}
+
+	err = edb_inspection.SetEdbInspectionMessageRead(readList)
+	if err != nil {
+		err = fmt.Errorf("巡检信息已读失败,err:%s", err.Error())
+		return
+	}
+	return
+}
+
+// 弃用
+func AutoCheckInspectionMessageListByAdminId(adminId int) (err error) {
+	defer func() {
+		if err != nil {
+			utils.FileLog.Error("巡检信息发送失败,err:%s", err.Error())
+		}
+	}()
+	messageList, er := edb_inspection.GetUnreadInspectionMessageListByAdminId(adminId)
+	if er != nil {
+		err = fmt.Errorf("获取巡检信息历史失败,err:%s", er.Error())
+		return
+	}
+	if len(messageList) == 0 {
+		return
+	}
+	readList := make([]int64, 0)
+	for _, msg := range messageList {
+		adminId := int(msg.AdminId)
+		respData, er := data.SendInspectionMessages(adminId, msg)
+		if er != nil {
+			utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", er.Error(), adminId)
+		} else {
+			resp := models.WebsocketMessageResponse{
+				MessageType: 1,
+				Data: respData,
+			}
+			conn := global.AdminWebSocketConnMap[adminId]
+			if conn == nil {
+				utils.FileLog.Error("巡检信息发送失败,连接已断开, adminId:%d", adminId)
+				return
+			}
+			message, er := json.Marshal(resp)
+			if er != nil {
+				utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", er.Error(), adminId)
+				return
+			}
+			ok := conn.Send(message)
+			if !ok {
+				utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d",  adminId)
+			}
+		}
+		readList = append(readList, msg.MessageId)
+	}
+	err = edb_inspection.SetEdbInspectionMessageRead(readList)
+	if err != nil {
+		err = fmt.Errorf("巡检信息已读失败,err:%s", err.Error())
+		return
+	}
+	return
 }

+ 1 - 0
utils/constants.go

@@ -278,6 +278,7 @@ const (
 	CACHE_REPORT_SHARE_AUTH                 = "eta:report:auth:share:"                //报告短链与报告图表授权映射key
 	CACHE_REPORT_AUTH                       = "eta:report:auth:"                      //报告图表数据授权
 	CACHE_EDB_INSPECTION_MESSAGE            = "eta:edb:inspection:message:"          //巡检消息队列
+	CACHE_EDB_MONITOR_MESSAGE               = "eta:edb:monitor:message:"              //指标预警消息队列
 )
 
 // 模板消息推送类型