session.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. package ws
  2. import (
  3. "errors"
  4. "eta/eta_api/utils"
  5. "fmt"
  6. "github.com/gorilla/websocket"
  7. "sync"
  8. "time"
  9. )
  10. // Session 会话结构
  11. type Session struct {
  12. Id string
  13. UserId int
  14. Conn *websocket.Conn
  15. LastActive time.Time
  16. Latency *LatencyMeasurer
  17. History []string
  18. CloseChan chan struct{}
  19. MessageChan chan *Message
  20. mu sync.RWMutex
  21. }
  22. type Message struct {
  23. MessageType string
  24. message []byte
  25. }
  26. // readPump 处理读操作
  27. func (s *Session) readPump() {
  28. defer manager.RemoveSession(s.Id)
  29. s.Conn.SetReadLimit(maxMessageSize)
  30. _ = s.Conn.SetReadDeadline(time.Now().Add(60 * time.Second))
  31. for {
  32. _, message, err := s.Conn.ReadMessage()
  33. if err != nil {
  34. handleCloseError(err)
  35. return
  36. }
  37. fmt.Printf("用户读取数据:%s", string(message))
  38. // 更新活跃时间
  39. s.mu.Lock()
  40. s.LastActive = time.Now()
  41. s.mu.Unlock()
  42. // 处理消息
  43. if err = manager.HandleMessage(s.UserId, s.Id, message); err != nil {
  44. //写应答
  45. s.Conn.WriteMessage(websocket.TextMessage, []byte(err.Error()))
  46. }
  47. }
  48. }
  49. // writePump 处理写操作
  50. func (s *Session) writePump() {
  51. fmt.Printf("用户写入数据")
  52. ticker := time.NewTicker(basePingInterval)
  53. defer ticker.Stop()
  54. for {
  55. select {
  56. case message, ok := <-s.MessageChan:
  57. if !ok {
  58. return
  59. }
  60. _ = s.Conn.WriteMessage(websocket.TextMessage, message.message)
  61. case <-ticker.C:
  62. _ = s.Latency.SendPing(s.Conn)
  63. ticker.Reset(s.Latency.lastLatency)
  64. case <-s.CloseChan:
  65. return
  66. }
  67. }
  68. }
  69. func handleCloseError(err error) {
  70. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
  71. var wsErr *websocket.CloseError
  72. if !errors.As(err, &wsErr) {
  73. utils.FileLog.Error("未知错误 %s", err.Error())
  74. } else {
  75. switch wsErr.Code {
  76. case websocket.CloseNormalClosure:
  77. utils.FileLog.Info("正常关闭连接")
  78. default:
  79. utils.FileLog.Error("关闭代码:%d:%s", wsErr.Code, wsErr.Text)
  80. }
  81. }
  82. }
  83. }
  84. // 强制关闭连接
  85. func (s *Session) forceClose() {
  86. close(s.CloseChan)
  87. // 发送关闭帧
  88. s.Conn.WriteControl(websocket.CloseMessage,
  89. websocket.FormatCloseMessage(websocket.ClosePolicyViolation, "heartbeat failed"),
  90. time.Now().Add(5*time.Second))
  91. s.Conn.Close()
  92. utils.FileLog.Info("连接已强制关闭",
  93. "user", s.UserId,
  94. "session", s.Id)
  95. }
  96. func NewSession(userId int, sessionId string, conn *websocket.Conn) (session *Session) {
  97. session = &Session{
  98. UserId: userId,
  99. Id: sessionId,
  100. Conn: conn,
  101. History: []string{},
  102. CloseChan: make(chan struct{}),
  103. MessageChan: make(chan *Message),
  104. }
  105. session.Latency = SetupLatencyMeasurement(conn)
  106. go session.readPump()
  107. go session.writePump()
  108. return
  109. }