edb_data.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package services
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "time"
  6. "eta/eta_forum_task/models"
  7. "eta/eta_forum_task/services/eta_forum"
  8. "eta/eta_forum_task/utils"
  9. )
  10. type EdbDataBinlogDataReq struct {
  11. Item string `description:"指标数据列表"`
  12. OpType string `description:"操作类型"`
  13. }
  14. type EdbDataBinlogReq struct {
  15. List []*EdbDataBinlogDataReq `description:"指标数据列表"`
  16. }
  17. func HandleBinlogEdbUpdateLog() error {
  18. // 设置缓存防止重复调用
  19. cacheKey := utils.CACHE_KEY_EDB_DATA_UPDATE_LOG
  20. if utils.Rc.Get(cacheKey) != nil {
  21. utils.FileLog.Info("HandleBinlogEdbUpdateLog cacheKey exists")
  22. return nil
  23. }
  24. utils.Rc.SetNX(cacheKey, "1", 10*time.Minute)
  25. defer func() {
  26. utils.Rc.Delete(cacheKey)
  27. }()
  28. //查询记录总数
  29. dateTime := "2024-10-14 00:00:00"
  30. total, err := models.GetEdbUpdateLogListCount(dateTime)
  31. if err != nil {
  32. return err
  33. }
  34. //分页获取待处理的日志列表
  35. if total == 0 {
  36. return nil
  37. }
  38. offset := 0
  39. pageSize := 10
  40. // 循环更新1000个图表数据
  41. for i := 0; offset < total; i++ {
  42. // 查询需要更新的图表信息
  43. logs, err := models.GetEdbUpdateLogList(dateTime, offset, pageSize)
  44. if err != nil {
  45. return err
  46. }
  47. if len(logs) == 0 {
  48. break
  49. }
  50. //处理日志
  51. list := make([]*EdbDataBinlogDataReq, 0)
  52. logIds := make([]int, 0)
  53. for _, log := range logs {
  54. logIds = append(logIds, int(log.Id))
  55. //根据op_table_name查询表结构
  56. if log.OpType == "insert" || log.OpType == "update" {
  57. tmp := &EdbDataBinlogDataReq{
  58. Item: log.NewData,
  59. OpType: log.OpType,
  60. }
  61. list = append(list, tmp)
  62. }else if log.OpType == "delete" {
  63. tmp := &EdbDataBinlogDataReq{
  64. Item: log.OldData,
  65. OpType: log.OpType,
  66. }
  67. list = append(list, tmp)
  68. }
  69. }
  70. if len(list) == 0 {
  71. continue
  72. }
  73. req := &EdbDataBinlogReq{
  74. List: list,
  75. }
  76. //批量上传
  77. reqJson, err := json.Marshal(req)
  78. if err != nil {
  79. return err
  80. }
  81. resp, err := eta_forum.EdbDataBatchSaveLib(string(reqJson))
  82. if err != nil {
  83. return err
  84. }
  85. if resp.Ret != 200 {
  86. return fmt.Errorf("上传失败,Err:%s", resp.ErrMsg)
  87. }
  88. // 批量更新日志状态
  89. err = models.UpdateEdbUpdateLogStatus(logIds)
  90. if err != nil {
  91. return err
  92. }
  93. }
  94. return nil
  95. }
  96. // the service for log
  97. func AutoUpdateEdbDataToEtaForum() {
  98. defer func() {
  99. if err := recover(); err != nil {
  100. fmt.Println("[AutoUpdateEdbDataToEtaForum]", err)
  101. }
  102. }()
  103. for {
  104. utils.Rc.Brpop(utils.CACHE_KEY_EDB_DATA_UPDATE_LOG, func(b []byte) {
  105. HandleBinlogEdbUpdateLog()
  106. })
  107. }
  108. }