Selaa lähdekoodia

Merge branch 'feature/eta2.5.9_api_stat' into debug

xyxie 5 päivää sitten
vanhempi
commit
a3583e7387
2 muutettua tiedostoa jossa 51 lisäystä ja 41 poistoa
  1. 50 41
      services/websocket_msg.go
  2. 1 0
      utils/constants.go

+ 50 - 41
services/websocket_msg.go

@@ -4,6 +4,7 @@ import (
 	"eta/eta_api/models"
 	"eta/eta_api/services/data"
 	"eta/eta_api/utils"
+	"fmt"
 
 	"github.com/gorilla/websocket"
 )
@@ -14,49 +15,57 @@ func DealWebSocketMsg(conn *websocket.Conn, adminId int) {
 
 // 处理巡检消息
 func DealEdbInspectionMessage(conn *websocket.Conn, adminId int) {
-	messageList, err := data.GetHistoryInspectionMessages(adminId)
-	if err != nil {
-		utils.FileLog.Error("获取巡检信息历史失败,err:%s, adminId:%d", err.Error(), adminId)
-	}
-	success := make(chan int64, 10)
-	go func() {
-		defer close(success)
-		for i, msg := range messageList {
-			if i == 0 {
-				// 多条消息仅发送最新一条
-				respData, err := data.SendInspectionMessages(adminId, msg)
-				if err != nil {
-					utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId)
-				} else {
-					resp := models.WebsocketMessageResponse{
-						MessageType: 1,
-						Data: respData,
-					}
-					err = conn.WriteJSON(resp)
-					if err != nil {
-						utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId)
+	cacheKey := fmt.Sprintf("%s%d", utils.CACHE_EDB_INSPECTION_MESSAGE, adminId)
+	for {
+		utils.Rc.Brpop(cacheKey, func(b []byte) {
+			messageList, err := data.GetHistoryInspectionMessages(adminId)
+			if err != nil {
+				utils.FileLog.Error("获取巡检信息历史失败,err:%s, adminId:%d", err.Error(), adminId)
+				return
+			}
+			success := make(chan int64, 10)
+			defer close(success)
+			go func() {
+				defer close(success)
+				for i, msg := range messageList {
+					if i == 0 {
+						// 多条消息仅发送最新一条
+						respData, err := data.SendInspectionMessages(adminId, msg)
+						if err != nil {
+							utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId)
+						} else {
+							resp := models.WebsocketMessageResponse{
+								MessageType: 1,
+								Data: respData,
+							}
+							err = conn.WriteJSON(resp)
+							if err != nil {
+								utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId)
+							} else {
+								utils.FileLog.Info("巡检信息发送成功,adminId:%d, messageId:%d", adminId, msg.MessageId)
+								success <- msg.MessageId
+							}
+						}
 					} else {
-						utils.FileLog.Info("巡检信息发送成功,adminId:%d, messageId:%d", adminId, msg.MessageId)
 						success <- msg.MessageId
 					}
 				}
-			} else {
-				success <- msg.MessageId
-			}
-		}
-	}()
-	go func() {
-		readList := make([]int64, 0)
-		for {
-			msgId, ok := <-success
-			if !ok {
-				break
-			}
-			readList = append(readList, msgId)
-		}
-		_, err = data.ReadEdbInspectionMessageList(readList, adminId)
-		if err != nil {
-			utils.FileLog.Error("巡检信息已读失败,err:%s, adminId:%d", err.Error(), adminId)
-		}
-	}()
+			}()
+			go func() {
+				readList := make([]int64, 0)
+				for {
+					msgId, ok := <-success
+					if !ok {
+						break
+					}
+					readList = append(readList, msgId)
+				}
+				_, err = data.ReadEdbInspectionMessageList(readList, adminId)
+				if err != nil {
+					utils.FileLog.Error("巡检信息已读失败,err:%s, adminId:%d", err.Error(), adminId)
+				}
+			}()
+			})
+	}
+	
 }

+ 1 - 0
utils/constants.go

@@ -275,6 +275,7 @@ const (
 	CACHE_CHART_AUTH                        = "eta:chart:auth:"                       //图表数据授权
 	CACHE_REPORT_SHARE_AUTH                 = "eta:report:auth:share:"                //报告短链与报告图表授权映射key
 	CACHE_REPORT_AUTH                       = "eta:report:auth:"                      //报告图表数据授权
+	CACHE_EDB_INSPECTION_MESSAGE            = "eta:edb:inspection:message:"          //巡检消息队列
 )
 
 // 模板消息推送类型