package global import ( "context" "errors" "eta/eta_api/utils" "strconv" "sync" "time" "github.com/gorilla/websocket" ) var MonitorMessageConn = make(map[int]*websocket.Conn) var WebSocketConnMap = make(map[string]*WebSocketConn) var AdminWebSocketConnMap = make(map[int]*WebSocketConn) var WebSocketConnAdminIdMap = make(map[string]int) type WebSocketConn struct { Conn *websocket.Conn id string ctx context.Context cancel context.CancelFunc wg sync.WaitGroup sendChan chan []byte closeChan chan struct{} monitorHandlers []func(adminId int)(err error) adminId int } // NewConnection 创建一个新的连接对象 func NewWebSocketConn(conn *websocket.Conn, id string, adminId int, monitorHandlers []func(adminId int)(err error)) *WebSocketConn { WebSocketConnAdminIdMap[id] = adminId ctx, cancel := context.WithCancel(context.Background()) webSocketConn := &WebSocketConn{ Conn: conn, id: id, ctx: ctx, cancel: cancel, sendChan: make(chan []byte, 100), // 发送消息的缓冲通道 closeChan: make(chan struct{}), adminId: adminId, } AdminWebSocketConnMap[adminId] = webSocketConn webSocketConn.monitorHandlers = monitorHandlers return webSocketConn } // Start 启动连接的读写协程 func (c *WebSocketConn) Start() { utils.FileLog.Info("客户端 %s 已连接", c.id) // 启动读协程 c.wg.Add(1) go c.readLoop() // 启动写协程 c.wg.Add(1) go c.writeLoop() if len(c.monitorHandlers) > 0 { // 启动消息监听协程 c.wg.Add(1) go c.MonitorMessageHandler(c.adminId) } } // Close 关闭连接并清理资源 func (c *WebSocketConn) Close() { utils.FileLog.Info("关闭与客户端 %s 的连接", c.id) c.cancel() // 通知所有协程退出 close(c.closeChan) c.Conn.Close() c.wg.Wait() // 等待所有协程退出 close(c.sendChan) // 从adminWebSocketConnMap中删除 delete(AdminWebSocketConnMap, WebSocketConnAdminIdMap[c.id]) delete(WebSocketConnMap, c.id) utils.FileLog.Info("客户端 %s 的所有协程已清理完毕", c.id) } // Send 发送消息到客户端 func (c *WebSocketConn) Send(msg []byte) bool { select { case c.sendChan <- msg: return true case <-c.ctx.Done(): return false default: // 通道已满,表示客户端处理速度慢 utils.FileLog.Info("客户端 %s 处理速度过慢,丢弃消息", c.id) return false } } // readLoop 处理来自客户端的消息 func (c *WebSocketConn) readLoop() { defer c.wg.Done() defer utils.FileLog.Info("客户端 %s 的读协程已退出", c.id) for { select { case <-c.ctx.Done(): return default: // 设置读取超时 _, msg, err := c.Conn.ReadMessage() if err != nil { utils.FileLog.Error("从客户端 %s 读取失败: %v", c.id, err) c.cancel() return } message := string(msg) // 处理消息 utils.FileLog.Info("收到客户端 %s 消息: %s", c.id, message) // 启动一个新的协程处理消息,避免阻塞读取循环 go c.handleMessage(message) } } } // handleMessage 处理收到的消息 func (c *WebSocketConn) handleMessage(message string) { defer utils.FileLog.Info("处理来自客户端 %s 的消息: %s, 结束", c.id, message) // 模拟消息处理 utils.FileLog.Info("处理来自客户端 %s 的消息: %s", c.id, message) // 如果是PING消息,回复PONG if message == "ping\n" { //c.Send([]byte("PONG\n")) return } // 回复消息 //response := fmt.Sprintf("已收到消息: %s", message) //c.Send([]byte(response)) } // writeLoop 发送消息到客户端 func (c *WebSocketConn) writeLoop() { defer c.wg.Done() defer utils.FileLog.Info("客户端 %s 的写协程已退出", c.id) for { select { case <-c.ctx.Done(): return case msg, ok := <-c.sendChan: if !ok { return } err := c.Conn.WriteJSON(msg) if err != nil { utils.FileLog.Error("向客户端 %s 写入失败: %v", c.id, err) c.cancel() return } } } } func (c *WebSocketConn) MonitorMessageHandler(adminId int) { defer c.wg.Done() defer utils.FileLog.Info("客户端 %s 的消息监控协程已退出", c.id) for { select { case <-c.ctx.Done(): return default: time.Sleep(time.Second * 10) for _, handler := range c.monitorHandlers { utils.FileLog.Info("处理注册的消息监控函数") handler(c.adminId) } } } } var ( EDB_MONITOR_MESSAGE_CONNECT_CACHE = "edb_monitor_message_cache:" ) func EdbMonitorMessageHealth(adminId int) (isClose bool, err error) { conn := MonitorMessageConn[adminId] if conn == nil { err = errors.New("no connection") isClose = true return } _, msg, err := conn.ReadMessage() if err != nil { isClose = true return } if string(msg) == "ping" { healthKey := EDB_MONITOR_MESSAGE_CONNECT_CACHE + strconv.Itoa(adminId) err = utils.Rc.Put(healthKey, "1", time.Minute*1) if err != nil { return } } return }