Browse Source

定时刷新

xyxie 2 weeks ago
parent
commit
e691b98af5
3 changed files with 104 additions and 0 deletions
  1. 40 0
      models/data_manage/base_from_kpler.go
  2. 60 0
      services/data/base_from_kpler.go
  3. 4 0
      services/task.go

+ 40 - 0
models/data_manage/base_from_kpler.go

@@ -0,0 +1,40 @@
+package data_manage
+
+import (
+	"eta/eta_task/global"
+	"eta/eta_task/utils"
+	"time"
+)
+
+type BaseFromKplerIndex struct {
+	BaseFromKplerIndexId int `gorm:"column:base_from_kpler_index_id;primaryKey"`
+	ClassifyId    int
+	IndexCode     string
+	IndexName     string
+	Frequency     string
+	Unit          string
+	Sort          int
+	StartDate     string `description:"开始日期"`
+	EndDate       string `description:"结束日期"`
+	EndValue      float64
+	CreateTime    time.Time
+	ModifyTime    time.Time
+	BaseFileName  string `description:"文件目录"`
+	TerminalCode  string `description:"所属终端编码"`
+	ApiQueryUrl   string `description:"API查询URL"`
+	ExcelQueryUrl string `description:"Excel查询URL"`
+	ProductNames  string `description:"产品名称"`
+	FromZoneId    int    `description:"区域ID"`
+	FromZoneName  string `description:"区域名称"`
+	ToZoneId      int    `description:"区域ID"`
+	ToZoneName    string `description:"区域名称"`
+	FlowDirection string `description:"流向"`
+	Granularity   string `description:"粒度"`
+	Split         string `description:"拆分类型"`
+}
+
+func (m *BaseFromKplerIndex) GetApiNum() (num int64, err error) {
+	o := global.DbMap[utils.DbNameIndex]
+	err = o.Model(m).Where("api_query_url != ''").Count(&num).Error
+	return
+}

+ 60 - 0
services/data/base_from_kpler.go

@@ -0,0 +1,60 @@
+package data
+
+import (
+	"context"
+	"eta/eta_task/models/data_manage"
+	"eta/eta_task/services/alarm_msg"
+	"eta/eta_task/utils"
+	"fmt"
+	"time"
+)
+
+// RefreshBaseFromKplerIndex 刷新通过api方式对接的Kpler数据
+func RefreshBaseFromKplerIndex(cont context.Context) (err error) {
+	defer func() {
+		if err != nil {
+			tips := fmt.Sprintf("RefreshBaseFromKplerIndex-刷新Kpler数据失败, %v", err)
+			utils.FileLog.Info(tips)
+			go alarm_msg.SendAlarmMsg(tips, 3)
+		}
+	}()
+
+	kplerObj := new(data_manage.BaseFromKplerIndex)
+	num, err := kplerObj.GetApiNum()
+	if err != nil {
+		utils.FileLog.Info(fmt.Sprintf("RefreshBaseFromKplerIndex-获取需要刷新的数据失败, %v", err))
+		return
+	}
+	if num == 0 {
+		utils.FileLog.Info("RefreshBaseFromKplerIndex-没有需要刷新的数据")
+		return
+	}
+	frequencys := []string{"日度", "周度", "月度", "季度", "年度"}
+	for _, frequency := range frequencys {
+		startDate := time.Now().Format(utils.FormatDate)
+		if frequency == "周度" {
+			startDate = time.Now().AddDate(0, 0, -5).Format(utils.FormatDate)
+		} else if frequency == "月度" {
+			startDate = time.Now().AddDate(0, -1, 0).Format(utils.FormatDate)
+		} else if frequency == "季度" {
+			startDate = time.Now().AddDate(0, -3, 0).Format(utils.FormatDate)
+		} else if frequency == "年度" {
+			startDate = time.Now().AddDate(-1, 0, 0).Format(utils.FormatDate)
+		}
+
+		refreshUrl := "kpler/index/refresh_by_api"
+		param := make(map[string]interface{})
+		param["Frequency"] = frequency
+		param["StartDate"] = startDate
+		res, e := postRefreshEdbData(param, refreshUrl)
+		if e != nil {
+			utils.FileLog.Info(fmt.Sprintf("RefreshBaseFromKplerIndex-postRefreshEdbData, frequency: %s, err: %v", frequency, e))
+			continue
+		}
+		if res != nil && res.Ret != 200 {
+			utils.FileLog.Info(fmt.Sprintf("RefreshBaseFromKplerIndex-postRefreshEdbData, frequency: %s, Ret: %d, ErrMsg: %s", frequency, res.Ret, res.ErrMsg))
+			continue
+		}
+	}
+	return
+}

+ 4 - 0
services/task.go

@@ -178,6 +178,10 @@ func releaseTask() {
 	// 每日同步数据源ES(business_conf可开关)
 	syncDataSourceEs := task.NewTask("syncDataSourceEs", "0 30 4 * * *", data.SyncDataSourceEs)
 	task.AddTask("syncDataSourceEs", syncDataSourceEs)
+
+	// 刷新kpler指标
+	refreshKplerIndex := task.NewTask("refreshKplerIndex", "0 10 18,21 * * *", data.RefreshBaseFromKplerIndex)
+	task.AddTask("refreshKplerIndex", refreshKplerIndex)
 }
 
 func RefreshData(cont context.Context) (err error) {