package ws import ( "errors" "eta/eta_api/utils" "fmt" "github.com/gorilla/websocket" "sync" "time" ) // Session 会话结构 type Session struct { Id string UserId int Conn *websocket.Conn LastActive time.Time Latency *LatencyMeasurer History []string CloseChan chan struct{} MessageChan chan *Message mu sync.RWMutex } type Message struct { MessageType string message []byte } // readPump 处理读操作 func (s *Session) readPump() { defer manager.RemoveSession(s.Id) s.Conn.SetReadLimit(maxMessageSize) _ = s.Conn.SetReadDeadline(time.Now().Add(60 * time.Second)) for { _, message, err := s.Conn.ReadMessage() if err != nil { handleCloseError(err) return } fmt.Printf("用户读取数据:%s", string(message)) // 更新活跃时间 s.mu.Lock() s.LastActive = time.Now() s.mu.Unlock() // 处理消息 if err = manager.HandleMessage(s.UserId, s.Id, message); err != nil { //写应答 s.Conn.WriteMessage(websocket.TextMessage, []byte(err.Error())) } } } // writePump 处理写操作 func (s *Session) writePump() { fmt.Printf("用户写入数据") ticker := time.NewTicker(basePingInterval) defer ticker.Stop() for { select { case message, ok := <-s.MessageChan: if !ok { return } _ = s.Conn.WriteMessage(websocket.TextMessage, message.message) case <-ticker.C: _ = s.Latency.SendPing(s.Conn) ticker.Reset(s.Latency.lastLatency) case <-s.CloseChan: return } } } func handleCloseError(err error) { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) { var wsErr *websocket.CloseError if !errors.As(err, &wsErr) { utils.FileLog.Error("未知错误 %s", err.Error()) } else { switch wsErr.Code { case websocket.CloseNormalClosure: utils.FileLog.Info("正常关闭连接") default: utils.FileLog.Error("关闭代码:%d:%s", wsErr.Code, wsErr.Text) } } } } // 强制关闭连接 func (s *Session) forceClose() { close(s.CloseChan) // 发送关闭帧 s.Conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.ClosePolicyViolation, "heartbeat failed"), time.Now().Add(5*time.Second)) s.Conn.Close() utils.FileLog.Info("连接已强制关闭", "user", s.UserId, "session", s.Id) } func NewSession(userId int, sessionId string, conn *websocket.Conn) (session *Session) { session = &Session{ UserId: userId, Id: sessionId, Conn: conn, History: []string{}, CloseChan: make(chan struct{}), MessageChan: make(chan *Message), } session.Latency = SetupLatencyMeasurement(conn) go session.readPump() go session.writePump() return }