package services import ( "eta/eta_api/global" "eta/eta_api/models" "eta/eta_api/services/data" "eta/eta_api/utils" "fmt" "runtime" "sync" "time" "context" ) func DealWebSocketMsg(adminId int) { go DealEdbInspectionMessage(adminId) } // 处理巡检消息 func DealEdbInspectionMessage(adminId int) { utils.FileLog.Info("创建协程, adminId:%d", adminId) // 创建上下文用于控制 goroutine 生命周期 ctx, cancel := context.WithCancel(context.Background()) defer cancel() cacheKey := fmt.Sprintf("%s%d", utils.CACHE_EDB_INSPECTION_MESSAGE, adminId) // 添加错误恢复机制 defer func() { if r := recover(); r != nil { utils.FileLog.Error("WebSocket handler recovered from panic: %v", r) // 清理资源 cancel() } }() go func() { ticker := time.NewTicker(time.Minute) defer ticker.Stop() for { select { case <-ticker.C: utils.FileLog.Info("Current goroutine count: %d", runtime.NumGoroutine()) case <-ctx.Done(): return } } }() for { select { case <-ctx.Done(): utils.FileLog.Info("DealEdbInspectionMessage 巡检消息处理协程结束, adminId:%d", adminId) return default: // 检查连接状态 conn := global.MonitorMessageConn[adminId] if conn == nil { utils.FileLog.Error("检查连接状态 发送消息时发现连接已断开, adminId:%d", adminId) cancel() return } // 使用带超时的 Redis 操作 val := utils.Rc.Get(cacheKey) if val == nil { utils.FileLog.Info("巡检信息历史为空, adminId:%d", adminId) continue } messageList, err := data.GetHistoryInspectionMessages(adminId) if err != nil { utils.FileLog.Error("获取巡检信息历史失败,err:%s, adminId:%d", err.Error(), adminId) continue } if len(messageList) == 0 { utils.FileLog.Info("巡检信息历史为空, adminId:%d", adminId) continue } readList := make([]int64, 0) // 只处理第一条消息的发送,其他消息只标记为已读 for i, msg := range messageList { if i == 0 { respData, err := data.SendInspectionMessages(adminId, msg) if err != nil { utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId) continue } resp := models.WebsocketMessageResponse{ MessageType: 1, Data: respData, } err, isClose := WriteWebSocketMessageAsync(ctx, adminId, resp) if err != nil { utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId) continue } if isClose { utils.FileLog.Error("检查连接状态 发送消息时发现连接已断开, adminId:%d", adminId) cancel() return } utils.FileLog.Info("巡检信息发送成功,adminId:%d, messageId:%d", adminId, msg.MessageId) } readList = append(readList, msg.MessageId) } if len(readList) > 0 { _, err = data.ReadEdbInspectionMessageList(readList, adminId) if err != nil { utils.FileLog.Error("巡检信息已读失败,err:%s, adminId:%d", err.Error(), adminId) } } utils.FileLog.Info("巡检信息处理完成, adminId:%d", adminId) } } } func WriteWebSocketMessageAsync(ctx context.Context, adminId int, resp interface{}) (error, bool) { errChan := make(chan error, 1) var wsWriteMutex sync.Mutex isClose := false go func() { wsWriteMutex.Lock() defer wsWriteMutex.Unlock() conn := global.MonitorMessageConn[adminId] if conn == nil { isClose = true errChan <- fmt.Errorf("connection closed for adminId: %d", adminId) return } // 设置写超时 //conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) errChan <- conn.WriteJSON(resp) }() select { case err := <-errChan: utils.FileLog.Error("WriteWebSocketMessageAsync errChan: %v", err) return err, isClose case <-ctx.Done(): utils.FileLog.Error("WriteWebSocketMessageAsync ctx.Done(): %v", ctx.Err()) return ctx.Err(), isClose } }