edb_data.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package services
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "time"
  6. "eta/eta_forum_task/models"
  7. "eta/eta_forum_task/services/alarm_msg"
  8. "eta/eta_forum_task/services/eta_forum"
  9. "eta/eta_forum_task/utils"
  10. )
  11. type EdbDataBinlogDataReq struct {
  12. Item string `description:"指标数据列表"`
  13. OpType string `description:"操作类型"`
  14. }
  15. type EdbDataBinlogReq struct {
  16. List []*EdbDataBinlogDataReq `description:"指标数据列表"`
  17. }
  18. func HandleBinlogEdbUpdateLog() (err error) {
  19. utils.FileLog.Info("HandleBinlogEdbUpdateLog 开始")
  20. // 设置缓存防止重复调用
  21. cacheKey := utils.CACHE_KEY_EDB_DATA_UPDATE_LOG
  22. if utils.Rc.Get(cacheKey) != nil {
  23. utils.FileLog.Info("HandleBinlogEdbUpdateLog cacheKey exists")
  24. return nil
  25. }
  26. utils.Rc.SetNX(cacheKey, "1", 10*time.Minute)
  27. defer func() {
  28. utils.Rc.Delete(cacheKey)
  29. if err := recover(); err != nil {
  30. utils.FileLog.Error("[HandleBinlogEdbUpdateLog]", err)
  31. }
  32. }()
  33. //查询记录总数
  34. dateTime := "2024-10-14 00:00:00"
  35. total, err := models.GetEdbUpdateLogListCount(dateTime)
  36. if err != nil {
  37. return err
  38. }
  39. //分页获取待处理的日志列表
  40. if total == 0 {
  41. return nil
  42. }
  43. offset := 0
  44. pageSize := 1000
  45. // 计算需要分多少页
  46. pageNum := total / pageSize
  47. if total % pageSize != 0 {
  48. pageNum += 1
  49. }
  50. // 循环更新1000个图表数据
  51. for i := 0; i < pageNum; i++ {
  52. // 查询需要更新的图表信息
  53. logs, err := models.GetEdbUpdateLogList(dateTime, offset, pageSize)
  54. if err != nil {
  55. return err
  56. }
  57. if len(logs) == 0 {
  58. break
  59. }
  60. //处理日志
  61. list := make([]*EdbDataBinlogDataReq, 0)
  62. logIds := make([]int, 0)
  63. for _, log := range logs {
  64. logIds = append(logIds, int(log.Id))
  65. //根据op_table_name查询表结构
  66. if log.OpType == "insert" || log.OpType == "update" {
  67. tmp := &EdbDataBinlogDataReq{
  68. Item: log.NewData,
  69. OpType: log.OpType,
  70. }
  71. list = append(list, tmp)
  72. }else if log.OpType == "delete" {
  73. tmp := &EdbDataBinlogDataReq{
  74. Item: log.OldData,
  75. OpType: log.OpType,
  76. }
  77. list = append(list, tmp)
  78. }
  79. }
  80. if len(list) == 0 {
  81. continue
  82. }
  83. req := &EdbDataBinlogReq{
  84. List: list,
  85. }
  86. //批量上传
  87. reqJson, err := json.Marshal(req)
  88. if err != nil {
  89. return err
  90. }
  91. resp, err := eta_forum.EdbDataBatchSaveLib(string(reqJson))
  92. if err != nil {
  93. return err
  94. }
  95. if resp.Ret != 200 {
  96. return fmt.Errorf("上传失败,Err:%s", resp.ErrMsg)
  97. }
  98. // 批量更新日志状态
  99. err = models.UpdateEdbUpdateLogStatus(logIds)
  100. if err != nil {
  101. return err
  102. }
  103. }
  104. utils.FileLog.Info("HandleBinlogEdbUpdateLog 结束")
  105. return nil
  106. }
  107. // the service for log
  108. func AutoUpdateEdbDataToEtaForum() {
  109. for {
  110. utils.Rc.Brpop(utils.CACHE_KEY_EDB_DATA_UPDATE_LOG, func(b []byte) {
  111. err := HandleBinlogEdbUpdateLog()
  112. if err != nil {
  113. utils.FileLog.Info("[AutoUpdateEdbDataToEtaForum]", err)
  114. go alarm_msg.SendAlarmMsg(fmt.Sprintf("[AutoUpdateEdbDataToEtaForum] 更新指标数据失败,Err:%s", err), 3)
  115. }
  116. })
  117. }
  118. }