Browse Source

fix:定时刷新

zqbao 8 months ago
parent
commit
febfcbe360
5 changed files with 113 additions and 14 deletions
  1. 0 2
      go.mod
  2. 0 4
      go.sum
  3. 20 1
      models/data_manage/base_from_mysteel_chemical_index.go
  4. 87 6
      services/edb_refresh.go
  5. 6 1
      services/task.go

+ 0 - 2
go.mod

@@ -14,14 +14,12 @@ require (
 	github.com/olivere/elastic/v7 v7.0.32
 	github.com/rdlucklib/rdluck_tools v1.0.3
 	github.com/shopspring/decimal v1.3.1
-	github.com/yidane/formula v0.0.0-20220322063702-c9da84ba3476
 	golang.org/x/net v0.21.0
 	gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
 )
 
 require (
 	github.com/andybalholm/cascadia v1.3.2 // indirect
-	github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20211218165449-dd623ecc2f02 // indirect
 	github.com/beorn7/perks v1.0.1 // indirect
 	github.com/cespare/xxhash/v2 v2.2.0 // indirect
 	github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect

+ 0 - 4
go.sum

@@ -12,8 +12,6 @@ github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible h1:8psS8a+wKfiLt1iVDX79F
 github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
 github.com/andybalholm/cascadia v1.3.2 h1:3Xi6Dw5lHF15JtdcmAHD3i1+T8plmv7BQ/nsViSLyss=
 github.com/andybalholm/cascadia v1.3.2/go.mod h1:7gtRlve5FxPPgIgX36uWBX58OdBsSS6lUvCFb+h7KvU=
-github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20211218165449-dd623ecc2f02 h1:o2oaBQGTzO+xNh12e7xWkphNe7H2DTiWv1ml9a2P9PQ=
-github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20211218165449-dd623ecc2f02/go.mod h1:F7bn7fEU90QkQ3tnmaTx3LTKLEDqnwWODIYppRQ5hnY=
 github.com/astaxie/beego v1.12.3/go.mod h1:p3qIm0Ryx7zeBHLljmd7omloyca1s4yu1a8kM1FkpIA=
 github.com/beego/beego/v2 v2.1.0 h1:Lk0FtQGvDQCx5V5yEu4XwDsIgt+QOlNjt5emUa3/ZmA=
 github.com/beego/beego/v2 v2.1.0/go.mod h1:6h36ISpaxNrrpJ27siTpXBG8d/Icjzsc7pU1bWpp0EE=
@@ -195,8 +193,6 @@ github.com/syndtr/goleveldb v0.0.0-20160425020131-cfa635847112/go.mod h1:Z4AUp2K
 github.com/syndtr/goleveldb v0.0.0-20181127023241-353a9fca669c/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0=
 github.com/ugorji/go v0.0.0-20171122102828-84cb69a8af83/go.mod h1:hnLbHMwcvSihnDhEfx2/BzKp2xb0Y+ErdfYcrs9tkJQ=
 github.com/wendal/errors v0.0.0-20130201093226-f66c77a7882b/go.mod h1:Q12BUT7DqIlHRmgv3RskH+UCM/4eqVMgI0EMmlSpAXc=
-github.com/yidane/formula v0.0.0-20220322063702-c9da84ba3476 h1:66fLxv8xlhSr42ZhVAYjUY/sEF0olUUAESVlsxVduuw=
-github.com/yidane/formula v0.0.0-20220322063702-c9da84ba3476/go.mod h1:9/dQiKiN04yPMdgsuFmKGuI2Hdp6OmFV9gSWS1col6g=
 github.com/ylywyn/jpush-api-go-client v0.0.0-20190906031852-8c4466c6e369/go.mod h1:Nv7wKD2/bCdKUFNKcJRa99a+1+aSLlCRJFriFYdjz/I=
 github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
 github.com/yuin/gopher-lua v0.0.0-20171031051903-609c9cd26973/go.mod h1:aEV29XrmTYFr3CiRxZeGHpkvbwq+prZduBqMaascyCU=

+ 20 - 1
models/data_manage/base_from_mysteel_chemical_index.go

@@ -1,6 +1,10 @@
 package data_manage
 
-import "github.com/beego/beego/v2/client/orm"
+import (
+	"eta/eta_task/utils"
+
+	"github.com/beego/beego/v2/client/orm"
+)
 
 type BaseFromMysteelChemicalIndexItem struct {
 	BaseFromMysteelChemicalIndexId    int32   `json:"base_from_mysteel_chemical_index_id"`
@@ -31,6 +35,7 @@ type BaseFromMysteelChemicalIndexItem struct {
 	IsStop                            int32   `json:"is_stop"`              // 是否停更:1:停更,0:未停更
 	TerminalCode                      string  `json:"terminal_code"`        // 所属终端编码
 	EndValue                          float64 `json:"end_value"`            // 指标的最新值
+	EdbInfoId                         int     `json:"edb_info_id"`
 }
 
 // GetBaseFromMysteelChemicalIndexItemByCode
@@ -47,3 +52,17 @@ func GetBaseFromMysteelChemicalIndexItemByCode(edbCode string) (item *BaseFromMy
 
 	return
 }
+
+func GetBaseFromMysteelChemicalIndexItems(frequencyList []string) (items []*BaseFromMysteelChemicalIndexItem, err error) {
+	num := len(frequencyList)
+	if num == 0 {
+		return
+	}
+	o := orm.NewOrmUsingDB("data")
+	sql := ` SELECT b.*, e.edb_info_id FROM base_from_mysteel_chemical_index AS b
+	LEFT JOIN edb_info AS e
+	ON b.index_code = e.edb_code
+	WHERE b.source='api' AND b.frequency IN (` + utils.GetOrmInReplace(len(frequencyList)) + `)`
+	_, err = o.Raw(sql, frequencyList).QueryRows(&items)
+	return
+}

+ 87 - 6
services/edb_refresh.go

@@ -2,6 +2,8 @@ package services
 
 import (
 	"context"
+	"eta/eta_task/models"
+	"eta/eta_task/models/data_manage"
 	"eta/eta_task/models/data_manage/edb_refresh"
 	"eta/eta_task/services/alarm_msg"
 	"eta/eta_task/services/data"
@@ -53,7 +55,9 @@ func ConfigRefreshData(cont context.Context) (err error) {
 			wg.Done()
 			continue
 		}
-		go BaseRefreshData(&wg, edbList[0].Source, edbList[0].SubSource, edbList)
+		if len(edbList) != 0 {
+			go BaseRefreshData(&wg, edbList[0].Source, edbList[0].SubSource, edbList)
+		}
 	}
 
 	wg.Wait()
@@ -111,6 +115,21 @@ func getDefaultRefreshData(now time.Time) (sourceEdbInfoListMap map[string][]*ed
 	//刷新频率,枚举值:每自然日、每交易日、每周、每旬、每月、每季、每半年、每年
 	refreshFrequencyList := []string{"每自然日", "每交易日", "每周", "每旬", "每月", "每季", "每半年", "每年"}
 
+	conf, err := models.GetBusinessConf()
+	if err != nil {
+		fmt.Println(err)
+		utils.FileLog.Info("获取业务配置失败,Err:" + err.Error())
+		return
+	}
+
+	// 获取钢联化工的数据获取方式
+	mySteelChemicalDataMethod := "excel"
+	if v, ok := conf["MySteelDataMethod"]; ok {
+		if v == "api" {
+			mySteelChemicalDataMethod = v
+		}
+	}
+	utils.FileLog.Info("获取业务配置,MySteelDataMethod:" + mySteelChemicalDataMethod)
 	// 获取各个刷新频率的配置
 	for _, refreshFrequency := range refreshFrequencyList {
 		// 获取刷新频率条件
@@ -124,9 +143,14 @@ func getDefaultRefreshData(now time.Time) (sourceEdbInfoListMap map[string][]*ed
 		pars = append(pars, refreshFrequency, currTimeStr)
 
 		// 这两个是excel的数据源,他是从公共机更新的,需要过滤掉
-		condition += ` AND source not in (?,?)`
-		pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS)
-
+		if mySteelChemicalDataMethod == "api" {
+			// 钢联化工使用api的方式获取数据的,不需要过滤
+			condition += ` AND source not in (?)`
+			pars = append(pars, utils.DATA_SOURCE_YS)
+		} else {
+			condition += ` AND source not in (?,?)`
+			pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS)
+		}
 		tmpList, tmpErr := edb_refresh.GetListByCondition(condition, pars)
 		if tmpErr != nil {
 			err = tmpErr
@@ -164,7 +188,45 @@ func getDefaultRefreshData(now time.Time) (sourceEdbInfoListMap map[string][]*ed
 	for source, subSourceFrequencyListMap := range refreshDataFrequencyListMap {
 		switch source {
 		case utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS:
-			// 这种不处理
+			// 只处理钢联化工使用api方式获取数据的情况
+			if mySteelChemicalDataMethod == "api" {
+				for subSource, frequencyList := range subSourceFrequencyListMap {
+					items, tmpErr := data_manage.GetBaseFromMysteelChemicalIndexItems(frequencyList)
+					if tmpErr != nil {
+						errMsgList = append(errMsgList, fmt.Sprint("source:", source, "subSource:", subSource, "frequencyList:", strings.Join(frequencyList, ","), "err:", tmpErr.Error()))
+					}
+					indexList := make([]*edb_refresh.EdbInfoListAndRefreshConfig, 0)
+
+					for _, v := range items {
+						tmpConf := new(edb_refresh.EdbInfoListAndRefreshConfig)
+						// 数据刷新的期数
+						dataRefreshNum := utils.DATA_REFRESH
+						key := fmt.Sprintf("%d_%d_%s", utils.DATA_SOURCE_MYSTEEL_CHEMICAL, 0, v.Frequency)
+						if edbRefreshDefaultConfig, ok := refreshDataNumMap[key]; ok {
+							if edbRefreshDefaultConfig.RefreshAllData == 1 { // 刷新所有数据期数
+								dataRefreshNum = 0
+							} else if edbRefreshDefaultConfig.RefreshDataNum > 0 { //
+								dataRefreshNum = edbRefreshDefaultConfig.RefreshDataNum
+							}
+						}
+						tmpConf.EdbCode = v.IndexCode
+						tmpConf.EdbName = v.IndexName
+						tmpConf.Source = utils.DATA_SOURCE_MYSTEEL_CHEMICAL
+						tmpConf.Frequency = v.Frequency
+						tmpConf.Unit = v.Unit
+						tmpConf.StartDate, err = time.Parse(utils.FormatDate, v.StartDate)
+						tmpConf.EndDate, err = time.Parse(utils.FormatDate, v.EndDate)
+						tmpConf.ClassifyId = int(v.BaseFromMysteelChemicalClassifyId)
+						tmpConf.DataRefreshNum = dataRefreshNum
+						tmpConf.EdbInfoId = v.EdbInfoId
+						indexList = append(indexList, tmpConf)
+					}
+
+					key := fmt.Sprint(source, "_", subSource)
+					sourceEdbInfoListMap[key] = indexList
+				}
+			}
+			// 其他情况不处理
 		default:
 			for subSource, frequencyList := range subSourceFrequencyListMap {
 				edbList, tmpErr := edb_refresh.GetDefaultRefreshEdbInfoListBySourceAndSubSource(source, subSource, frequencyList)
@@ -248,8 +310,27 @@ func getConfigRefreshData(now time.Time) (sourceEdbInfoListMap map[string][]*edb
 		configIdList = append(configIdList, v.EdbRefreshConfigId)
 		configIdEdbRefreshConfigMap[v.EdbRefreshConfigId] = v
 	}
+	conf, err := models.GetBusinessConf()
+	if err != nil {
+		fmt.Println(err)
+		return
+	}
 
-	edbInfoList, err := edb_refresh.GetConfigRefreshEdbInfoListBySourceAndSubSource([]int{utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS}, configIdList)
+	// 获取钢联化工的数据获取方式
+	mySteelChemicalDataMethod := "excel"
+	if v, ok := conf["MySteelDataMethod"]; ok {
+		if v == "api" {
+			mySteelChemicalDataMethod = v
+		}
+	}
+	// 当钢联的数据获取方式是api时,不用过滤
+	var sourceList []int
+	if mySteelChemicalDataMethod == "api" {
+		sourceList = []int{utils.DATA_SOURCE_YS}
+	} else {
+		sourceList = []int{utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS}
+	}
+	edbInfoList, err := edb_refresh.GetConfigRefreshEdbInfoListBySourceAndSubSource(sourceList, configIdList)
 	if err != nil {
 		return
 	}

+ 6 - 1
services/task.go

@@ -7,9 +7,10 @@ import (
 	"eta/eta_task/services/data_stat"
 	"eta/eta_task/utils"
 	"fmt"
-	"github.com/beego/beego/v2/task"
 	"sync"
 	"time"
+
+	"github.com/beego/beego/v2/task"
 )
 
 func Task() {
@@ -17,6 +18,10 @@ func Task() {
 	//如果是生产环境,才需要走这些任务
 	if utils.RunMode == "release" {
 		releaseTask()
+	} else {
+		// 根据配置刷新指标数据
+		configRefreshData := task.NewTask("syncBaseDataExt", "0 */30 * * * * ", ConfigRefreshData)
+		task.AddTask("configRefreshData", configRefreshData)
 	}
 	// 定时发布智能研报
 	publishSmartReport := task.NewTask("publishSmartReport", "0 */1 * * * *", PublishSmartReport)