package services import ( "encoding/json" "fmt" "time" "eta/eta_forum_task/models" "eta/eta_forum_task/services/alarm_msg" "eta/eta_forum_task/services/eta_forum" "eta/eta_forum_task/utils" ) type EdbDataBinlogDataReq struct { Item string `description:"指标数据列表"` OpType string `description:"操作类型"` } type EdbDataBinlogReq struct { List []*EdbDataBinlogDataReq `description:"指标数据列表"` } func HandleBinlogEdbUpdateLog() (err error) { utils.FileLog.Info("HandleBinlogEdbUpdateLog 开始") // 设置缓存防止重复调用 cacheKey := utils.CACHE_KEY_EDB_DATA_UPDATE_LOG if utils.Rc.Get(cacheKey) != nil { utils.FileLog.Info("HandleBinlogEdbUpdateLog cacheKey exists") return nil } utils.Rc.SetNX(cacheKey, "1", 10*time.Minute) defer func() { utils.Rc.Delete(cacheKey) if err := recover(); err != nil { utils.FileLog.Error("[HandleBinlogEdbUpdateLog]", err) } }() //查询记录总数 dateTime := "2024-10-14 00:00:00" total, err := models.GetEdbUpdateLogListCount(dateTime) if err != nil { return err } //分页获取待处理的日志列表 if total == 0 { return nil } offset := 0 pageSize := 1000 // 计算需要分多少页 pageNum := total / pageSize if total % pageSize != 0 { pageNum += 1 } // 循环更新1000个图表数据 for i := 0; i < pageNum; i++ { // 查询需要更新的图表信息 logs, err := models.GetEdbUpdateLogList(dateTime, offset, pageSize) if err != nil { return err } if len(logs) == 0 { break } //处理日志 list := make([]*EdbDataBinlogDataReq, 0) logIds := make([]int, 0) for _, log := range logs { logIds = append(logIds, int(log.Id)) //根据op_table_name查询表结构 if log.OpType == "insert" || log.OpType == "update" { tmp := &EdbDataBinlogDataReq{ Item: log.NewData, OpType: log.OpType, } list = append(list, tmp) }else if log.OpType == "delete" { tmp := &EdbDataBinlogDataReq{ Item: log.OldData, OpType: log.OpType, } list = append(list, tmp) } } if len(list) == 0 { continue } req := &EdbDataBinlogReq{ List: list, } //批量上传 reqJson, err := json.Marshal(req) if err != nil { return err } resp, err := eta_forum.EdbDataBatchSaveLib(string(reqJson)) if err != nil { return err } if resp.Ret != 200 { return fmt.Errorf("上传失败,Err:%s", resp.ErrMsg) } // 批量更新日志状态 err = models.UpdateEdbUpdateLogStatus(logIds) if err != nil { return err } } utils.FileLog.Info("HandleBinlogEdbUpdateLog 结束") return nil } // the service for log func AutoUpdateEdbDataToEtaForum() { for { utils.Rc.Brpop(utils.CACHE_KEY_EDB_DATA_UPDATE_LOG, func(b []byte) { err := HandleBinlogEdbUpdateLog() if err != nil { utils.FileLog.Info("[AutoUpdateEdbDataToEtaForum]", err) go alarm_msg.SendAlarmMsg(fmt.Sprintf("[AutoUpdateEdbDataToEtaForum] 更新指标数据失败,Err:%s", err), 3) } }) } }