123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 |
- package global
- import (
- "context"
- "errors"
- "eta/eta_api/utils"
- "strconv"
- "sync"
- "time"
- "github.com/gorilla/websocket"
- )
- var MonitorMessageConn = make(map[int]*websocket.Conn)
- var WebSocketConnMap = make(map[string]*WebSocketConn)
- var AdminWebSocketConnMap = make(map[int]*WebSocketConn)
- var WebSocketConnAdminIdMap = make(map[string]int)
- type WebSocketConn struct {
- Conn *websocket.Conn
- id string
- ctx context.Context
- cancel context.CancelFunc
- wg sync.WaitGroup
- sendChan chan []byte
- closeChan chan struct{}
- monitorHandlers []func(adminId int)(err error)
- adminId int
- }
- // NewConnection 创建一个新的连接对象
- func NewWebSocketConn(conn *websocket.Conn, id string, adminId int, monitorHandlers []func(adminId int)(err error)) *WebSocketConn {
- WebSocketConnAdminIdMap[id] = adminId
- ctx, cancel := context.WithCancel(context.Background())
- webSocketConn := &WebSocketConn{
- Conn: conn,
- id: id,
- ctx: ctx,
- cancel: cancel,
- sendChan: make(chan []byte, 100), // 发送消息的缓冲通道
- closeChan: make(chan struct{}),
- adminId: adminId,
- }
- AdminWebSocketConnMap[adminId] = webSocketConn
- webSocketConn.monitorHandlers = monitorHandlers
- return webSocketConn
- }
- // Start 启动连接的读写协程
- func (c *WebSocketConn) Start() {
- utils.FileLog.Info("客户端 %s 已连接", c.id)
- // 启动读协程
- c.wg.Add(1)
- go c.readLoop()
-
- // 启动写协程
- c.wg.Add(1)
- go c.writeLoop()
- if len(c.monitorHandlers) > 0 {
- // 启动消息监听协程
- c.wg.Add(1)
- go c.MonitorMessageHandler(c.adminId)
- }
- }
- // Close 关闭连接并清理资源
- func (c *WebSocketConn) Close() {
- utils.FileLog.Info("关闭与客户端 %s 的连接", c.id)
- c.cancel() // 通知所有协程退出
- close(c.closeChan)
- c.Conn.Close()
- c.wg.Wait() // 等待所有协程退出
- close(c.sendChan)
- // 从adminWebSocketConnMap中删除
- delete(AdminWebSocketConnMap, WebSocketConnAdminIdMap[c.id])
- delete(WebSocketConnMap, c.id)
- utils.FileLog.Info("客户端 %s 的所有协程已清理完毕", c.id)
- }
- // Send 发送消息到客户端
- func (c *WebSocketConn) Send(msg []byte) bool {
- select {
- case c.sendChan <- msg:
- return true
- case <-c.ctx.Done():
- return false
- default: // 通道已满,表示客户端处理速度慢
- utils.FileLog.Info("客户端 %s 处理速度过慢,丢弃消息", c.id)
- return false
- }
- }
- // readLoop 处理来自客户端的消息
- func (c *WebSocketConn) readLoop() {
- defer c.wg.Done()
- defer utils.FileLog.Info("客户端 %s 的读协程已退出", c.id)
- for {
- select {
- case <-c.ctx.Done():
- return
- default:
- // 设置读取超时
- _, msg, err := c.Conn.ReadMessage()
- if err != nil {
- utils.FileLog.Error("从客户端 %s 读取失败: %v", c.id, err)
- c.cancel()
- return
- }
- message := string(msg)
- // 处理消息
- //utils.FileLog.Info("收到客户端 %s 消息: %s", c.id, message)
-
- // 启动一个新的协程处理消息,避免阻塞读取循环
- go c.handleMessage(message)
- }
- }
- }
- // handleMessage 处理收到的消息
- func (c *WebSocketConn) handleMessage(message string) {
- defer utils.FileLog.Info("处理来自客户端 %s 的消息: %s, 结束", c.id, message)
- // 模拟消息处理
- //utils.FileLog.Info("处理来自客户端 %s 的消息: %s", c.id, message)
-
- // 如果是PING消息,回复PONG
- if message == "ping\n" {
- //c.Send([]byte("PONG\n"))
- return
- }
-
- // 回复消息
- //response := fmt.Sprintf("已收到消息: %s", message)
- //c.Send([]byte(response))
- }
- // writeLoop 发送消息到客户端
- func (c *WebSocketConn) writeLoop() {
- defer c.wg.Done()
- defer utils.FileLog.Info("客户端 %s 的写协程已退出", c.id)
- for {
- select {
- case <-c.ctx.Done():
- return
- case msg, ok := <-c.sendChan:
- if !ok {
- return
- }
- err := c.Conn.WriteMessage(websocket.TextMessage, msg)
- if err != nil {
- utils.FileLog.Error("向客户端 %s 写入失败: %v", c.id, err)
- c.cancel()
- return
- }
- }
- }
- }
- func (c *WebSocketConn) MonitorMessageHandler(adminId int) {
- defer c.wg.Done()
- defer utils.FileLog.Info("客户端 %s 的消息监控协程已退出", c.id)
- for {
- select {
- case <-c.ctx.Done():
- return
- default:
- time.Sleep(time.Second * 10)
- for _, handler := range c.monitorHandlers {
- //utils.FileLog.Info("处理注册的消息监控函数")
- handler(c.adminId)
- }
- }
- }
- }
- var (
- EDB_MONITOR_MESSAGE_CONNECT_CACHE = "edb_monitor_message_cache:"
- )
- func EdbMonitorMessageHealth(adminId int) (isClose bool, err error) {
- conn := MonitorMessageConn[adminId]
- if conn == nil {
- err = errors.New("no connection")
- isClose = true
- return
- }
- _, msg, err := conn.ReadMessage()
- if err != nil {
- isClose = true
- return
- }
- if string(msg) == "ping" {
- healthKey := EDB_MONITOR_MESSAGE_CONNECT_CACHE + strconv.Itoa(adminId)
- err = utils.Rc.Put(healthKey, "1", time.Minute*1)
- if err != nil {
- return
- }
- }
- return
- }
|