123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422 |
- package services
- import (
- "eta/eta_index_lib/models"
- "eta/eta_index_lib/models/data_stat"
- "eta/eta_index_lib/services/alarm_msg"
- "eta/eta_index_lib/utils"
- "fmt"
- "time"
- )
- // AddEdbInfoUpdateLog 添加指标编辑/刷新日志
- func AddEdbInfoUpdateLog(edbInfoId int, updateResult int, updateFailedReason string, dataUpdateResult int, dataUpdateFailedReason string, isSourceRefresh int, updateType int) (err error) {
- var edbInfo *models.EdbInfo
- if edbInfoId > 0 {
- // 获取指标详情
- edbInfo, err = models.GetEdbInfoById(edbInfoId)
- if err != nil {
- err = fmt.Errorf("指标不存在")
- return
- }
- log := new(data_stat.EdbInfoUpdateLog)
- log.EdbInfoId = edbInfo.EdbInfoId
- log.SourceName = edbInfo.SourceName
- log.Source = edbInfo.Source
- log.EdbCode = edbInfo.EdbCode
- log.EdbName = edbInfo.EdbName
- log.EdbNameSource = edbInfo.SourceIndexName
- log.Frequency = edbInfo.Frequency
- log.Unit = edbInfo.Unit
- log.StartDate = edbInfo.StartDate
- log.EndDate = edbInfo.EndDate
- log.SysUserId = edbInfo.SysUserId
- log.SysUserRealName = edbInfo.SysUserRealName
- log.UniqueCode = edbInfo.UniqueCode
- log.EdbCreateTime = edbInfo.CreateTime
- log.EdbModifyTime = edbInfo.ModifyTime
- log.CreateTime = time.Now()
- log.LatestDate = edbInfo.LatestDate
- log.LatestValue = edbInfo.LatestValue
- log.TerminalCode = edbInfo.TerminalCode
- log.UpdateResult = updateResult
- log.UpdateFailedReason = updateFailedReason
- log.DataUpdateTime = edbInfo.DataUpdateTime
- log.ErDataUpdateDate = edbInfo.ErDataUpdateDate
- log.DataUpdateResult = dataUpdateResult
- log.DataUpdateFailedReason = dataUpdateFailedReason
- log.IsSourceRefresh = isSourceRefresh
- log.UpdateType = updateType
- _, err = data_stat.AddEdbUpdateLog(log)
- if err != nil {
- err = fmt.Errorf("新增指标更新日志失败,Err: %s", err)
- return
- }
- }
- return
- }
- // SetMysteelChemicalEdbInfoUpdateStat 定时统计上海钢联的数据源明细表
- func SetMysteelChemicalEdbInfoUpdateStat(needStat bool) (err error) {
- defer func() {
- if err != nil {
- tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新统计异常 Err: %s", err.Error())
- utils.FileLog.Info(tips)
- alarm_msg.SendAlarmMsg(tips, 3)
- }
- }()
- //查询钢联的所有在更新的指标信息
- condition := " and source = ? and no_update=0"
- var pars []interface{}
- pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL)
- edbList, err := models.GetEdbInfoByCondition(condition, pars, 0)
- if err != nil {
- err = fmt.Errorf("查询上海钢联指标信息出错,err: %s", err)
- return
- }
- nowTime := time.Now()
- today := time.Now().Format(utils.FormatDate)
- todayT, _ := time.ParseInLocation(utils.FormatDate, today, time.Local)
- nextDay := time.Now().AddDate(0, 0, 1).Format(utils.FormatDate)
- //查询当日所有钢联指标的终端更新记录
- updateLogList, err := data_stat.GetEdbUpdateSourceLogByCreateDate(utils.DATA_SOURCE_MYSTEEL_CHEMICAL, today, nextDay)
- if err != nil {
- err = fmt.Errorf("查询上海钢联指标终端更新日志报错,err: %s", err)
- return
- }
- fmt.Println(len(updateLogList))
- if !needStat && len(updateLogList) == 0 { //如果不存在变更记录 则不进行汇总
- return
- }
- updateLogMap := make(map[int]*data_stat.EdbInfoUpdateLog)
- if len(updateLogList) > 0 {
- for _, v := range updateLogList {
- if _, ok := updateLogMap[v.EdbInfoId]; !ok {
- updateLogMap[v.EdbInfoId] = v
- }
- }
- }
- statCond := " and source = ? and create_time >= ? and create_time < ?"
- var statPars []interface{}
- statPars = append(statPars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, today, nextDay)
- //查询当日钢联所有的刷新记录
- updateStatList, err := data_stat.GetEdbUpdateStatByCondition(statCond, statPars)
- if err != nil {
- err = fmt.Errorf("查询上海钢联数据源明细记录统计报错,err: %s", err)
- return
- }
- updateStatMap := make(map[int]*data_stat.EdbInfoUpdateStat)
- if len(updateStatList) > 0 {
- for _, v := range updateStatList {
- updateStatMap[v.EdbInfoId] = v
- }
- }
- indexObj := new(models.BaseFromMysteelChemicalIndex)
- week := int(nowTime.Weekday())
- weekNeedRefreshMap := make(map[string]struct{})
- if week >= 3 && week <= 6 {
- endDate := utils.GetNowWeekMonday().Format(utils.FormatDate)
- nowDate := time.Now().Format(utils.FormatDate)
- cond := ` AND frequency = ? AND (end_date < ? or end_date=?) AND is_stop = 0`
- var tmpPars []interface{}
- tmpPars = append(tmpPars, "周度", endDate, nowDate)
- //查询所有需要当日刷新的周度指标
- indexTotal, tErr := indexObj.GetIndexByCondition(cond, tmpPars)
- if tErr != nil {
- err = fmt.Errorf("查询上海钢联原始指标报错,err: %s", tErr)
- return
- }
- for _, v := range indexTotal {
- weekNeedRefreshMap[v.IndexCode] = struct{}{}
- }
- }
- //查询所有停更指标
- stopRefreshMap := make(map[string]struct{})
- tmpCond := ` AND is_stop = 1`
- //查询所有需要当日刷新的周度指标
- indexStop, tErr := indexObj.GetIndexByCondition(tmpCond, []interface{}{})
- if tErr != nil {
- err = fmt.Errorf("查询上海钢联原始指标报错,err: %s", tErr)
- return
- }
- for _, v := range indexStop {
- stopRefreshMap[v.IndexCode] = struct{}{}
- }
- logStat := new(data_stat.EdbInfoUpdateStat)
- //组装新增数据
- addList := make([]*data_stat.EdbInfoUpdateStat, 0)
- modifyList := make([]*data_stat.EdbInfoUpdateStat, 0)
- if len(edbList) > 0 {
- for _, v := range edbList {
- if _, ok := stopRefreshMap[v.EdbCode]; ok {
- continue
- }
- tmp := &data_stat.EdbInfoUpdateStat{
- EdbInfoId: v.EdbInfoId,
- SourceName: v.SourceName,
- Source: v.Source,
- EdbCode: v.EdbCode,
- EdbName: v.EdbName,
- EdbNameSource: v.EdbNameSource,
- Frequency: v.Frequency,
- Unit: v.Unit,
- StartDate: v.StartDate,
- EndDate: v.EndDate,
- SysUserId: v.SysUserId,
- SysUserRealName: v.SysUserRealName,
- UniqueCode: v.UniqueCode,
- EdbCreateTime: v.CreateTime,
- EdbModifyTime: v.ModifyTime,
- LatestDate: v.LatestDate,
- LatestValue: v.LatestValue,
- TerminalCode: v.TerminalCode,
- DataUpdateTime: v.DataUpdateTime,
- ErDataUpdateDate: v.ErDataUpdateDate,
- ModifyTime: nowTime,
- }
- exist, existOk := updateStatMap[v.EdbInfoId]
- frequency := v.Frequency
- if v.Frequency == "旬度" { //特殊处理指标库里和数据源里频度不一致的情况
- //查询源指标库的频度
- indexTmp, e := indexObj.GetIndexItem(v.EdbCode)
- if e == nil {
- frequency = indexTmp.Frequency
- }
- }
- if existOk {
- tmp.NeedRefresh = exist.NeedRefresh
- } else {
- needRefresh, _ := checkMySteelEdbInfoNeedRefresh(v.EdbCode, frequency, weekNeedRefreshMap)
- tmp.NeedRefresh = needRefresh
- }
- // 判断是否当日新增
- if v.CreateTime.After(todayT) || v.CreateTime == todayT {
- tmp.IsAdd = 1
- } else {
- tmp.IsAdd = 2
- }
- if up, ok := updateLogMap[v.EdbInfoId]; ok {
- tmp.DataUpdateTime = up.DataUpdateTime
- tmp.ErDataUpdateDate = up.ErDataUpdateDate
- tmp.DataUpdateResult = up.DataUpdateResult
- tmp.DataUpdateFailedReason = up.DataUpdateFailedReason
- tmp.HasRefresh = 1
- tmp.UpdateResult = up.UpdateResult
- tmp.UpdateFailedReason = up.UpdateFailedReason
- tmp.UpdateTime = up.CreateTime.Format(utils.FormatDateTime)
- } else if tmp.NeedRefresh == 1 {
- tmp.HasRefresh = 0
- tmp.DataUpdateResult = 2
- tmp.DataUpdateFailedReason = "服务异常"
- }
- // 判断是否需要新增还是更新
- if existOk {
- tmp.Id = exist.Id
- modifyList = append(modifyList, tmp)
- } else {
- tmp.CreateTime = nowTime
- addList = append(addList, tmp)
- }
- if len(addList) >= 500 {
- err = logStat.Add(addList)
- if err != nil {
- err = fmt.Errorf("新增上海钢联明细记录报错,err: %s", err)
- return
- }
- addList = addList[:0]
- }
- if len(modifyList) >= 500 {
- err = data_stat.UpdateEdbUpdateStatMulti(modifyList)
- if err != nil {
- err = fmt.Errorf("更新上海钢联明细记录报错,err: %s", err)
- return
- }
- modifyList = modifyList[:0]
- }
- }
- }
- //判断当日指标统计数据是否存在,如果存在则更新,不存在则新增
- if len(addList) > 0 {
- err = logStat.Add(addList)
- }
- if len(modifyList) > 0 {
- err = data_stat.UpdateEdbUpdateStatMulti(modifyList)
- }
- return
- }
- func checkMySteelEdbInfoNeedRefresh(edbCode, frequency string, weekNeedRefreshMap map[string]struct{}) (needRefresh int, err error) {
- now := time.Now()
- week := int(now.Weekday())
- //日度
- if week >= 1 && week <= 6 {
- if frequency == "日度" {
- needRefresh = 1
- return
- }
- }
- //周度
- if week >= 3 && week <= 6 {
- _, ok := weekNeedRefreshMap[edbCode]
- if frequency == "周度" && ok {
- needRefresh = 1
- return
- }
- }
- //季度,月度,年度都是每个周末刷新
- if week == 0 {
- if frequency == "旬度" || frequency == "月度" || frequency == "季度" || frequency == "年度" {
- needRefresh = 1
- return
- }
- }
- return
- }
- // SetEdbSourceStat 定时统计数据源汇总表
- func SetEdbSourceStat(needStat bool) (err error) {
- defer func() {
- if err != nil {
- tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新统计异常 Err: %s", err.Error())
- alarm_msg.SendAlarmMsg(tips, 3)
- }
- }()
- //查询钢联的所有指标信息
- nowTime := time.Now()
- today := time.Now().Format(utils.FormatDate)
- nextDay := time.Now().AddDate(0, 0, 1).Format(utils.FormatDate)
- statCond := " and create_time >= ? and create_time < ?"
- var statPars []interface{}
- statPars = append(statPars, today, nextDay)
- //查询当日钢联所有的统计数据
- updateStatList, err := data_stat.GetEdbUpdateStatByCondition(statCond, statPars)
- if err != nil {
- err = fmt.Errorf("查询上海钢联数据源明细记录统计报错,err: %s", err)
- return
- }
- if !needStat && len(updateStatList) == 0 {
- return
- }
- updateStatMap := make(map[string][]*data_stat.EdbInfoUpdateStat)
- if len(updateStatList) > 0 {
- for _, v := range updateStatList {
- updateStatMap[v.TerminalCode] = append(updateStatMap[v.TerminalCode], v)
- }
- }
- cond := " and create_time >= ? and create_time < ?"
- var pars []interface{}
- pars = append(pars, today, nextDay)
- //查询当日钢联所有的统计数据
- statList, err := data_stat.GetEdbSourceStatByCondition(cond, pars)
- if err != nil {
- err = fmt.Errorf("查询上海钢联数据源统计报错,err: %s", err)
- return
- }
- statMap := make(map[string]*data_stat.EdbSourceStat)
- if len(statList) > 0 {
- for _, v := range statList {
- statMap[v.TerminalCode] = v
- }
- }
- // 查询今日被删除的指标数
- delList, err := data_stat.GetEdbDeleteLogNumByCreateTime(today, nextDay)
- if err != nil {
- err = fmt.Errorf("查询今日被删除指标数目报错,err: %s", err)
- return
- }
- delMap := make(map[string]int)
- if len(delList) > 0 {
- for _, v := range delList {
- delMap[v.TerminalCode] = v.Num
- }
- }
- logStat := new(data_stat.EdbSourceStat)
- //组装新增数据
- addList := make([]*data_stat.EdbSourceStat, 0)
- modifyList := make([]*data_stat.EdbSourceStat, 0)
- for terminalCode, list := range updateStatMap {
- tmp := new(data_stat.EdbSourceStat)
- for k, v := range list {
- if k == 0 {
- tmp.SourceName = v.SourceName
- tmp.Source = v.Source
- tmp.TerminalCode = v.TerminalCode
- tmp.ModifyTime = nowTime
- }
- tmp.EdbNum = tmp.EdbNum + 1
- if v.IsAdd == 1 {
- tmp.EdbNewNum = tmp.EdbNewNum + 1
- }
- if v.NeedRefresh == 1 {
- tmp.NeedRefreshNum = tmp.NeedRefreshNum + 1
- }
- if v.HasRefresh == 1 {
- tmp.HasRefreshNum = tmp.HasRefreshNum + 1
- }
- // 区分刷新成功和更新成功
- if v.DataUpdateResult == 1 { //处理更新结果
- tmp.UpdateSuccessNum = tmp.UpdateSuccessNum + 1
- } else if v.NeedRefresh == 1 {
- tmp.UpdateFailedNum = tmp.UpdateFailedNum + 1
- }
- if v.UpdateResult == 1 { //刷新结果
- tmp.RefreshSuccessNum = tmp.RefreshSuccessNum + 1
- } else if v.HasRefresh == 1 {
- tmp.RefreshFailedNum = tmp.RefreshFailedNum + 1
- }
- }
- // 处理今天删除的指标数量
- if dn, ok := delMap[terminalCode]; ok {
- tmp.EdbDelNum = dn
- }
- // 判断是否需要新增还是更新
- if exist, ok := statMap[terminalCode]; ok {
- tmp.Id = exist.Id
- modifyList = append(modifyList, tmp)
- } else {
- tmp.CreateTime = nowTime
- addList = append(addList, tmp)
- }
- if len(addList) >= 500 {
- err = logStat.Add(addList)
- if err != nil {
- err = fmt.Errorf("新增上海钢联统计表报错,err: %s", err)
- return
- }
- addList = addList[:0]
- }
- if len(modifyList) >= 500 {
- err = data_stat.UpdateEdbSourceStatMulti(modifyList)
- if err != nil {
- err = fmt.Errorf("更新上海钢联统计表报错,err: %s", err)
- return
- }
- modifyList = modifyList[:0]
- }
- }
- //判断当日指标统计数据是否存在,如果存在则更新,不存在则新增
- if len(addList) > 0 {
- err = logStat.Add(addList)
- }
- if len(modifyList) > 0 {
- err = data_stat.UpdateEdbSourceStatMulti(modifyList)
- }
- return
- }
|