Prechádzať zdrojové kódy

优化长连接获取消息

xyxie 4 týždňov pred
rodič
commit
ca3f5f8876

+ 54 - 3
controllers/edb_monitor/edb_monitor_message.go

@@ -9,6 +9,7 @@ import (
 	"eta/eta_api/services"
 	edbmonitor "eta/eta_api/services/edb_monitor"
 	"eta/eta_api/utils"
+	"fmt"
 	"net/http"
 	"strconv"
 	"time"
@@ -29,11 +30,11 @@ var upgrader = websocket.Upgrader{
 }
 
 // GetMonitorLevel
-// @Title 预警管理消息
+// @Title 预警管理消息 弃用
 // @Description 预警管理消息
 // @Param   request body request.EdbMonitorSaveRequest  true  "每页数据条数"
 // @Success 200 {object} models.EnglishReportEmailPageListResp
-// @router /message/connect [get]
+// @router /message/connectOld [get]
 func (m *EdbMonitorMessageController) Connect() {
 	br := new(models.BaseResponse).Init()
 	defer func() {
@@ -143,6 +144,56 @@ func (m *EdbMonitorMessageController) Connect() {
 	}
 }
 
+
+// GetMonitorLevel
+// @Title 预警管理消息
+// @Description 预警管理消息
+// @Param   request body request.EdbMonitorSaveRequest  true  "每页数据条数"
+// @Success 200 {object} models.EnglishReportEmailPageListResp
+// @router /message/connect [get]
+func (m *EdbMonitorMessageController) ConnectV2() {
+	br := new(models.BaseResponse).Init()
+	defer func() {
+		m.Data["json"] = br
+		m.ServeJSON()
+	}()
+	sysUser := m.SysUser
+	if sysUser == nil {
+		br.Msg = "请登录"
+		br.ErrMsg = "请登录,SysUser Is Empty"
+		br.Ret = 408
+		return
+	}
+	oldConn := global.AdminWebSocketConnMap[sysUser.AdminId]
+	if oldConn != nil {
+		oldConn.Close()
+	}
+    // 建立长连接
+	// 开启心跳检测
+	// 发送消息
+	// 关闭连接
+	var conn *websocket.Conn
+	var err error
+	conn, err = upgrader.Upgrade(m.Ctx.ResponseWriter, m.Ctx.Request, nil)
+	if err != nil {
+		br.Msg = "连接失败"
+		br.ErrMsg = "连接失败,err:" + err.Error()
+		return
+	}
+	defer conn.Close()
+
+	// 为新连接创建唯一ID
+	connID := fmt.Sprintf("%s-%d", conn.RemoteAddr().String(), time.Now().UnixNano())
+			
+	// 创建新的连接对象
+	connection := global.NewWebSocketConn(conn, connID, sysUser.AdminId)
+	
+	// 保存连接
+	global.WebSocketConnMap[connID] = connection
+	
+	// 启动连接的协程
+	connection.Start()
+}
 // Close
 // @Title 预警管理消息
 // @Description 预警管理消息
@@ -163,7 +214,7 @@ func (m *EdbMonitorMessageController) Close() {
 		return
 	}
 
-	conn := global.MonitorMessageConn[sysUser.AdminId]
+	conn := global.AdminWebSocketConnMap[sysUser.AdminId]
 	if conn != nil {
 		conn.Close()
 	}

+ 142 - 1
global/websocket.go

@@ -1,9 +1,11 @@
 package global
 
 import (
+	"context"
 	"errors"
 	"eta/eta_api/utils"
 	"strconv"
+	"sync"
 	"time"
 
 	"github.com/gorilla/websocket"
@@ -11,6 +13,145 @@ import (
 
 var MonitorMessageConn = make(map[int]*websocket.Conn)
 
+var WebSocketConnMap = make(map[string]*WebSocketConn)
+
+var AdminWebSocketConnMap = make(map[int]*WebSocketConn)
+
+var WebSocketConnAdminIdMap = make(map[string]int)
+
+type WebSocketConn struct {
+	Conn *websocket.Conn
+	id   string
+	ctx  context.Context
+	cancel context.CancelFunc
+	wg        sync.WaitGroup
+	sendChan  chan []byte
+	closeChan chan struct{}
+}
+
+// NewConnection 创建一个新的连接对象
+func NewWebSocketConn(conn *websocket.Conn, id string, adminId int) *WebSocketConn {
+	WebSocketConnAdminIdMap[id] = adminId
+	ctx, cancel := context.WithCancel(context.Background())
+	webSocketConn := &WebSocketConn{
+		Conn:      conn,
+		id:        id,
+		ctx:       ctx,
+		cancel:    cancel,
+		sendChan:  make(chan []byte, 100), // 发送消息的缓冲通道
+		closeChan: make(chan struct{}),
+	}
+	AdminWebSocketConnMap[adminId] = webSocketConn
+	return webSocketConn
+}
+
+// Start 启动连接的读写协程
+func (c *WebSocketConn) Start() {
+	utils.FileLog.Info("客户端 %s 已连接", c.id)
+
+	// 启动读协程
+	c.wg.Add(1)
+	go c.readLoop()
+	
+	// 启动写协程
+	c.wg.Add(1)
+	go c.writeLoop()
+}
+
+// Close 关闭连接并清理资源
+func (c *WebSocketConn) Close() {
+	utils.FileLog.Info("关闭与客户端 %s 的连接", c.id)
+	c.cancel() // 通知所有协程退出
+	close(c.closeChan)
+	c.Conn.Close()
+	c.wg.Wait() // 等待所有协程退出
+	close(c.sendChan)
+	// 从adminWebSocketConnMap中删除
+	delete(AdminWebSocketConnMap, WebSocketConnAdminIdMap[c.id])
+	delete(WebSocketConnMap, c.id)
+	utils.FileLog.Info("客户端 %s 的所有协程已清理完毕", c.id)
+}
+
+// Send 发送消息到客户端
+func (c *WebSocketConn) Send(msg []byte) bool {
+	select {
+	case c.sendChan <- msg:
+		return true
+	case <-c.ctx.Done():
+		return false
+	default: // 通道已满,表示客户端处理速度慢
+		utils.FileLog.Info("客户端 %s 处理速度过慢,丢弃消息", c.id)
+		return false
+	}
+}
+
+// readLoop 处理来自客户端的消息
+func (c *WebSocketConn) readLoop() {
+	defer c.wg.Done()
+	defer utils.FileLog.Info("客户端 %s 的读协程已退出", c.id)
+
+	for {
+		select {
+		case <-c.ctx.Done():
+			return
+		default:
+			// 设置读取超时
+			_, msg, err := c.Conn.ReadMessage()
+			if err != nil {
+				utils.FileLog.Error("从客户端 %s 读取失败: %v", c.id, err)
+				c.cancel()
+				return
+			}
+			message := string(msg)
+			// 处理消息
+			utils.FileLog.Info("收到客户端 %s 消息: %s", c.id, message)
+			
+			// 启动一个新的协程处理消息,避免阻塞读取循环
+			go c.handleMessage(message)
+		}
+	}
+}
+
+// handleMessage 处理收到的消息
+func (c *WebSocketConn) handleMessage(message string) {
+	defer utils.FileLog.Info("处理来自客户端 %s 的消息: %s, 结束", c.id, message)
+	// 模拟消息处理
+	utils.FileLog.Info("处理来自客户端 %s 的消息: %s", c.id, message)
+	
+	// 如果是PING消息,回复PONG
+	if message == "ping\n" {
+		//c.Send([]byte("PONG\n"))
+		return
+	}
+	
+	// 回复消息
+	//response := fmt.Sprintf("已收到消息: %s", message)
+	//c.Send([]byte(response))
+}
+
+// writeLoop 发送消息到客户端
+func (c *WebSocketConn) writeLoop() {
+	defer c.wg.Done()
+	defer utils.FileLog.Info("客户端 %s 的写协程已退出", c.id)
+
+	for {
+		select {
+		case <-c.ctx.Done():
+			return
+		case msg, ok := <-c.sendChan:
+			if !ok {
+				return
+			}
+			err := c.Conn.WriteJSON(msg)
+			if err != nil {
+				utils.FileLog.Error("向客户端 %s 写入失败: %v", c.id, err)
+				c.cancel()
+				return
+			}
+		}
+	}
+}
+
 var (
 	EDB_MONITOR_MESSAGE_CONNECT_CACHE = "edb_monitor_message_cache:"
 )
@@ -35,4 +176,4 @@ func EdbMonitorMessageHealth(adminId int) (isClose bool, err error) {
 		}
 	}
 	return
-}
+}

+ 15 - 1
models/data_manage/edb_inspection/edb_inspection_message.go

@@ -167,4 +167,18 @@ func GetEdbInspectionMessagePageByAdminId(adminId, startSize, pageSize int) (ite
 	sql := "SELECT * FROM edb_inspection_message WHERE admin_id =? ORDER BY is_read ASC, create_time DESC LIMIT ?,?"
 	err = o.Raw(sql, adminId, startSize, pageSize).Find(&items).Error
 	return
-} 
+} 
+
+func GetUnreadInspectionMessageList(adminIds []int) (items []*EdbInspectionMessage, err error) {
+	o := global.DbMap[utils.DbNameIndex]
+	sql := "SELECT * FROM edb_inspection_message WHERE admin_id IN (?) AND is_read = 0 ORDER BY create_time DESC, message_id DESC"
+	err = o.Raw(sql, adminIds).Find(&items).Error
+	return
+}
+
+func SetEdbInspectionMessageRead(messageId []int64) (err error) {
+	o := global.DbMap[utils.DbNameIndex]
+	sql := "UPDATE edb_inspection_message SET is_read = 1, modify_time = ? WHERE message_id IN (?) AND is_read = 0"
+	err = o.Exec(sql, time.Now(), messageId).Error
+	return
+}

+ 14 - 0
models/edb_monitor/edb_monitor_message.go

@@ -80,3 +80,17 @@ func GetEdbMonitorMessagePageByAdminId(adminId, startSize, pageSize int) (items
 	err = o.Raw(sql, adminId, startSize, pageSize).Find(&items).Error
 	return
 }
+
+func GetEdbMonitorMessageUnreadListByAdminIds(adminIds []int) (items []*EdbMonitorMessage, err error) {
+	o := global.DbMap[utils.DbNameIndex]
+	sql := "SELECT * FROM edb_monitor_message WHERE is_read = 0 AND admin_id IN (?) ORDER BY create_time DESC, edb_monitor_message_id DESC"
+	err = o.Raw(sql, adminIds).Find(&items).Error
+	return
+}
+
+func SetEdbMonitorMessageReadByIds(msgIds []int) (err error) {
+	o := global.DbMap[utils.DbNameIndex]
+	sql := "UPDATE edb_monitor_message SET is_read = 1 WHERE edb_monitor_message_id IN (?) and is_read=0"
+	err = o.Exec(sql, msgIds).Error
+	return
+}

+ 10 - 1
routers/commentsRouter.go

@@ -7902,13 +7902,22 @@ func init() {
 
     beego.GlobalControllerRouter["eta/eta_api/controllers/edb_monitor:EdbMonitorMessageController"] = append(beego.GlobalControllerRouter["eta/eta_api/controllers/edb_monitor:EdbMonitorMessageController"],
         beego.ControllerComments{
-            Method: "Connect",
+            Method: "ConnectV2",
             Router: `/message/connect`,
             AllowHTTPMethods: []string{"get"},
             MethodParams: param.Make(),
             Filters: nil,
             Params: nil})
 
+    beego.GlobalControllerRouter["eta/eta_api/controllers/edb_monitor:EdbMonitorMessageController"] = append(beego.GlobalControllerRouter["eta/eta_api/controllers/edb_monitor:EdbMonitorMessageController"],
+        beego.ControllerComments{
+            Method: "Connect",
+            Router: `/message/connectOld`,
+            AllowHTTPMethods: []string{"get"},
+            MethodParams: param.Make(),
+            Filters: nil,
+            Params: nil})
+
     beego.GlobalControllerRouter["eta/eta_api/controllers/edb_monitor:EdbMonitorMessageController"] = append(beego.GlobalControllerRouter["eta/eta_api/controllers/edb_monitor:EdbMonitorMessageController"],
         beego.ControllerComments{
             Method: "List",

+ 79 - 3
services/edb_monitor/edb_monitor_message.go

@@ -1,13 +1,16 @@
 package edbmonitor
 
 import (
+	"encoding/json"
 	"errors"
 	"eta/eta_api/global"
 	"eta/eta_api/models"
 	edbmonitor "eta/eta_api/models/edb_monitor"
 	"eta/eta_api/models/edb_monitor/response"
 	"eta/eta_api/utils"
+	"fmt"
 	"time"
+
 	"github.com/rdlucklib/rdluck_tools/paging"
 )
 
@@ -84,9 +87,10 @@ func LogMessage(content, uniqueCode string, triggerTime time.Time, edbInfoId, ed
 }
 
 func SendMessages(adminId, edbInfoId, edbInfoType int, classifyId int, edbUniqueCode, message string, triggerTime string) (err error) {
-	conn := global.MonitorMessageConn[adminId]
+	conn := global.AdminWebSocketConnMap[adminId]
 	if conn == nil {
-		return errors.New("no connection")
+		err = errors.New("no connection")
+		return
 	}
 	resp := models.WebsocketMessageResponse{
 		MessageType: 0,
@@ -99,7 +103,17 @@ func SendMessages(adminId, edbInfoId, edbInfoType int, classifyId int, edbUnique
 			TriggerTime:   triggerTime,
 		},
 	}
-	return conn.WriteJSON(resp)
+	jsonData, err := json.Marshal(resp)
+	if err != nil {
+		err = fmt.Errorf("json marshal failed, err:%s", err.Error())
+		return
+	}
+	ok := conn.Send(jsonData)
+	if !ok {
+		err = fmt.Errorf("send message failed, err:%s", err.Error())
+		return
+	}
+	return
 }
 
 func GetHistoryMessages(adminId int) (items []*response.EdbMonitorMessageResp, err error) {
@@ -152,3 +166,65 @@ func toEdbMonitorMessageResp(items []*edbmonitor.EdbMonitorMessage) (list []*res
 	}
 	return
 }
+
+func AutoCheckMonitorMessageList() (err error) {
+	defer func() {
+		if err != nil {
+			utils.FileLog.Error("指标预警信息发送失败,err:%s", err.Error())
+		}
+	}()
+	ticker := time.NewTicker(time.Second * 10)
+	defer ticker.Stop()
+
+	for {
+		select {
+		case <-ticker.C:
+			autoCheckMonitorMessageList()
+		}
+	}
+}
+
+func autoCheckMonitorMessageList() (err error){
+	defer func() {
+		if err != nil {
+			utils.FileLog.Error("指标预警信息检查失败,err:%s", err.Error())
+		}
+	}()
+	utils.FileLog.Info("检查是否有预警信息")
+	admins := make([]int, 0)
+	for adminId, conn := range global.AdminWebSocketConnMap {
+		if conn == nil {
+			continue
+		}
+		admins = append(admins, adminId)
+	}
+	if len(admins) == 0 {
+		return
+	}
+	messageList, er := edbmonitor.GetEdbMonitorMessageUnreadListByAdminIds(admins)
+	if er != nil {
+		err = fmt.Errorf("获取指标预警信息历史失败,err:%s", er.Error())
+		return
+	}
+	readList := make([]int, 0)
+	adminMsgMap := make(map[int]int)
+	for _, msg := range messageList {
+		if _, ok := adminMsgMap[msg.AdminId]; !ok {
+			adminMsgMap[msg.AdminId] = msg.EdbMonitorMessageId
+			triggerTime := utils.TimeTransferString(utils.FormatDateTime, msg.MonitorTriggerTime)
+			err := SendMessages(msg.AdminId, msg.EdbInfoId, msg.EdbInfoType, msg.EdbClassifyId, msg.EdbUniqueCode, msg.Message, triggerTime)
+			if err != nil {
+				utils.FileLog.Error("指标预警信息发送失败,err:%s, adminId:%d", err.Error(), msg.AdminId)
+			} 
+		}else {
+			readList = append(readList, msg.EdbMonitorMessageId)
+		}
+	}
+
+	err = edbmonitor.SetEdbMonitorMessageReadByIds(readList)
+	if err != nil {
+		err = fmt.Errorf("指标预警信息已读失败,err:%s", err.Error())
+		return
+	}
+	return
+}

+ 6 - 0
services/task.go

@@ -80,6 +80,12 @@ func Task() {
 	// 权益报告监听入库
 	go AutoInsertRaiReport()
 
+	// 指标预警信息发送
+	go edbmonitor.AutoCheckMonitorMessageList()
+
+	// 巡检信息发送
+	go AutoCheckInspectionMessageList()
+
 	// TODO:数据修复
 	//FixNewEs()
 	fmt.Println("task end")

+ 77 - 0
services/websocket_msg.go

@@ -1,8 +1,10 @@
 package services
 
 import (
+	"encoding/json"
 	"eta/eta_api/global"
 	"eta/eta_api/models"
+	"eta/eta_api/models/data_manage/edb_inspection"
 	"eta/eta_api/services/data"
 	"eta/eta_api/utils"
 	"fmt"
@@ -217,4 +219,79 @@ func DealEdbInspectionMessageTest(adminId int) {
 			utils.FileLog.Error("巡检信息已读失败,err:%s, adminId:%d", err.Error(), adminId)
 		}
 	}()
+}
+
+func AutoCheckInspectionMessageList() (err error) {
+	defer func() {
+		if err != nil {
+			utils.FileLog.Error("巡检信息发送失败,err:%s", err.Error())
+		}
+	}()
+	ticker := time.NewTicker(time.Second * 10)
+	defer ticker.Stop()
+
+	for {
+		select {
+		case <-ticker.C:
+			err = autoCheckInspectionMessageList()
+		}
+	}
+}
+func autoCheckInspectionMessageList() (err error) {
+	defer func() {
+		if err != nil {
+			utils.FileLog.Error("巡检信息发送失败,err:%s", err.Error())
+		}
+	}()
+	utils.FileLog.Info("检查是否有巡检信息")
+	admins := make([]int, 0)
+	for adminId, conn := range global.AdminWebSocketConnMap {
+		if conn == nil {
+			continue
+		}
+		admins = append(admins, adminId)
+	}
+	if len(admins) == 0 {
+		return
+	}
+	messageList, er := edb_inspection.GetUnreadInspectionMessageList(admins)
+	if er != nil {
+		err = fmt.Errorf("获取巡检信息历史失败,err:%s", er.Error())
+		return
+	}
+	readList := make([]int64, 0)
+	for _, msg := range messageList {
+		adminId := int(msg.AdminId)
+		respData, er := data.SendInspectionMessages(adminId, msg)
+		if er != nil {
+			utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", er.Error(), adminId)
+		} else {
+			resp := models.WebsocketMessageResponse{
+				MessageType: 1,
+				Data: respData,
+			}
+			conn := global.AdminWebSocketConnMap[int(msg.AdminId)]
+			if conn == nil {
+				utils.FileLog.Error("巡检信息发送失败,连接已断开, adminId:%d", adminId)
+				return
+			}
+			message, er := json.Marshal(resp)
+			if er != nil {
+				utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", er.Error(), adminId)
+				return
+			}
+			ok := conn.Send(message)
+			if !ok {
+				utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d",  adminId)
+			}
+		}
+		readList = append(readList, msg.MessageId)
+	}
+
+	err = edb_inspection.SetEdbInspectionMessageRead(readList)
+	if err != nil {
+		err = fmt.Errorf("巡检信息已读失败,err:%s", err.Error())
+		return
+	}
+	return
 }