websocket.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. package global
  2. import (
  3. "context"
  4. "errors"
  5. "eta/eta_api/utils"
  6. "strconv"
  7. "sync"
  8. "time"
  9. "github.com/gorilla/websocket"
  10. )
  11. var MonitorMessageConn = make(map[int]*websocket.Conn)
  12. var WebSocketConnMap = make(map[string]*WebSocketConn)
  13. var AdminWebSocketConnMap = make(map[int]*WebSocketConn)
  14. var WebSocketConnAdminIdMap = make(map[string]int)
  15. type WebSocketConn struct {
  16. Conn *websocket.Conn
  17. id string
  18. ctx context.Context
  19. cancel context.CancelFunc
  20. wg sync.WaitGroup
  21. sendChan chan []byte
  22. closeChan chan struct{}
  23. monitorHandlers []func(adminId int)(err error)
  24. adminId int
  25. }
  26. // NewConnection 创建一个新的连接对象
  27. func NewWebSocketConn(conn *websocket.Conn, id string, adminId int, monitorHandlers []func(adminId int)(err error)) *WebSocketConn {
  28. WebSocketConnAdminIdMap[id] = adminId
  29. ctx, cancel := context.WithCancel(context.Background())
  30. webSocketConn := &WebSocketConn{
  31. Conn: conn,
  32. id: id,
  33. ctx: ctx,
  34. cancel: cancel,
  35. sendChan: make(chan []byte, 100), // 发送消息的缓冲通道
  36. closeChan: make(chan struct{}),
  37. adminId: adminId,
  38. }
  39. AdminWebSocketConnMap[adminId] = webSocketConn
  40. webSocketConn.monitorHandlers = monitorHandlers
  41. return webSocketConn
  42. }
  43. // Start 启动连接的读写协程
  44. func (c *WebSocketConn) Start() {
  45. utils.FileLog.Info("客户端 %s 已连接", c.id)
  46. // 启动读协程
  47. c.wg.Add(1)
  48. go c.readLoop()
  49. // 启动写协程
  50. c.wg.Add(1)
  51. go c.writeLoop()
  52. if len(c.monitorHandlers) > 0 {
  53. // 启动消息监听协程
  54. c.wg.Add(1)
  55. go c.MonitorMessageHandler(c.adminId)
  56. }
  57. }
  58. // Close 关闭连接并清理资源
  59. func (c *WebSocketConn) Close() {
  60. utils.FileLog.Info("关闭与客户端 %s 的连接", c.id)
  61. c.cancel() // 通知所有协程退出
  62. close(c.closeChan)
  63. c.Conn.Close()
  64. c.wg.Wait() // 等待所有协程退出
  65. close(c.sendChan)
  66. // 从adminWebSocketConnMap中删除
  67. delete(AdminWebSocketConnMap, WebSocketConnAdminIdMap[c.id])
  68. delete(WebSocketConnMap, c.id)
  69. utils.FileLog.Info("客户端 %s 的所有协程已清理完毕", c.id)
  70. }
  71. // Send 发送消息到客户端
  72. func (c *WebSocketConn) Send(msg []byte) bool {
  73. select {
  74. case c.sendChan <- msg:
  75. return true
  76. case <-c.ctx.Done():
  77. return false
  78. default: // 通道已满,表示客户端处理速度慢
  79. utils.FileLog.Info("客户端 %s 处理速度过慢,丢弃消息", c.id)
  80. return false
  81. }
  82. }
  83. // readLoop 处理来自客户端的消息
  84. func (c *WebSocketConn) readLoop() {
  85. defer c.wg.Done()
  86. defer utils.FileLog.Info("客户端 %s 的读协程已退出", c.id)
  87. for {
  88. select {
  89. case <-c.ctx.Done():
  90. return
  91. default:
  92. // 设置读取超时
  93. _, msg, err := c.Conn.ReadMessage()
  94. if err != nil {
  95. utils.FileLog.Error("从客户端 %s 读取失败: %v", c.id, err)
  96. c.cancel()
  97. return
  98. }
  99. message := string(msg)
  100. // 处理消息
  101. //utils.FileLog.Info("收到客户端 %s 消息: %s", c.id, message)
  102. // 启动一个新的协程处理消息,避免阻塞读取循环
  103. go c.handleMessage(message)
  104. }
  105. }
  106. }
  107. // handleMessage 处理收到的消息
  108. func (c *WebSocketConn) handleMessage(message string) {
  109. defer utils.FileLog.Info("处理来自客户端 %s 的消息: %s, 结束", c.id, message)
  110. // 模拟消息处理
  111. //utils.FileLog.Info("处理来自客户端 %s 的消息: %s", c.id, message)
  112. // 如果是PING消息,回复PONG
  113. if message == "ping\n" {
  114. //c.Send([]byte("PONG\n"))
  115. return
  116. }
  117. // 回复消息
  118. //response := fmt.Sprintf("已收到消息: %s", message)
  119. //c.Send([]byte(response))
  120. }
  121. // writeLoop 发送消息到客户端
  122. func (c *WebSocketConn) writeLoop() {
  123. defer c.wg.Done()
  124. defer utils.FileLog.Info("客户端 %s 的写协程已退出", c.id)
  125. for {
  126. select {
  127. case <-c.ctx.Done():
  128. return
  129. case msg, ok := <-c.sendChan:
  130. if !ok {
  131. return
  132. }
  133. err := c.Conn.WriteMessage(websocket.TextMessage, msg)
  134. if err != nil {
  135. utils.FileLog.Error("向客户端 %s 写入失败: %v", c.id, err)
  136. c.cancel()
  137. return
  138. }
  139. }
  140. }
  141. }
  142. func (c *WebSocketConn) MonitorMessageHandler(adminId int) {
  143. defer c.wg.Done()
  144. defer utils.FileLog.Info("客户端 %s 的消息监控协程已退出", c.id)
  145. for {
  146. select {
  147. case <-c.ctx.Done():
  148. return
  149. default:
  150. time.Sleep(time.Second * 10)
  151. for _, handler := range c.monitorHandlers {
  152. //utils.FileLog.Info("处理注册的消息监控函数")
  153. handler(c.adminId)
  154. }
  155. }
  156. }
  157. }
  158. var (
  159. EDB_MONITOR_MESSAGE_CONNECT_CACHE = "edb_monitor_message_cache:"
  160. )
  161. func EdbMonitorMessageHealth(adminId int) (isClose bool, err error) {
  162. conn := MonitorMessageConn[adminId]
  163. if conn == nil {
  164. err = errors.New("no connection")
  165. isClose = true
  166. return
  167. }
  168. _, msg, err := conn.ReadMessage()
  169. if err != nil {
  170. isClose = true
  171. return
  172. }
  173. if string(msg) == "ping" {
  174. healthKey := EDB_MONITOR_MESSAGE_CONNECT_CACHE + strconv.Itoa(adminId)
  175. err = utils.Rc.Put(healthKey, "1", time.Minute*1)
  176. if err != nil {
  177. return
  178. }
  179. }
  180. return
  181. }