瀏覽代碼

Merge branch 'hotfix/bug8100_eia_steo' of eta_server/eta_task into master

xyxie 1 周之前
父節點
當前提交
797f5aad84
共有 3 個文件被更改,包括 125 次插入57 次删除
  1. 58 1
      models/data_manage/base_from_eia_steo.go
  2. 65 55
      services/eia_steo.go
  3. 2 1
      services/sync_hz_data.go

+ 58 - 1
models/data_manage/base_from_eia_steo.go

@@ -6,6 +6,8 @@ import (
 	"eta/eta_task/utils"
 
 	"time"
+
+	"github.com/rdlucklib/rdluck_tools/paging"
 )
 
 // BaseFromEiaSteoIndex EiaSteo指标
@@ -218,7 +220,22 @@ func GetBaseFromEiaSteoIndexMaxDate() (maxDate time.Time, err error) {
 
 func GetBaseFromEiaSteoIndexMaxCreateDate() (maxDate time.Time, err error) {
 	o := global.DbMap[utils.DbNameIndex]
-	sql := ` SELECT MAX(a.create_time) AS max_date FROM base_from_eia_steo_index AS a `
+	sql := ` SELECT MAX(a.modify_time) AS max_date FROM base_from_eia_steo_index AS a `
+	var timeNull sql2.NullTime
+	err = o.Raw(sql).Scan(&timeNull).Error
+	if err != nil {
+		return
+	}
+	if timeNull.Valid {
+		maxDate = timeNull.Time
+	}
+	return
+}
+
+
+func GetBaseFromEiaSteoDataMaxModifyDate() (maxDate time.Time, err error) {
+	o := global.DbMap[utils.DbNameIndex]
+	sql := ` SELECT MAX(a.modify_time) AS max_date FROM base_from_eia_steo_data AS a `
 	var timeNull sql2.NullTime
 	err = o.Raw(sql).Scan(&timeNull).Error
 	if err != nil {
@@ -229,3 +246,43 @@ func GetBaseFromEiaSteoIndexMaxCreateDate() (maxDate time.Time, err error) {
 	}
 	return
 }
+
+// GetAllBaseFromEiaSteoDataList 获取EiaSteo数据
+func GetAllBaseFromEiaSteoDataList(startDate string) (list []*BaseFromEiaSteoData, err error) {
+	o := global.DbMap[utils.DbNameIndex]
+	sql := `SELECT * FROM base_from_eia_steo_data WHERE modify_time>=?  ORDER BY base_from_eia_steo_data_id ASC `
+	err = o.Raw(sql, startDate).Find(&list).Error
+	return
+}	
+
+
+type BaseFromEiaSteoDataResp struct {
+	Ret     int
+	Msg     string
+	ErrMsg  string
+	ErrCode string
+	Data    BaseFromEiaSteoDataIndexAndDataResp
+}
+
+// BaseFromEiaSteoDataIndexAndDataResp 分页列表响应体
+type BaseFromEiaSteoDataIndexAndDataResp struct {
+	List   []*BaseFromEiaSteoData
+	Paging *paging.PagingItem `description:"分页数据"`
+}
+
+// MultiAddBaseFromEiaSteoDataIndex 批量添加数据
+func MultiAddBaseFromEiaSteoDataIndex(items []*BaseFromEiaSteoData) (lastId int64, err error) {
+	num := len(items)
+	if num <= 0 {
+		return
+	}
+	o := global.DbMap[utils.DbNameIndex]
+	err = o.CreateInBatches(items, utils.MultiAddNum).Error
+	return
+}
+
+func UpdateBaseFromEiaSteoData(item *BaseFromEiaSteoData) (err error) {
+	o := global.DbMap[utils.DbNameIndex]
+	err = o.Model(item).Where("base_from_eia_steo_data_id = ?", item.BaseFromEiaSteoDataId).Updates(item).Error
+	return
+}

+ 65 - 55
services/eia_steo.go

@@ -4,6 +4,7 @@ import (
 	"encoding/json"
 	"eta/eta_task/models/data_manage"
 	"eta/eta_task/utils"
+	"errors"
 	"fmt"
 	"time"
 )
@@ -15,7 +16,7 @@ func SyncEiaSteoIndex() (err error) {
 	if err != nil || maxDate.IsZero() {
 		startDate = time.Now().AddDate(0, 0, -1).Format(utils.FormatDate) + " 00:00:00"
 	} else {
-		startDate = maxDate.AddDate(0, 0, 1).Format(utils.FormatDate) + " 00:00:00"
+		startDate = maxDate.Format(utils.FormatDateTime)
 	}
 
 	method := `index/list`
@@ -29,8 +30,8 @@ func SyncEiaSteoIndex() (err error) {
 		utils.FileLog.Info("HttpPost err:", err)
 		return
 	}
-	utils.FileLog.Info(result)
-	fmt.Println(result)
+	//utils.FileLog.Info(result)
+	//fmt.Println(result)
 
 	respObj := new(data_manage.EiaSteoIndexResp)
 	err = json.Unmarshal([]byte(result), &respObj)
@@ -52,14 +53,14 @@ func SyncEiaSteoIndex() (err error) {
 
 	for _, zv := range respObj.Data {
 		//if _, ok := existIndexMap[zv.IndexCode]; !ok {
-		if zv.BaseFromEiaSteoIndexId <= 0 {
-			continue
-		}
-		newID, err := data_manage.InsertOrUpdateBaseFromEiaSteoIndex(zv)
+		// if zv.BaseFromEiaSteoIndexId <= 0 {
+		// 	continue
+		// }
+		_, err := data_manage.InsertOrUpdateBaseFromEiaSteoIndex(zv)
 		if err != nil {
-			fmt.Println("InsertOrUpdateBaseFromEiaSteoIndex error:", err)
+			utils.FileLog.Info("InsertOrUpdateBaseFromEiaSteoIndex error:", err)
 		}
-		fmt.Println("InsertOrUpdateBaseFromEiaSteoIndex new indexID:", newID)
+		//fmt.Println("InsertOrUpdateBaseFromEiaSteoIndex new indexID:", newID)
 		//}
 	}
 	return err
@@ -202,76 +203,85 @@ func SyncEiaSteoClassify() (err error) {
 
 func SyncEiaSteoIndexDataV2() (err error) {
 	startDate := time.Now().AddDate(0, 0, -1).Format(utils.FormatDate) + " 00:00:00"
+	//var startDate string
+	maxDate, err := data_manage.GetBaseFromEiaSteoDataMaxModifyDate()
+	if err != nil || maxDate.IsZero() {
+		startDate = time.Now().AddDate(0, 0, -1).Format(utils.FormatDate) + " 00:00:00"
+	} else {
+		startDate = maxDate.Format(utils.FormatDateTime)
+	}
 
-	method := `index/data/list`
+	existDataMap := make(map[string]*data_manage.BaseFromEiaSteoData)
+    allData, err1 := data_manage.GetAllBaseFromEiaSteoDataList(startDate)
+    if err1 != nil {
+        fmt.Println("get GetAllBaseFromEiaSteoDataList err:" + err1.Error())
+        return
+    }
+    for _, dv := range allData {
+        tmpKey := dv.IndexCode + "_" + dv.DataTime.Format(utils.FormatDate)
+        existDataMap[tmpKey] = dv
+    }
+
+	method := `index/data/list_page`
 
 	//获取所有指标信息  某一天的
-	allIndexCode, err := data_manage.GetBaseFromEiaSteoIndexCodeListByDate(startDate)
-	if err != nil {
-		fmt.Println("get GetBaseFromEiaSteoIndexAll err:" + err.Error())
-		utils.FileLog.Info("get GetBaseFromEiaSteoIndexCodeList err:", err)
-		return
-	}
+	maxPage := 1
 
-	for _, indexCode := range allIndexCode {
+	for currPage := 0; currPage < maxPage; currPage++ {
 		data := make(map[string]interface{})
 		data["Source"] = utils.DATA_SOURCE_EIA_STEO
 		data["StartDate"] = startDate
-		data["IndexCode"] = indexCode
+		data["CurrPage"] = currPage
+		data["PageSize"] = 500 //
 
 		var result string
-		result, err = HttpPost("SyncEiaSteoIndexData", method, data)
+		result, err = HttpPost("SyncBaseFromEiaSteoData", method, data)
 		if err != nil {
-			fmt.Println("HttpPost err:", err)
-			utils.FileLog.Info("HttpPost err:", err)
+			err = fmt.Errorf("获取分页指标信息失败")
 			return
 		}
-		utils.FileLog.Info(result)
-		fmt.Println(result)
+		//utils.FileLog.Info(result)
+		//fmt.Println(result)
 
-		respObj := new(data_manage.EiaSteoIndexDataResp)
+		respObj := new(data_manage.BaseFromEiaSteoDataResp)
 		err = json.Unmarshal([]byte(result), &respObj)
 		if err != nil {
 			fmt.Println("json.Unmarshal err:" + err.Error())
 			return err
 		}
-
-		existDataMap := make(map[int]*data_manage.BaseFromEiaSteoData)
-		allData, err1 := data_manage.GetBaseFromEiaSteoIndexDataByIndexCode(indexCode, startDate)
-		if err1 != nil {
-			utils.FileLog.Error("get GetBaseFromEiaSteoIndexDataByDate err:", err1.Error())
+		if respObj.Ret != 200 {
+			err = errors.New(respObj.ErrMsg)
 			return
 		}
-		for _, dv := range allData {
-			existDataMap[dv.BaseFromEiaSteoDataId] = dv
-		}
+		// 总页码数
+		maxPage = respObj.Data.Paging.Pages
 
-		for _, dv := range respObj.Data {
-			if v, ok := existDataMap[dv.BaseFromEiaSteoDataId]; !ok {
-				newID, err := data_manage.AddBaseFromEiaSteoData(dv)
-				if err != nil {
-					utils.FileLog.Error("add error:", err)
-				}
-				fmt.Println("insert new indexID:", newID)
-			} else {
-				upDateCols := make([]string, 0)
-				if !v.DataTime.Equal(dv.DataTime) {
-					v.DataTime = dv.DataTime
-					upDateCols = append(upDateCols, "data_time")
-				}
-				if v.Value != dv.Value {
-					v.Value = dv.Value
-					v.ModifyTime = dv.ModifyTime
-					upDateCols = append(upDateCols, []string{"value", "modify_time"}...)
-				}
-				if len(upDateCols) > 0 {
-					err = v.Update(upDateCols)
+	
+		addDataList := make([]*data_manage.BaseFromEiaSteoData, 0)
+        if respObj.Data.List != nil && len(respObj.Data.List) > 0 {
+            for _, dv := range respObj.Data.List {
+                tmpKey := dv.IndexCode + "_" + dv.DataTime.Format(utils.FormatDate)
+                if oldVal, ok := existDataMap[tmpKey]; !ok {
+                    addDataList = append(addDataList, dv)
+                    existDataMap[tmpKey] = dv
+                }else if dv.Value != oldVal.Value {
+					// 更新指标数据
+					err = data_manage.UpdateBaseFromEiaSteoData(dv)
 					if err != nil {
-						utils.FileLog.Error("update error:", err)
+						utils.FileLog.Info("UpdateBaseFromEiaSteoData error:"+err.Error())
 					}
 				}
-			}
-		}
+            }
+        }
+
+        // 最后如果还有数据未插入,那么继续插入吧
+        if len(addDataList) > 0 {
+            _, err = data_manage.MultiAddBaseFromEiaSteoDataIndex(addDataList)
+            if err != nil {
+                utils.FileLog.Info("MultiAddBaseFromEiaSteoDataIndex error:"+err.Error())
+            }
+        }
 	}
+
 	return err
 }

+ 2 - 1
services/sync_hz_data.go

@@ -189,7 +189,8 @@ func SyncHzDataIndexData() {
 	//EiaSteo
 	err = SyncEiaSteoIndexDataV2()
 	if err != nil {
-		fmt.Println("SyncEiaSteoIndexData Err:" + err.Error())
+		fmt.Println("SyncEiaSteoIndexDataV2 Err:" + err.Error())
+		utils.FileLog.Info("SyncEiaSteoIndexDataV2 Err:" + err.Error())
 		return
 	}