Explorar o código

Merge branch 'eta/1.5.1'

Roc hai 1 ano
pai
achega
66707ca753

+ 89 - 0
models/data_manage/edb_refresh/edb_refresh_config.go

@@ -0,0 +1,89 @@
+package edb_refresh
+
+import (
+	"errors"
+	"github.com/beego/beego/v2/client/orm"
+	"time"
+)
+
+// EdbRefreshConfig
+// @Description: 指标的刷新时间配置表
+type EdbRefreshConfig struct {
+	EdbRefreshConfigId  int       `orm:"column(edb_refresh_config_id);pk"`
+	RefreshFrequency    string    `description:"刷新频率"`
+	RefreshFrequencyDay int       `description:"具体刷新的日期"`
+	RefreshTime         string    `description:"刷新时间"`
+	RefreshAllData      int       `description:"是否刷新所有数据,0:否,1:刷新所有数据"`
+	RefreshDataNum      int       `description:"刷新单元格数"`
+	ModifyTime          time.Time `description:"最晚一次的更新时间"`
+	CreateTime          time.Time `description:"添加时间"`
+}
+
+// Add
+// @Description: 添加
+// @author: Roc
+// @receiver m
+// @datetime 2023-12-14 16:11:10
+// @param cols []string
+// @return err error
+func (m *EdbRefreshConfig) Add() (err error) {
+	if m.EdbRefreshConfigId > 0 {
+		err = errors.New("该配置已存在")
+		return
+	}
+	o := orm.NewOrmUsingDB("data")
+	lastId, err := o.Insert(m)
+	if err != nil {
+		return
+	}
+	m.EdbRefreshConfigId = int(lastId)
+
+	return
+}
+
+// Update
+// @Description: 更新
+// @author: Roc
+// @receiver m
+// @datetime 2023-12-14 16:11:10
+// @param cols []string
+// @return err error
+func (m *EdbRefreshConfig) Update(cols []string) (err error) {
+	o := orm.NewOrmUsingDB("data")
+	_, err = o.Update(m, cols...)
+	return
+}
+
+// Delete
+// @Description: 删除
+// @author: Roc
+// @receiver m
+// @datetime 2023-12-14 16:11:10
+// @return err error
+func (m *EdbRefreshConfig) Delete() (err error) {
+	o := orm.NewOrmUsingDB("data")
+	_, err = o.Delete(m)
+	return
+}
+
+// GetEdbRefreshConfigListByCondition
+// @Description: 根据条条件获取刷新配置列表
+// @author: Roc
+// @datetime 2024-01-09 13:28:49
+// @param condition string
+// @param pars []interface{}
+// @return list []*EdbRefreshDefaultConfig
+// @return err error
+func GetEdbRefreshConfigListByCondition(condition string, pars []interface{}) (list []*EdbRefreshConfig, err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := `SELECT * FROM edb_refresh_config
+         WHERE 1 = 1 `
+
+	if condition != "" {
+		sql += condition
+	}
+	sql += ` ORDER BY edb_refresh_config_id ASC `
+	_, err = o.Raw(sql, pars).QueryRows(&list)
+
+	return
+}

+ 134 - 0
models/data_manage/edb_refresh/edb_refresh_default_config.go

@@ -0,0 +1,134 @@
+package edb_refresh
+
+import (
+	"errors"
+	"eta/eta_task/utils"
+	"github.com/beego/beego/v2/client/orm"
+	"time"
+)
+
+// EdbRefreshDefaultConfig
+// @Description: 指标的默认刷新时间配置表
+type EdbRefreshDefaultConfig struct {
+	Id                  int       `orm:"column(id);pk"`
+	Source              int       `description:"来源"`
+	SubSource           int       `description:"来源名称"`
+	Frequency           string    `description:"频度"`
+	RefreshFrequency    string    `description:"刷新频率"`
+	RefreshFrequencyDay int       `description:"具体刷新的日期"`
+	RefreshTime         string    `description:"刷新时间"`
+	RefreshAllData      int       `description:"是否刷新所有数据,0:否,1:刷新所有数据"`
+	RefreshDataNum      int       `description:"刷新单元格数"`
+	ModifyTime          time.Time `description:"最晚一次的更新时间"`
+	CreateTime          time.Time `description:"添加时间"`
+}
+
+// Add
+// @Description: 添加
+// @author: Roc
+// @receiver m
+// @datetime 2023-12-14 16:11:10
+// @param cols []string
+// @return err error
+func (m *EdbRefreshDefaultConfig) Add() (err error) {
+	if m.Id > 0 {
+		err = errors.New("该配置已存在")
+		return
+	}
+	o := orm.NewOrmUsingDB("data")
+	lastId, err := o.Insert(m)
+	if err != nil {
+		return
+	}
+	m.Id = int(lastId)
+
+	return
+}
+
+// Update
+// @Description: 更新
+// @author: Roc
+// @receiver m
+// @datetime 2023-12-14 16:11:10
+// @param cols []string
+// @return err error
+func (m *EdbRefreshDefaultConfig) Update(cols []string) (err error) {
+	o := orm.NewOrmUsingDB("data")
+	_, err = o.Update(m, cols...)
+	return
+}
+
+// Delete
+// @Description: 删除
+// @author: Roc
+// @receiver m
+// @datetime 2023-12-14 16:11:10
+// @return err error
+func (m *EdbRefreshDefaultConfig) Delete() (err error) {
+	o := orm.NewOrmUsingDB("data")
+	_, err = o.Delete(m)
+	return
+}
+
+// GetListBySourceAndFrequency
+// @Description: 根据来源和频度获取列表
+// @author: Roc
+// @datetime 2024-01-04 17:39:47
+// @param source int
+// @param subSource int
+// @param frequency string
+// @return list []*EdbRefreshDefaultConfig
+// @return err error
+func GetListBySourceAndFrequency(source, subSource int, frequency string) (list []*EdbRefreshDefaultConfig, err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := `SELECT * FROM edb_refresh_default_config
+         WHERE source = ? AND sub_source = ? AND frequency = ? ORDER BY id ASC `
+	_, err = o.Raw(sql, source, subSource, frequency).QueryRows(&list)
+
+	return
+}
+
+// GetListByCondition
+// @Description: 根据条条件获取默认配置列表
+// @author: Roc
+// @datetime 2024-01-09 13:28:49
+// @param condition string
+// @param pars []interface{}
+// @return list []*EdbRefreshDefaultConfig
+// @return err error
+func GetListByCondition(condition string, pars []interface{}) (list []*EdbRefreshDefaultConfig, err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := `SELECT * FROM edb_refresh_default_config
+         WHERE 1 = 1 `
+
+	if condition != "" {
+		sql += condition
+	}
+	sql += ` ORDER BY id ASC `
+	_, err = o.Raw(sql, pars).QueryRows(&list)
+
+	return
+}
+
+// GetDefaultRefreshEdbInfoListBySourceAndSubSource
+// @Description: 根据来源获取默认刷新的指标列表
+// @author: Roc
+// @datetime 2024-01-09 16:22:49
+// @param source int
+// @param subSource int
+// @return list []*data_manage.EdbInfo
+// @return err error
+func GetDefaultRefreshEdbInfoListBySourceAndSubSource(source, subSource int, frequencyList []string) (list []*EdbInfoListAndRefreshConfig, err error) {
+	num := len(frequencyList)
+	if num <= 0 {
+		return
+	}
+
+	o := orm.NewOrmUsingDB("data")
+	sql := `SELECT a.* FROM edb_info a 
+LEFT JOIN edb_refresh_mapping b ON a.edb_info_id = b.edb_info_id
+WHERE a.source = ? AND a.sub_source = ? AND a.frequency IN (` + utils.GetOrmInReplace(num) + `)  AND b.edb_info_id is null`
+	_, err = o.Raw(sql, source, subSource, frequencyList).QueryRows(&list)
+
+	return
+}

+ 122 - 0
models/data_manage/edb_refresh/edb_refresh_mapping.go

@@ -0,0 +1,122 @@
+package edb_refresh
+
+import (
+	"errors"
+	"eta/eta_task/utils"
+	"github.com/beego/beego/v2/client/orm"
+	"time"
+)
+
+// EdbRefreshMapping
+// @Description: 指标刷新时间配置关系表
+type EdbRefreshMapping struct {
+	EdbRefreshMappingId int       `orm:"column(edb_refresh_mapping_id);pk"`
+	Source              int       `description:"来源"`
+	SubSource           int       `description:"来源名称"`
+	EdbInfoId           int       `description:"指标id,如果是数据源(钢联、有色)的,那么就是数据源里面的id"`
+	EdbRefreshConfigId  int       `description:"刷新配置id"`
+	SysUserId           int       `description:"操作人id"`
+	SysUserRealName     string    `description:"操作人真实姓名"`
+	ModifyTime          time.Time `description:"最晚一次的更新时间"`
+	CreateTime          time.Time `description:"添加时间"`
+}
+
+// Add
+// @Description: 添加
+// @author: Roc
+// @receiver m
+// @datetime 2023-12-14 16:11:10
+// @param cols []string
+// @return err error
+func (m *EdbRefreshMapping) Add() (err error) {
+	if m.EdbRefreshMappingId > 0 {
+		err = errors.New("该配置已存在")
+		return
+	}
+	o := orm.NewOrmUsingDB("data")
+	lastId, err := o.Insert(m)
+	if err != nil {
+		return
+	}
+	m.EdbRefreshMappingId = int(lastId)
+
+	return
+}
+
+// Update
+// @Description: 更新
+// @author: Roc
+// @receiver m
+// @datetime 2023-12-14 16:11:10
+// @param cols []string
+// @return err error
+func (m *EdbRefreshMapping) Update(cols []string) (err error) {
+	o := orm.NewOrmUsingDB("data")
+	_, err = o.Update(m, cols...)
+	return
+}
+
+// Delete
+// @Description: 删除
+// @author: Roc
+// @receiver m
+// @datetime 2023-12-14 16:11:10
+// @return err error
+func (m *EdbRefreshMapping) Delete() (err error) {
+	o := orm.NewOrmUsingDB("data")
+	_, err = o.Delete(m)
+	return
+}
+
+type EdbInfoListAndRefreshConfig struct {
+	EdbInfoId          int       `orm:"column(edb_info_id);pk"`
+	SourceName         string    `description:"来源名称"`
+	Source             int       `description:"来源id"`
+	SubSource          int       `description:"子数据来源:0:经济数据库,1:日期序列"`
+	SubSourceName      string    `description:"子数据来源名称"`
+	EdbCode            string    `description:"指标编码"`
+	EdbName            string    `description:"指标名称"`
+	Frequency          string    `description:"频率"`
+	Unit               string    `description:"单位"`
+	StartDate          time.Time `description:"起始日期"`
+	EndDate            time.Time `description:"终止日期"`
+	ClassifyId         int       `description:"分类id"`
+	UniqueCode         string    `description:"指标唯一编码"`
+	CalculateFormula   string    `description:"计算公式"`
+	ModifyTime         string    `description:"更新时间"`
+	NoUpdate           int8      `description:"是否停止更新,0:继续更新;1:停止更新"`
+	EdbRefreshConfigId int       `description:"刷新配置id"`
+	DataRefreshNum     int       `description:"刷新的期数"`
+}
+
+// GetConfigRefreshEdbInfoListBySourceAndSubSource
+// @Description:  根据来源和配置id列表获取指标列表
+// @author: Roc
+// @datetime 2024-01-09 17:28:06
+// @param sourceList []int
+// @param configIdList []int
+// @return list []*data_manage.EdbInfoList
+// @return err error
+func GetConfigRefreshEdbInfoListBySourceAndSubSource(sourceList, configIdList []int) (list []*EdbInfoListAndRefreshConfig, err error) {
+	num := len(configIdList)
+	if num <= 0 {
+		return
+	}
+
+	var pars []interface{}
+
+	o := orm.NewOrmUsingDB("data")
+	sql := `SELECT a.*,b.edb_refresh_config_id FROM edb_info a 
+ JOIN edb_refresh_mapping b ON a.edb_info_id = b.edb_info_id
+WHERE b.edb_refresh_config_id IN (` + utils.GetOrmInReplace(num) + `) `
+	pars = append(pars, configIdList)
+
+	sourceNum := len(sourceList)
+	if sourceNum > 0 {
+		sql += ` AND b.source not in (` + utils.GetOrmInReplace(sourceNum) + `) `
+		pars = append(pars, sourceList)
+	}
+	_, err = o.Raw(sql, pars).QueryRows(&list)
+
+	return
+}

+ 51 - 0
models/data_manage/edb_refresh/edb_refresh_source.go

@@ -0,0 +1,51 @@
+package edb_refresh
+
+import (
+	"github.com/beego/beego/v2/client/orm"
+	"time"
+)
+
+// EdbRefreshSource
+// @Description: 刷新的数据源表
+type EdbRefreshSource struct {
+	Id            int       `orm:"column(id);pk"`
+	Source        int       `orm:"column(source)" description:"来源"`
+	SourceName    string    `description:"来源名称"`
+	SubSource     int       `description:"子数据来源:0:经济数据库,1:日期序列"`
+	SubSourceName string    `description:"子来源名称"`
+	HasChild      int       `description:"是否有子来源,0:否,1:是"`
+	ModifyTime    time.Time `description:"修改时间"`
+	CreateTime    time.Time `description:"创建时间"`
+}
+
+// EdbRefreshSourceList
+// @Description: 获取刷新的数据源列表
+type EdbRefreshSourceList struct {
+	Source        int                    `orm:"column(source)" description:"来源"`
+	SourceName    string                 `description:"来源名称"`
+	SubSource     int                    `description:"子数据来源:0:经济数据库,1:日期序列"`
+	SubSourceName string                 `description:"子来源名称"`
+	Child         []EdbRefreshSourceList `description:"子来源"`
+	HasChild      int                    `description:"是否有子来源,0:否,1:是"`
+}
+
+// GetAllList
+// @Description: 获取刷新数据源列表
+// @author: Roc
+// @datetime 2024-01-03 15:03:24
+// @return items []*EdbRefreshSource
+// @return err error
+func (m EdbRefreshSource) GetAllList() (items []*EdbRefreshSource, err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := ` SELECT *  FROM edb_refresh_source ORDER BY id ASC `
+	_, err = o.Raw(sql).QueryRows(&items)
+	return
+}
+
+type BaseClassifyItems struct {
+	ClassifyId   int                  `description:"分类id"`
+	ClassifyName string               `description:"分类名称"`
+	ParentId     int                  `description:"父级id"`
+	UniqueCode   string               `description:"唯一编码"`
+	Children     []*BaseClassifyItems `description:"下级"`
+}

+ 52 - 0
models/data_manage/edb_refresh/request/edb_info_refresh.go

@@ -0,0 +1,52 @@
+package request
+
+// SaveEdbRefreshDefaultConfigReq
+// @Description: 设置默认刷新时间配置
+type SaveEdbRefreshDefaultConfigReq struct {
+	Source    int                `description:"来源"`
+	SubSource int                `description:"子来源"`
+	Frequency string             `description:"频度"`
+	List      []RefreshConfigReq `description:"刷新配置项"`
+}
+
+// RefreshConfigReq
+// @Description: 刷新时间配置项
+type RefreshConfigReq struct {
+	RefreshFrequency    string `description:"刷新频率"`
+	RefreshFrequencyDay int    `description:"具体刷新的日期"`
+	RefreshTime         string `description:"刷新时间"`
+	RefreshAllData      int    `description:"是否刷新所有数据,0:否,1:刷新所有数据"`
+	RefreshDataNum      int    `description:"刷新单元格数"`
+}
+
+// SaveEdbRefreshConfigReq
+// @Description: 设置指标的刷新时间配置
+type SaveEdbRefreshConfigReq struct {
+	Source          int                `description:"来源"`
+	SubSource       int                `description:"子来源"`
+	ClassifyId      int                `description:"分类id"`
+	TerminalCode    string             `description:"终端编码"`
+	SysUserId       string             `description:"操作人id"`
+	Frequency       string             `description:"频度"`
+	Keyword         string             `description:"关键字"`
+	Status          string             `description:"状态,枚举值:启用、暂停"`
+	IsSelectAll     bool               `description:"是否选择所有指标"`
+	EdbSelectIdList []int              `description:"选择的指标id列表"`
+	List            []RefreshConfigReq `description:"刷新配置项"`
+}
+
+// SaveEdbRefreshStatusReq
+// @Description: 设置指标的刷新状态
+type SaveEdbRefreshStatusReq struct {
+	Source          int    `description:"来源"`
+	SubSource       int    `description:"子来源"`
+	ClassifyId      int    `description:"分类id"`
+	TerminalCode    string `description:"终端编码"`
+	SysUserId       string `description:"操作人id"`
+	Frequency       string `description:"频度"`
+	Keyword         string `description:"关键字"`
+	Status          string `description:"状态,枚举值:启用、暂停"`
+	IsSelectAll     bool   `description:"是否选择所有指标"`
+	EdbSelectIdList []int  `description:"选择的指标id列表"`
+	ModifyStatus    string `description:"需要更改的状态,枚举值:启用、暂停"`
+}

+ 14 - 0
models/db.go

@@ -2,6 +2,7 @@ package models
 
 import (
 	"eta/eta_task/models/data_manage"
+	"eta/eta_task/models/data_manage/edb_refresh"
 	"eta/eta_task/models/data_manage/future_good"
 	"eta/eta_task/utils"
 	_ "github.com/go-sql-driver/mysql"
@@ -80,6 +81,9 @@ func init() {
 	// 智能研报数据表
 	initSmartReport()
 
+	// 初始化指标刷新
+	initEdbRefresh()
+
 	// 初始化部分数据表变量(直接init会有顺序问题=_=!)
 	data_manage.InitEdbSourceVar()
 }
@@ -152,3 +156,13 @@ func initSmartReport() {
 		new(ReportStateRecord),
 	)
 }
+
+// initEdbRefresh 初始化指标刷新
+func initEdbRefresh() {
+	orm.RegisterModel(
+		new(edb_refresh.EdbRefreshSource),        // 刷新的数据源表
+		new(edb_refresh.EdbRefreshDefaultConfig), // 指标的默认刷新时间配置表
+		new(edb_refresh.EdbRefreshConfig),        // 指标的刷新时间配置表
+		new(edb_refresh.EdbRefreshMapping),       // 指标刷新时间配置关系表
+	)
+}

+ 518 - 0
services/edb_refresh.go

@@ -0,0 +1,518 @@
+package services
+
+import (
+	"context"
+	"eta/eta_task/models/data_manage/edb_refresh"
+	"eta/eta_task/services/alarm_msg"
+	"eta/eta_task/services/data"
+	"eta/eta_task/utils"
+	"fmt"
+	"strings"
+	"sync"
+	"time"
+)
+
+// ConfigRefreshData
+// @Description:  配置刷新数据
+// @author: Roc
+// @datetime 2024-01-10 13:55:05
+// @param cont context.Context
+// @return err error
+func ConfigRefreshData(cont context.Context) (err error) {
+	errMsgList := make([]string, 0)
+	defer func() {
+		if err != nil {
+			fmt.Println(err)
+		}
+	}()
+	// 一期是只做wind、同花顺、钢联、有色
+
+	now := time.Now()
+	//now = time.Date(2023, 12, 31, 19, 10, 59, 0, time.Local)
+	//now = time.Date(2023, 12, 31, 16, 50, 59, 0, time.Local)
+	defaultSourceEdbInfoListMap, err := getDefaultRefreshData(now)
+	if err != nil {
+		errMsgList = append(errMsgList, "获取默认刷新数据失败,Err:"+err.Error())
+	}
+	sourceEdbInfoListMap, err := getConfigRefreshData(now)
+	if err != nil {
+		errMsgList = append(errMsgList, "获取指标配置刷新数据失败,Err:"+err.Error())
+	}
+
+	// 将两个合并
+	allSourceEdbInfoListMap := mergeMaps(defaultSourceEdbInfoListMap, sourceEdbInfoListMap)
+	wgNum := len(allSourceEdbInfoListMap)
+	if wgNum <= 0 {
+		return
+	}
+	wg := sync.WaitGroup{}
+	wg.Add(wgNum)
+
+	for _, edbList := range allSourceEdbInfoListMap {
+		go BaseRefreshData(&wg, edbList[0].Source, edbList[0].SubSource, edbList)
+	}
+
+	wg.Wait()
+
+	fmt.Println("Refresh End")
+
+	return
+}
+
+// Function to merge two maps
+func mergeMaps(dst map[string][]*edb_refresh.EdbInfoListAndRefreshConfig, src map[string][]*edb_refresh.EdbInfoListAndRefreshConfig) map[string][]*edb_refresh.EdbInfoListAndRefreshConfig {
+	if dst == nil {
+		return src
+	}
+	if src == nil {
+		return dst
+	}
+	for k, v := range src {
+		if dstk, ok := dst[k]; ok {
+			dstk = append(dstk, v...)
+			dst[k] = dstk
+		} else {
+			dst[k] = v
+		}
+	}
+	return dst
+}
+
+// getDefaultRefreshData
+// @Description: 根据默认配置获取需要刷新的指标列表
+// @author: Roc
+// @datetime 2024-01-10 13:55:38
+// @param now time.Time
+// @return sourceEdbInfoListMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig
+// @return err error
+func getDefaultRefreshData(now time.Time) (sourceEdbInfoListMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig, err error) {
+	errMsgList := make([]string, 0)
+	defer func() {
+		if err != nil {
+			fmt.Println(err)
+		}
+	}()
+	// 一期是只做wind、同花顺、钢联、有色
+
+	sourceEdbInfoListMap = make(map[string][]*edb_refresh.EdbInfoListAndRefreshConfig)
+
+	currTimeStr := getPreviousHalfHour(now)
+	fmt.Println(currTimeStr)
+
+	// 所有默认配置刷新项
+	list := make([]*edb_refresh.EdbRefreshDefaultConfig, 0)
+
+	//刷新频率,枚举值:每自然日、每交易日、每周、每旬、每月、每季、每半年、每年
+	refreshFrequencyList := []string{"每自然日", "每交易日", "每周", "每旬", "每月", "每季", "每半年", "每年"}
+
+	// 获取各个刷新频率的配置
+	for _, refreshFrequency := range refreshFrequencyList {
+		// 获取刷新频率条件
+		condition, pars, isHandler := getRefreshFrequencyCondition(now, refreshFrequency)
+		if !isHandler {
+			// 可能是非交易日,所以过滤不处理
+			continue
+		}
+
+		condition += ` AND refresh_frequency = ? AND refresh_time = ?`
+		pars = append(pars, refreshFrequency, currTimeStr)
+
+		// 这两个是excel的数据源,他是从公共机更新的,需要过滤掉
+		condition += ` AND source not in (?,?)`
+		pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS)
+
+		tmpList, tmpErr := edb_refresh.GetListByCondition(condition, pars)
+		if tmpErr != nil {
+			err = tmpErr
+			return
+		}
+		list = append(list, tmpList...)
+	}
+
+	// 更新的单元格数
+	refreshDataNumMap := make(map[string]*edb_refresh.EdbRefreshDefaultConfig)
+	// 数据源刷新频度的列表数组
+	refreshDataFrequencyListMap := make(map[int]map[int][]string)
+
+	wgNum := 0
+	// 处理待刷新的数据源,整理成数组,方便获取对应的指标
+	for _, item := range list {
+		// 更新的单元格数
+		key := fmt.Sprintf("%d_%d_%s", item.Source, item.SubSource, item.Frequency)
+		refreshDataNumMap[key] = item
+
+		// 数据源刷新频度的列表数组
+		subSourceFrequencyList, ok := refreshDataFrequencyListMap[item.Source]
+		if !ok {
+			subSourceFrequencyList = make(map[int][]string)
+		}
+		frequencyList, ok := subSourceFrequencyList[item.SubSource]
+		if !ok {
+			wgNum++
+			frequencyList = make([]string, 0)
+		}
+		subSourceFrequencyList[item.SubSource] = append(frequencyList, item.Frequency)
+		refreshDataFrequencyListMap[item.Source] = subSourceFrequencyList
+	}
+
+	for source, subSourceFrequencyListMap := range refreshDataFrequencyListMap {
+		switch source {
+		case utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS:
+			// 这种不处理
+		default:
+			for subSource, frequencyList := range subSourceFrequencyListMap {
+				edbList, tmpErr := edb_refresh.GetDefaultRefreshEdbInfoListBySourceAndSubSource(source, subSource, frequencyList)
+				if tmpErr != nil {
+					errMsgList = append(errMsgList, fmt.Sprint("source:", source, "subSource:", subSource, "frequencyList:", strings.Join(frequencyList, ","), "err:", tmpErr.Error()))
+				}
+
+				for _, v := range edbList {
+					// 数据刷新的期数
+					dataRefreshNum := utils.DATA_REFRESH
+					key := fmt.Sprintf("%d_%d_%s", v.Source, v.SubSource, v.Frequency)
+					if edbRefreshDefaultConfig, ok := refreshDataNumMap[key]; ok {
+						if edbRefreshDefaultConfig.RefreshAllData == 1 { // 刷新所有数据期数
+							dataRefreshNum = 0
+						} else if edbRefreshDefaultConfig.RefreshDataNum > 0 { //
+							dataRefreshNum = edbRefreshDefaultConfig.RefreshDataNum
+						}
+					}
+					v.DataRefreshNum = dataRefreshNum
+				}
+
+				key := fmt.Sprint(source, "_", subSource)
+				sourceEdbInfoListMap[key] = edbList
+			}
+		}
+	}
+
+	fmt.Println("Get Refresh End")
+	return
+}
+
+// getConfigRefreshData
+// @Description: 根据指标配置获取需要刷新的指标列表
+// @author: Roc
+// @datetime 2024-01-10 13:55:59
+// @param now time.Time
+// @return sourceEdbInfoListMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig
+// @return err error
+func getConfigRefreshData(now time.Time) (sourceEdbInfoListMap map[string][]*edb_refresh.EdbInfoListAndRefreshConfig, err error) {
+	defer func() {
+		if err != nil {
+			fmt.Println(err)
+		}
+	}()
+	// 一期是只做wind、同花顺、钢联、有色
+
+	sourceEdbInfoListMap = make(map[string][]*edb_refresh.EdbInfoListAndRefreshConfig)
+
+	currTimeStr := getPreviousHalfHour(now)
+
+	// 所有默认配置刷新项
+	list := make([]*edb_refresh.EdbRefreshConfig, 0)
+
+	//刷新频率,枚举值:每自然日、每交易日、每周、每旬、每月、每季、每半年、每年
+	refreshFrequencyList := []string{"每自然日", "每交易日", "每周", "每旬", "每月", "每季", "每半年", "每年"}
+
+	// 获取各个刷新频率的配置
+	for _, refreshFrequency := range refreshFrequencyList {
+		// 获取刷新频率条件
+		condition, pars, isHandler := getRefreshFrequencyCondition(now, refreshFrequency)
+		if !isHandler {
+			// 可能是非交易日,所以过滤不处理
+			continue
+		}
+
+		condition += ` AND refresh_frequency = ? AND refresh_time = ?`
+		pars = append(pars, refreshFrequency, currTimeStr)
+
+		tmpList, tmpErr := edb_refresh.GetEdbRefreshConfigListByCondition(condition, pars)
+		if tmpErr != nil {
+			err = tmpErr
+			return
+		}
+		list = append(list, tmpList...)
+	}
+
+	// 配置列表
+	configIdEdbRefreshConfigMap := make(map[int]*edb_refresh.EdbRefreshConfig)
+	configIdList := make([]int, 0)
+	for _, v := range list {
+		configIdList = append(configIdList, v.EdbRefreshConfigId)
+		configIdEdbRefreshConfigMap[v.EdbRefreshConfigId] = v
+	}
+
+	edbInfoList, err := edb_refresh.GetConfigRefreshEdbInfoListBySourceAndSubSource([]int{utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS}, configIdList)
+	if err != nil {
+		return
+	}
+
+	for _, v := range edbInfoList {
+		key := fmt.Sprint(v.Source, "_", v.SubSource)
+		tmpList, ok := sourceEdbInfoListMap[key]
+		if !ok {
+			tmpList = make([]*edb_refresh.EdbInfoListAndRefreshConfig, 0)
+		}
+		// 数据刷新的期数
+		dataRefreshNum := utils.DATA_REFRESH
+		if edbRefreshConfig, ok2 := configIdEdbRefreshConfigMap[v.EdbRefreshConfigId]; ok2 {
+			if edbRefreshConfig.RefreshAllData == 1 { // 刷新所有数据期数
+				dataRefreshNum = 0
+			} else if edbRefreshConfig.RefreshDataNum > 0 { //
+				dataRefreshNum = edbRefreshConfig.RefreshDataNum
+			}
+		}
+		v.DataRefreshNum = dataRefreshNum
+		sourceEdbInfoListMap[key] = append(tmpList, v)
+	}
+
+	fmt.Println("Get ConfigRefreshData End")
+	return
+}
+
+// BaseRefreshData
+// @Description: 基础数据刷新
+// @author: Roc
+// @datetime 2024-01-09 16:27:45
+// @param wg *sync.WaitGroup
+// @return err error
+func BaseRefreshData(wg *sync.WaitGroup, source, subSource int, items []*edb_refresh.EdbInfoListAndRefreshConfig) (err error) {
+	errMsgList := make([]string, 0)
+	defer func() {
+		if err != nil {
+			fmt.Println("来源:", source, ";子来源:", subSource, ";BaseRefreshData Err:"+err.Error())
+			go alarm_msg.SendAlarmMsg(fmt.Sprint("来源:", source, ";子来源:", subSource, ";BaseRefreshData ErrMsg:"+err.Error()), 3)
+		}
+		if len(errMsgList) > 0 {
+			errMsg := fmt.Sprint("来源:", source, ";子来源:", subSource, ";BaseRefreshData Err:"+strings.Join(errMsgList, "\n"))
+			fmt.Println(errMsg)
+			go alarm_msg.SendAlarmMsg(errMsg, 3)
+		}
+		wg.Done()
+	}()
+
+	// 数据刷新的期数
+	dataRefreshNum := utils.DATA_REFRESH
+	// 是否从最开始的日期更新
+	var isRefreshByStartDate bool
+
+	for _, v := range items {
+		if v.DataRefreshNum > 0 {
+			dataRefreshNum = v.DataRefreshNum
+		}
+
+		startDate := ""
+		if isRefreshByStartDate {
+			startDate = v.StartDate.Format(utils.FormatDate)
+		} else {
+			if v.Frequency == "日度" {
+				startDate = v.EndDate.AddDate(0, 0, -dataRefreshNum).Format(utils.FormatDate)
+			} else if v.Frequency == "周度" {
+				startDate = v.EndDate.AddDate(0, 0, -(dataRefreshNum * 7)).Format(utils.FormatDate)
+			} else if v.Frequency == "旬度" {
+				startDate = v.EndDate.AddDate(0, 0, -(dataRefreshNum * 10)).Format(utils.FormatDate)
+			} else if v.Frequency == "月度" {
+				startDate = v.EndDate.AddDate(0, -dataRefreshNum, 0).Format(utils.FormatDate)
+			} else if v.Frequency == "季度" {
+				startDate = v.EndDate.AddDate(0, -dataRefreshNum*3, 0).Format(utils.FormatDate)
+			} else if v.Frequency == "半年度" {
+				startDate = v.EndDate.AddDate(0, -dataRefreshNum*6, 0).Format(utils.FormatDate)
+			} else if v.Frequency == "年度" {
+				startDate = v.EndDate.AddDate(-dataRefreshNum, 0, 0).Format(utils.FormatDate)
+			} else {
+				startDate = v.EndDate.AddDate(0, 0, -utils.DATA_REFRESH).Format(utils.FormatDate)
+			}
+		}
+		fmt.Println(startDate)
+
+		// 数据更新
+		resp, tmpErr := data.RefreshEdbData(v.EdbInfoId, v.Source, v.SubSource, v.EdbCode, startDate)
+		if tmpErr != nil {
+			errMsgList = append(errMsgList, v.EdbCode+"RefreshEdbData Err:"+tmpErr.Error())
+			continue
+		}
+		if resp.Ret != 200 {
+			errMsgList = append(errMsgList, v.EdbCode+";RefreshEdbData Err:"+resp.Msg+";ErrMsg:"+resp.ErrMsg)
+			continue
+		}
+	}
+
+	fmt.Println("来源:", source, ";子来源:", subSource, "刷新结束")
+
+	return err
+}
+
+// getRefreshFrequencyCondition
+// @Description: 根据时间和刷新频率获取条件
+// @author: Roc
+// @datetime 2024-01-09 16:27:11
+// @param now time.Time
+// @param refreshFrequency string
+// @return condition string
+// @return pars []interface{}
+// @return isHandler bool
+func getRefreshFrequencyCondition(now time.Time, refreshFrequency string) (condition string, pars []interface{}, isHandler bool) {
+	isHandler = true
+
+	var dayNum int
+	var isLastDay bool
+
+	//刷新频率,枚举值:每自然日、每交易日、每周、每旬、每月、每季、每半年、每年
+	switch refreshFrequency {
+	case "每自然日":
+		// 自然日不需要额外条件
+		return
+	case "每交易日":
+		// 周六日不处理
+		if now.Weekday() == time.Saturday || now.Weekday() == time.Sunday {
+			isHandler = false
+		}
+		return
+	case "每周":
+		currWeekDay := now.Weekday()
+		if currWeekDay == time.Sunday {
+			currWeekDay = 7
+			isLastDay = true
+		}
+		dayNum = int(currWeekDay)
+	case "每旬":
+		currDay := now.Day()
+		if currDay <= 10 {
+			dayNum = currDay
+			// 如果是这旬的最后一天
+			if currDay == 10 {
+				isLastDay = true
+			}
+		} else if currDay <= 20 {
+			dayNum = currDay - 10
+			// 如果是这旬的最后一天
+			if currDay == 20 {
+				isLastDay = true
+			}
+		} else {
+			dayNum = currDay - 20
+
+			// 当月的最后一天
+			monthLastDay := time.Date(now.Year(), now.Month()+1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
+			// 如果是这旬的最后一天
+			if currDay == monthLastDay.Day() {
+				isLastDay = true
+			}
+		}
+	case "每月":
+		// 当前日期
+		currDay := now.Day()
+		dayNum = currDay
+
+		// 当期的最后一天
+		monthLastDay := time.Date(now.Year(), now.Month()+1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
+
+		// 如果是这期的最后一天
+		if currDay == monthLastDay.Day() {
+			isLastDay = true
+		}
+	case "每季":
+		// 当期的第一天  ;  当期的最后一天
+		var startDay, endDay time.Time
+		currMonth := now.Month()
+		currDay := now.Day()
+		if currMonth <= 3 {
+			// 当季的第一天
+			startDay = time.Date(now.Year(), 1, 1, 0, 0, 0, 0, time.Local)
+			// 当季的最后一天
+			endDay = time.Date(now.Year(), 4, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
+		} else if currMonth <= 6 {
+			// 当期的第一天
+			startDay = time.Date(now.Year(), 4, 1, 0, 0, 0, 0, time.Local)
+			// 当期的最后一天
+			endDay = time.Date(now.Year(), 7, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
+		} else if currMonth <= 9 {
+			// 当期的第一天
+			startDay = time.Date(now.Year(), 7, 1, 0, 0, 0, 0, time.Local)
+			// 当期的最后一天
+			endDay = time.Date(now.Year(), 10, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
+		} else {
+			// 当期的第一天
+			startDay = time.Date(now.Year(), 10, 1, 0, 0, 0, 0, time.Local)
+			// 当期的最后一天
+			endDay = time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
+		}
+
+		// 计算这期的第一天和当日的天数
+		dayNum = utils.GetTimeSubDay(startDay, now) + 1
+
+		// 如果是这期的最后一天
+		if currMonth == endDay.Month() && currDay == endDay.Day() {
+			isLastDay = true
+		}
+	case "每半年":
+		// 当期的第一天  ;  当期的最后一天
+		var startDay, endDay time.Time
+		currMonth := now.Month()
+		currDay := now.Day()
+		if currMonth <= 6 {
+			// 当期的第一天
+			startDay = time.Date(now.Year(), 1, 1, 0, 0, 0, 0, time.Local)
+			// 当期的最后一天
+			endDay = time.Date(now.Year(), 7, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
+		} else {
+			// 当期的第一天
+			startDay = time.Date(now.Year(), 7, 1, 0, 0, 0, 0, time.Local)
+			// 当期的最后一天
+			endDay = time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
+		}
+
+		// 计算这期的第一天和当日的天数
+		dayNum = utils.GetTimeSubDay(startDay, now) + 1
+
+		// 如果是这期的最后一天
+		if currMonth == endDay.Month() && currDay == endDay.Day() {
+			isLastDay = true
+		}
+	case "每年":
+		currMonth := now.Month()
+		currDay := now.Day()
+
+		// 当期的第一天
+		startDay := time.Date(now.Year(), 1, 1, 0, 0, 0, 0, time.Local)
+		// 当期的最后一天
+		endDay := time.Date(now.Year()+1, 1, 1, 0, 0, 0, 0, time.Local).AddDate(0, 0, -1)
+
+		// 计算这期的第一天和当日的天数
+		dayNum = utils.GetTimeSubDay(startDay, now) + 1
+
+		// 如果是这期的最后一天
+		if currMonth == endDay.Month() && currDay == endDay.Day() {
+			isLastDay = true
+		}
+
+	}
+
+	// 如果是这期的最后一天,那么就是判断refresh_frequency_day是否配置为0,或者配置的天数大于这期的最大天数
+	if isLastDay {
+		condition += ` AND ( refresh_frequency_day = ? OR refresh_frequency_day >= ? )`
+		pars = append(pars, 0, dayNum)
+	} else {
+		// 如果不是这期的最后一天,那么就是判断refresh_frequency_day是否等于配置的天数
+		condition += ` AND refresh_frequency_day = ?  `
+		pars = append(pars, dayNum)
+	}
+
+	return
+}
+
+// getPreviousHalfHour
+// @Description: 根据传入的时间获取该时间的前整半小时的时间字符串
+// @author: Roc
+// @datetime 2024-01-09 14:27:34
+// @param now time.Time
+// @return string
+func getPreviousHalfHour(now time.Time) string {
+	minute := now.Minute()
+
+	if minute >= 30 {
+		return fmt.Sprintf("%02d:%02d", now.Hour(), 30)
+	}
+	return fmt.Sprintf("%02d:%02d", now.Hour(), 0)
+}

+ 5 - 4
services/task.go

@@ -46,6 +46,10 @@ func releaseTask() {
 	refreshData := task.NewTask("refreshData", "0 30 0,19 * * *", RefreshData)
 	task.AddTask("refreshData", refreshData)
 
+	// 根据配置刷新指标数据
+	configRefreshData := task.NewTask("syncBaseDataExt", "0 */30 * * * * ", ConfigRefreshData)
+	task.AddTask("configRefreshData", configRefreshData)
+
 	//同步弘则数据库中来自,钢联,隆众,有色,人工等基础数据--每隔五分钟,同步一次最新数据
 	syncBaseData := task.NewTask("syncBaseData", "0 */5 * * * * ", SyncBaseData)
 	task.AddTask("syncBaseData", syncBaseData)
@@ -113,13 +117,10 @@ func releaseTask() {
 
 func RefreshData(cont context.Context) (err error) {
 	wg := sync.WaitGroup{}
-	wg.Add(16)
+	wg.Add(14)
 	//hour := time.Now().Hour()
 	//if hour != 0 {
 	//}
-	go data.RefreshDataFromWind(&wg)
-	//同花顺
-	go data.RefreshDataFromThs(&wg)
 	//彭博
 	go data.RefreshDataFromPb(&wg)
 	//彭博财务

+ 23 - 0
utils/common.go

@@ -912,3 +912,26 @@ func GetVideoPlaySeconds(videoPath string) (playSeconds float64, err error) {
 	}
 	return
 }
+
+// GetTimeSubDay 计算两个时间的自然日期差(后面减去前面)
+func GetTimeSubDay(t1, t2 time.Time) int {
+	var day int
+	swap := false
+	if t1.Unix() > t2.Unix() {
+		t1, t2 = t2, t1
+		swap = true
+	}
+
+	t1_ := t1.Add(time.Duration(t2.Sub(t1).Milliseconds()%86400000) * time.Millisecond)
+	day = int(t2.Sub(t1).Hours() / 24)
+	// 计算在t1+两个时间的余数之后天数是否有变化
+	if t1_.Day() != t1.Day() {
+		day += 1
+	}
+
+	if swap {
+		day = -day
+	}
+
+	return day
+}