package services import ( "context" "encoding/json" "eta/eta_task/models" "eta/eta_task/models/data_manage" "eta/eta_task/models/data_manage/edb_refresh" "eta/eta_task/services/alarm_msg" "eta/eta_task/services/data" "eta/eta_task/utils" "fmt" "strings" "sync" "time" ) // ConfigRefreshData // @Description: 配置刷新数据 // @author: Roc // @datetime 2024-01-10 13:55:05 // @param cont context.Context // @return err error func ConfigRefreshData(cont context.Context) (err error) { errMsgList := make([]string, 0) defer func() { if err != nil { fmt.Println(err) } }() // 一期是只做wind、同花顺、钢联、有色 now := time.Now() //now = time.Date(2023, 12, 31, 19, 10, 59, 0, time.Local) //now = time.Date(2023, 12, 31, 16, 50, 59, 0, time.Local) defaultSourceEdbInfoListMap, err := getDefaultRefreshData(now) if err != nil { errMsgList = append(errMsgList, "获取默认刷新数据失败,Err:"+err.Error()) } sourceEdbInfoListMap, err := getConfigRefreshData(now) if err != nil { errMsgList = append(errMsgList, "获取指标配置刷新数据失败,Err:"+err.Error()) } // 将两个合并 allSourceEdbInfoListMap := mergeMaps(defaultSourceEdbInfoListMap, sourceEdbInfoListMap) wgNum := len(allSourceEdbInfoListMap) if wgNum <= 0 { return } wg := sync.WaitGroup{} wg.Add(wgNum) for _, edbList := range allSourceEdbInfoListMap { if edbList == nil { wg.Done() continue } if len(edbList) != 0 { go BaseRefreshData(&wg, edbList[0].Source, edbList[0].SubSource, edbList) } } wg.Wait() fmt.Println("Refresh End") return } // Function to merge two maps func mergeMaps(dst map[string][]*edb_refresh.EdbInfoListAndRefreshConfig, src map[string][]*edb_refresh.EdbInfoListAndRefreshConfig) (newMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig) { if dst == nil { return src } if src == nil { return dst } newMap = dst for k, v := range src { if newK, ok := newMap[k]; ok { newK = append(newK, v...) newMap[k] = newK } else { newMap[k] = v } } return newMap } // getDefaultRefreshData // @Description: 根据默认配置获取需要刷新的指标列表 // @author: Roc // @datetime 2024-01-10 13:55:38 // @param now time.Time // @return sourceEdbInfoListMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig // @return err error func getDefaultRefreshData(now time.Time) (sourceEdbInfoListMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig, err error) { errMsgList := make([]string, 0) defer func() { if err != nil { fmt.Println(err) } }() // 一期是只做wind、同花顺、钢联、有色 sourceEdbInfoListMap = make(map[string][]*edb_refresh.EdbInfoListAndRefreshConfig) currTimeStr := getPreviousHalfHour(now) fmt.Println(currTimeStr) // 所有默认配置刷新项 list := make([]*edb_refresh.EdbRefreshDefaultConfig, 0) //刷新频率,枚举值:每自然日、每交易日、每周、每旬、每月、每季、每半年、每年 refreshFrequencyList := []string{"每自然日", "每交易日", "每周", "每旬", "每月", "每季", "每半年", "每年"} conf, err := models.GetBusinessConf() if err != nil { fmt.Println(err) utils.FileLog.Info("获取业务配置失败,Err:" + err.Error()) return } // 获取钢联化工的数据获取方式 mySteelChemicalDataMethod := "excel" if v, ok := conf["MySteelDataMethod"]; ok { if v == "api" { mySteelChemicalDataMethod = v } } utils.FileLog.Info("获取业务配置,MySteelDataMethod:" + mySteelChemicalDataMethod) // 获取各个刷新频率的配置 for _, refreshFrequency := range refreshFrequencyList { // 获取刷新频率条件 condition, pars, isHandler := getRefreshFrequencyCondition(now, refreshFrequency) if !isHandler { // 可能是非交易日,所以过滤不处理 continue } condition += ` AND refresh_frequency = ? AND refresh_time = ?` pars = append(pars, refreshFrequency, currTimeStr) // 这两个是excel的数据源,他是从公共机更新的,需要过滤掉 if mySteelChemicalDataMethod == "api" { // 钢联化工使用api的方式获取数据的,不需要过滤 condition += ` AND source not in (?)` pars = append(pars, utils.DATA_SOURCE_YS) } else { condition += ` AND source not in (?,?)` pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS) } tmpList, tmpErr := edb_refresh.GetListByCondition(condition, pars) if tmpErr != nil { err = tmpErr return } list = append(list, tmpList...) } // 更新的单元格数 refreshDataNumMap := make(map[string]*edb_refresh.EdbRefreshDefaultConfig) // 数据源刷新频度的列表数组 refreshDataFrequencyListMap := make(map[int]map[int][]string) wgNum := 0 // 处理待刷新的数据源,整理成数组,方便获取对应的指标 for _, item := range list { // 更新的单元格数 key := fmt.Sprintf("%d_%d_%s", item.Source, item.SubSource, item.Frequency) refreshDataNumMap[key] = item // 数据源刷新频度的列表数组 subSourceFrequencyList, ok := refreshDataFrequencyListMap[item.Source] if !ok { subSourceFrequencyList = make(map[int][]string) } frequencyList, ok := subSourceFrequencyList[item.SubSource] if !ok { wgNum++ frequencyList = make([]string, 0) } subSourceFrequencyList[item.SubSource] = append(frequencyList, item.Frequency) refreshDataFrequencyListMap[item.Source] = subSourceFrequencyList } for source, subSourceFrequencyListMap := range refreshDataFrequencyListMap { switch source { case utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS: // 只处理钢联化工使用api方式获取数据的情况 if mySteelChemicalDataMethod == "api" { for subSource, frequencyList := range subSourceFrequencyListMap { items, tmpErr := data_manage.GetBaseFromMysteelChemicalIndexItems(frequencyList) if tmpErr != nil { errMsgList = append(errMsgList, fmt.Sprint("source:", source, "subSource:", subSource, "frequencyList:", strings.Join(frequencyList, ","), "err:", tmpErr.Error())) } indexList := make([]*edb_refresh.EdbInfoListAndRefreshConfig, 0) for _, v := range items { tmpConf := new(edb_refresh.EdbInfoListAndRefreshConfig) // 数据刷新的期数 dataRefreshNum := utils.DATA_REFRESH key := fmt.Sprintf("%d_%d_%s", utils.DATA_SOURCE_MYSTEEL_CHEMICAL, 0, v.Frequency) if edbRefreshDefaultConfig, ok := refreshDataNumMap[key]; ok { if edbRefreshDefaultConfig.RefreshAllData == 1 { // 刷新所有数据期数 dataRefreshNum = 0 } else if edbRefreshDefaultConfig.RefreshDataNum > 0 { // dataRefreshNum = edbRefreshDefaultConfig.RefreshDataNum } } tmpConf.EdbCode = v.IndexCode tmpConf.EdbName = v.IndexName tmpConf.Source = utils.DATA_SOURCE_MYSTEEL_CHEMICAL tmpConf.Frequency = v.Frequency tmpConf.Unit = v.Unit tmpConf.StartDate, err = time.Parse(utils.FormatDate, v.StartDate) tmpConf.EndDate, err = time.Parse(utils.FormatDate, v.EndDate) tmpConf.ClassifyId = int(v.BaseFromMysteelChemicalClassifyId) tmpConf.DataRefreshNum = dataRefreshNum tmpConf.EdbInfoId = v.EdbInfoId indexList = append(indexList, tmpConf) } key := fmt.Sprint(source, "_", subSource) sourceEdbInfoListMap[key] = indexList } } // 其他情况不处理 default: for subSource, frequencyList := range subSourceFrequencyListMap { edbList, tmpErr := edb_refresh.GetDefaultRefreshEdbInfoListBySourceAndSubSource(source, subSource, frequencyList) if tmpErr != nil { errMsgList = append(errMsgList, fmt.Sprint("source:", source, "subSource:", subSource, "frequencyList:", strings.Join(frequencyList, ","), "err:", tmpErr.Error())) } for _, v := range edbList { // 数据刷新的期数 dataRefreshNum := utils.DATA_REFRESH key := fmt.Sprintf("%d_%d_%s", v.Source, v.SubSource, v.Frequency) if edbRefreshDefaultConfig, ok := refreshDataNumMap[key]; ok { if edbRefreshDefaultConfig.RefreshAllData == 1 { // 刷新所有数据期数 dataRefreshNum = 0 } else if edbRefreshDefaultConfig.RefreshDataNum > 0 { // dataRefreshNum = edbRefreshDefaultConfig.RefreshDataNum } } v.DataRefreshNum = dataRefreshNum } key := fmt.Sprint(source, "_", subSource) sourceEdbInfoListMap[key] = edbList } } } fmt.Println("Get Refresh End") return } // getConfigRefreshData // @Description: 根据指标配置获取需要刷新的指标列表 // @author: Roc // @datetime 2024-01-10 13:55:59 // @param now time.Time // @return sourceEdbInfoListMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig // @return err error func getConfigRefreshData(now time.Time) (sourceEdbInfoListMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig, err error) { defer func() { if err != nil { fmt.Println(err) } }() // 一期是只做wind、同花顺、钢联、有色 sourceEdbInfoListMap = make(map[string][]*edb_refresh.EdbInfoListAndRefreshConfig) currTimeStr := getPreviousHalfHour(now) // 所有默认配置刷新项 list := make([]*edb_refresh.EdbRefreshConfig, 0) //刷新频率,枚举值:每自然日、每交易日、每周、每旬、每月、每季、每半年、每年 refreshFrequencyList := []string{"每自然日", "每交易日", "每周", "每旬", "每月", "每季", "每半年", "每年"} // 获取各个刷新频率的配置 for _, refreshFrequency := range refreshFrequencyList { // 获取刷新频率条件 condition, pars, isHandler := getRefreshFrequencyCondition(now, refreshFrequency) if !isHandler { // 可能是非交易日,所以过滤不处理 continue } condition += ` AND refresh_frequency = ? AND refresh_time = ?` pars = append(pars, refreshFrequency, currTimeStr) tmpList, tmpErr := edb_refresh.GetEdbRefreshConfigListByCondition(condition, pars) if tmpErr != nil { err = tmpErr return } list = append(list, tmpList...) } // 配置列表 configIdEdbRefreshConfigMap := make(map[int]*edb_refresh.EdbRefreshConfig) configIdList := make([]int, 0) for _, v := range list { configIdList = append(configIdList, v.EdbRefreshConfigId) configIdEdbRefreshConfigMap[v.EdbRefreshConfigId] = v } conf, err := models.GetBusinessConf() if err != nil { fmt.Println(err) return } // 获取钢联化工的数据获取方式 mySteelChemicalDataMethod := "excel" if v, ok := conf["MySteelDataMethod"]; ok { if v == "api" { mySteelChemicalDataMethod = v } } // 当钢联的数据获取方式是api时,不用过滤 var sourceList []int if mySteelChemicalDataMethod == "api" { sourceList = []int{utils.DATA_SOURCE_YS} } else { sourceList = []int{utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS} } edbInfoList, err := edb_refresh.GetConfigRefreshEdbInfoListBySourceAndSubSource(sourceList, configIdList) if err != nil { return } for _, v := range edbInfoList { key := fmt.Sprint(v.Source, "_", v.SubSource) tmpList, ok := sourceEdbInfoListMap[key] if !ok { tmpList = make([]*edb_refresh.EdbInfoListAndRefreshConfig, 0) } // 数据刷新的期数 dataRefreshNum := utils.DATA_REFRESH if edbRefreshConfig, ok2 := configIdEdbRefreshConfigMap[v.EdbRefreshConfigId]; ok2 { if edbRefreshConfig.RefreshAllData == 1 { // 刷新所有数据期数 dataRefreshNum = 0 } else if edbRefreshConfig.RefreshDataNum > 0 { // dataRefreshNum = edbRefreshConfig.RefreshDataNum } } v.DataRefreshNum = dataRefreshNum sourceEdbInfoListMap[key] = append(tmpList, v) } fmt.Println("Get ConfigRefreshData End") return } // BaseRefreshData // @Description: 基础数据刷新 // @author: Roc // @datetime 2024-01-09 16:27:45 // @param wg *sync.WaitGroup // @return err error func BaseRefreshData(wg *sync.WaitGroup, source, subSource int, items []*edb_refresh.EdbInfoListAndRefreshConfig) (err error) { errMsgList := make([]string, 0) defer func() { if err != nil { fmt.Println("来源:", source, ";子来源:", subSource, ";BaseRefreshData Err:"+err.Error()) go alarm_msg.SendAlarmMsg(fmt.Sprint("来源:", source, ";子来源:", subSource, ";BaseRefreshData ErrMsg:"+err.Error()), 3) } if len(errMsgList) > 0 { errMsg := fmt.Sprint("来源:", source, ";子来源:", subSource, ";BaseRefreshData Err:"+strings.Join(errMsgList, "\n")) fmt.Println(errMsg) go alarm_msg.SendAlarmMsg(errMsg, 3) } wg.Done() }() // 数据刷新的期数 dataRefreshNum := utils.DATA_REFRESH // 是否从最开始的日期更新 var isRefreshByStartDate bool if source != utils.DATA_SOURCE_THS { for _, v := range items { // 如果暂停更新,那就过滤 if v.NoUpdate == 1 { continue } if v.DataRefreshNum > 0 { dataRefreshNum = v.DataRefreshNum } startDate := "" if isRefreshByStartDate { startDate = v.StartDate.Format(utils.FormatDate) } else { if v.Frequency == "日度" { startDate = v.EndDate.AddDate(0, 0, -dataRefreshNum).Format(utils.FormatDate) } else if v.Frequency == "周度" { startDate = v.EndDate.AddDate(0, 0, -(dataRefreshNum * 7)).Format(utils.FormatDate) } else if v.Frequency == "旬度" { startDate = v.EndDate.AddDate(0, 0, -(dataRefreshNum * 10)).Format(utils.FormatDate) } else if v.Frequency == "月度" { startDate = v.EndDate.AddDate(0, -dataRefreshNum, 0).Format(utils.FormatDate) } else if v.Frequency == "季度" { startDate = v.EndDate.AddDate(0, -dataRefreshNum*3, 0).Format(utils.FormatDate) } else if v.Frequency == "半年度" { startDate = v.EndDate.AddDate(0, -dataRefreshNum*6, 0).Format(utils.FormatDate) } else if v.Frequency == "年度" { startDate = v.EndDate.AddDate(-dataRefreshNum, 0, 0).Format(utils.FormatDate) } else { startDate = v.EndDate.AddDate(0, 0, -utils.DATA_REFRESH).Format(utils.FormatDate) } } fmt.Println(startDate) // 数据更新 resp, tmpErr := data.RefreshEdbData(v.EdbInfoId, v.Source, v.SubSource, v.EdbCode, startDate) if tmpErr != nil { errMsgList = append(errMsgList, v.EdbCode+"RefreshEdbData Err:"+tmpErr.Error()) continue } if resp.Ret != 200 { errMsgList = append(errMsgList, v.EdbCode+";RefreshEdbData Err:"+resp.Msg+";ErrMsg:"+resp.ErrMsg) continue } } } // 同花顺控制速率, 每秒最多4次请求(同花顺EDB函数限制为5, 考虑到可能存在用户同时在使用, 这里批量刷新设置为4, 话说5确实是有点少了吧=_=!) if source == utils.DATA_SOURCE_THS { ticker := time.NewTicker(250 * time.Millisecond) defer ticker.Stop() for _, v := range items { <-ticker.C // 如果暂停更新,那就过滤 if v.NoUpdate == 1 { continue } if v.DataRefreshNum > 0 { dataRefreshNum = v.DataRefreshNum } startDate := "" if isRefreshByStartDate { startDate = v.StartDate.Format(utils.FormatDate) } else { if v.Frequency == "日度" { startDate = v.EndDate.AddDate(0, 0, -dataRefreshNum).Format(utils.FormatDate) } else if v.Frequency == "周度" { startDate = v.EndDate.AddDate(0, 0, -(dataRefreshNum * 7)).Format(utils.FormatDate) } else if v.Frequency == "旬度" { startDate = v.EndDate.AddDate(0, 0, -(dataRefreshNum * 10)).Format(utils.FormatDate) } else if v.Frequency == "月度" { startDate = v.EndDate.AddDate(0, -dataRefreshNum, 0).Format(utils.FormatDate) } else if v.Frequency == "季度" { startDate = v.EndDate.AddDate(0, -dataRefreshNum*3, 0).Format(utils.FormatDate) } else if v.Frequency == "半年度" { startDate = v.EndDate.AddDate(0, -dataRefreshNum*6, 0).Format(utils.FormatDate) } else if v.Frequency == "年度" { startDate = v.EndDate.AddDate(-dataRefreshNum, 0, 0).Format(utils.FormatDate) } else { startDate = v.EndDate.AddDate(0, 0, -utils.DATA_REFRESH).Format(utils.FormatDate) } } fmt.Println(startDate) // 数据更新 resp, tmpErr := data.RefreshEdbData(v.EdbInfoId, v.Source, v.SubSource, v.EdbCode, startDate) if tmpErr != nil { errMsgList = append(errMsgList, v.EdbCode+"RefreshEdbData Err:"+tmpErr.Error()) continue } if resp.Ret != 200 { errMsgList = append(errMsgList, v.EdbCode+";RefreshEdbData Err:"+resp.Msg+";ErrMsg:"+resp.ErrMsg) continue } } } fmt.Println("来源:", source, ";子来源:", subSource, "刷新结束") return err } // getRefreshFrequencyCondition // @Description: 根据时间和刷新频率获取条件 // @author: Roc // @datetime 2024-01-09 16:27:11 // @param now time.Time // @param refreshFrequency string // @return condition string // @return pars []interface{} // @return isHandler bool func getRefreshFrequencyCondition(now time.Time, refreshFrequency string) (condition string, pars []interface{}, isHandler bool) { isHandler = true var dayNum int var isLastDay bool //刷新频率,枚举值:每自然日、每交易日、每周、每旬、每月、每季、每半年、每年 switch refreshFrequency { case "每自然日": // 自然日不需要额外条件 return case "每交易日": // 周六日不处理 if now.Weekday() == time.Saturday || now.Weekday() == time.Sunday { isHandler = false } return case "每周": currWeekDay := now.Weekday() if currWeekDay == time.Sunday { currWeekDay = 7 isLastDay = true } dayNum = int(currWeekDay) case "每旬": currDay := now.Day() if currDay <= 10 { dayNum = currDay // 如果是这旬的最后一天 if currDay == 10 { isLastDay = true } } else if currDay <= 20 { dayNum = currDay - 10 // 如果是这旬的最后一天 if currDay == 20 { isLastDay = true } } else { dayNum = currDay - 20 // 当月的最后一天 monthLastDay := time.Date(now.Year(), now.Month()+1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1) // 如果是这旬的最后一天 if currDay == monthLastDay.Day() { isLastDay = true } } case "每月": // 当前日期 currDay := now.Day() dayNum = currDay // 当期的最后一天 monthLastDay := time.Date(now.Year(), now.Month()+1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1) // 如果是这期的最后一天 if currDay == monthLastDay.Day() { isLastDay = true } case "每季": // 当期的第一天 ; 当期的最后一天 var startDay, endDay time.Time currMonth := now.Month() currDay := now.Day() if currMonth <= 3 { // 当季的第一天 startDay = time.Date(now.Year(), 1, 1, 0, 0, 0, 0, time.Local) // 当季的最后一天 endDay = time.Date(now.Year(), 4, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1) } else if currMonth <= 6 { // 当期的第一天 startDay = time.Date(now.Year(), 4, 1, 0, 0, 0, 0, time.Local) // 当期的最后一天 endDay = time.Date(now.Year(), 7, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1) } else if currMonth <= 9 { // 当期的第一天 startDay = time.Date(now.Year(), 7, 1, 0, 0, 0, 0, time.Local) // 当期的最后一天 endDay = time.Date(now.Year(), 10, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1) } else { // 当期的第一天 startDay = time.Date(now.Year(), 10, 1, 0, 0, 0, 0, time.Local) // 当期的最后一天 endDay = time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1) } // 计算这期的第一天和当日的天数 dayNum = utils.GetTimeSubDay(startDay, now) + 1 // 如果是这期的最后一天 if currMonth == endDay.Month() && currDay == endDay.Day() { isLastDay = true } case "每半年": // 当期的第一天 ; 当期的最后一天 var startDay, endDay time.Time currMonth := now.Month() currDay := now.Day() if currMonth <= 6 { // 当期的第一天 startDay = time.Date(now.Year(), 1, 1, 0, 0, 0, 0, time.Local) // 当期的最后一天 endDay = time.Date(now.Year(), 7, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1) } else { // 当期的第一天 startDay = time.Date(now.Year(), 7, 1, 0, 0, 0, 0, time.Local) // 当期的最后一天 endDay = time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1) } // 计算这期的第一天和当日的天数 dayNum = utils.GetTimeSubDay(startDay, now) + 1 // 如果是这期的最后一天 if currMonth == endDay.Month() && currDay == endDay.Day() { isLastDay = true } case "每年": currMonth := now.Month() currDay := now.Day() // 当期的第一天 startDay := time.Date(now.Year(), 1, 1, 0, 0, 0, 0, time.Local) // 当期的最后一天 endDay := time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1) // 计算这期的第一天和当日的天数 dayNum = utils.GetTimeSubDay(startDay, now) + 1 // 如果是这期的最后一天 if currMonth == endDay.Month() && currDay == endDay.Day() { isLastDay = true } } // 如果是这期的最后一天,那么就是判断refresh_frequency_day是否配置为0,或者配置的天数大于这期的最大天数 if isLastDay { condition += ` AND ( refresh_frequency_day = ? OR refresh_frequency_day >= ? )` pars = append(pars, 0, dayNum) } else { // 如果不是这期的最后一天,那么就是判断refresh_frequency_day是否等于配置的天数 condition += ` AND refresh_frequency_day = ? ` pars = append(pars, dayNum) } return } // getPreviousHalfHour // @Description: 根据传入的时间获取该时间的前整半小时的时间字符串 // @author: Roc // @datetime 2024-01-09 14:27:34 // @param now time.Time // @return string func getPreviousHalfHour(now time.Time) string { minute := now.Minute() if minute >= 30 { return fmt.Sprintf("%02d:%02d", now.Hour(), 30) } return fmt.Sprintf("%02d:%02d", now.Hour(), 0) } // 根据配置把钢联化工和wind指标设置成禁止刷新 func DisableEdbRefresh(cont context.Context) (err error) { //设置刷新key,如果没有执行完 报错提示 cacheKey := "eta_task:DisableEdbRefresh" deleteCache := true defer func() { if deleteCache { utils.Rc.Delete(cacheKey) } if err != nil { tips := "DisableEdbRefresh-钢联化工和wind指标设置成禁止刷新失败, ErrMsg:\n" + err.Error() utils.FileLog.Info(tips) go alarm_msg.SendAlarmMsg(tips, 3) } }() if !utils.Rc.SetNX(cacheKey, 1, 2*time.Minute) { deleteCache = false err = fmt.Errorf("系统处理中,请稍后重试!") return } //查询配置,如果未开启自动设置禁止刷新,则无需处理 obj := new(models.BusinessConf) conf, err := obj.GetItemByConfKey("EdbStopRefreshRule") if err != nil { if err.Error() == utils.ErrNoRow() { err = fmt.Errorf("未找到配置项,无需处理") return } return } //将json转为结构体 rule := new(models.EdbStopRefreshRule) err = json.Unmarshal([]byte(conf.ConfVal), rule) if err != nil { return } //判断是否开启自动设置禁止刷新 if rule.IsOpen == 0 { return } //获取当前时间 now := time.Now() if rule.BaseIndexStopDays > 0 { //设置数据源钢联化工指标禁止更新 baseIndexEndDate := now.AddDate(0, 0, -rule.BaseIndexStopDays+1).Format(utils.FormatDate) // 查询钢联化工指标,查询创建时间在baseIndexStartDate前,的所有钢联化工指标,分批查询,先查总数,再查列表 totalCount, e := data_manage.GetCountRefreshBaseFromMysteelChemicalIndexItemByCreateTime(baseIndexEndDate) if e != nil { err = fmt.Errorf("查询钢联化工指标总数失败:%v", e) return } //分页查询 pageSize := 100 pageNum := (int(totalCount) + 99) / pageSize // 使用整数除法,并添加一页以防有余数 stopRefreshIds := make([]int32, 0) for i := 0; i < pageNum; i++ { start := i * pageSize indexItems, e := data_manage.GetRefreshBaseFromMysteelChemicalIndexItemByCreateTime(baseIndexEndDate, start, pageSize) if e != nil { err = fmt.Errorf("分页查询钢联化工指标失败:%v", e) return } if len(indexItems) == 0 { continue } indexCodeList := make([]string, 0) for _, indexItem := range indexItems { indexCodeList = append(indexCodeList, indexItem.IndexCode) } condition := ` AND source=? AND edb_code in (` + utils.GetOrmInReplace(len(indexCodeList)) + `)` var pars []interface{} pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, indexCodeList) // 查询指标库里这些指标是否已创建 edbList, e := data_manage.GetEdbInfoByCondition(condition, pars, 0) if e != nil { err = fmt.Errorf("查询指标库里这些指标是否被创建失败:%v", e) return } edbMap := make(map[string]bool) for _, edb := range edbList { edbMap[edb.EdbCode] = true } for _, indexItem := range indexItems { // 判断指标是否被创建 if _, ok := edbMap[indexItem.IndexCode]; !ok { stopRefreshIds = append(stopRefreshIds, indexItem.BaseFromMysteelChemicalIndexId) if len(stopRefreshIds) > 100 { err = data_manage.SetStopRefreshMysteelChemicalIndex(stopRefreshIds) if err != nil { err = fmt.Errorf("设置禁止刷新失败:%v", err) return } stopRefreshIds = make([]int32, 0) } } } } // 未被创建,则设置禁止刷新 if len(stopRefreshIds) > 0 { err = data_manage.SetStopRefreshMysteelChemicalIndex(stopRefreshIds) if err != nil { err = fmt.Errorf("设置禁止刷新失败:%v", err) return } } } if rule.EdbStopDays > 0 { // 查询钢联和wind来源的指标 edbEndDate := now.AddDate(0, 0, -rule.EdbStopDays+1).Format(utils.FormatDate) condition := ` AND no_update=0 AND source in (?,?) AND ((create_time < ? and set_update_time is null) or set_update_time < ? )` var pars []interface{} pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_WIND, edbEndDate, edbEndDate) // 查询钢联化工指标和wind指标 分批查询,先查总数,再查列表 totalCount, e := data_manage.GetEdbInfoCountByCondition(condition, pars) if e != nil { err = fmt.Errorf("查询钢联化工指标总数失败:%v", e) return } //分页查询 pageSize := 100 pageNum := (int(totalCount) + 99) / pageSize // 使用整数除法,并添加一页以防有余数 stopRefreshIds := make([]int, 0) stopRefreshMysteelCode := make([]string, 0) fromEdbIdList := make([]int, 0) for i := 0; i < pageNum; i++ { start := i * pageSize edbItems, e := data_manage.GetEdbInfoPageByCondition(condition, pars, start, pageSize) if e != nil { err = fmt.Errorf("分页查询钢联化工指标失败:%v", e) return } if len(edbItems) == 0 { continue } edbInfoIds := make([]int, 0) fromEdbIdList = make([]int, 0) for _, item := range edbItems { edbInfoIds = append(edbInfoIds, item.EdbInfoId) } // 查询指标库里这些指标 引用情况 relationList, e := data_manage.GetEdbInfoRelationByEdbInfoIds(edbInfoIds) if e != nil { err = fmt.Errorf("查询指标库里这些指标是否被创建失败:%v", e) return } edbMap := make(map[int]struct{}) for _, item := range relationList { edbMap[item] = struct{}{} } for _, item := range edbItems { if _, ok := edbMap[item.EdbInfoId]; !ok { stopRefreshIds = append(stopRefreshIds, item.EdbInfoId) if item.Source == utils.DATA_SOURCE_MYSTEEL_CHEMICAL { stopRefreshMysteelCode = append(stopRefreshMysteelCode, item.EdbCode) } if item.EdbInfoType == 0 && item.EdbType == 1 { fromEdbIdList = append(fromEdbIdList, item.EdbInfoId) } // 更新指标禁止刷新状态 if len(stopRefreshIds) > 100 { // 查询相关的计算指标 calculateEdbIdList := make([]int, 0) if len(fromEdbIdList) > 0 { hasFind := make(map[int]struct{}) calculateEdbIdList, err = GetCalculateEdbByFromEdbInfo(fromEdbIdList, calculateEdbIdList, hasFind) if err != nil { err = fmt.Errorf("查询计算指标信息失败:%v", err) return } } err = data_manage.ModifyEdbUpdateStatus(stopRefreshIds, stopRefreshMysteelCode, calculateEdbIdList) if err != nil { err = fmt.Errorf("更新指标禁止刷新状态失败:%v", err) return } stopRefreshIds = []int{} stopRefreshMysteelCode = []string{} } } } } // 更新指标禁止刷新状态 if len(stopRefreshIds) > 0 { // 查询相关的计算指标 calculateEdbIdList := make([]int, 0) if len(fromEdbIdList) > 0 { hasFind := make(map[int]struct{}) calculateEdbIdList, err = GetCalculateEdbByFromEdbInfo(fromEdbIdList, calculateEdbIdList, hasFind) if err != nil { err = fmt.Errorf("查询计算指标信息失败:%v", err) return } } err = data_manage.ModifyEdbUpdateStatus(stopRefreshIds, stopRefreshMysteelCode, calculateEdbIdList) if err != nil { err = fmt.Errorf("更新指标禁止刷新状态失败:%v", err) return } } } return }