ソースを参照

数据源刷新

hsun 6 ヶ月 前
コミット
69c45acc90

+ 6 - 0
models/data_manage/edb_refresh/edb_refresh_default_config.go

@@ -137,3 +137,9 @@ WHERE a.source = ? AND a.sub_source = ? AND a.frequency IN (` + utils.GetOrmInRe
 	err = global.DmSQL["data"].Raw(sql, source, subSource, frequencyList).Find(&list).Error
 	return
 }
+
+func GetDistinctDefaultRefreshSourceIds() (sourceIds []int, err error) {
+	sql := `SELECT DISTINCT source FROM edb_refresh_default_config`
+	err = global.DmSQL["data"].Raw(sql).Scan(&sourceIds).Error
+	return
+}

+ 89 - 0
services/data/edb_info.go

@@ -4,6 +4,7 @@ import (
 	"context"
 	"errors"
 	"eta_gn/eta_task/models/data_manage"
+	"eta_gn/eta_task/models/data_manage/edb_refresh"
 	"eta_gn/eta_task/services/alarm_msg"
 	"eta_gn/eta_task/utils"
 	"fmt"
@@ -1367,3 +1368,91 @@ func RefreshDataFromTradeAnalysis(wg *sync.WaitGroup) (err error) {
 	utils.FileLog.Info(fmt.Sprintf("持仓分析指标刷新结束: %s", time.Now().Format(utils.FormatDateTime)))
 	return err
 }
+
+// NoneConfigRefreshDataGn 刷新未配置的所有指标来源
+func NoneConfigRefreshDataGn(wg *sync.WaitGroup) (err error) {
+	errMsgList := make([]string, 0)
+	defer func() {
+		if err != nil {
+			tips := fmt.Sprintf("NoneConfigRefreshDataGn ErrMsg: %v", err)
+			utils.FileLog.Info(tips)
+		}
+		if len(errMsgList) > 0 {
+			errMsg := "NoneConfigRefreshDataGn Err:" + strings.Join(errMsgList, "\n")
+			utils.FileLog.Info(errMsg)
+		}
+		wg.Done()
+	}()
+
+	// 过滤出无刷新配置的指标数据来源
+	sourceRefreshIds := make([]int, 0)
+	{
+		cond := fmt.Sprintf(" AND is_base = ?")
+		pars := make([]interface{}, 0)
+		pars = append(pars, 1)
+		list, e := data_manage.GetEdbSourceItemsByCondition(cond, pars, []string{}, "")
+		if e != nil {
+			err = fmt.Errorf("获取需要刷新的基础指标来源失败, %v", e)
+			return
+		}
+		sourceBaseIds := make([]int, 0)
+		for _, v := range list {
+			sourceBaseIds = append(sourceBaseIds, v.EdbSourceId)
+		}
+
+		// 获取默认刷新中配置过的数据来源
+		hasConfigIds, e := edb_refresh.GetDistinctDefaultRefreshSourceIds()
+		if e != nil {
+			err = fmt.Errorf("获取已配置过刷新的指标来源失败, %v", e)
+			return
+		}
+
+		// 取两个[]int的差集
+		sourceRefreshIds = utils.MinusInt(sourceBaseIds, hasConfigIds)
+	}
+	if len(sourceRefreshIds) == 0 {
+		utils.FileLog.Info("无未配置刷新时间的指标来源需要刷新")
+		return
+	}
+
+	// 根据数据源依次刷新
+	for _, sourceId := range sourceRefreshIds {
+		cond := ` AND source = ? `
+		pars := make([]interface{}, 0)
+		pars = append(pars, sourceId)
+
+		// 获取指标
+		items, e := data_manage.GetEdbInfoByCondition(cond, pars, 0)
+		if e != nil {
+			err = fmt.Errorf("获取指标失败, Source: %d, Err: %v", sourceId, e)
+			return
+		}
+
+		for _, v := range items {
+			startDate := ""
+			if v.Frequency == "日度" {
+				startDate = v.EndDate.AddDate(0, 0, -utils.DATA_REFRESH).Format(utils.FormatDate)
+			} else if v.Frequency == "周度" {
+				startDate = v.EndDate.AddDate(0, 0, -(utils.DATA_REFRESH * 7)).Format(utils.FormatDate)
+			} else if v.Frequency == "月度" {
+				startDate = v.EndDate.AddDate(0, -utils.DATA_REFRESH, 0).Format(utils.FormatDate)
+			} else if v.Frequency == "季度" {
+				startDate = v.EndDate.AddDate(0, -utils.DATA_REFRESH*3, 0).Format(utils.FormatDate)
+			} else if v.Frequency == "年度" {
+				startDate = v.EndDate.AddDate(-utils.DATA_REFRESH, 0, 0).Format(utils.FormatDate)
+			} else {
+				startDate = v.EndDate.AddDate(0, 0, -utils.DATA_REFRESH).Format(utils.FormatDate)
+			}
+			resp, e := RefreshEdbData(v.EdbInfoId, v.Source, v.SubSource, v.EdbCode, startDate)
+			if e != nil {
+				errMsgList = append(errMsgList, fmt.Sprintf("RefreshEdbData Code: %s Err: %v\n", v.EdbCode, e))
+				continue
+			}
+			if resp.Ret != 200 {
+				errMsgList = append(errMsgList, fmt.Sprintf("RefreshEdbData Code: %s Err: %s, ErrMsg: %s\n", v.EdbCode, resp.Msg, resp.ErrMsg))
+				continue
+			}
+		}
+	}
+	return
+}

+ 186 - 242
services/edb_refresh.go

@@ -21,52 +21,52 @@ import (
 // @datetime 2024-01-10 13:55:05
 // @param cont context.Context
 // @return err error
-func ConfigRefreshData(cont context.Context) (err error) {
-	errMsgList := make([]string, 0)
-	defer func() {
-		if err != nil {
-			fmt.Println(err)
-		}
-	}()
-	// 一期是只做wind、同花顺、钢联、有色
-
-	now := time.Now()
-	//now = time.Date(2023, 12, 31, 19, 10, 59, 0, time.Local)
-	//now = time.Date(2023, 12, 31, 16, 50, 59, 0, time.Local)
-	defaultSourceEdbInfoListMap, err := getDefaultRefreshData(now)
-	if err != nil {
-		errMsgList = append(errMsgList, "获取默认刷新数据失败,Err:"+err.Error())
-	}
-	sourceEdbInfoListMap, err := getConfigRefreshData(now)
-	if err != nil {
-		errMsgList = append(errMsgList, "获取指标配置刷新数据失败,Err:"+err.Error())
-	}
-
-	// 将两个合并
-	allSourceEdbInfoListMap := mergeMaps(defaultSourceEdbInfoListMap, sourceEdbInfoListMap)
-	wgNum := len(allSourceEdbInfoListMap)
-	if wgNum <= 0 {
-		return
-	}
-	wg := sync.WaitGroup{}
-	wg.Add(wgNum)
-
-	for _, edbList := range allSourceEdbInfoListMap {
-		if edbList == nil {
-			wg.Done()
-			continue
-		}
-		if len(edbList) != 0 {
-			go BaseRefreshData(&wg, edbList[0].Source, edbList[0].SubSource, edbList)
-		}
-	}
-
-	wg.Wait()
-
-	fmt.Println("Refresh End")
-
-	return
-}
+//func ConfigRefreshData(cont context.Context) (err error) {
+//	errMsgList := make([]string, 0)
+//	defer func() {
+//		if err != nil {
+//			fmt.Println(err)
+//		}
+//	}()
+//	// 一期是只做wind、同花顺、钢联、有色
+//
+//	now := time.Now()
+//	//now = time.Date(2023, 12, 31, 19, 10, 59, 0, time.Local)
+//	//now = time.Date(2023, 12, 31, 16, 50, 59, 0, time.Local)
+//	defaultSourceEdbInfoListMap, err := getDefaultRefreshData(now)
+//	if err != nil {
+//		errMsgList = append(errMsgList, "获取默认刷新数据失败,Err:"+err.Error())
+//	}
+//	sourceEdbInfoListMap, err := getConfigRefreshData(now)
+//	if err != nil {
+//		errMsgList = append(errMsgList, "获取指标配置刷新数据失败,Err:"+err.Error())
+//	}
+//
+//	// 将两个合并
+//	allSourceEdbInfoListMap := mergeMaps(defaultSourceEdbInfoListMap, sourceEdbInfoListMap)
+//	wgNum := len(allSourceEdbInfoListMap)
+//	if wgNum <= 0 {
+//		return
+//	}
+//	wg := sync.WaitGroup{}
+//	wg.Add(wgNum)
+//
+//	for _, edbList := range allSourceEdbInfoListMap {
+//		if edbList == nil {
+//			wg.Done()
+//			continue
+//		}
+//		if len(edbList) != 0 {
+//			go BaseRefreshData(&wg, edbList[0].Source, edbList[0].SubSource, edbList)
+//		}
+//	}
+//
+//	wg.Wait()
+//
+//	fmt.Println("Refresh End")
+//
+//	return
+//}
 
 // Function to merge two maps
 func mergeMaps(dst map[string][]*edb_refresh.EdbInfoListAndRefreshConfig, src map[string][]*edb_refresh.EdbInfoListAndRefreshConfig) (newMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig) {
@@ -103,7 +103,6 @@ func getDefaultRefreshData(now time.Time) (sourceEdbInfoListMap map[string][]*ed
 			fmt.Println(err)
 		}
 	}()
-	// 一期是只做wind、同花顺、钢联、有色
 
 	sourceEdbInfoListMap = make(map[string][]*edb_refresh.EdbInfoListAndRefreshConfig)
 
@@ -116,21 +115,21 @@ func getDefaultRefreshData(now time.Time) (sourceEdbInfoListMap map[string][]*ed
 	//刷新频率,枚举值:每自然日、每交易日、每周、每旬、每月、每季、每半年、每年
 	refreshFrequencyList := []string{"每自然日", "每交易日", "每周", "每旬", "每月", "每季", "每半年", "每年"}
 
-	conf, err := models.GetBusinessConf()
-	if err != nil {
-		fmt.Println(err)
-		utils.FileLog.Info("获取业务配置失败,Err:" + err.Error())
-		return
-	}
+	//conf, err := models.GetBusinessConf()
+	//if err != nil {
+	//	fmt.Println(err)
+	//	utils.FileLog.Info("获取业务配置失败,Err:" + err.Error())
+	//	return
+	//}
 
 	// 获取钢联化工的数据获取方式
-	mySteelChemicalDataMethod := "excel"
-	if v, ok := conf["MySteelDataMethod"]; ok {
-		if v == "api" {
-			mySteelChemicalDataMethod = v
-		}
-	}
-	utils.FileLog.Info("获取业务配置,MySteelDataMethod:" + mySteelChemicalDataMethod)
+	//mySteelChemicalDataMethod := "excel"
+	//if v, ok := conf["MySteelDataMethod"]; ok {
+	//	if v == "api" {
+	//		mySteelChemicalDataMethod = v
+	//	}
+	//}
+	//utils.FileLog.Info("获取业务配置,MySteelDataMethod:" + mySteelChemicalDataMethod)
 	// 获取各个刷新频率的配置
 	for _, refreshFrequency := range refreshFrequencyList {
 		// 获取刷新频率条件
@@ -144,14 +143,14 @@ func getDefaultRefreshData(now time.Time) (sourceEdbInfoListMap map[string][]*ed
 		pars = append(pars, refreshFrequency, currTimeStr)
 
 		// 这两个是excel的数据源,他是从公共机更新的,需要过滤掉
-		if mySteelChemicalDataMethod == "api" {
-			// 钢联化工使用api的方式获取数据的,不需要过滤
-			condition += ` AND source not in (?)`
-			pars = append(pars, utils.DATA_SOURCE_YS)
-		} else {
-			condition += ` AND source not in (?,?)`
-			pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS)
-		}
+		//if mySteelChemicalDataMethod == "api" {
+		//	// 钢联化工使用api的方式获取数据的,不需要过滤
+		//	condition += ` AND source not in (?)`
+		//	pars = append(pars, utils.DATA_SOURCE_YS)
+		//} else {
+		//	condition += ` AND source not in (?,?)`
+		//	pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS)
+		//}
 		tmpList, tmpErr := edb_refresh.GetListByCondition(condition, pars)
 		if tmpErr != nil {
 			err = tmpErr
@@ -187,71 +186,28 @@ func getDefaultRefreshData(now time.Time) (sourceEdbInfoListMap map[string][]*ed
 	}
 
 	for source, subSourceFrequencyListMap := range refreshDataFrequencyListMap {
-		switch source {
-		case utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS:
-			// 只处理钢联化工使用api方式获取数据的情况
-			if mySteelChemicalDataMethod == "api" {
-				for subSource, frequencyList := range subSourceFrequencyListMap {
-					items, tmpErr := data_manage.GetBaseFromMysteelChemicalIndexItems(frequencyList)
-					if tmpErr != nil {
-						errMsgList = append(errMsgList, fmt.Sprint("source:", source, "subSource:", subSource, "frequencyList:", strings.Join(frequencyList, ","), "err:", tmpErr.Error()))
-					}
-					indexList := make([]*edb_refresh.EdbInfoListAndRefreshConfig, 0)
-
-					for _, v := range items {
-						tmpConf := new(edb_refresh.EdbInfoListAndRefreshConfig)
-						// 数据刷新的期数
-						dataRefreshNum := utils.DATA_REFRESH
-						key := fmt.Sprintf("%d_%d_%s", utils.DATA_SOURCE_MYSTEEL_CHEMICAL, 0, v.Frequency)
-						if edbRefreshDefaultConfig, ok := refreshDataNumMap[key]; ok {
-							if edbRefreshDefaultConfig.RefreshAllData == 1 { // 刷新所有数据期数
-								dataRefreshNum = 0
-							} else if edbRefreshDefaultConfig.RefreshDataNum > 0 { //
-								dataRefreshNum = edbRefreshDefaultConfig.RefreshDataNum
-							}
-						}
-						tmpConf.EdbCode = v.IndexCode
-						tmpConf.EdbName = v.IndexName
-						tmpConf.Source = utils.DATA_SOURCE_MYSTEEL_CHEMICAL
-						tmpConf.Frequency = v.Frequency
-						tmpConf.Unit = v.Unit
-						tmpConf.StartDate, err = time.Parse(utils.FormatDate, v.StartDate)
-						tmpConf.EndDate, err = time.Parse(utils.FormatDate, v.EndDate)
-						tmpConf.ClassifyId = int(v.BaseFromMysteelChemicalClassifyId)
-						tmpConf.DataRefreshNum = dataRefreshNum
-						tmpConf.EdbInfoId = v.EdbInfoId
-						indexList = append(indexList, tmpConf)
-					}
-
-					key := fmt.Sprint(source, "_", subSource)
-					sourceEdbInfoListMap[key] = indexList
-				}
+		for subSource, frequencyList := range subSourceFrequencyListMap {
+			edbList, tmpErr := edb_refresh.GetDefaultRefreshEdbInfoListBySourceAndSubSource(source, subSource, frequencyList)
+			if tmpErr != nil {
+				errMsgList = append(errMsgList, fmt.Sprint("source:", source, "subSource:", subSource, "frequencyList:", strings.Join(frequencyList, ","), "err:", tmpErr.Error()))
 			}
-			// 其他情况不处理
-		default:
-			for subSource, frequencyList := range subSourceFrequencyListMap {
-				edbList, tmpErr := edb_refresh.GetDefaultRefreshEdbInfoListBySourceAndSubSource(source, subSource, frequencyList)
-				if tmpErr != nil {
-					errMsgList = append(errMsgList, fmt.Sprint("source:", source, "subSource:", subSource, "frequencyList:", strings.Join(frequencyList, ","), "err:", tmpErr.Error()))
-				}
 
-				for _, v := range edbList {
-					// 数据刷新的期数
-					dataRefreshNum := utils.DATA_REFRESH
-					key := fmt.Sprintf("%d_%d_%s", v.Source, v.SubSource, v.Frequency)
-					if edbRefreshDefaultConfig, ok := refreshDataNumMap[key]; ok {
-						if edbRefreshDefaultConfig.RefreshAllData == 1 { // 刷新所有数据期数
-							dataRefreshNum = 0
-						} else if edbRefreshDefaultConfig.RefreshDataNum > 0 { //
-							dataRefreshNum = edbRefreshDefaultConfig.RefreshDataNum
-						}
+			for _, v := range edbList {
+				// 数据刷新的期数
+				dataRefreshNum := utils.DATA_REFRESH
+				key := fmt.Sprintf("%d_%d_%s", v.Source, v.SubSource, v.Frequency)
+				if edbRefreshDefaultConfig, ok := refreshDataNumMap[key]; ok {
+					if edbRefreshDefaultConfig.RefreshAllData == 1 { // 刷新所有数据期数
+						dataRefreshNum = 0
+					} else if edbRefreshDefaultConfig.RefreshDataNum > 0 { //
+						dataRefreshNum = edbRefreshDefaultConfig.RefreshDataNum
 					}
-					v.DataRefreshNum = dataRefreshNum
 				}
-
-				key := fmt.Sprint(source, "_", subSource)
-				sourceEdbInfoListMap[key] = edbList
+				v.DataRefreshNum = dataRefreshNum
 			}
+
+			key := fmt.Sprint(source, "_", subSource)
+			sourceEdbInfoListMap[key] = edbList
 		}
 	}
 
@@ -272,7 +228,6 @@ func getConfigRefreshData(now time.Time) (sourceEdbInfoListMap map[string][]*edb
 			fmt.Println(err)
 		}
 	}()
-	// 一期是只做wind、同花顺、钢联、有色
 
 	sourceEdbInfoListMap = make(map[string][]*edb_refresh.EdbInfoListAndRefreshConfig)
 
@@ -311,26 +266,26 @@ func getConfigRefreshData(now time.Time) (sourceEdbInfoListMap map[string][]*edb
 		configIdList = append(configIdList, v.EdbRefreshConfigId)
 		configIdEdbRefreshConfigMap[v.EdbRefreshConfigId] = v
 	}
-	conf, err := models.GetBusinessConf()
-	if err != nil {
-		fmt.Println(err)
-		return
-	}
+	//conf, err := models.GetBusinessConf()
+	//if err != nil {
+	//	fmt.Println(err)
+	//	return
+	//}
 
 	// 获取钢联化工的数据获取方式
-	mySteelChemicalDataMethod := "excel"
-	if v, ok := conf["MySteelDataMethod"]; ok {
-		if v == "api" {
-			mySteelChemicalDataMethod = v
-		}
-	}
+	//mySteelChemicalDataMethod := "excel"
+	//if v, ok := conf["MySteelDataMethod"]; ok {
+	//	if v == "api" {
+	//		mySteelChemicalDataMethod = v
+	//	}
+	//}
 	// 当钢联的数据获取方式是api时,不用过滤
 	var sourceList []int
-	if mySteelChemicalDataMethod == "api" {
-		sourceList = []int{utils.DATA_SOURCE_YS}
-	} else {
-		sourceList = []int{utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS}
-	}
+	//if mySteelChemicalDataMethod == "api" {
+	//	sourceList = []int{utils.DATA_SOURCE_YS}
+	//} else {
+	//	sourceList = []int{utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS}
+	//}
 	edbInfoList, err := edb_refresh.GetConfigRefreshEdbInfoListBySourceAndSubSource(sourceList, configIdList)
 	if err != nil {
 		return
@@ -383,110 +338,54 @@ func BaseRefreshData(wg *sync.WaitGroup, source, subSource int, items []*edb_ref
 	// 数据刷新的期数
 	dataRefreshNum := utils.DATA_REFRESH
 	// 是否从最开始的日期更新
-	var isRefreshByStartDate bool
-
-	if source != utils.DATA_SOURCE_THS {
-		for _, v := range items {
-			// 如果暂停更新,那就过滤
-			if v.NoUpdate == 1 {
-				continue
-			}
-			if v.DataRefreshNum > 0 {
-				dataRefreshNum = v.DataRefreshNum
-			}
-
-			startDate := ""
-			if isRefreshByStartDate {
-				startDate = v.StartDate.Format(utils.FormatDate)
-			} else {
-				if v.Frequency == "日度" {
-					startDate = v.EndDate.AddDate(0, 0, -dataRefreshNum).Format(utils.FormatDate)
-				} else if v.Frequency == "周度" {
-					startDate = v.EndDate.AddDate(0, 0, -(dataRefreshNum * 7)).Format(utils.FormatDate)
-				} else if v.Frequency == "旬度" {
-					startDate = v.EndDate.AddDate(0, 0, -(dataRefreshNum * 10)).Format(utils.FormatDate)
-				} else if v.Frequency == "月度" {
-					startDate = v.EndDate.AddDate(0, -dataRefreshNum, 0).Format(utils.FormatDate)
-				} else if v.Frequency == "季度" {
-					startDate = v.EndDate.AddDate(0, -dataRefreshNum*3, 0).Format(utils.FormatDate)
-				} else if v.Frequency == "半年度" {
-					startDate = v.EndDate.AddDate(0, -dataRefreshNum*6, 0).Format(utils.FormatDate)
-				} else if v.Frequency == "年度" {
-					startDate = v.EndDate.AddDate(-dataRefreshNum, 0, 0).Format(utils.FormatDate)
-				} else {
-					startDate = v.EndDate.AddDate(0, 0, -utils.DATA_REFRESH).Format(utils.FormatDate)
-				}
-			}
-			fmt.Println(startDate)
+	//var isRefreshByStartDate bool
 
-			// 数据更新
-			resp, tmpErr := data.RefreshEdbData(v.EdbInfoId, v.Source, v.SubSource, v.EdbCode, startDate)
-			if tmpErr != nil {
-				errMsgList = append(errMsgList, v.EdbCode+"RefreshEdbData Err:"+tmpErr.Error())
-				continue
-			}
-			if resp.Ret != 200 {
-				errMsgList = append(errMsgList, v.EdbCode+";RefreshEdbData Err:"+resp.Msg+";ErrMsg:"+resp.ErrMsg)
-				continue
-			}
+	for _, v := range items {
+		// 如果暂停更新,那就过滤
+		if v.NoUpdate == 1 {
+			continue
 		}
-	}
-
-	// 同花顺控制速率, 每秒最多4次请求(同花顺EDB函数限制为5, 考虑到可能存在用户同时在使用, 这里批量刷新设置为4, 话说5确实是有点少了吧=_=!)
-	if source == utils.DATA_SOURCE_THS {
-		ticker := time.NewTicker(250 * time.Millisecond)
-		defer ticker.Stop()
-
-		for _, v := range items {
-			<-ticker.C
-
-			// 如果暂停更新,那就过滤
-			if v.NoUpdate == 1 {
-				continue
-			}
-			if v.DataRefreshNum > 0 {
-				dataRefreshNum = v.DataRefreshNum
-			}
-
-			startDate := ""
-			if isRefreshByStartDate {
-				startDate = v.StartDate.Format(utils.FormatDate)
-			} else {
-				if v.Frequency == "日度" {
-					startDate = v.EndDate.AddDate(0, 0, -dataRefreshNum).Format(utils.FormatDate)
-				} else if v.Frequency == "周度" {
-					startDate = v.EndDate.AddDate(0, 0, -(dataRefreshNum * 7)).Format(utils.FormatDate)
-				} else if v.Frequency == "旬度" {
-					startDate = v.EndDate.AddDate(0, 0, -(dataRefreshNum * 10)).Format(utils.FormatDate)
-				} else if v.Frequency == "月度" {
-					startDate = v.EndDate.AddDate(0, -dataRefreshNum, 0).Format(utils.FormatDate)
-				} else if v.Frequency == "季度" {
-					startDate = v.EndDate.AddDate(0, -dataRefreshNum*3, 0).Format(utils.FormatDate)
-				} else if v.Frequency == "半年度" {
-					startDate = v.EndDate.AddDate(0, -dataRefreshNum*6, 0).Format(utils.FormatDate)
-				} else if v.Frequency == "年度" {
-					startDate = v.EndDate.AddDate(-dataRefreshNum, 0, 0).Format(utils.FormatDate)
-				} else {
-					startDate = v.EndDate.AddDate(0, 0, -utils.DATA_REFRESH).Format(utils.FormatDate)
-				}
-			}
-			fmt.Println(startDate)
+		if v.DataRefreshNum > 0 {
+			dataRefreshNum = v.DataRefreshNum
+		}
+
+		startDate := ""
+		//if isRefreshByStartDate {
+		//	startDate = v.StartDate.Format(utils.FormatDate)
+		//} else {
+		if v.Frequency == "日度" {
+			startDate = v.EndDate.AddDate(0, 0, -dataRefreshNum).Format(utils.FormatDate)
+		} else if v.Frequency == "周度" {
+			startDate = v.EndDate.AddDate(0, 0, -(dataRefreshNum * 7)).Format(utils.FormatDate)
+		} else if v.Frequency == "旬度" {
+			startDate = v.EndDate.AddDate(0, 0, -(dataRefreshNum * 10)).Format(utils.FormatDate)
+		} else if v.Frequency == "月度" {
+			startDate = v.EndDate.AddDate(0, -dataRefreshNum, 0).Format(utils.FormatDate)
+		} else if v.Frequency == "季度" {
+			startDate = v.EndDate.AddDate(0, -dataRefreshNum*3, 0).Format(utils.FormatDate)
+		} else if v.Frequency == "半年度" {
+			startDate = v.EndDate.AddDate(0, -dataRefreshNum*6, 0).Format(utils.FormatDate)
+		} else if v.Frequency == "年度" {
+			startDate = v.EndDate.AddDate(-dataRefreshNum, 0, 0).Format(utils.FormatDate)
+		} else {
+			startDate = v.EndDate.AddDate(0, 0, -utils.DATA_REFRESH).Format(utils.FormatDate)
+		}
+		//}
+		fmt.Println(startDate)
 
-			// 数据更新
-			resp, tmpErr := data.RefreshEdbData(v.EdbInfoId, v.Source, v.SubSource, v.EdbCode, startDate)
-			if tmpErr != nil {
-				errMsgList = append(errMsgList, v.EdbCode+"RefreshEdbData Err:"+tmpErr.Error())
-				continue
-			}
-			if resp.Ret != 200 {
-				errMsgList = append(errMsgList, v.EdbCode+";RefreshEdbData Err:"+resp.Msg+";ErrMsg:"+resp.ErrMsg)
-				continue
-			}
+		// 数据更新
+		resp, tmpErr := data.RefreshEdbData(v.EdbInfoId, v.Source, v.SubSource, v.EdbCode, startDate)
+		if tmpErr != nil {
+			errMsgList = append(errMsgList, v.EdbCode+"RefreshEdbData Err:"+tmpErr.Error())
+			continue
+		}
+		if resp.Ret != 200 {
+			errMsgList = append(errMsgList, v.EdbCode+";RefreshEdbData Err:"+resp.Msg+";ErrMsg:"+resp.ErrMsg)
+			continue
 		}
 	}
 
 	fmt.Println("来源:", source, ";子来源:", subSource, "刷新结束")
-
 	return err
 }
 
@@ -873,3 +772,48 @@ func DisableEdbRefresh(cont context.Context) (err error) {
 	}
 	return
 }
+
+// ConfigRefreshDataGn 根据配置刷新指标数据
+func ConfigRefreshDataGn(cont context.Context) (err error) {
+	errMsgList := make([]string, 0)
+	defer func() {
+		if err != nil {
+			fmt.Println(err)
+		}
+	}()
+
+	now := time.Now()
+	//now = time.Date(2023, 12, 31, 16, 50, 59, 0, time.Local)
+	defaultSourceEdbInfoListMap, err := getDefaultRefreshData(now)
+	if err != nil {
+		errMsgList = append(errMsgList, "获取默认刷新数据失败,Err:"+err.Error())
+	}
+	sourceEdbInfoListMap, err := getConfigRefreshData(now)
+	if err != nil {
+		errMsgList = append(errMsgList, "获取指标配置刷新数据失败,Err:"+err.Error())
+	}
+
+	// 将两个合并
+	allSourceEdbInfoListMap := mergeMaps(defaultSourceEdbInfoListMap, sourceEdbInfoListMap)
+	wgNum := len(allSourceEdbInfoListMap)
+	if wgNum <= 0 {
+		return
+	}
+	wg := sync.WaitGroup{}
+	wg.Add(wgNum)
+
+	for _, edbList := range allSourceEdbInfoListMap {
+		if edbList == nil {
+			wg.Done()
+			continue
+		}
+		if len(edbList) != 0 {
+			go BaseRefreshData(&wg, edbList[0].Source, edbList[0].SubSource, edbList)
+		}
+	}
+
+	wg.Wait()
+
+	fmt.Println("Refresh End")
+	return
+}

+ 9 - 11
services/init_base.go

@@ -1,14 +1,12 @@
 package services
 
-import "fmt"
-
 // InitEs 数据初始化的时候,用来更新es信息
-func InitEs() {
-	err := RefreshData(nil)
-	if err != nil {
-		fmt.Println("更新指标信息失败,err:", err)
-		return
-	}
-
-	fmt.Println("更新成功")
-}
+//func InitEs() {
+//	err := RefreshData(nil)
+//	if err != nil {
+//		fmt.Println("更新指标信息失败,err:", err)
+//		return
+//	}
+//
+//	fmt.Println("更新成功")
+//}

+ 14 - 71
services/task.go

@@ -62,16 +62,16 @@ func releaseTask() {
 	task.AddTask("refreshData", refreshData)
 
 	// 根据配置刷新指标数据
-	configRefreshData := task.NewTask("syncBaseDataExt", "0 */30 * * * * ", ConfigRefreshData)
+	configRefreshData := task.NewTask("syncBaseDataExt", "0 */30 * * * * ", ConfigRefreshDataGn)
 	task.AddTask("configRefreshData", configRefreshData)
 
 	// 定时禁用钢联化工和wind指标的刷新状态
-	disableEdbRefresh := task.NewTask("disableEdbRefresh", "0 0 10 * * *", DisableEdbRefresh)
-	task.AddTask("disableEdbRefresh", disableEdbRefresh)
+	//disableEdbRefresh := task.NewTask("disableEdbRefresh", "0 0 10 * * *", DisableEdbRefresh)
+	//task.AddTask("disableEdbRefresh", disableEdbRefresh)
 
 	//同步弘则数据库中来自,钢联,隆众,有色,人工等基础数据--每隔五分钟,同步一次最新数据
-	syncBaseData := task.NewTask("syncBaseData", "0 */5 * * * * ", SyncBaseData)
-	task.AddTask("syncBaseData", syncBaseData)
+	//syncBaseData := task.NewTask("syncBaseData", "0 */5 * * * * ", SyncBaseData)
+	//task.AddTask("syncBaseData", syncBaseData)
 
 	syncBaseDataExt := task.NewTask("syncBaseDataExt", "0 */30 * * * * ", SyncBaseDataExt)
 	task.AddTask("syncBaseDataExt", syncBaseDataExt)
@@ -112,29 +112,6 @@ func releaseTask() {
 	clearAdminOperateLog := task.NewTask("clearAdminOperateLog", "0 20 23 * * *", ClearAdminOperateLog)
 	task.AddTask("定时清理用户操作日志", clearAdminOperateLog)
 
-	// 嘉悦物产
-	if utils.BusinessCode == utils.BusinessCodeJiaYue {
-		// 每10分钟定时同步增量指标
-		syncJiaYueNewIndex := task.NewTask("syncJiaYueNewIndex", "0 */10 * * * *", data.SyncJiaYueNewIndex)
-		task.AddTask("定时同步嘉悦物产增量指标", syncJiaYueNewIndex)
-
-		// 每30分钟同步一次数据宝指标数据
-		syncJiaYueDataBaby := task.NewTask("syncJiaYueDataBaby", "0 */30 * * * * ", data.RefreshJiaYueDataFromBridge)
-		task.AddTask("syncJiaYueDataBaby", syncJiaYueDataBaby)
-	}
-
-	if utils.BusinessCode == utils.BusinessCodeZhongJi {
-		// 每天同步一次指标列表
-		syncZhongJiIndexList := task.NewTask("syncZhongJiIndexList", "0 0 17 * * *", data.SyncZhongJiIndexList)
-		task.AddTask("定时同步中基宁波SMM指标列表", syncZhongJiIndexList)
-	}
-
-	// 中石油新加坡
-	if utils.IsPCSG == "1" {
-		refreshPCSGBloomberg := task.NewTask("refreshPCSGBloombergDaily", "0 */30 * * * *", data.RefreshPCSGBloomberg)
-		task.AddTask("中石油新加坡-每日Bloomberg指标刷新", refreshPCSGBloomberg)
-	}
-
 	// 刷新同花顺高频
 	refreshThsHfBase := task.NewTask("refreshThsHfBase", "0 0 0,6,9,12,15,18,21 * * *", data.RefreshBaseFromThsHfIndex)
 	task.AddTask("refreshThsHfBase", refreshThsHfBase)
@@ -142,57 +119,23 @@ func releaseTask() {
 
 func RefreshData(cont context.Context) (err error) {
 	wg := sync.WaitGroup{}
-	wg.Add(14)
-	//hour := time.Now().Hour()
-	//if hour != 0 {
-	//}
-	//彭博
-	go data.RefreshDataFromPb(&wg)
-	//彭博财务
-	go data.RefreshDataFromPbFinance(&wg)
-	//手工数据
-	go data.RefreshDataFromManual(&wg)
-	//隆众数据
-	//go data.RefreshDataFromLz(&wg)
-	//有色
-	go data.RefreshDataFromYs(&wg)
-	//钢联
-	go data.RefreshDataFromGl(&wg)
-	//路透
-	go data.RefreshDataFromLt(&wg)
-	//煤炭
-	go data.RefreshDataFromCoal(&wg)
-	//谷歌出行数据
-	go data.RefreshDataFromGoogleTravel(&wg)
-	//钢联化工
-	go data.RefreshDataFromMysteelChemical(&wg)
-	//eia steo报告指标
-	go data.RefreshDataFromEiaSteo(&wg)
-	//UN报告指标
-	go data.RefreshDataFromComTrade(&wg)
-	//卓创报告指标
-	go data.RefreshDataFromSci(&wg)
-	//国家统计局指标
-	go data.RefreshDataFromNationalStatistics(&wg)
-	//富宝指标刷新
-	go data.RefreshDataFromFubao(&wg)
-
-	// Bloomberg
-	go func() {
-		wg.Add(1)
-		_ = data.RefreshDataFromBloomberg(&wg)
-	}()
 
-	// CCF化纤信息
+	//手工数据
 	go func() {
 		wg.Add(1)
-		_ = data.RefreshDataFromCCF(&wg)
+		data.RefreshDataFromManual(&wg)
 	}()
 
 	// 持仓分析
+	//go func() {
+	//	wg.Add(1)
+	//	_ = data.RefreshDataFromTradeAnalysis(&wg)
+	//}()
+
+	// 刷新未配置刷新时间点的来源(可能含未知的来源, 统一在这个时间刷新)
 	go func() {
 		wg.Add(1)
-		_ = data.RefreshDataFromTradeAnalysis(&wg)
+		data.NoneConfigRefreshDataGn(&wg)
 	}()
 
 	wg.Wait()

+ 24 - 0
utils/common.go

@@ -951,3 +951,27 @@ func HmacSha256(key string, data string) []byte {
 func HmacSha256ToBase64(key string, data string) string {
 	return base64.URLEncoding.EncodeToString(HmacSha256(key, data))
 }
+
+// MinusInt 获取两个[]int差集
+func MinusInt(a []int, b []int) []int {
+	var diff []int
+	mpA, mpB := make(map[int]bool), make(map[int]bool)
+
+	for _, v := range a {
+		mpA[v] = true
+	}
+	for _, v := range b {
+		mpB[v] = true
+	}
+	for _, s := range a {
+		if !mpB[s] {
+			diff = append(diff, s)
+		}
+	}
+	for _, s := range b {
+		if !mpA[s] {
+			diff = append(diff, s)
+		}
+	}
+	return diff
+}