websocket_msg.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. package services
  2. import (
  3. "context"
  4. "eta/eta_api/models"
  5. "eta/eta_api/services/data"
  6. "eta/eta_api/utils"
  7. "fmt"
  8. "sync"
  9. "time"
  10. "github.com/gorilla/websocket"
  11. )
  12. func DealWebSocketMsg(conn *websocket.Conn, adminId int) {
  13. DealEdbInspectionMessage(conn, adminId)
  14. }
  15. // 处理巡检消息
  16. func DealEdbInspectionMessage(conn *websocket.Conn, adminId int) {
  17. // 创建上下文用于控制 goroutine 生命周期
  18. ctx, cancel := context.WithCancel(context.Background())
  19. defer cancel()
  20. // 创建互斥锁保护 WebSocket 写操作
  21. var wsWriteMutex sync.Mutex
  22. // 创建连接关闭标志
  23. done := make(chan struct{})
  24. defer close(done)
  25. cacheKey := fmt.Sprintf("%s%d", utils.CACHE_EDB_INSPECTION_MESSAGE, adminId)
  26. // 监听连接关闭
  27. go func() {
  28. <-done
  29. cancel()
  30. }()
  31. // 设置连接关闭处理器
  32. conn.SetCloseHandler(func(code int, text string) error {
  33. close(done)
  34. return nil
  35. })
  36. for {
  37. select {
  38. case <-ctx.Done():
  39. return
  40. default:
  41. // 使用带超时的 Redis 操作
  42. utils.Rc.BrpopWithTimeout(cacheKey, 30*time.Second, func(b []byte) {
  43. messageList, err := data.GetHistoryInspectionMessages(adminId)
  44. if err != nil {
  45. utils.FileLog.Error("获取巡检信息历史失败,err:%s, adminId:%d", err.Error(), adminId)
  46. return
  47. }
  48. success := make(chan int64, 10)
  49. var wg sync.WaitGroup
  50. // 消息发送 goroutine
  51. wg.Add(1)
  52. go func() {
  53. defer wg.Done()
  54. defer close(success)
  55. for i, msg := range messageList {
  56. select {
  57. case <-ctx.Done():
  58. return
  59. default:
  60. if i == 0 {
  61. respData, err := data.SendInspectionMessages(adminId, msg)
  62. if err != nil {
  63. utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId)
  64. continue
  65. }
  66. resp := models.WebsocketMessageResponse{
  67. MessageType: 1,
  68. Data: respData,
  69. }
  70. // 使用互斥锁保护 WebSocket 写操作
  71. wsWriteMutex.Lock()
  72. err = conn.WriteJSON(resp)
  73. wsWriteMutex.Unlock()
  74. if err != nil {
  75. utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId)
  76. continue
  77. }
  78. utils.FileLog.Info("巡检信息发送成功,adminId:%d, messageId:%d", adminId, msg.MessageId)
  79. success <- msg.MessageId
  80. } else {
  81. success <- msg.MessageId
  82. }
  83. }
  84. }
  85. }()
  86. // 消息已读处理 goroutine
  87. wg.Add(1)
  88. go func() {
  89. defer wg.Done()
  90. readList := make([]int64, 0)
  91. for {
  92. select {
  93. case <-ctx.Done():
  94. return
  95. case msgId, ok := <-success:
  96. if !ok {
  97. // 处理已收集的消息
  98. if len(readList) > 0 {
  99. _, err = data.ReadEdbInspectionMessageList(readList, adminId)
  100. if err != nil {
  101. utils.FileLog.Error("巡检信息已读失败,err:%s, adminId:%d", err.Error(), adminId)
  102. }
  103. }
  104. return
  105. }
  106. readList = append(readList, msgId)
  107. }
  108. }
  109. }()
  110. // 等待所有 goroutine 完成
  111. wg.Wait()
  112. })
  113. }
  114. }
  115. }