package services import ( "eta/eta_index_lib/models" "eta/eta_index_lib/models/data_stat" "eta/eta_index_lib/services/alarm_msg" "eta/eta_index_lib/utils" "fmt" "time" ) // AddEdbInfoUpdateLog 添加指标编辑/刷新日志 func AddEdbInfoUpdateLog(edbInfoId int, updateResult int, updateFailedReason string, dataUpdateResult int, dataUpdateFailedReason string, isSourceRefresh int, updateType int) (err error) { var edbInfo *models.EdbInfo if edbInfoId > 0 { // 获取指标详情 edbInfo, err = models.GetEdbInfoById(edbInfoId) if err != nil { err = fmt.Errorf("指标不存在") return } log := new(data_stat.EdbInfoUpdateLog) log.EdbInfoId = edbInfo.EdbInfoId log.SourceName = edbInfo.SourceName log.Source = edbInfo.Source log.EdbCode = edbInfo.EdbCode log.EdbName = edbInfo.EdbName log.EdbNameSource = edbInfo.SourceIndexName log.Frequency = edbInfo.Frequency log.Unit = edbInfo.Unit log.StartDate = edbInfo.StartDate log.EndDate = edbInfo.EndDate log.SysUserId = edbInfo.SysUserId log.SysUserRealName = edbInfo.SysUserRealName log.UniqueCode = edbInfo.UniqueCode log.EdbCreateTime = edbInfo.CreateTime log.EdbModifyTime = edbInfo.ModifyTime log.CreateTime = time.Now() log.LatestDate = edbInfo.LatestDate log.LatestValue = edbInfo.LatestValue log.TerminalCode = edbInfo.TerminalCode log.UpdateResult = updateResult log.UpdateFailedReason = updateFailedReason log.DataUpdateTime = edbInfo.DataUpdateTime log.ErDataUpdateDate = edbInfo.ErDataUpdateDate log.DataUpdateResult = dataUpdateResult log.DataUpdateFailedReason = dataUpdateFailedReason log.IsSourceRefresh = isSourceRefresh log.UpdateType = updateType _, err = data_stat.AddEdbUpdateLog(log) if err != nil { err = fmt.Errorf("新增指标更新日志失败,Err: %s", err) return } } return } // SetMysteelChemicalEdbInfoUpdateStat 定时统计钢联化工的数据源明细表 func SetMysteelChemicalEdbInfoUpdateStat(needStat bool) (err error) { defer func() { if err != nil { tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新统计异常 Err: %s", err.Error()) utils.FileLog.Info(tips) alarm_msg.SendAlarmMsg(tips, 3) } }() //查询钢联的所有在更新的指标信息 condition := " and source = ? and no_update=0" var pars []interface{} pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL) edbList, err := models.GetEdbInfoByCondition(condition, pars, 0) if err != nil { err = fmt.Errorf("查询钢联化工指标信息出错,err: %s", err) return } nowTime := time.Now() today := time.Now().Format(utils.FormatDate) todayT, _ := time.ParseInLocation(utils.FormatDate, today, time.Local) nextDay := time.Now().AddDate(0, 0, 1).Format(utils.FormatDate) //查询当日所有钢联指标的终端更新记录 updateLogList, err := data_stat.GetEdbUpdateSourceLogByCreateDate(utils.DATA_SOURCE_MYSTEEL_CHEMICAL, today, nextDay) if err != nil { err = fmt.Errorf("查询钢联化工指标终端更新日志报错,err: %s", err) return } fmt.Println(len(updateLogList)) if !needStat && len(updateLogList) == 0 { //如果不存在变更记录 则不进行汇总 return } updateLogMap := make(map[int]*data_stat.EdbInfoUpdateLog) if len(updateLogList) > 0 { for _, v := range updateLogList { if _, ok := updateLogMap[v.EdbInfoId]; !ok { updateLogMap[v.EdbInfoId] = v } } } statCond := " and source = ? and create_time >= ? and create_time < ?" var statPars []interface{} statPars = append(statPars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, today, nextDay) //查询当日钢联所有的刷新记录 updateStatList, err := data_stat.GetEdbUpdateStatByCondition(statCond, statPars) if err != nil { err = fmt.Errorf("查询钢联化工数据源明细记录统计报错,err: %s", err) return } updateStatMap := make(map[int]*data_stat.EdbInfoUpdateStat) if len(updateStatList) > 0 { for _, v := range updateStatList { updateStatMap[v.EdbInfoId] = v } } indexObj := new(models.BaseFromMysteelChemicalIndex) week := int(nowTime.Weekday()) weekNeedRefreshMap := make(map[string]struct{}) if week >= 3 && week <= 6 { endDate := utils.GetNowWeekMonday().Format(utils.FormatDate) nowDate := time.Now().Format(utils.FormatDate) cond := ` AND frequency = ? AND (end_date < ? or end_date=?) AND is_stop = 0` var tmpPars []interface{} tmpPars = append(tmpPars, "周度", endDate, nowDate) //查询所有需要当日刷新的周度指标 indexTotal, tErr := indexObj.GetIndexByCondition(cond, tmpPars) if tErr != nil { err = fmt.Errorf("查询钢联化工原始指标报错,err: %s", tErr) return } for _, v := range indexTotal { weekNeedRefreshMap[v.IndexCode] = struct{}{} } } //查询所有停更指标 stopRefreshMap := make(map[string]struct{}) tmpCond := ` AND is_stop = 1` //查询所有需要当日刷新的周度指标 indexStop, tErr := indexObj.GetIndexByCondition(tmpCond, []interface{}{}) if tErr != nil { err = fmt.Errorf("查询钢联化工原始指标报错,err: %s", tErr) return } for _, v := range indexStop { stopRefreshMap[v.IndexCode] = struct{}{} } logStat := new(data_stat.EdbInfoUpdateStat) //组装新增数据 addList := make([]*data_stat.EdbInfoUpdateStat, 0) modifyList := make([]*data_stat.EdbInfoUpdateStat, 0) if len(edbList) > 0 { for _, v := range edbList { if _, ok := stopRefreshMap[v.EdbCode]; ok { continue } tmp := &data_stat.EdbInfoUpdateStat{ EdbInfoId: v.EdbInfoId, SourceName: v.SourceName, Source: v.Source, EdbCode: v.EdbCode, EdbName: v.EdbName, EdbNameSource: v.EdbNameSource, Frequency: v.Frequency, Unit: v.Unit, StartDate: v.StartDate, EndDate: v.EndDate, SysUserId: v.SysUserId, SysUserRealName: v.SysUserRealName, UniqueCode: v.UniqueCode, EdbCreateTime: v.CreateTime, EdbModifyTime: v.ModifyTime, LatestDate: v.LatestDate, LatestValue: v.LatestValue, TerminalCode: v.TerminalCode, DataUpdateTime: v.DataUpdateTime, ErDataUpdateDate: v.ErDataUpdateDate, ModifyTime: nowTime, } exist, existOk := updateStatMap[v.EdbInfoId] frequency := v.Frequency if v.Frequency == "旬度" { //特殊处理指标库里和数据源里频度不一致的情况 //查询源指标库的频度 indexTmp, e := indexObj.GetIndexItem(v.EdbCode) if e == nil { frequency = indexTmp.Frequency } } if existOk { tmp.NeedRefresh = exist.NeedRefresh } else { needRefresh, _ := checkMySteelEdbInfoNeedRefresh(v.EdbCode, frequency, weekNeedRefreshMap) tmp.NeedRefresh = needRefresh } // 判断是否当日新增 if v.CreateTime.After(todayT) || v.CreateTime == todayT { tmp.IsAdd = 1 } else { tmp.IsAdd = 2 } if up, ok := updateLogMap[v.EdbInfoId]; ok { tmp.DataUpdateTime = up.DataUpdateTime tmp.ErDataUpdateDate = up.ErDataUpdateDate tmp.DataUpdateResult = up.DataUpdateResult tmp.DataUpdateFailedReason = up.DataUpdateFailedReason tmp.HasRefresh = 1 tmp.UpdateResult = up.UpdateResult tmp.UpdateFailedReason = up.UpdateFailedReason tmp.UpdateTime = up.CreateTime.Format(utils.FormatDateTime) } else if tmp.NeedRefresh == 1 { tmp.HasRefresh = 0 tmp.DataUpdateResult = 2 tmp.DataUpdateFailedReason = "服务异常" } // 判断是否需要新增还是更新 if existOk { tmp.Id = exist.Id modifyList = append(modifyList, tmp) } else { tmp.CreateTime = nowTime addList = append(addList, tmp) } if len(addList) >= 500 { err = logStat.Add(addList) if err != nil { err = fmt.Errorf("新增钢联化工明细记录报错,err: %s", err) return } addList = addList[:0] } if len(modifyList) >= 500 { err = data_stat.UpdateEdbUpdateStatMulti(modifyList) if err != nil { err = fmt.Errorf("更新钢联化工明细记录报错,err: %s", err) return } modifyList = modifyList[:0] } } } //判断当日指标统计数据是否存在,如果存在则更新,不存在则新增 if len(addList) > 0 { err = logStat.Add(addList) } if len(modifyList) > 0 { err = data_stat.UpdateEdbUpdateStatMulti(modifyList) } return } func checkMySteelEdbInfoNeedRefresh(edbCode, frequency string, weekNeedRefreshMap map[string]struct{}) (needRefresh int, err error) { now := time.Now() week := int(now.Weekday()) //日度 if week >= 1 && week <= 6 { if frequency == "日度" { needRefresh = 1 return } } //周度 if week >= 3 && week <= 6 { _, ok := weekNeedRefreshMap[edbCode] if frequency == "周度" && ok { needRefresh = 1 return } } //季度,月度,年度都是每个周末刷新 if week == 0 { if frequency == "旬度" || frequency == "月度" || frequency == "季度" || frequency == "年度" { needRefresh = 1 return } } return } // SetEdbSourceStat 定时统计数据源汇总表 func SetEdbSourceStat(needStat bool) (err error) { defer func() { if err != nil { tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新统计异常 Err: %s", err.Error()) alarm_msg.SendAlarmMsg(tips, 3) } }() //查询钢联的所有指标信息 nowTime := time.Now() today := time.Now().Format(utils.FormatDate) nextDay := time.Now().AddDate(0, 0, 1).Format(utils.FormatDate) statCond := " and create_time >= ? and create_time < ?" var statPars []interface{} statPars = append(statPars, today, nextDay) //查询当日钢联所有的统计数据 updateStatList, err := data_stat.GetEdbUpdateStatByCondition(statCond, statPars) if err != nil { err = fmt.Errorf("查询钢联化工数据源明细记录统计报错,err: %s", err) return } if !needStat && len(updateStatList) == 0 { return } updateStatMap := make(map[string][]*data_stat.EdbInfoUpdateStat) if len(updateStatList) > 0 { for _, v := range updateStatList { updateStatMap[v.TerminalCode] = append(updateStatMap[v.TerminalCode], v) } } cond := " and create_time >= ? and create_time < ?" var pars []interface{} pars = append(pars, today, nextDay) //查询当日钢联所有的统计数据 statList, err := data_stat.GetEdbSourceStatByCondition(cond, pars) if err != nil { err = fmt.Errorf("查询钢联化工数据源统计报错,err: %s", err) return } statMap := make(map[string]*data_stat.EdbSourceStat) if len(statList) > 0 { for _, v := range statList { statMap[v.TerminalCode] = v } } // 查询今日被删除的指标数 delList, err := data_stat.GetEdbDeleteLogNumByCreateTime(today, nextDay) if err != nil { err = fmt.Errorf("查询今日被删除指标数目报错,err: %s", err) return } delMap := make(map[string]int) if len(delList) > 0 { for _, v := range delList { delMap[v.TerminalCode] = v.Num } } logStat := new(data_stat.EdbSourceStat) //组装新增数据 addList := make([]*data_stat.EdbSourceStat, 0) modifyList := make([]*data_stat.EdbSourceStat, 0) for terminalCode, list := range updateStatMap { tmp := new(data_stat.EdbSourceStat) for k, v := range list { if k == 0 { tmp.SourceName = v.SourceName tmp.Source = v.Source tmp.TerminalCode = v.TerminalCode tmp.ModifyTime = nowTime } tmp.EdbNum = tmp.EdbNum + 1 if v.IsAdd == 1 { tmp.EdbNewNum = tmp.EdbNewNum + 1 } if v.NeedRefresh == 1 { tmp.NeedRefreshNum = tmp.NeedRefreshNum + 1 } if v.HasRefresh == 1 { tmp.HasRefreshNum = tmp.HasRefreshNum + 1 } // 区分刷新成功和更新成功 if v.DataUpdateResult == 1 { //处理更新结果 tmp.UpdateSuccessNum = tmp.UpdateSuccessNum + 1 } else if v.NeedRefresh == 1 { tmp.UpdateFailedNum = tmp.UpdateFailedNum + 1 } if v.UpdateResult == 1 { //刷新结果 tmp.RefreshSuccessNum = tmp.RefreshSuccessNum + 1 } else if v.HasRefresh == 1 { tmp.RefreshFailedNum = tmp.RefreshFailedNum + 1 } } // 处理今天删除的指标数量 if dn, ok := delMap[terminalCode]; ok { tmp.EdbDelNum = dn } // 判断是否需要新增还是更新 if exist, ok := statMap[terminalCode]; ok { tmp.Id = exist.Id modifyList = append(modifyList, tmp) } else { tmp.CreateTime = nowTime addList = append(addList, tmp) } if len(addList) >= 500 { err = logStat.Add(addList) if err != nil { err = fmt.Errorf("新增钢联化工统计表报错,err: %s", err) return } addList = addList[:0] } if len(modifyList) >= 500 { err = data_stat.UpdateEdbSourceStatMulti(modifyList) if err != nil { err = fmt.Errorf("更新钢联化工统计表报错,err: %s", err) return } modifyList = modifyList[:0] } } //判断当日指标统计数据是否存在,如果存在则更新,不存在则新增 if len(addList) > 0 { err = logStat.Add(addList) } if len(modifyList) > 0 { err = data_stat.UpdateEdbSourceStatMulti(modifyList) } return }