kobe6258 2 сар өмнө
parent
commit
17571f7160

+ 0 - 1
utils/ws/limiter.go

@@ -52,7 +52,6 @@ func (qalm *LimiterManger) GetLimiter(token string) *RateLimiter {
 	return limiter
 }
 func (qalm *LimiterManger) Allow(token string) bool {
-
 	limiter := qalm.GetLimiter(token)
 	if limiter.LastRequest.IsZero() {
 		limiter.LastRequest = time.Now()

+ 13 - 7
utils/ws/session.go

@@ -3,7 +3,6 @@ package ws
 import (
 	"errors"
 	"eta/eta_api/utils"
-	"fmt"
 	"github.com/gorilla/websocket"
 	"sync"
 	"time"
@@ -38,19 +37,26 @@ func (s *Session) readPump() {
 			handleCloseError(err)
 			return
 		}
-		fmt.Printf("用户读取数据:%s", string(message))
 		// 更新活跃时间
-		s.mu.Lock()
-		s.LastActive = time.Now()
-		s.mu.Unlock()
+		s.UpdateActivity()
 		// 处理消息
 		if err = manager.HandleMessage(s.UserId, s.Id, message); err != nil {
 			//写应答
-			s.Conn.WriteMessage(websocket.TextMessage, []byte(err.Error()))
+			_ = s.writeWithTimeout(&Message{
+				MessageType: "error",
+				message:     []byte(err.Error()),
+			})
 		}
 	}
 }
 
+// UpdateActivity 跟新最近活跃时间
+func (s *Session) UpdateActivity() {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	s.LastActive = time.Now()
+}
+
 func (s *Session) Close() {
 	s.sessionOnce.Do(func() {
 		// 控制关闭顺序
@@ -77,7 +83,6 @@ func (s *Session) writeWithTimeout(msg *Message) error {
 
 // writePump 处理写操作
 func (s *Session) writePump() {
-	fmt.Printf("用户写入数据")
 	ticker := time.NewTicker(basePingInterval)
 	defer ticker.Stop()
 	for {
@@ -91,6 +96,7 @@ func (s *Session) writePump() {
 			_ = s.Latency.SendPing(s.Conn)
 			ticker.Reset(s.Latency.lastLatency)
 		case <-s.CloseChan:
+			manager.RemoveSession(s.Id)
 			return
 		}
 	}

+ 3 - 14
utils/ws/session_manager.go

@@ -69,10 +69,7 @@ func (manager *ConnectionManager) GetSessionId(userId int, sessionId string) (se
 
 // RemoveSession Remove 移除一个会话
 func (manager *ConnectionManager) RemoveSession(sessionCode string) {
-	if data, ok := manager.Sessions.LoadAndDelete(sessionCode); ok {
-		session := data.(*Session)
-		session.Close()
-	}
+	manager.Sessions.Delete(sessionCode)
 }
 
 // GetSession 获取一个会话
@@ -95,8 +92,8 @@ func (manager *ConnectionManager) CheckAll() {
 			if time.Since(session.LastActive) > connectionTimeout {
 				fmt.Printf("连接超时关闭: SessionID=%s, UserID=%s", session.Id, session.UserId)
 				utils.FileLog.Warn("连接超时关闭: SessionID=%s, UserID=%s", session.Id, session.UserId)
-				_ = session.Conn.Close()
 				session.Close()
+				manager.RemoveSession(session.Id)
 				return true
 			}
 			// 发送心跳
@@ -108,9 +105,8 @@ func (manager *ConnectionManager) CheckAll() {
 					fmt.Printf("心跳发送失败: SessionID=%s, Error=%v", s.Id, err)
 					utils.FileLog.Warn("心跳发送失败: SessionID=%s, Error=%v",
 						s.Id, err)
-					_ = s.Conn.Close()
 					session.Close()
-					manager.Sessions.Delete(session.Id)
+					manager.RemoveSession(s.Id)
 				}
 			}(session)
 			return true
@@ -136,10 +132,3 @@ func (manager *ConnectionManager) Start() {
 func (manager *ConnectionManager) Stop() {
 	close(manager.stopChan)
 }
-
-// UpdateActivity 跟新最近活跃时间
-func (manager *ConnectionManager) UpdateActivity(sessionID string) {
-	if session, ok := manager.Sessions.Load(sessionID); ok {
-		session.(*Session).LastActive = time.Now()
-	}
-}