package services import ( "encoding/json" "eta/eta_api/models" "eta/eta_api/services/data" "eta/eta_api/utils" "fmt" "runtime" "sync" "time" "context" "github.com/gorilla/websocket" ) func DealWebSocketMsg(connKey string, conn *websocket.Conn, adminId int) { go DealEdbInspectionMessage(connKey, conn, adminId) } // 处理巡检消息 func DealEdbInspectionMessage(connKey string, conn *websocket.Conn, adminId int) { // 创建上下文用于控制 goroutine 生命周期 ctx, cancel := context.WithCancel(context.Background()) defer cancel() // 监控 goroutine 数量 go func() { ticker := time.NewTicker(time.Minute) defer ticker.Stop() for { select { case <-ticker.C: utils.FileLog.Info("Current goroutine count: %d", runtime.NumGoroutine()) case <-ctx.Done(): return } } }() cacheKey := fmt.Sprintf("%s%d", utils.CACHE_EDB_INSPECTION_MESSAGE, adminId) // // 设置连接关闭处理器 // conn.SetCloseHandler(func(code int, text string) error { // utils.FileLog.Info("长连接关闭, adminId:%d", adminId) // cancel() // return nil // }) // 添加错误恢复机制 defer func() { if r := recover(); r != nil { utils.FileLog.Error("WebSocket handler recovered from panic: %v", r) // 清理资源 cancel() } }() for { ok := utils.Rc.IsExist(connKey) if !ok { utils.FileLog.Info("长连接关闭, adminId:%d", adminId) cancel() return } select { case <-ctx.Done(): return default: // 使用带超时的 Redis 操作 err := utils.Rc.BrpopWithTimeout(cacheKey, 30*time.Second, func(b []byte) { utils.FileLog.Info("收到巡检信息开始处理, adminId:%d", adminId) messageList, err := data.GetHistoryInspectionMessages(adminId) if err != nil { utils.FileLog.Error("获取巡检信息历史失败,err:%s, adminId:%d", err.Error(), adminId) return } if len(messageList) == 0 { utils.FileLog.Info("巡检信息历史为空, adminId:%d", adminId) return } readList := make([]int64, 0) // 消息发送 goroutine for i, msg := range messageList { select { case <-ctx.Done(): return default: if i == 0 { respData, err := data.SendInspectionMessages(adminId, msg) if err != nil { utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId) continue } resp := models.WebsocketMessageResponse{ MessageType: 1, Data: respData, } err = WriteWebSocketMessage(conn, resp) if err != nil { utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId) continue } utils.FileLog.Info("巡检信息发送成功,adminId:%d, messageId:%d", adminId, msg.MessageId) readList = append(readList, msg.MessageId) } else { readList = append(readList, msg.MessageId) } } } if len(readList) > 0 { _, err = data.ReadEdbInspectionMessageList(readList, adminId) if err != nil { utils.FileLog.Error("巡检信息已读失败,err:%s, adminId:%d", err.Error(), adminId) } } }) if err != nil && err.Error() != "redis: nil" { utils.FileLog.Error("Redis operation failed: %v", err) continue }else{ utils.FileLog.Info("巡检信息处理完成, adminId:%d", adminId) } } } } func WriteWebSocketMessage(conn *websocket.Conn, resp models.WebsocketMessageResponse) error { data, err := json.Marshal(resp) if err != nil { utils.FileLog.Error("Failed to marshal response: %v", err) return err } var wsWriteMutex sync.Mutex done := make(chan error, 1) go func() { wsWriteMutex.Lock() defer wsWriteMutex.Unlock() done <- conn.WriteMessage(websocket.TextMessage, data) }() select { case err := <-done: return err case <-time.After(5 * time.Second): return fmt.Errorf("write timeout") } }