瀏覽代碼

Merge branch 'feature/jingbo_pb_refresh' of eta_server/eta_task into master

xyxie 1 年之前
父節點
當前提交
e4e59f6fb1
共有 6 個文件被更改,包括 67 次插入33 次删除
  1. 6 0
      models/data_manage/base_from_eia_steo.go
  2. 12 0
      models/data_manage/base_from_icpi.go
  3. 10 1
      services/data/edb_info.go
  4. 8 8
      services/eia_steo.go
  5. 22 24
      services/icpi.go
  6. 9 0
      utils/config.go

+ 6 - 0
models/data_manage/base_from_eia_steo.go

@@ -32,6 +32,12 @@ func AddBaseFromEiaSteoIndex(item *BaseFromEiaSteoIndex) (lastId int64, err erro
 	return
 }
 
+func InsertOrUpdateBaseFromEiaSteoIndex(item *BaseFromEiaSteoIndex) (lastId int64, err error) {
+	o := orm.NewOrm()
+	lastId, err = o.InsertOrUpdate(item)
+	return
+}
+
 // Add 新增指标
 func (item *BaseFromEiaSteoIndex) Add() (err error) {
 	o := orm.NewOrm()

+ 12 - 0
models/data_manage/base_from_icpi.go

@@ -46,6 +46,12 @@ func (obj *BaseFromIcpiIndex) AddBaseFromIcpiIndex(item *BaseFromIcpiIndex) (las
 	return
 }
 
+func (obj *BaseFromIcpiIndex) InsertOrUpdateBaseFromIcpiIndex(item *BaseFromIcpiIndex) (lastId int64, err error) {
+	o := orm.NewOrm()
+	lastId, err = o.InsertOrUpdate(item)
+	return
+}
+
 type BaseFromIcpiClassify struct {
 	BaseFromIcpiClassifyId int    `orm:"column(base_from_icpi_classify_id);pk"`
 	ClassifyName           string `description:"分类名称"`
@@ -76,6 +82,12 @@ func AddBaseFromIcpiClassify(item *BaseFromIcpiClassify) (lastId int64, err erro
 	return
 }
 
+func InsertOrUpdateBaseFromIcpiClassify(item *BaseFromIcpiClassify) (lastId int64, err error) {
+	o := orm.NewOrm()
+	lastId, err = o.InsertOrUpdate(item)
+	return
+}
+
 // GetBaseFromComTradeMaxDate 获取ICPI消费者指数最大数据
 func GetBaseFromIcpiMaxDate() (max_date time.Time, err error) {
 	o := orm.NewOrm()

+ 10 - 1
services/data/edb_info.go

@@ -135,9 +135,18 @@ func RefreshDataFromPb(wg *sync.WaitGroup) (err error) {
 		}
 		wg.Done()
 	}()
+	if utils.PbRefreshOpen == "0" {
+		return
+	}
 	var condition string
 	var pars []interface{}
-	condition += " AND source=? AND frequency in ('日度','周度') "
+	var frequencyStr string
+	if utils.PbRefreshFrequency == "" {
+		frequencyStr = "'日度','周度'"
+	} else {
+		frequencyStr = utils.PbRefreshFrequency
+	}
+	condition += " AND source=? AND frequency in (" + frequencyStr + ") "
 	pars = append(pars, utils.DATA_SOURCE_PB)
 
 	items, err := data_manage.GetEdbInfoByCondition(condition, pars, 0)

+ 8 - 8
services/eia_steo.go

@@ -34,7 +34,7 @@ func SyncEiaSteoIndex() (err error) {
 		return err
 	}
 	//获取所有指标信息  某一天的
-	allIndex, err := data_manage.GetBaseFromEiaSteoIndexAll(startDate)
+	/*allIndex, err := data_manage.GetBaseFromEiaSteoIndexAll(startDate)
 	if err != nil {
 		fmt.Println("get GetBaseFromEiaSteoIndexAll err:" + err.Error())
 		return
@@ -43,16 +43,16 @@ func SyncEiaSteoIndex() (err error) {
 	existIndexMap := make(map[string]*data_manage.BaseFromEiaSteoIndex)
 	for _, v := range allIndex {
 		existIndexMap[v.IndexCode] = v
-	}
+	}*/
 
 	for _, zv := range respObj.Data {
-		if _, ok := existIndexMap[zv.IndexCode]; !ok {
-			newID, err := data_manage.AddBaseFromEiaSteoIndex(zv)
-			if err != nil {
-				fmt.Println("insert error:", err)
-			}
-			fmt.Println("insert new indexID:", newID)
+		//if _, ok := existIndexMap[zv.IndexCode]; !ok {
+		newID, err := data_manage.InsertOrUpdateBaseFromEiaSteoIndex(zv)
+		if err != nil {
+			fmt.Println("InsertOrUpdateBaseFromEiaSteoIndex error:", err)
 		}
+		fmt.Println("InsertOrUpdateBaseFromEiaSteoIndex new indexID:", newID)
+		//}
 	}
 	return err
 }

+ 22 - 24
services/icpi.go

@@ -31,12 +31,12 @@ func SyncBaseFromIcpi() (err error) {
 	respObj := new(data_manage.IcpiIndexResp)
 	err = json.Unmarshal([]byte(result), &respObj)
 	if err != nil {
-		utils.FileLog.Info("err:",err.Error())
-		fmt.Println("err:",err.Error())
+		utils.FileLog.Info("err:", err.Error())
+		fmt.Println("err:", err.Error())
 		return err
 	}
 	//获取所有指标信息  某一天的
-	allIndex, err := data_manage.GetBaseFromIcpiIndexAll(startDate)
+	/*allIndex, err := data_manage.GetBaseFromIcpiIndexAll(startDate)
 	if err != nil {
 		return
 	}
@@ -44,18 +44,16 @@ func SyncBaseFromIcpi() (err error) {
 	existIndexMap := make(map[int]*data_manage.BaseFromIcpiIndex)
 	for _, v := range allIndex {
 		existIndexMap[v.BaseFromIcpiIndexId] = v
-	}
+	}*/
 
 	icpiObj := new(data_manage.BaseFromIcpiIndex)
 
 	for _, zv := range respObj.Data {
-		if _, ok := existIndexMap[zv.BaseFromIcpiIndexId]; !ok {
-			newID, err := icpiObj.AddBaseFromIcpiIndex(zv)
-			if err != nil {
-				fmt.Println("insert error:", err)
-			}
-			fmt.Println("insert new indexID:", newID)
+		newID, err := icpiObj.InsertOrUpdateBaseFromIcpiIndex(zv)
+		if err != nil {
+			fmt.Println("InsertOrUpdateBaseFromIcpiIndex error:", err)
 		}
+		fmt.Println("InsertOrUpdateBaseFromIcpiIndex new indexID:", newID)
 	}
 	return err
 }
@@ -78,24 +76,24 @@ func SyncBaseFromIcpiClassify() (err error) {
 		return err
 	}
 	//获取所有分类
-	allClassify, err := data_manage.GetBaseFromIcpiClassifyAll()
-	if err != nil {
-		return
-	}
+	/*	allClassify, err := data_manage.GetBaseFromIcpiClassifyAll()
+		if err != nil {
+			return
+		}
 
-	existIndexMap := make(map[int]*data_manage.BaseFromIcpiClassify)
-	for _, v := range allClassify {
-		existIndexMap[v.BaseFromIcpiClassifyId] = v
-	}
+		existIndexMap := make(map[int]*data_manage.BaseFromIcpiClassify)
+		for _, v := range allClassify {
+			existIndexMap[v.BaseFromIcpiClassifyId] = v
+		}*/
 
 	for _, item := range respObj.Data {
-		if _, ok := existIndexMap[item.BaseFromIcpiClassifyId]; !ok {
-			newID, err := data_manage.AddBaseFromIcpiClassify(item)
-			if err != nil {
-				fmt.Println("AddBaseFromIcpiClassify error:", err)
-			}
-			fmt.Println("AddBaseFromIcpiClassify new indexID:", newID)
+		//if _, ok := existIndexMap[item.BaseFromIcpiClassifyId]; !ok {
+		newID, err := data_manage.InsertOrUpdateBaseFromIcpiClassify(item)
+		if err != nil {
+			fmt.Println("InsertOrUpdateBaseFromIcpiClassify error:", err)
 		}
+		fmt.Println("InsertOrUpdateBaseFromIcpiClassify new indexID:", newID)
+		//}
 	}
 	return err
 }

+ 9 - 0
utils/config.go

@@ -115,6 +115,10 @@ var (
 	AccessKeySecret  string
 )
 
+var (
+	PbRefreshFrequency string // 彭博更新频度
+	PbRefreshOpen      string // 彭博刷新开关 0关闭,默认都是开启的
+)
 var HzDataApi string
 
 func init() {
@@ -249,4 +253,9 @@ func init() {
 	if HzDataApi == "" {
 		HzDataApi = "https://hzdataapi.hzinsights.com/hzdataapi/"
 	}
+
+	// 彭博刷新频度
+	PbRefreshFrequency = config["pb_refresh_frequency"]
+	// 彭博刷新开关
+	PbRefreshOpen = config["pb_refresh_open"]
 }