websocket_msg.go 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. package services
  2. import (
  3. "eta/eta_api/models"
  4. "eta/eta_api/services/data"
  5. "eta/eta_api/utils"
  6. "fmt"
  7. "github.com/gorilla/websocket"
  8. )
  9. func DealWebSocketMsg(conn *websocket.Conn, adminId int) {
  10. DealEdbInspectionMessage(conn, adminId)
  11. }
  12. // 处理巡检消息
  13. func DealEdbInspectionMessage(conn *websocket.Conn, adminId int) {
  14. cacheKey := fmt.Sprintf("%s%d", utils.CACHE_EDB_INSPECTION_MESSAGE, adminId)
  15. for {
  16. utils.Rc.Brpop(cacheKey, func(b []byte) {
  17. messageList, err := data.GetHistoryInspectionMessages(adminId)
  18. if err != nil {
  19. utils.FileLog.Error("获取巡检信息历史失败,err:%s, adminId:%d", err.Error(), adminId)
  20. return
  21. }
  22. success := make(chan int64, 10)
  23. defer close(success)
  24. go func() {
  25. defer close(success)
  26. for i, msg := range messageList {
  27. if i == 0 {
  28. // 多条消息仅发送最新一条
  29. respData, err := data.SendInspectionMessages(adminId, msg)
  30. if err != nil {
  31. utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId)
  32. } else {
  33. resp := models.WebsocketMessageResponse{
  34. MessageType: 1,
  35. Data: respData,
  36. }
  37. err = conn.WriteJSON(resp)
  38. if err != nil {
  39. utils.FileLog.Error("巡检信息发送失败,err:%s, adminId:%d", err.Error(), adminId)
  40. } else {
  41. utils.FileLog.Info("巡检信息发送成功,adminId:%d, messageId:%d", adminId, msg.MessageId)
  42. success <- msg.MessageId
  43. }
  44. }
  45. } else {
  46. success <- msg.MessageId
  47. }
  48. }
  49. }()
  50. go func() {
  51. readList := make([]int64, 0)
  52. for {
  53. msgId, ok := <-success
  54. if !ok {
  55. break
  56. }
  57. readList = append(readList, msgId)
  58. }
  59. _, err = data.ReadEdbInspectionMessageList(readList, adminId)
  60. if err != nil {
  61. utils.FileLog.Error("巡检信息已读失败,err:%s, adminId:%d", err.Error(), adminId)
  62. }
  63. }()
  64. })
  65. }
  66. }