package services import ( "context" "eta_gn/eta_task/models/data_manage/edb_refresh" "eta_gn/eta_task/services/alarm_msg" "eta_gn/eta_task/services/data" "eta_gn/eta_task/utils" "fmt" "strings" "sync" "time" ) // Function to merge two maps func mergeMaps(dst map[string][]*edb_refresh.EdbInfoListAndRefreshConfig, src map[string][]*edb_refresh.EdbInfoListAndRefreshConfig) (newMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig) { if dst == nil { return src } if src == nil { return dst } newMap = dst for k, v := range src { if newK, ok := newMap[k]; ok { newK = append(newK, v...) newMap[k] = newK } else { newMap[k] = v } } return newMap } // getDefaultRefreshData // @Description: 根据默认配置获取需要刷新的指标列表 // @author: Roc // @datetime 2024-01-10 13:55:38 // @param now time.Time // @return sourceEdbInfoListMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig // @return err error func getDefaultRefreshData(now time.Time) (sourceEdbInfoListMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig, err error) { errMsgList := make([]string, 0) defer func() { if err != nil { fmt.Println(err) } }() sourceEdbInfoListMap = make(map[string][]*edb_refresh.EdbInfoListAndRefreshConfig) currTimeStr := getPreviousHalfHour(now) fmt.Println(currTimeStr) // 所有默认配置刷新项 list := make([]*edb_refresh.EdbRefreshDefaultConfig, 0) //刷新频率,枚举值:每自然日、每交易日、每周、每旬、每月、每季、每半年、每年 refreshFrequencyList := []string{"每自然日", "每交易日", "每周", "每旬", "每月", "每季", "每半年", "每年"} //conf, err := models.GetBusinessConf() //if err != nil { // fmt.Println(err) // utils.FileLog.Info("获取业务配置失败,Err:" + err.Error()) // return //} // 获取钢联化工的数据获取方式 //mySteelChemicalDataMethod := "excel" //if v, ok := conf["MySteelDataMethod"]; ok { // if v == "api" { // mySteelChemicalDataMethod = v // } //} //utils.FileLog.Info("获取业务配置,MySteelDataMethod:" + mySteelChemicalDataMethod) // 获取各个刷新频率的配置 for _, refreshFrequency := range refreshFrequencyList { // 获取刷新频率条件 condition, pars, isHandler := getRefreshFrequencyCondition(now, refreshFrequency) if !isHandler { // 可能是非交易日,所以过滤不处理 continue } condition += ` AND refresh_frequency = ? AND refresh_time = ?` pars = append(pars, refreshFrequency, currTimeStr) // 这两个是excel的数据源,他是从公共机更新的,需要过滤掉 //if mySteelChemicalDataMethod == "api" { // // 钢联化工使用api的方式获取数据的,不需要过滤 // condition += ` AND source not in (?)` // pars = append(pars, utils.DATA_SOURCE_YS) //} else { // condition += ` AND source not in (?,?)` // pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS) //} tmpList, tmpErr := edb_refresh.GetListByCondition(condition, pars) if tmpErr != nil { err = tmpErr return } list = append(list, tmpList...) } // 更新的单元格数 refreshDataNumMap := make(map[string]*edb_refresh.EdbRefreshDefaultConfig) // 数据源刷新频度的列表数组 refreshDataFrequencyListMap := make(map[int]map[int][]string) wgNum := 0 // 处理待刷新的数据源,整理成数组,方便获取对应的指标 for _, item := range list { // 更新的单元格数 key := fmt.Sprintf("%d_%d_%s", item.Source, item.SubSource, item.Frequency) refreshDataNumMap[key] = item // 数据源刷新频度的列表数组 subSourceFrequencyList, ok := refreshDataFrequencyListMap[item.Source] if !ok { subSourceFrequencyList = make(map[int][]string) } frequencyList, ok := subSourceFrequencyList[item.SubSource] if !ok { wgNum++ frequencyList = make([]string, 0) } subSourceFrequencyList[item.SubSource] = append(frequencyList, item.Frequency) refreshDataFrequencyListMap[item.Source] = subSourceFrequencyList } for source, subSourceFrequencyListMap := range refreshDataFrequencyListMap { for subSource, frequencyList := range subSourceFrequencyListMap { edbList, tmpErr := edb_refresh.GetDefaultRefreshEdbInfoListBySourceAndSubSource(source, subSource, frequencyList) if tmpErr != nil { errMsgList = append(errMsgList, fmt.Sprint("source:", source, "subSource:", subSource, "frequencyList:", strings.Join(frequencyList, ","), "err:", tmpErr.Error())) } for _, v := range edbList { // 数据刷新的期数 dataRefreshNum := utils.DATA_REFRESH key := fmt.Sprintf("%d_%d_%s", v.Source, v.SubSource, v.Frequency) if edbRefreshDefaultConfig, ok := refreshDataNumMap[key]; ok { if edbRefreshDefaultConfig.RefreshAllData == 1 { // 刷新所有数据期数 dataRefreshNum = 0 } else if edbRefreshDefaultConfig.RefreshDataNum > 0 { // dataRefreshNum = edbRefreshDefaultConfig.RefreshDataNum } } v.DataRefreshNum = dataRefreshNum } key := fmt.Sprint(source, "_", subSource) sourceEdbInfoListMap[key] = edbList } } fmt.Println("Get Refresh End") return } // getConfigRefreshData // @Description: 根据指标配置获取需要刷新的指标列表 // @author: Roc // @datetime 2024-01-10 13:55:59 // @param now time.Time // @return sourceEdbInfoListMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig // @return err error func getConfigRefreshData(now time.Time) (sourceEdbInfoListMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig, err error) { defer func() { if err != nil { fmt.Println(err) } }() sourceEdbInfoListMap = make(map[string][]*edb_refresh.EdbInfoListAndRefreshConfig) currTimeStr := getPreviousHalfHour(now) // 所有默认配置刷新项 list := make([]*edb_refresh.EdbRefreshConfig, 0) //刷新频率,枚举值:每自然日、每交易日、每周、每旬、每月、每季、每半年、每年 refreshFrequencyList := []string{"每自然日", "每交易日", "每周", "每旬", "每月", "每季", "每半年", "每年"} // 获取各个刷新频率的配置 for _, refreshFrequency := range refreshFrequencyList { // 获取刷新频率条件 condition, pars, isHandler := getRefreshFrequencyCondition(now, refreshFrequency) if !isHandler { // 可能是非交易日,所以过滤不处理 continue } condition += ` AND refresh_frequency = ? AND refresh_time = ?` pars = append(pars, refreshFrequency, currTimeStr) tmpList, tmpErr := edb_refresh.GetEdbRefreshConfigListByCondition(condition, pars) if tmpErr != nil { err = tmpErr return } list = append(list, tmpList...) } // 配置列表 configIdEdbRefreshConfigMap := make(map[int]*edb_refresh.EdbRefreshConfig) configIdList := make([]int, 0) for _, v := range list { configIdList = append(configIdList, v.EdbRefreshConfigId) configIdEdbRefreshConfigMap[v.EdbRefreshConfigId] = v } //conf, err := models.GetBusinessConf() //if err != nil { // fmt.Println(err) // return //} // 获取钢联化工的数据获取方式 //mySteelChemicalDataMethod := "excel" //if v, ok := conf["MySteelDataMethod"]; ok { // if v == "api" { // mySteelChemicalDataMethod = v // } //} // 当钢联的数据获取方式是api时,不用过滤 var sourceList []int //if mySteelChemicalDataMethod == "api" { // sourceList = []int{utils.DATA_SOURCE_YS} //} else { // sourceList = []int{utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS} //} edbInfoList, err := edb_refresh.GetConfigRefreshEdbInfoListBySourceAndSubSource(sourceList, configIdList) if err != nil { return } for _, v := range edbInfoList { key := fmt.Sprint(v.Source, "_", v.SubSource) tmpList, ok := sourceEdbInfoListMap[key] if !ok { tmpList = make([]*edb_refresh.EdbInfoListAndRefreshConfig, 0) } // 数据刷新的期数 dataRefreshNum := utils.DATA_REFRESH if edbRefreshConfig, ok2 := configIdEdbRefreshConfigMap[v.EdbRefreshConfigId]; ok2 { if edbRefreshConfig.RefreshAllData == 1 { // 刷新所有数据期数 dataRefreshNum = 0 } else if edbRefreshConfig.RefreshDataNum > 0 { // dataRefreshNum = edbRefreshConfig.RefreshDataNum } } v.DataRefreshNum = dataRefreshNum sourceEdbInfoListMap[key] = append(tmpList, v) } fmt.Println("Get ConfigRefreshData End") return } // BaseRefreshData // @Description: 基础数据刷新 // @author: Roc // @datetime 2024-01-09 16:27:45 // @param wg *sync.WaitGroup // @return err error func BaseRefreshData(wg *sync.WaitGroup, source, subSource int, items []*edb_refresh.EdbInfoListAndRefreshConfig) (err error) { errMsgList := make([]string, 0) defer func() { if err != nil { fmt.Println("来源:", source, ";子来源:", subSource, ";BaseRefreshData Err:"+err.Error()) go alarm_msg.SendAlarmMsg(fmt.Sprint("来源:", source, ";子来源:", subSource, ";BaseRefreshData ErrMsg:"+err.Error()), 3) } if len(errMsgList) > 0 { errMsg := fmt.Sprint("来源:", source, ";子来源:", subSource, ";BaseRefreshData Err:"+strings.Join(errMsgList, "\n")) fmt.Println(errMsg) go alarm_msg.SendAlarmMsg(errMsg, 3) } wg.Done() }() // 数据刷新的期数 dataRefreshNum := utils.DATA_REFRESH // 是否从最开始的日期更新 //var isRefreshByStartDate bool for _, v := range items { // 如果暂停更新,那就过滤 if v.NoUpdate == 1 { continue } if v.DataRefreshNum > 0 { dataRefreshNum = v.DataRefreshNum } startDate := "" //if isRefreshByStartDate { // startDate = v.StartDate.Format(utils.FormatDate) //} else { if v.EndDate.IsZero() { startDate = utils.BaseEdbRefreshStartDate } else { if v.Frequency == "日度" { startDate = v.EndDate.AddDate(0, 0, -dataRefreshNum).Format(utils.FormatDate) } else if v.Frequency == "周度" { startDate = v.EndDate.AddDate(0, 0, -(dataRefreshNum * 7)).Format(utils.FormatDate) } else if v.Frequency == "旬度" { startDate = v.EndDate.AddDate(0, 0, -(dataRefreshNum * 10)).Format(utils.FormatDate) } else if v.Frequency == "月度" { startDate = v.EndDate.AddDate(0, -dataRefreshNum, 0).Format(utils.FormatDate) } else if v.Frequency == "季度" { startDate = v.EndDate.AddDate(0, -dataRefreshNum*3, 0).Format(utils.FormatDate) } else if v.Frequency == "半年度" { startDate = v.EndDate.AddDate(0, -dataRefreshNum*6, 0).Format(utils.FormatDate) } else if v.Frequency == "年度" { startDate = v.EndDate.AddDate(-dataRefreshNum, 0, 0).Format(utils.FormatDate) } else { startDate = v.EndDate.AddDate(0, 0, -utils.DATA_REFRESH).Format(utils.FormatDate) } } //} fmt.Println(startDate) // 数据更新 resp, tmpErr := data.RefreshEdbData(v.EdbInfoId, v.Source, v.SubSource, v.EdbCode, startDate) if tmpErr != nil { errMsgList = append(errMsgList, v.EdbCode+"RefreshEdbData Err:"+tmpErr.Error()) continue } if resp.Ret != 200 { errMsgList = append(errMsgList, v.EdbCode+";RefreshEdbData Err:"+resp.Msg+";ErrMsg:"+resp.ErrMsg) continue } } fmt.Println("来源:", source, ";子来源:", subSource, "刷新结束") return err } // getRefreshFrequencyCondition // @Description: 根据时间和刷新频率获取条件 // @author: Roc // @datetime 2024-01-09 16:27:11 // @param now time.Time // @param refreshFrequency string // @return condition string // @return pars []interface{} // @return isHandler bool func getRefreshFrequencyCondition(now time.Time, refreshFrequency string) (condition string, pars []interface{}, isHandler bool) { isHandler = true var dayNum int var isLastDay bool //刷新频率,枚举值:每自然日、每交易日、每周、每旬、每月、每季、每半年、每年 switch refreshFrequency { case "每自然日": // 自然日不需要额外条件 return case "每交易日": // 周六日不处理 if now.Weekday() == time.Saturday || now.Weekday() == time.Sunday { isHandler = false } return case "每周": currWeekDay := now.Weekday() if currWeekDay == time.Sunday { currWeekDay = 7 isLastDay = true } dayNum = int(currWeekDay) case "每旬": currDay := now.Day() if currDay <= 10 { dayNum = currDay // 如果是这旬的最后一天 if currDay == 10 { isLastDay = true } } else if currDay <= 20 { dayNum = currDay - 10 // 如果是这旬的最后一天 if currDay == 20 { isLastDay = true } } else { dayNum = currDay - 20 // 当月的最后一天 monthLastDay := time.Date(now.Year(), now.Month()+1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1) // 如果是这旬的最后一天 if currDay == monthLastDay.Day() { isLastDay = true } } case "每月": // 当前日期 currDay := now.Day() dayNum = currDay // 当期的最后一天 monthLastDay := time.Date(now.Year(), now.Month()+1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1) // 如果是这期的最后一天 if currDay == monthLastDay.Day() { isLastDay = true } case "每季": // 当期的第一天 ; 当期的最后一天 var startDay, endDay time.Time currMonth := now.Month() currDay := now.Day() if currMonth <= 3 { // 当季的第一天 startDay = time.Date(now.Year(), 1, 1, 0, 0, 0, 0, time.Local) // 当季的最后一天 endDay = time.Date(now.Year(), 4, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1) } else if currMonth <= 6 { // 当期的第一天 startDay = time.Date(now.Year(), 4, 1, 0, 0, 0, 0, time.Local) // 当期的最后一天 endDay = time.Date(now.Year(), 7, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1) } else if currMonth <= 9 { // 当期的第一天 startDay = time.Date(now.Year(), 7, 1, 0, 0, 0, 0, time.Local) // 当期的最后一天 endDay = time.Date(now.Year(), 10, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1) } else { // 当期的第一天 startDay = time.Date(now.Year(), 10, 1, 0, 0, 0, 0, time.Local) // 当期的最后一天 endDay = time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1) } // 计算这期的第一天和当日的天数 dayNum = utils.GetTimeSubDay(startDay, now) + 1 // 如果是这期的最后一天 if currMonth == endDay.Month() && currDay == endDay.Day() { isLastDay = true } case "每半年": // 当期的第一天 ; 当期的最后一天 var startDay, endDay time.Time currMonth := now.Month() currDay := now.Day() if currMonth <= 6 { // 当期的第一天 startDay = time.Date(now.Year(), 1, 1, 0, 0, 0, 0, time.Local) // 当期的最后一天 endDay = time.Date(now.Year(), 7, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1) } else { // 当期的第一天 startDay = time.Date(now.Year(), 7, 1, 0, 0, 0, 0, time.Local) // 当期的最后一天 endDay = time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1) } // 计算这期的第一天和当日的天数 dayNum = utils.GetTimeSubDay(startDay, now) + 1 // 如果是这期的最后一天 if currMonth == endDay.Month() && currDay == endDay.Day() { isLastDay = true } case "每年": currMonth := now.Month() currDay := now.Day() // 当期的第一天 startDay := time.Date(now.Year(), 1, 1, 0, 0, 0, 0, time.Local) // 当期的最后一天 endDay := time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1) // 计算这期的第一天和当日的天数 dayNum = utils.GetTimeSubDay(startDay, now) + 1 // 如果是这期的最后一天 if currMonth == endDay.Month() && currDay == endDay.Day() { isLastDay = true } } // 如果是这期的最后一天,那么就是判断refresh_frequency_day是否配置为0,或者配置的天数大于这期的最大天数 if isLastDay { condition += ` AND ( refresh_frequency_day = ? OR refresh_frequency_day >= ? )` pars = append(pars, 0, dayNum) } else { // 如果不是这期的最后一天,那么就是判断refresh_frequency_day是否等于配置的天数 condition += ` AND refresh_frequency_day = ? ` pars = append(pars, dayNum) } return } // getPreviousHalfHour // @Description: 根据传入的时间获取该时间的前整半小时的时间字符串 // @author: Roc // @datetime 2024-01-09 14:27:34 // @param now time.Time // @return string func getPreviousHalfHour(now time.Time) string { minute := now.Minute() if minute >= 30 { return fmt.Sprintf("%02d:%02d", now.Hour(), 30) } return fmt.Sprintf("%02d:%02d", now.Hour(), 0) } // ConfigRefreshDataGn 根据配置刷新指标数据 func ConfigRefreshDataGn(cont context.Context) (err error) { errMsgList := make([]string, 0) defer func() { if err != nil { fmt.Println(err) } }() now := time.Now() //now = time.Date(2023, 12, 31, 16, 50, 59, 0, time.Local) defaultSourceEdbInfoListMap, err := getDefaultRefreshData(now) if err != nil { errMsgList = append(errMsgList, "获取默认刷新数据失败,Err:"+err.Error()) } sourceEdbInfoListMap, err := getConfigRefreshData(now) if err != nil { errMsgList = append(errMsgList, "获取指标配置刷新数据失败,Err:"+err.Error()) } // 将两个合并 allSourceEdbInfoListMap := mergeMaps(defaultSourceEdbInfoListMap, sourceEdbInfoListMap) wgNum := len(allSourceEdbInfoListMap) if wgNum <= 0 { return } wg := sync.WaitGroup{} wg.Add(wgNum) for _, edbList := range allSourceEdbInfoListMap { if edbList == nil { wg.Done() continue } if len(edbList) != 0 { go BaseRefreshData(&wg, edbList[0].Source, edbList[0].SubSource, edbList) } } wg.Wait() fmt.Println("Refresh End") return }