xyxie 3 veckor sedan
förälder
incheckning
171e686806

+ 5 - 3
controllers/edb_monitor/edb_monitor_message.go

@@ -34,7 +34,7 @@ var upgrader = websocket.Upgrader{
 // @Description 预警管理消息
 // @Param   request body request.EdbMonitorSaveRequest  true  "每页数据条数"
 // @Success 200 {object} models.EnglishReportEmailPageListResp
-// @router /message/connectOld [get]
+// @router /message/connect [get]
 func (m *EdbMonitorMessageController) Connect() {
 	br := new(models.BaseResponse).Init()
 	defer func() {
@@ -150,7 +150,7 @@ func (m *EdbMonitorMessageController) Connect() {
 // @Description 预警管理消息
 // @Param   request body request.EdbMonitorSaveRequest  true  "每页数据条数"
 // @Success 200 {object} models.EnglishReportEmailPageListResp
-// @router /message/connect [get]
+// @router /message/connectV2 [get]
 func (m *EdbMonitorMessageController) ConnectV2() {
 	br := new(models.BaseResponse).Init()
 	defer func() {
@@ -186,7 +186,9 @@ func (m *EdbMonitorMessageController) ConnectV2() {
 	connID := fmt.Sprintf("%s-%d", conn.RemoteAddr().String(), time.Now().UnixNano())
 			
 	// 创建新的连接对象
-	connection := global.NewWebSocketConn(conn, connID, sysUser.AdminId)
+	handlers := make([]func(adminId int)(err error), 0)
+	handlers = append(handlers, edbmonitor.AutoCheckMonitorMessageListByAdminId)
+	connection := global.NewWebSocketConn(conn, connID, sysUser.AdminId, handlers)
 	
 	// 保存连接
 	global.WebSocketConnMap[connID] = connection

+ 29 - 1
global/websocket.go

@@ -27,10 +27,12 @@ type WebSocketConn struct {
 	wg        sync.WaitGroup
 	sendChan  chan []byte
 	closeChan chan struct{}
+	monitorHandlers []func(adminId int)(err error)
+	adminId int
 }
 
 // NewConnection 创建一个新的连接对象
-func NewWebSocketConn(conn *websocket.Conn, id string, adminId int) *WebSocketConn {
+func NewWebSocketConn(conn *websocket.Conn, id string, adminId int, monitorHandlers []func(adminId int)(err error)) *WebSocketConn {
 	WebSocketConnAdminIdMap[id] = adminId
 	ctx, cancel := context.WithCancel(context.Background())
 	webSocketConn := &WebSocketConn{
@@ -40,8 +42,10 @@ func NewWebSocketConn(conn *websocket.Conn, id string, adminId int) *WebSocketCo
 		cancel:    cancel,
 		sendChan:  make(chan []byte, 100), // 发送消息的缓冲通道
 		closeChan: make(chan struct{}),
+		adminId:   adminId,
 	}
 	AdminWebSocketConnMap[adminId] = webSocketConn
+	webSocketConn.monitorHandlers = monitorHandlers
 	return webSocketConn
 }
 
@@ -56,6 +60,12 @@ func (c *WebSocketConn) Start() {
 	// 启动写协程
 	c.wg.Add(1)
 	go c.writeLoop()
+
+	if len(c.monitorHandlers) > 0 {
+		// 启动消息监听协程
+		c.wg.Add(1)
+		go c.MonitorMessageHandler(c.adminId)
+	}
 }
 
 // Close 关闭连接并清理资源
@@ -152,6 +162,24 @@ func (c *WebSocketConn) writeLoop() {
 	}
 }
 
+func (c *WebSocketConn) MonitorMessageHandler(adminId int) {
+	defer c.wg.Done()
+	defer utils.FileLog.Info("客户端 %s 的消息监控协程已退出", c.id)
+	for {
+		select {
+		case <-c.ctx.Done():
+			return
+		default:
+			time.Sleep(time.Second * 10)
+			for _, handler := range c.monitorHandlers {
+				utils.FileLog.Info("处理注册的消息监控函数")
+				handler(c.adminId)
+			}
+		}
+
+	}
+}
+
 var (
 	EDB_MONITOR_MESSAGE_CONNECT_CACHE = "edb_monitor_message_cache:"
 )

+ 15 - 1
models/edb_monitor/edb_monitor_message.go

@@ -93,4 +93,18 @@ func SetEdbMonitorMessageReadByIds(msgIds []int) (err error) {
 	sql := "UPDATE edb_monitor_message SET is_read = 1 WHERE edb_monitor_message_id IN (?) and is_read=0"
 	err = o.Exec(sql, msgIds).Error
 	return
-}
+}
+
+func GetEdbMonitorMessageUnreadByAdminId(adminId int) (items []*EdbMonitorMessage, err error) {
+	o := global.DbMap[utils.DbNameIndex]
+	sql := "SELECT * FROM edb_monitor_message WHERE admin_id =? AND is_read = 0 ORDER BY create_time DESC, edb_monitor_message_id DESC"
+	err = o.Raw(sql, adminId).Find(&items).Error
+	return
+}
+
+func SetEdbMonitorMessageReadByAdminId(adminId int, msgIds []int) (err error) {
+	o := global.DbMap[utils.DbNameIndex]
+	sql := "UPDATE edb_monitor_message SET is_read = 1 WHERE admin_id =? AND edb_monitor_message_id IN (?) and is_read=0"
+	err = o.Exec(sql, adminId, msgIds).Error
+	return
+}

+ 3 - 3
routers/commentsRouter.go

@@ -7902,7 +7902,7 @@ func init() {
 
     beego.GlobalControllerRouter["eta/eta_api/controllers/edb_monitor:EdbMonitorMessageController"] = append(beego.GlobalControllerRouter["eta/eta_api/controllers/edb_monitor:EdbMonitorMessageController"],
         beego.ControllerComments{
-            Method: "ConnectV2",
+            Method: "Connect",
             Router: `/message/connect`,
             AllowHTTPMethods: []string{"get"},
             MethodParams: param.Make(),
@@ -7911,8 +7911,8 @@ func init() {
 
     beego.GlobalControllerRouter["eta/eta_api/controllers/edb_monitor:EdbMonitorMessageController"] = append(beego.GlobalControllerRouter["eta/eta_api/controllers/edb_monitor:EdbMonitorMessageController"],
         beego.ControllerComments{
-            Method: "Connect",
-            Router: `/message/connectOld`,
+            Method: "ConnectV2",
+            Router: `/message/connectV2`,
             AllowHTTPMethods: []string{"get"},
             MethodParams: param.Make(),
             Filters: nil,

+ 43 - 0
services/edb_monitor/edb_monitor_message.go

@@ -227,4 +227,47 @@ func autoCheckMonitorMessageList() (err error){
 		return
 	}
 	return
+}
+
+func AutoCheckMonitorMessageListByAdminId(adminId int) (err error){
+	// 设置缓存防止重复发送
+	cacheKey := fmt.Sprintf("%s%d", utils.CACHE_EDB_MONITOR_MESSAGE, adminId)
+	if !utils.Rc.SetNX(cacheKey, 1, 10*time.Minute) {
+		err = fmt.Errorf("系统处理中,请稍后重试!")
+		utils.FileLog.Error("指标预警信息检查失败,err:%s", err.Error())
+		return
+	}
+	defer func() {
+		if err != nil {
+			utils.FileLog.Error("指标预警信息检查失败,err:%s", err.Error())
+		}
+		_ = utils.Rc.Delete(cacheKey)
+	}()
+	utils.FileLog.Info("检查是否有预警信息")
+	messageList, er := edbmonitor.GetEdbMonitorMessageUnreadByAdminId(adminId)
+	if er != nil {
+		err = fmt.Errorf("获取指标预警信息历史失败,err:%s", er.Error())
+		return
+	}
+	if len(messageList) == 0 {
+		return
+	}
+	readList := make([]int, 0)
+	for k, msg := range messageList {
+		if k == 0 {
+			triggerTime := utils.TimeTransferString(utils.FormatDateTime, msg.MonitorTriggerTime)
+			err := SendMessages(msg.AdminId, msg.EdbInfoId, msg.EdbInfoType, msg.EdbClassifyId, msg.EdbUniqueCode, msg.Message, triggerTime)
+			if err != nil {
+				utils.FileLog.Error("指标预警信息发送失败,err:%s, adminId:%d", err.Error(), msg.AdminId)
+			} 
+		}
+		readList = append(readList, msg.EdbMonitorMessageId)
+	}
+
+	err = edbmonitor.SetEdbMonitorMessageReadByAdminId(adminId, readList)
+	if err != nil {
+		err = fmt.Errorf("指标预警信息已读失败,err:%s", err.Error())
+		return
+	}
+	return
 }

+ 1 - 1
services/task.go

@@ -81,7 +81,7 @@ func Task() {
 	go AutoInsertRaiReport()
 
 	// 指标预警信息发送
-	go edbmonitor.AutoCheckMonitorMessageList()
+	//go edbmonitor.AutoCheckMonitorMessageList()
 
 	// 巡检信息发送
 	go AutoCheckInspectionMessageList()

+ 1 - 0
utils/constants.go

@@ -272,6 +272,7 @@ const (
 	CACHE_REPORT_SHARE_AUTH                 = "eta:report:auth:share:"                //报告短链与报告图表授权映射key
 	CACHE_REPORT_AUTH                       = "eta:report:auth:"                      //报告图表数据授权
 	CACHE_EDB_INSPECTION_MESSAGE            = "eta:edb:inspection:message:"          //巡检消息队列
+	CACHE_EDB_MONITOR_MESSAGE               = "eta:edb:monitor:message:"              //指标预警消息队列
 )
 
 // 模板消息推送类型