session_manager.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. package ws
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/gorilla/websocket"
  6. "sync"
  7. "time"
  8. )
  9. const (
  10. maxMessageSize = 1024 * 1024 // 1MB
  11. basePingInterval = 30 * time.Second
  12. maxPingInterval = 120 * time.Second
  13. minPingInterval = 15 * time.Second
  14. )
  15. type ConnectionManager struct {
  16. Sessions sync.Map
  17. heartbeat *HeartbeatManager
  18. }
  19. var (
  20. manager = &ConnectionManager{
  21. heartbeat: GetHeartbeatManager(),
  22. }
  23. )
  24. func Manager() *ConnectionManager {
  25. return manager
  26. }
  27. // 消息处理核心逻辑
  28. func (manager *ConnectionManager) HandleMessage(userID int, sessionID string, message []byte) error {
  29. if !Allow(userID, QA_LIMITER) {
  30. return errors.New("request too frequent")
  31. }
  32. session, exists := manager.GetSession(sessionID)
  33. if !exists {
  34. return errors.New("session not found")
  35. }
  36. // 处理业务逻辑
  37. session.History = append(session.History, string(message))
  38. response := "Processed: " + string(message)
  39. // 更新最后活跃时间
  40. session.LastActive = time.Now()
  41. // 发送响应
  42. return session.Conn.WriteMessage(websocket.TextMessage, []byte(response))
  43. }
  44. // 心跳管理
  45. func (manager *ConnectionManager) StartHeartbeat() {
  46. ticker := time.NewTicker(basePingInterval)
  47. defer ticker.Stop()
  48. for range ticker.C {
  49. manager.checkSessions()
  50. }
  51. }
  52. func (manager *ConnectionManager) checkSessions() {
  53. manager.Sessions.Range(func(key, value interface{}) bool {
  54. session := value.(*Session)
  55. if time.Since(session.LastActive) > 2*basePingInterval {
  56. session.Conn.Close()
  57. manager.Sessions.Delete(key)
  58. } else {
  59. _ = session.Latency.SendPing(session.Conn)
  60. }
  61. return true
  62. })
  63. }
  64. // AddSession Add 添加一个新的会话
  65. func (manager *ConnectionManager) AddSession(session *Session) {
  66. manager.Sessions.Store(session.ID, session)
  67. manager.heartbeat.Register(session)
  68. }
  69. func (manager *ConnectionManager) GetSessionId(userId int, sessionId string) (sessionID string) {
  70. return fmt.Sprintf("%d_%s", userId, sessionId)
  71. }
  72. // RemoveSession Remove 移除一个会话
  73. func (manager *ConnectionManager) RemoveSession(sessionCode string) {
  74. if data, ok := manager.Sessions.LoadAndDelete(sessionCode); ok {
  75. session := data.(*Session)
  76. close(session.CloseChan)
  77. _ = session.Conn.Close()
  78. }
  79. }
  80. // GetSession 获取一个会话
  81. func (manager *ConnectionManager) GetSession(sessionCode string) (session *Session, ok bool) {
  82. if data, ok := manager.Sessions.Load(sessionCode); ok {
  83. session = data.(*Session)
  84. }
  85. return
  86. }