package services import ( "context" "eta/eta_api/models" "eta/eta_api/services/data" "eta/eta_api/utils" "fmt" "sync" "time" "github.com/gorilla/websocket" ) func DealWebSocketMsg(conn *websocket.Conn, adminId int) { DealEdbInspectionMessage(conn, adminId) } // 处理巡检消息 func DealEdbInspectionMessage(conn *websocket.Conn, adminId int) { // 创建上下文用于控制 goroutine 生命周期 ctx, cancel := context.WithCancel(context.Background()) defer cancel() // 创建互斥锁保护 WebSocket 写操作 var wsWriteMutex sync.Mutex // 创建连接关闭标志 done := make(chan struct{}) defer close(done) cacheKey := fmt.Sprintf("%s%d", utils.CACHE_EDB_INSPECTION_MESSAGE, adminId) // 监听连接关闭 go func() { <-done cancel() }() // 设置连接关闭处理器 conn.SetCloseHandler(func(code int, text string) error { close(done) return nil }) for { select { case <-ctx.Done(): return default: // 使用带超时的 Redis 操作 utils.Rc.BrpopWithTimeout(cacheKey, 30*time.Second, func(b []byte) { messageList, err := data.GetHistoryInspectionMessages(adminId) if err != nil { utils.FileLog.Error("获取巡检信息历史失败,err:%s, adminId:%d", err.Error(), adminId) return } success := make(chan int64, 10) var wg sync.WaitGroup // 消息发送 goroutine wg.Add(1) go func() { defer wg.Done() defer close(success) for i, msg := range messageList { select { case <-ctx.Done(): return default: if i == 0 { respData, err := data.SendInspectionMessages(adminId, msg) if err != nil { utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId) continue } resp := models.WebsocketMessageResponse{ MessageType: 1, Data: respData, } // 使用互斥锁保护 WebSocket 写操作 wsWriteMutex.Lock() err = conn.WriteJSON(resp) wsWriteMutex.Unlock() if err != nil { utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId) continue } utils.FileLog.Info("巡检信息发送成功,adminId:%d, messageId:%d", adminId, msg.MessageId) success <- msg.MessageId } else { success <- msg.MessageId } } } }() // 消息已读处理 goroutine wg.Add(1) go func() { defer wg.Done() readList := make([]int64, 0) for { select { case <-ctx.Done(): return case msgId, ok := <-success: if !ok { // 处理已收集的消息 if len(readList) > 0 { _, err = data.ReadEdbInspectionMessageList(readList, adminId) if err != nil { utils.FileLog.Error("巡检信息已读失败,err:%s, adminId:%d", err.Error(), adminId) } } return } readList = append(readList, msgId) } } }() // 等待所有 goroutine 完成 wg.Wait() }) } } }