123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 |
- package ws
- import (
- "errors"
- "eta/eta_api/utils"
- "fmt"
- "github.com/gorilla/websocket"
- "sync"
- "time"
- )
- // Session 会话结构
- type Session struct {
- Id string
- UserId int
- Conn *websocket.Conn
- LastActive time.Time
- Latency *LatencyMeasurer
- History []string
- CloseChan chan struct{}
- MessageChan chan *Message
- mu sync.RWMutex
- }
- type Message struct {
- MessageType string
- message []byte
- }
- // readPump 处理读操作
- func (s *Session) readPump() {
- defer manager.RemoveSession(s.Id)
- s.Conn.SetReadLimit(maxMessageSize)
- _ = s.Conn.SetReadDeadline(time.Now().Add(60 * time.Second))
- for {
- _, message, err := s.Conn.ReadMessage()
- if err != nil {
- handleCloseError(err)
- return
- }
- fmt.Printf("用户读取数据:%s", string(message))
- // 更新活跃时间
- s.mu.Lock()
- s.LastActive = time.Now()
- s.mu.Unlock()
- // 处理消息
- if err = manager.HandleMessage(s.UserId, s.Id, message); err != nil {
- //写应答
- s.Conn.WriteMessage(websocket.TextMessage, []byte(err.Error()))
- }
- }
- }
- // writePump 处理写操作
- func (s *Session) writePump() {
- fmt.Printf("用户写入数据")
- ticker := time.NewTicker(basePingInterval)
- defer ticker.Stop()
- for {
- select {
- case message, ok := <-s.MessageChan:
- if !ok {
- return
- }
- _ = s.Conn.WriteMessage(websocket.TextMessage, message.message)
- case <-ticker.C:
- _ = s.Latency.SendPing(s.Conn)
- ticker.Reset(s.Latency.lastLatency)
- case <-s.CloseChan:
- return
- }
- }
- }
- func handleCloseError(err error) {
- if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
- var wsErr *websocket.CloseError
- if !errors.As(err, &wsErr) {
- utils.FileLog.Error("未知错误 %s", err.Error())
- } else {
- switch wsErr.Code {
- case websocket.CloseNormalClosure:
- utils.FileLog.Info("正常关闭连接")
- default:
- utils.FileLog.Error("关闭代码:%d:%s", wsErr.Code, wsErr.Text)
- }
- }
- }
- }
- // 强制关闭连接
- func (s *Session) forceClose() {
- close(s.CloseChan)
- // 发送关闭帧
- s.Conn.WriteControl(websocket.CloseMessage,
- websocket.FormatCloseMessage(websocket.ClosePolicyViolation, "heartbeat failed"),
- time.Now().Add(5*time.Second))
- s.Conn.Close()
- utils.FileLog.Info("连接已强制关闭",
- "user", s.UserId,
- "session", s.Id)
- }
- func NewSession(userId int, sessionId string, conn *websocket.Conn) (session *Session) {
- session = &Session{
- UserId: userId,
- Id: sessionId,
- Conn: conn,
- History: []string{},
- CloseChan: make(chan struct{}),
- MessageChan: make(chan *Message),
- }
- session.Latency = SetupLatencyMeasurement(conn)
- go session.readPump()
- go session.writePump()
- return
- }
|