123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- package services
- import (
- "encoding/json"
- "fmt"
- "time"
- "eta/eta_forum_task/models"
- "eta/eta_forum_task/services/alarm_msg"
- "eta/eta_forum_task/services/eta_forum"
- "eta/eta_forum_task/utils"
- )
- type EdbDataBinlogDataReq struct {
- Item string `description:"指标数据列表"`
- OpType string `description:"操作类型"`
- }
- type EdbDataBinlogReq struct {
- List []*EdbDataBinlogDataReq `description:"指标数据列表"`
- }
- func HandleBinlogEdbUpdateLog() (err error) {
- utils.FileLog.Info("HandleBinlogEdbUpdateLog 开始")
- // 设置缓存防止重复调用
- cacheKey := utils.CACHE_KEY_EDB_DATA_UPDATE_LOG
- if utils.Rc.Get(cacheKey) != nil {
- utils.FileLog.Info("HandleBinlogEdbUpdateLog cacheKey exists")
- return nil
- }
- utils.Rc.SetNX(cacheKey, "1", 10*time.Minute)
- defer func() {
- utils.Rc.Delete(cacheKey)
- if err := recover(); err != nil {
- utils.FileLog.Error("[HandleBinlogEdbUpdateLog]", err)
- }
- }()
- //查询记录总数
- dateTime := "2024-10-14 00:00:00"
- total, err := models.GetEdbUpdateLogListCount(dateTime)
- if err != nil {
- return err
- }
- //分页获取待处理的日志列表
- if total == 0 {
- return nil
- }
- offset := 0
- pageSize := 1000
- // 计算需要分多少页
- pageNum := total / pageSize
- if total % pageSize != 0 {
- pageNum += 1
- }
- // 循环更新1000个图表数据
- for i := 0; i < pageNum; i++ {
- // 查询需要更新的图表信息
- logs, err := models.GetEdbUpdateLogList(dateTime, offset, pageSize)
- if err != nil {
- return err
- }
- if len(logs) == 0 {
- break
- }
- //处理日志
- list := make([]*EdbDataBinlogDataReq, 0)
- logIds := make([]int, 0)
- for _, log := range logs {
- logIds = append(logIds, int(log.Id))
- //根据op_table_name查询表结构
- if log.OpType == "insert" || log.OpType == "update" {
- tmp := &EdbDataBinlogDataReq{
- Item: log.NewData,
- OpType: log.OpType,
- }
- list = append(list, tmp)
- }else if log.OpType == "delete" {
- tmp := &EdbDataBinlogDataReq{
- Item: log.OldData,
- OpType: log.OpType,
- }
- list = append(list, tmp)
- }
- }
- if len(list) == 0 {
- continue
- }
- req := &EdbDataBinlogReq{
- List: list,
- }
- //批量上传
- reqJson, err := json.Marshal(req)
- if err != nil {
- return err
- }
- resp, err := eta_forum.EdbDataBatchSaveLib(string(reqJson))
- if err != nil {
- return err
- }
- if resp.Ret != 200 {
- return fmt.Errorf("上传失败,Err:%s", resp.ErrMsg)
- }
- // 批量更新日志状态
- err = models.UpdateEdbUpdateLogStatus(logIds)
- if err != nil {
- return err
- }
- }
- utils.FileLog.Info("HandleBinlogEdbUpdateLog 结束")
- return nil
- }
- // the service for log
- func AutoUpdateEdbDataToEtaForum() {
- for {
- utils.Rc.Brpop(utils.CACHE_KEY_EDB_DATA_UPDATE_LOG, func(b []byte) {
- err := HandleBinlogEdbUpdateLog()
- if err != nil {
- utils.FileLog.Info("[AutoUpdateEdbDataToEtaForum]", err)
- go alarm_msg.SendAlarmMsg(fmt.Sprintf("[AutoUpdateEdbDataToEtaForum] 更新指标数据失败,Err:%s", err), 3)
- }
- })
- }
- }
|