Эх сурвалжийг харах

Merge branch 'bzq/mysteel_custom_debug' of eta_server/eta_task into debug

鲍自强 8 сар өмнө
parent
commit
279b61af11

+ 0 - 2
go.mod

@@ -14,14 +14,12 @@ require (
 	github.com/olivere/elastic/v7 v7.0.32
 	github.com/rdlucklib/rdluck_tools v1.0.3
 	github.com/shopspring/decimal v1.3.1
-	github.com/yidane/formula v0.0.0-20220322063702-c9da84ba3476
 	golang.org/x/net v0.21.0
 	gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
 )
 
 require (
 	github.com/andybalholm/cascadia v1.3.2 // indirect
-	github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20211218165449-dd623ecc2f02 // indirect
 	github.com/beorn7/perks v1.0.1 // indirect
 	github.com/cespare/xxhash/v2 v2.2.0 // indirect
 	github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect

+ 0 - 4
go.sum

@@ -12,8 +12,6 @@ github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible h1:8psS8a+wKfiLt1iVDX79F
 github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
 github.com/andybalholm/cascadia v1.3.2 h1:3Xi6Dw5lHF15JtdcmAHD3i1+T8plmv7BQ/nsViSLyss=
 github.com/andybalholm/cascadia v1.3.2/go.mod h1:7gtRlve5FxPPgIgX36uWBX58OdBsSS6lUvCFb+h7KvU=
-github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20211218165449-dd623ecc2f02 h1:o2oaBQGTzO+xNh12e7xWkphNe7H2DTiWv1ml9a2P9PQ=
-github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20211218165449-dd623ecc2f02/go.mod h1:F7bn7fEU90QkQ3tnmaTx3LTKLEDqnwWODIYppRQ5hnY=
 github.com/astaxie/beego v1.12.3/go.mod h1:p3qIm0Ryx7zeBHLljmd7omloyca1s4yu1a8kM1FkpIA=
 github.com/beego/beego/v2 v2.1.0 h1:Lk0FtQGvDQCx5V5yEu4XwDsIgt+QOlNjt5emUa3/ZmA=
 github.com/beego/beego/v2 v2.1.0/go.mod h1:6h36ISpaxNrrpJ27siTpXBG8d/Icjzsc7pU1bWpp0EE=
@@ -195,8 +193,6 @@ github.com/syndtr/goleveldb v0.0.0-20160425020131-cfa635847112/go.mod h1:Z4AUp2K
 github.com/syndtr/goleveldb v0.0.0-20181127023241-353a9fca669c/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0=
 github.com/ugorji/go v0.0.0-20171122102828-84cb69a8af83/go.mod h1:hnLbHMwcvSihnDhEfx2/BzKp2xb0Y+ErdfYcrs9tkJQ=
 github.com/wendal/errors v0.0.0-20130201093226-f66c77a7882b/go.mod h1:Q12BUT7DqIlHRmgv3RskH+UCM/4eqVMgI0EMmlSpAXc=
-github.com/yidane/formula v0.0.0-20220322063702-c9da84ba3476 h1:66fLxv8xlhSr42ZhVAYjUY/sEF0olUUAESVlsxVduuw=
-github.com/yidane/formula v0.0.0-20220322063702-c9da84ba3476/go.mod h1:9/dQiKiN04yPMdgsuFmKGuI2Hdp6OmFV9gSWS1col6g=
 github.com/ylywyn/jpush-api-go-client v0.0.0-20190906031852-8c4466c6e369/go.mod h1:Nv7wKD2/bCdKUFNKcJRa99a+1+aSLlCRJFriFYdjz/I=
 github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
 github.com/yuin/gopher-lua v0.0.0-20171031051903-609c9cd26973/go.mod h1:aEV29XrmTYFr3CiRxZeGHpkvbwq+prZduBqMaascyCU=

+ 16 - 0
models/data_manage/base_from_mysteel_chemical_index.go

@@ -2,6 +2,7 @@ package data_manage
 
 import (
 	"eta/eta_task/utils"
+
 	"github.com/beego/beego/v2/client/orm"
 )
 
@@ -34,6 +35,7 @@ type BaseFromMysteelChemicalIndexItem struct {
 	IsStop                            int32   `json:"is_stop"`              // 是否停更:1:停更,0:未停更
 	TerminalCode                      string  `json:"terminal_code"`        // 所属终端编码
 	EndValue                          float64 `json:"end_value"`            // 指标的最新值
+	EdbInfoId                         int     `json:"edb_info_id"`
 }
 
 // GetBaseFromMysteelChemicalIndexItemByCode
@@ -51,6 +53,20 @@ func GetBaseFromMysteelChemicalIndexItemByCode(edbCode string) (item *BaseFromMy
 	return
 }
 
+func GetBaseFromMysteelChemicalIndexItems(frequencyList []string) (items []*BaseFromMysteelChemicalIndexItem, err error) {
+	num := len(frequencyList)
+	if num == 0 {
+		return
+	}
+	o := orm.NewOrmUsingDB("data")
+	sql := ` SELECT b.*, e.edb_info_id FROM base_from_mysteel_chemical_index AS b
+	LEFT JOIN edb_info AS e
+	ON b.index_code = e.edb_code
+	WHERE b.source='api' AND b.frequency IN (` + utils.GetOrmInReplace(len(frequencyList)) + `)`
+	_, err = o.Raw(sql, frequencyList).QueryRows(&items)
+	return
+}
+
 // GetRefreshBaseFromMysteelChemicalIndexItemByCreateTime 获取正常刷新的钢联化工指标
 func GetRefreshBaseFromMysteelChemicalIndexItemByCreateTime(endDate string, startPage, pageSize int) (items []*BaseFromMysteelChemicalIndexItem, err error) {
 	o := orm.NewOrmUsingDB("data")

+ 3 - 75
services/data/base_from_pcsg.go

@@ -7,8 +7,8 @@ import (
 	"fmt"
 )
 
-// RefreshPCSGBloombergDaily 中石油新加坡-刷新彭博日度指标
-func RefreshPCSGBloombergDaily(cont context.Context) (err error) {
+// RefreshPCSGBloomberg 中石油新加坡-每日指标刷新
+func RefreshPCSGBloomberg(cont context.Context) (err error) {
 	defer func() {
 		if err != nil {
 			tips := "RefreshPCSGBloombergDaily-中石油新加坡-刷新彭博日度指标失败, ErrMsg:\n" + err.Error()
@@ -18,79 +18,7 @@ func RefreshPCSGBloombergDaily(cont context.Context) (err error) {
 	}()
 
 	param := make(map[string]interface{})
-	uri := "bloomberg/pcsg/refresh_daily"
-	res, e := postRefreshEdbData(param, uri)
-	if e != nil {
-		err = fmt.Errorf("postRefreshEdbData err: %s", e.Error())
-		return
-	}
-	if res != nil && res.Ret != 200 {
-		err = fmt.Errorf("postRefreshEdbData fail")
-		return
-	}
-	return
-}
-
-// RefreshPCSGBloombergWeekly 中石油新加坡-刷新彭博周度指标
-func RefreshPCSGBloombergWeekly(cont context.Context) (err error) {
-	defer func() {
-		if err != nil {
-			tips := "RefreshPCSGBloombergWeekly-中石油新加坡-刷新彭博周度指标失败, ErrMsg:\n" + err.Error()
-			utils.FileLog.Info(tips)
-			go alarm_msg.SendAlarmMsg(tips, 3)
-		}
-	}()
-
-	param := make(map[string]interface{})
-	uri := "bloomberg/pcsg/refresh_weekly"
-	res, e := postRefreshEdbData(param, uri)
-	if e != nil {
-		err = fmt.Errorf("postRefreshEdbData err: %s", e.Error())
-		return
-	}
-	if res != nil && res.Ret != 200 {
-		err = fmt.Errorf("postRefreshEdbData fail")
-		return
-	}
-	return
-}
-
-// RefreshPCSGBloombergMonthly 中石油新加坡-刷新彭博月度指标
-func RefreshPCSGBloombergMonthly(cont context.Context) (err error) {
-	defer func() {
-		if err != nil {
-			tips := "RefreshPCSGBloombergMonthly-中石油新加坡-刷新彭博月度指标失败, ErrMsg:\n" + err.Error()
-			utils.FileLog.Info(tips)
-			go alarm_msg.SendAlarmMsg(tips, 3)
-		}
-	}()
-
-	param := make(map[string]interface{})
-	uri := "bloomberg/pcsg/refresh_monthly"
-	res, e := postRefreshEdbData(param, uri)
-	if e != nil {
-		err = fmt.Errorf("postRefreshEdbData err: %s", e.Error())
-		return
-	}
-	if res != nil && res.Ret != 200 {
-		err = fmt.Errorf("postRefreshEdbData fail")
-		return
-	}
-	return
-}
-
-// RefreshPCSGBloombergDailyRun3 中石油新加坡-刷新彭博日度指标Run3
-func RefreshPCSGBloombergDailyRun3(cont context.Context) (err error) {
-	defer func() {
-		if err != nil {
-			tips := "RefreshPCSGBloombergDailyRun3-中石油新加坡-刷新彭博日度指标V2失败, ErrMsg:\n" + err.Error()
-			utils.FileLog.Info(tips)
-			go alarm_msg.SendAlarmMsg(tips, 3)
-		}
-	}()
-
-	param := make(map[string]interface{})
-	uri := "bloomberg/pcsg/refresh_daily_run3"
+	uri := "bloomberg/pcsg/refresh_task"
 	res, e := postRefreshEdbData(param, uri)
 	if e != nil {
 		err = fmt.Errorf("postRefreshEdbData err: %s", e.Error())

+ 88 - 6
services/edb_refresh.go

@@ -56,7 +56,12 @@ func ConfigRefreshData(cont context.Context) (err error) {
 			wg.Done()
 			continue
 		}
-		go BaseRefreshData(&wg, edbList[0].Source, edbList[0].SubSource, edbList)
+		if len(edbList) != 0 {
+			// debug环境仅测试刷新钢联
+			if edbList[0].Source == utils.DATA_SOURCE_MYSTEEL_CHEMICAL && utils.RunMode == "debug" {
+				go BaseRefreshData(&wg, edbList[0].Source, edbList[0].SubSource, edbList)
+			}
+		}
 	}
 
 	wg.Wait()
@@ -114,6 +119,21 @@ func getDefaultRefreshData(now time.Time) (sourceEdbInfoListMap map[string][]*ed
 	//刷新频率,枚举值:每自然日、每交易日、每周、每旬、每月、每季、每半年、每年
 	refreshFrequencyList := []string{"每自然日", "每交易日", "每周", "每旬", "每月", "每季", "每半年", "每年"}
 
+	conf, err := models.GetBusinessConf()
+	if err != nil {
+		fmt.Println(err)
+		utils.FileLog.Info("获取业务配置失败,Err:" + err.Error())
+		return
+	}
+
+	// 获取钢联化工的数据获取方式
+	mySteelChemicalDataMethod := "excel"
+	if v, ok := conf["MySteelDataMethod"]; ok {
+		if v == "api" {
+			mySteelChemicalDataMethod = v
+		}
+	}
+	utils.FileLog.Info("获取业务配置,MySteelDataMethod:" + mySteelChemicalDataMethod)
 	// 获取各个刷新频率的配置
 	for _, refreshFrequency := range refreshFrequencyList {
 		// 获取刷新频率条件
@@ -127,9 +147,14 @@ func getDefaultRefreshData(now time.Time) (sourceEdbInfoListMap map[string][]*ed
 		pars = append(pars, refreshFrequency, currTimeStr)
 
 		// 这两个是excel的数据源,他是从公共机更新的,需要过滤掉
-		condition += ` AND source not in (?,?)`
-		pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS)
-
+		if mySteelChemicalDataMethod == "api" {
+			// 钢联化工使用api的方式获取数据的,不需要过滤
+			condition += ` AND source not in (?)`
+			pars = append(pars, utils.DATA_SOURCE_YS)
+		} else {
+			condition += ` AND source not in (?,?)`
+			pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS)
+		}
 		tmpList, tmpErr := edb_refresh.GetListByCondition(condition, pars)
 		if tmpErr != nil {
 			err = tmpErr
@@ -167,7 +192,45 @@ func getDefaultRefreshData(now time.Time) (sourceEdbInfoListMap map[string][]*ed
 	for source, subSourceFrequencyListMap := range refreshDataFrequencyListMap {
 		switch source {
 		case utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS:
-			// 这种不处理
+			// 只处理钢联化工使用api方式获取数据的情况
+			if mySteelChemicalDataMethod == "api" {
+				for subSource, frequencyList := range subSourceFrequencyListMap {
+					items, tmpErr := data_manage.GetBaseFromMysteelChemicalIndexItems(frequencyList)
+					if tmpErr != nil {
+						errMsgList = append(errMsgList, fmt.Sprint("source:", source, "subSource:", subSource, "frequencyList:", strings.Join(frequencyList, ","), "err:", tmpErr.Error()))
+					}
+					indexList := make([]*edb_refresh.EdbInfoListAndRefreshConfig, 0)
+
+					for _, v := range items {
+						tmpConf := new(edb_refresh.EdbInfoListAndRefreshConfig)
+						// 数据刷新的期数
+						dataRefreshNum := utils.DATA_REFRESH
+						key := fmt.Sprintf("%d_%d_%s", utils.DATA_SOURCE_MYSTEEL_CHEMICAL, 0, v.Frequency)
+						if edbRefreshDefaultConfig, ok := refreshDataNumMap[key]; ok {
+							if edbRefreshDefaultConfig.RefreshAllData == 1 { // 刷新所有数据期数
+								dataRefreshNum = 0
+							} else if edbRefreshDefaultConfig.RefreshDataNum > 0 { //
+								dataRefreshNum = edbRefreshDefaultConfig.RefreshDataNum
+							}
+						}
+						tmpConf.EdbCode = v.IndexCode
+						tmpConf.EdbName = v.IndexName
+						tmpConf.Source = utils.DATA_SOURCE_MYSTEEL_CHEMICAL
+						tmpConf.Frequency = v.Frequency
+						tmpConf.Unit = v.Unit
+						tmpConf.StartDate, err = time.Parse(utils.FormatDate, v.StartDate)
+						tmpConf.EndDate, err = time.Parse(utils.FormatDate, v.EndDate)
+						tmpConf.ClassifyId = int(v.BaseFromMysteelChemicalClassifyId)
+						tmpConf.DataRefreshNum = dataRefreshNum
+						tmpConf.EdbInfoId = v.EdbInfoId
+						indexList = append(indexList, tmpConf)
+					}
+
+					key := fmt.Sprint(source, "_", subSource)
+					sourceEdbInfoListMap[key] = indexList
+				}
+			}
+			// 其他情况不处理
 		default:
 			for subSource, frequencyList := range subSourceFrequencyListMap {
 				edbList, tmpErr := edb_refresh.GetDefaultRefreshEdbInfoListBySourceAndSubSource(source, subSource, frequencyList)
@@ -251,8 +314,27 @@ func getConfigRefreshData(now time.Time) (sourceEdbInfoListMap map[string][]*edb
 		configIdList = append(configIdList, v.EdbRefreshConfigId)
 		configIdEdbRefreshConfigMap[v.EdbRefreshConfigId] = v
 	}
+	conf, err := models.GetBusinessConf()
+	if err != nil {
+		fmt.Println(err)
+		return
+	}
 
-	edbInfoList, err := edb_refresh.GetConfigRefreshEdbInfoListBySourceAndSubSource([]int{utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS}, configIdList)
+	// 获取钢联化工的数据获取方式
+	mySteelChemicalDataMethod := "excel"
+	if v, ok := conf["MySteelDataMethod"]; ok {
+		if v == "api" {
+			mySteelChemicalDataMethod = v
+		}
+	}
+	// 当钢联的数据获取方式是api时,不用过滤
+	var sourceList []int
+	if mySteelChemicalDataMethod == "api" {
+		sourceList = []int{utils.DATA_SOURCE_YS}
+	} else {
+		sourceList = []int{utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_YS}
+	}
+	edbInfoList, err := edb_refresh.GetConfigRefreshEdbInfoListBySourceAndSubSource(sourceList, configIdList)
 	if err != nil {
 		return
 	}

+ 8 - 12
services/task.go

@@ -7,9 +7,10 @@ import (
 	"eta/eta_task/services/data_stat"
 	"eta/eta_task/utils"
 	"fmt"
-	"github.com/beego/beego/v2/task"
 	"sync"
 	"time"
+
+	"github.com/beego/beego/v2/task"
 )
 
 func Task() {
@@ -17,6 +18,10 @@ func Task() {
 	//如果是生产环境,才需要走这些任务
 	if utils.RunMode == "release" {
 		releaseTask()
+	} else {
+		// 根据配置刷新指标数据
+		configRefreshData := task.NewTask("syncBaseDataExt", "0 */30 * * * * ", ConfigRefreshData)
+		task.AddTask("configRefreshData", configRefreshData)
 	}
 	// 定时发布智能研报
 	publishSmartReport := task.NewTask("publishSmartReport", "0 */1 * * * *", PublishSmartReport)
@@ -123,17 +128,8 @@ func releaseTask() {
 
 	// 中石油新加坡
 	if utils.IsPCSG == "1" {
-		refreshPCSGBloombergDaily := task.NewTask("refreshPCSGBloombergDaily", "0 30 9 * * *", data.RefreshPCSGBloombergDaily)
-		task.AddTask("刷新中石油新加坡-Bloomberg日度指标", refreshPCSGBloombergDaily)
-
-		refreshPCSGBloombergDailyRun3 := task.NewTask("refreshPCSGBloombergDailyRun3", "0 32 9 * * *", data.RefreshPCSGBloombergDailyRun3)
-		task.AddTask("刷新中石油新加坡-Bloomberg日度指标Run3", refreshPCSGBloombergDailyRun3)
-
-		//refreshPCSGBloombergWeekly := task.NewTask("refreshPCSGBloombergWeekly", "0 30 9 * * *", data.RefreshPCSGBloombergWeekly)
-		//task.AddTask("刷新中石油新加坡-Bloomberg周度指标", refreshPCSGBloombergWeekly)
-
-		refreshPCSGBloombergMonthly := task.NewTask("refreshPCSGBloombergMonthly", "0 34 9 * * *", data.RefreshPCSGBloombergMonthly)
-		task.AddTask("刷新中石油新加坡-Bloomberg月度指标", refreshPCSGBloombergMonthly)
+		refreshPCSGBloomberg := task.NewTask("refreshPCSGBloombergDaily", "0 30 9 * * *", data.RefreshPCSGBloomberg)
+		task.AddTask("中石油新加坡-每日Bloomberg指标刷新", refreshPCSGBloomberg)
 	}
 }