Преглед изворни кода

Merge branch 'feature/eta2.5.9_api_stat_websocket' into debug

# Conflicts:
#	services/task.go
#	services/websocket_msg.go
#	utils/constants.go
xyxie пре 1 дан
родитељ
комит
5330cd13f1

+ 61 - 3
controllers/edb_monitor/edb_monitor_message.go

@@ -9,6 +9,7 @@ import (
 	"eta/eta_api/services"
 	edbmonitor "eta/eta_api/services/edb_monitor"
 	"eta/eta_api/utils"
+	"fmt"
 	"net/http"
 	"strconv"
 	"time"
@@ -29,11 +30,11 @@ var upgrader = websocket.Upgrader{
 }
 
 // GetMonitorLevel
-// @Title 预警管理消息
+// @Title 预警管理消息 弃用
 // @Description 预警管理消息
 // @Param   request body request.EdbMonitorSaveRequest  true  "每页数据条数"
 // @Success 200 {object} models.EnglishReportEmailPageListResp
-// @router /message/connect [get]
+// @router /message/connectV1 [get]
 func (m *EdbMonitorMessageController) Connect() {
 	br := new(models.BaseResponse).Init()
 	defer func() {
@@ -143,6 +144,63 @@ func (m *EdbMonitorMessageController) Connect() {
 	}
 }
 
+
+// GetMonitorLevel
+// @Title 预警管理消息
+// @Description 预警管理消息
+// @Param   request body request.EdbMonitorSaveRequest  true  "每页数据条数"
+// @Success 200 {object} models.EnglishReportEmailPageListResp
+// @router /message/connect [get]
+func (m *EdbMonitorMessageController) ConnectV2() {
+	// 不要在WebSocket连接中使用defer m.ServeJSON(),因为连接已被劫持
+	sysUser := m.SysUser
+	if sysUser == nil {
+		// 在升级连接前处理错误
+		br := new(models.BaseResponse).Init()
+		br.Msg = "请登录"
+		br.ErrMsg = "请登录,SysUser Is Empty"
+		br.Ret = 408
+		m.Data["json"] = br
+		m.ServeJSON()
+		return
+	}
+	
+	oldConn := global.AdminWebSocketConnMap[sysUser.AdminId]
+	if oldConn != nil {
+		oldConn.Close()
+	}
+	
+    // 建立长连接
+	var conn *websocket.Conn
+	var err error
+	conn, err = upgrader.Upgrade(m.Ctx.ResponseWriter, m.Ctx.Request, nil)
+	if err != nil {
+		// 在升级连接失败时处理错误
+		br := new(models.BaseResponse).Init()
+		br.Msg = "连接失败"
+		br.ErrMsg = "连接失败,err:" + err.Error()
+		m.Data["json"] = br
+		m.ServeJSON()
+		return
+	}
+	
+	// 为新连接创建唯一ID
+	connID := fmt.Sprintf("%s-%d", conn.RemoteAddr().String(), time.Now().UnixNano())
+			
+	// 创建新的连接对象
+	handlers := make([]func(adminId int)(err error), 0)
+	handlers = append(handlers, edbmonitor.AutoCheckMonitorMessageListByAdminId)
+	connection := global.NewWebSocketConn(conn, connID, sysUser.AdminId, handlers)
+	
+	// 保存连接
+	global.WebSocketConnMap[connID] = connection
+	
+	// 启动连接的协程
+	connection.Start()
+	
+	// 注意:不要在这里使用defer conn.Close(),因为连接会在WebSocketConn.Close()中关闭
+	// 连接将保持开放状态,直到客户端断开连接或通过其他方法关闭
+}
 // Close
 // @Title 预警管理消息
 // @Description 预警管理消息
@@ -163,7 +221,7 @@ func (m *EdbMonitorMessageController) Close() {
 		return
 	}
 
-	conn := global.MonitorMessageConn[sysUser.AdminId]
+	conn := global.AdminWebSocketConnMap[sysUser.AdminId]
 	if conn != nil {
 		conn.Close()
 	}

+ 170 - 1
global/websocket.go

@@ -1,9 +1,11 @@
 package global
 
 import (
+	"context"
 	"errors"
 	"eta/eta_api/utils"
 	"strconv"
+	"sync"
 	"time"
 
 	"github.com/gorilla/websocket"
@@ -11,6 +13,173 @@ import (
 
 var MonitorMessageConn = make(map[int]*websocket.Conn)
 
+var WebSocketConnMap = make(map[string]*WebSocketConn)
+
+var AdminWebSocketConnMap = make(map[int]*WebSocketConn)
+
+var WebSocketConnAdminIdMap = make(map[string]int)
+
+type WebSocketConn struct {
+	Conn *websocket.Conn
+	id   string
+	ctx  context.Context
+	cancel context.CancelFunc
+	wg        sync.WaitGroup
+	sendChan  chan []byte
+	closeChan chan struct{}
+	monitorHandlers []func(adminId int)(err error)
+	adminId int
+}
+
+// NewConnection 创建一个新的连接对象
+func NewWebSocketConn(conn *websocket.Conn, id string, adminId int, monitorHandlers []func(adminId int)(err error)) *WebSocketConn {
+	WebSocketConnAdminIdMap[id] = adminId
+	ctx, cancel := context.WithCancel(context.Background())
+	webSocketConn := &WebSocketConn{
+		Conn:      conn,
+		id:        id,
+		ctx:       ctx,
+		cancel:    cancel,
+		sendChan:  make(chan []byte, 100), // 发送消息的缓冲通道
+		closeChan: make(chan struct{}),
+		adminId:   adminId,
+	}
+	AdminWebSocketConnMap[adminId] = webSocketConn
+	webSocketConn.monitorHandlers = monitorHandlers
+	return webSocketConn
+}
+
+// Start 启动连接的读写协程
+func (c *WebSocketConn) Start() {
+	utils.FileLog.Info("客户端 %s 已连接", c.id)
+
+	// 启动读协程
+	c.wg.Add(1)
+	go c.readLoop()
+	
+	// 启动写协程
+	c.wg.Add(1)
+	go c.writeLoop()
+
+	if len(c.monitorHandlers) > 0 {
+		// 启动消息监听协程
+		c.wg.Add(1)
+		go c.MonitorMessageHandler(c.adminId)
+	}
+}
+
+// Close 关闭连接并清理资源
+func (c *WebSocketConn) Close() {
+	utils.FileLog.Info("关闭与客户端 %s 的连接", c.id)
+	c.cancel() // 通知所有协程退出
+	close(c.closeChan)
+	c.Conn.Close()
+	c.wg.Wait() // 等待所有协程退出
+	close(c.sendChan)
+	// 从adminWebSocketConnMap中删除
+	delete(AdminWebSocketConnMap, WebSocketConnAdminIdMap[c.id])
+	delete(WebSocketConnMap, c.id)
+	utils.FileLog.Info("客户端 %s 的所有协程已清理完毕", c.id)
+}
+
+// Send 发送消息到客户端
+func (c *WebSocketConn) Send(msg []byte) bool {
+	select {
+	case c.sendChan <- msg:
+		return true
+	case <-c.ctx.Done():
+		return false
+	default: // 通道已满,表示客户端处理速度慢
+		utils.FileLog.Info("客户端 %s 处理速度过慢,丢弃消息", c.id)
+		return false
+	}
+}
+
+// readLoop 处理来自客户端的消息
+func (c *WebSocketConn) readLoop() {
+	defer c.wg.Done()
+	defer utils.FileLog.Info("客户端 %s 的读协程已退出", c.id)
+
+	for {
+		select {
+		case <-c.ctx.Done():
+			return
+		default:
+			// 设置读取超时
+			_, msg, err := c.Conn.ReadMessage()
+			if err != nil {
+				utils.FileLog.Error("从客户端 %s 读取失败: %v", c.id, err)
+				c.cancel()
+				return
+			}
+			message := string(msg)
+			// 处理消息
+			utils.FileLog.Info("收到客户端 %s 消息: %s", c.id, message)
+			
+			// 启动一个新的协程处理消息,避免阻塞读取循环
+			go c.handleMessage(message)
+		}
+	}
+}
+
+// handleMessage 处理收到的消息
+func (c *WebSocketConn) handleMessage(message string) {
+	defer utils.FileLog.Info("处理来自客户端 %s 的消息: %s, 结束", c.id, message)
+	// 模拟消息处理
+	utils.FileLog.Info("处理来自客户端 %s 的消息: %s", c.id, message)
+	
+	// 如果是PING消息,回复PONG
+	if message == "ping\n" {
+		//c.Send([]byte("PONG\n"))
+		return
+	}
+	
+	// 回复消息
+	//response := fmt.Sprintf("已收到消息: %s", message)
+	//c.Send([]byte(response))
+}
+
+// writeLoop 发送消息到客户端
+func (c *WebSocketConn) writeLoop() {
+	defer c.wg.Done()
+	defer utils.FileLog.Info("客户端 %s 的写协程已退出", c.id)
+
+	for {
+		select {
+		case <-c.ctx.Done():
+			return
+		case msg, ok := <-c.sendChan:
+			if !ok {
+				return
+			}
+			err := c.Conn.WriteJSON(msg)
+			if err != nil {
+				utils.FileLog.Error("向客户端 %s 写入失败: %v", c.id, err)
+				c.cancel()
+				return
+			}
+		}
+	}
+}
+
+func (c *WebSocketConn) MonitorMessageHandler(adminId int) {
+	defer c.wg.Done()
+	defer utils.FileLog.Info("客户端 %s 的消息监控协程已退出", c.id)
+	for {
+		select {
+		case <-c.ctx.Done():
+			return
+		default:
+			time.Sleep(time.Second * 10)
+			for _, handler := range c.monitorHandlers {
+				utils.FileLog.Info("处理注册的消息监控函数")
+				handler(c.adminId)
+			}
+		}
+
+	}
+}
+
 var (
 	EDB_MONITOR_MESSAGE_CONNECT_CACHE = "edb_monitor_message_cache:"
 )
@@ -35,4 +204,4 @@ func EdbMonitorMessageHealth(adminId int) (isClose bool, err error) {
 		}
 	}
 	return
-}
+}

+ 15 - 1
models/data_manage/edb_inspection/edb_inspection_message.go

@@ -167,4 +167,18 @@ func GetEdbInspectionMessagePageByAdminId(adminId, startSize, pageSize int) (ite
 	sql := "SELECT * FROM edb_inspection_message WHERE admin_id =? ORDER BY is_read ASC, create_time DESC LIMIT ?,?"
 	err = o.Raw(sql, adminId, startSize, pageSize).Find(&items).Error
 	return
-} 
+} 
+
+func GetUnreadInspectionMessageList(adminIds []int) (items []*EdbInspectionMessage, err error) {
+	o := global.DbMap[utils.DbNameIndex]
+	sql := "SELECT * FROM edb_inspection_message WHERE admin_id IN (?) AND is_read = 0 ORDER BY create_time DESC, message_id DESC"
+	err = o.Raw(sql, adminIds).Find(&items).Error
+	return
+}
+
+func SetEdbInspectionMessageRead(messageId []int64) (err error) {
+	o := global.DbMap[utils.DbNameIndex]
+	sql := "UPDATE edb_inspection_message SET is_read = 1, modify_time = ? WHERE message_id IN (?) AND is_read = 0"
+	err = o.Exec(sql, time.Now(), messageId).Error
+	return
+}

+ 28 - 0
models/edb_monitor/edb_monitor_message.go

@@ -80,3 +80,31 @@ func GetEdbMonitorMessagePageByAdminId(adminId, startSize, pageSize int) (items
 	err = o.Raw(sql, adminId, startSize, pageSize).Find(&items).Error
 	return
 }
+
+func GetEdbMonitorMessageUnreadListByAdminIds(adminIds []int) (items []*EdbMonitorMessage, err error) {
+	o := global.DbMap[utils.DbNameIndex]
+	sql := "SELECT * FROM edb_monitor_message WHERE is_read = 0 AND admin_id IN (?) ORDER BY create_time DESC, edb_monitor_message_id DESC"
+	err = o.Raw(sql, adminIds).Find(&items).Error
+	return
+}
+
+func SetEdbMonitorMessageReadByIds(msgIds []int) (err error) {
+	o := global.DbMap[utils.DbNameIndex]
+	sql := "UPDATE edb_monitor_message SET is_read = 1 WHERE edb_monitor_message_id IN (?) and is_read=0"
+	err = o.Exec(sql, msgIds).Error
+	return
+}
+
+func GetEdbMonitorMessageUnreadByAdminId(adminId int) (items []*EdbMonitorMessage, err error) {
+	o := global.DbMap[utils.DbNameIndex]
+	sql := "SELECT * FROM edb_monitor_message WHERE admin_id =? AND is_read = 0 ORDER BY create_time DESC, edb_monitor_message_id DESC"
+	err = o.Raw(sql, adminId).Find(&items).Error
+	return
+}
+
+func SetEdbMonitorMessageReadByAdminId(adminId int, msgIds []int) (err error) {
+	o := global.DbMap[utils.DbNameIndex]
+	sql := "UPDATE edb_monitor_message SET is_read = 1 WHERE admin_id =? AND edb_monitor_message_id IN (?) and is_read=0"
+	err = o.Exec(sql, adminId, msgIds).Error
+	return
+}

+ 10 - 1
routers/commentsRouter.go

@@ -8145,13 +8145,22 @@ func init() {
 
     beego.GlobalControllerRouter["eta/eta_api/controllers/edb_monitor:EdbMonitorMessageController"] = append(beego.GlobalControllerRouter["eta/eta_api/controllers/edb_monitor:EdbMonitorMessageController"],
         beego.ControllerComments{
-            Method: "Connect",
+            Method: "ConnectV2",
             Router: `/message/connect`,
             AllowHTTPMethods: []string{"get"},
             MethodParams: param.Make(),
             Filters: nil,
             Params: nil})
 
+    beego.GlobalControllerRouter["eta/eta_api/controllers/edb_monitor:EdbMonitorMessageController"] = append(beego.GlobalControllerRouter["eta/eta_api/controllers/edb_monitor:EdbMonitorMessageController"],
+        beego.ControllerComments{
+            Method: "Connect",
+            Router: `/message/connectV1`,
+            AllowHTTPMethods: []string{"get"},
+            MethodParams: param.Make(),
+            Filters: nil,
+            Params: nil})
+
     beego.GlobalControllerRouter["eta/eta_api/controllers/edb_monitor:EdbMonitorMessageController"] = append(beego.GlobalControllerRouter["eta/eta_api/controllers/edb_monitor:EdbMonitorMessageController"],
         beego.ControllerComments{
             Method: "List",

+ 123 - 3
services/edb_monitor/edb_monitor_message.go

@@ -1,13 +1,16 @@
 package edbmonitor
 
 import (
+	"encoding/json"
 	"errors"
 	"eta/eta_api/global"
 	"eta/eta_api/models"
 	edbmonitor "eta/eta_api/models/edb_monitor"
 	"eta/eta_api/models/edb_monitor/response"
 	"eta/eta_api/utils"
+	"fmt"
 	"time"
+
 	"github.com/rdlucklib/rdluck_tools/paging"
 )
 
@@ -84,9 +87,10 @@ func LogMessage(content, uniqueCode string, triggerTime time.Time, edbInfoId, ed
 }
 
 func SendMessages(adminId, edbInfoId, edbInfoType int, classifyId int, edbUniqueCode, message string, triggerTime string) (err error) {
-	conn := global.MonitorMessageConn[adminId]
+	conn := global.AdminWebSocketConnMap[adminId]
 	if conn == nil {
-		return errors.New("no connection")
+		err = errors.New("no connection")
+		return
 	}
 	resp := models.WebsocketMessageResponse{
 		MessageType: 0,
@@ -99,7 +103,17 @@ func SendMessages(adminId, edbInfoId, edbInfoType int, classifyId int, edbUnique
 			TriggerTime:   triggerTime,
 		},
 	}
-	return conn.WriteJSON(resp)
+	jsonData, err := json.Marshal(resp)
+	if err != nil {
+		err = fmt.Errorf("json marshal failed, err:%s", err.Error())
+		return
+	}
+	ok := conn.Send(jsonData)
+	if !ok {
+		err = fmt.Errorf("send message failed, err:%s", err.Error())
+		return
+	}
+	return
 }
 
 func GetHistoryMessages(adminId int) (items []*response.EdbMonitorMessageResp, err error) {
@@ -152,3 +166,109 @@ func toEdbMonitorMessageResp(items []*edbmonitor.EdbMonitorMessage) (list []*res
 	}
 	return
 }
+
+func AutoCheckMonitorMessageList() (err error) {
+	defer func() {
+		if err != nil {
+			utils.FileLog.Error("指标预警信息发送失败,err:%s", err.Error())
+		}
+	}()
+	ticker := time.NewTicker(time.Second * 10)
+	defer ticker.Stop()
+
+	for {
+		select {
+		case <-ticker.C:
+			autoCheckMonitorMessageList()
+		}
+	}
+}
+
+func autoCheckMonitorMessageList() (err error){
+	defer func() {
+		if err != nil {
+			utils.FileLog.Error("指标预警信息检查失败,err:%s", err.Error())
+		}
+	}()
+	utils.FileLog.Info("检查是否有预警信息")
+	admins := make([]int, 0)
+	for adminId, conn := range global.AdminWebSocketConnMap {
+		if conn == nil {
+			continue
+		}
+		admins = append(admins, adminId)
+	}
+	if len(admins) == 0 {
+		return
+	}
+	messageList, er := edbmonitor.GetEdbMonitorMessageUnreadListByAdminIds(admins)
+	if er != nil {
+		err = fmt.Errorf("获取指标预警信息历史失败,err:%s", er.Error())
+		return
+	}
+	readList := make([]int, 0)
+	adminMsgMap := make(map[int]int)
+	for _, msg := range messageList {
+		if _, ok := adminMsgMap[msg.AdminId]; !ok {
+			adminMsgMap[msg.AdminId] = msg.EdbMonitorMessageId
+			triggerTime := utils.TimeTransferString(utils.FormatDateTime, msg.MonitorTriggerTime)
+			err := SendMessages(msg.AdminId, msg.EdbInfoId, msg.EdbInfoType, msg.EdbClassifyId, msg.EdbUniqueCode, msg.Message, triggerTime)
+			if err != nil {
+				utils.FileLog.Error("指标预警信息发送失败,err:%s, adminId:%d", err.Error(), msg.AdminId)
+			} 
+		}else {
+			readList = append(readList, msg.EdbMonitorMessageId)
+		}
+	}
+
+	err = edbmonitor.SetEdbMonitorMessageReadByIds(readList)
+	if err != nil {
+		err = fmt.Errorf("指标预警信息已读失败,err:%s", err.Error())
+		return
+	}
+	return
+}
+
+func AutoCheckMonitorMessageListByAdminId(adminId int) (err error){
+	// 设置缓存防止重复发送
+	cacheKey := fmt.Sprintf("%s%d", utils.CACHE_EDB_MONITOR_MESSAGE, adminId)
+	if !utils.Rc.SetNX(cacheKey, 1, 10*time.Minute) {
+		err = fmt.Errorf("系统处理中,请稍后重试!")
+		utils.FileLog.Error("指标预警信息检查失败,err:%s", err.Error())
+		return
+	}
+	defer func() {
+		if err != nil {
+			utils.FileLog.Error("指标预警信息检查失败,err:%s", err.Error())
+		}
+		_ = utils.Rc.Delete(cacheKey)
+	}()
+	utils.FileLog.Info("检查是否有预警信息")
+	messageList, er := edbmonitor.GetEdbMonitorMessageUnreadByAdminId(adminId)
+	if er != nil {
+		err = fmt.Errorf("获取指标预警信息历史失败,err:%s", er.Error())
+		return
+	}
+	if len(messageList) == 0 {
+		return
+	}
+	readList := make([]int, 0)
+	for k, msg := range messageList {
+		if k == 0 {
+			triggerTime := utils.TimeTransferString(utils.FormatDateTime, msg.MonitorTriggerTime)
+			err = SendMessages(msg.AdminId, msg.EdbInfoId, msg.EdbInfoType, msg.EdbClassifyId, msg.EdbUniqueCode, msg.Message, triggerTime)
+			if err != nil {
+				utils.FileLog.Error("指标预警信息发送失败,err:%s, adminId:%d", err.Error(), msg.AdminId)
+				return
+			} 
+		}
+		readList = append(readList, msg.EdbMonitorMessageId)
+	}
+
+	err = edbmonitor.SetEdbMonitorMessageReadByAdminId(adminId, readList)
+	if err != nil {
+		err = fmt.Errorf("指标预警信息已读失败,err:%s", err.Error())
+		return
+	}
+	return
+}

+ 5 - 0
services/task.go

@@ -88,6 +88,11 @@ func Task() {
 	go AutoInsertRaiReport()
 
 	go llm.DealHistoryArticleDafTags()
+	// 指标预警信息发送
+	//go edbmonitor.AutoCheckMonitorMessageList()
+
+	// 巡检信息发送
+	go AutoCheckInspectionMessageList()
 
 	// TODO:数据修复
 	//FixNewEs()

+ 77 - 0
services/websocket_msg.go

@@ -1,8 +1,10 @@
 package services
 
 import (
+	"encoding/json"
 	"eta/eta_api/global"
 	"eta/eta_api/models"
+	"eta/eta_api/models/data_manage/edb_inspection"
 	"eta/eta_api/services/data"
 	"eta/eta_api/utils"
 	"fmt"
@@ -208,4 +210,79 @@ func DealEdbInspectionMessageTest(adminId int) {
 				return
 			}
 		}()
+}
+
+func AutoCheckInspectionMessageList() (err error) {
+	defer func() {
+		if err != nil {
+			utils.FileLog.Error("巡检信息发送失败,err:%s", err.Error())
+		}
+	}()
+	ticker := time.NewTicker(time.Second * 10)
+	defer ticker.Stop()
+
+	for {
+		select {
+		case <-ticker.C:
+			err = autoCheckInspectionMessageList()
+		}
+	}
+}
+func autoCheckInspectionMessageList() (err error) {
+	defer func() {
+		if err != nil {
+			utils.FileLog.Error("巡检信息发送失败,err:%s", err.Error())
+		}
+	}()
+	utils.FileLog.Info("检查是否有巡检信息")
+	admins := make([]int, 0)
+	for adminId, conn := range global.AdminWebSocketConnMap {
+		if conn == nil {
+			continue
+		}
+		admins = append(admins, adminId)
+	}
+	if len(admins) == 0 {
+		return
+	}
+	messageList, er := edb_inspection.GetUnreadInspectionMessageList(admins)
+	if er != nil {
+		err = fmt.Errorf("获取巡检信息历史失败,err:%s", er.Error())
+		return
+	}
+	readList := make([]int64, 0)
+	for _, msg := range messageList {
+		adminId := int(msg.AdminId)
+		respData, er := data.SendInspectionMessages(adminId, msg)
+		if er != nil {
+			utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", er.Error(), adminId)
+		} else {
+			resp := models.WebsocketMessageResponse{
+				MessageType: 1,
+				Data: respData,
+			}
+			conn := global.AdminWebSocketConnMap[int(msg.AdminId)]
+			if conn == nil {
+				utils.FileLog.Error("巡检信息发送失败,连接已断开, adminId:%d", adminId)
+				return
+			}
+			message, er := json.Marshal(resp)
+			if er != nil {
+				utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", er.Error(), adminId)
+				return
+			}
+			ok := conn.Send(message)
+			if !ok {
+				utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d",  adminId)
+			}
+		}
+		readList = append(readList, msg.MessageId)
+	}
+
+	err = edb_inspection.SetEdbInspectionMessageRead(readList)
+	if err != nil {
+		err = fmt.Errorf("巡检信息已读失败,err:%s", err.Error())
+		return
+	}
+	return
 }

+ 1 - 0
utils/constants.go

@@ -281,6 +281,7 @@ const (
 	CACHE_REPORT_AUTH                       = "eta:report:auth:"                      //报告图表数据授权
 	CACHE_EDB_INSPECTION_MESSAGE            = "eta:edb:inspection:message:"           //巡检消息队列
 	CACHE_INDEX_TASK                        = "eta:index:task:op:"                    // 指标库的任务调度缓存
+	CACHE_EDB_MONITOR_MESSAGE               = "eta:edb:monitor:message:"              //指标预警消息队列
 )
 
 // 模板消息推送类型