浏览代码

feat:新增un数据同步

Roc 1 年之前
父节点
当前提交
136bd61742
共有 6 个文件被更改,包括 350 次插入11 次删除
  1. 64 0
      models/data_manage/com_trade_data.go
  2. 92 0
      models/data_manage/com_trade_index.go
  3. 3 0
      models/db.go
  4. 164 0
      services/com_trade.go
  5. 2 2
      services/eia_steo.go
  6. 25 9
      services/sync_hz_data.go

+ 64 - 0
models/data_manage/com_trade_data.go

@@ -0,0 +1,64 @@
+package data_manage
+
+import (
+	"github.com/beego/beego/v2/client/orm"
+	"time"
+)
+
+// ComTradeData 联合国商品贸易数据表
+type ComTradeData struct {
+	ComTradeDataId     int       `orm:"column(com_trade_data_id);pk"`
+	ComTradeId         int       `description:"指标id"`
+	IndexCode          string    `description:"联合国商品贸易编码"`
+	Flow               string    `description:"贸易流向:X(Export:出口);M(Import:进口)"`
+	ReporterCode       int       `description:"出口国id"`
+	ReporterName       string    `description:"出口国名称"`
+	PartnerCode        int       `description:"进口国id"`
+	PartnerName        string    `description:"进口国名称"`
+	Partner2Code       int       `description:"第二进口国id"`
+	Partner2Name       string    `description:"第二进口国名称"`
+	DateType           int       `description:"日期类型,1:年度,2:月度"`
+	DataTime           string    `description:"数据日期"`
+	IndexTradeCode     string    `description:"离岸价编码"`
+	TradeValue         float64   `description:"离岸价(美元)"`
+	IndexNetWeightCode string    `description:"净重编码"`
+	NetWeightValue     float64   `description:"净重/公斤"`
+	ModifyTime         time.Time `description:"最新更新时间"`
+	CreateTime         time.Time `description:"创建时间"`
+}
+
+// GetAllComTradeDataList 获取联合国商品贸易数据列表
+func GetAllComTradeDataList(startDate string) (list []*ComTradeData, err error) {
+	o := orm.NewOrm()
+	sql := `SELECT * FROM com_trade_data WHERE create_time>=?  ORDER BY com_trade_data_id ASC `
+	_, err = o.Raw(sql, startDate).QueryRows(&list)
+
+	return
+}
+
+// AddBaseFromComTradeData 添加数据
+func AddBaseFromComTradeData(item *ComTradeData) (lastId int64, err error) {
+	o := orm.NewOrm()
+	lastId, err = o.Insert(item)
+	return
+}
+
+// MultiAddBaseFromComTradeData 批量添加数据
+func MultiAddBaseFromComTradeData(items []*ComTradeData) (lastId int64, err error) {
+	num := len(items)
+	if num <= 0 {
+		return
+	}
+	o := orm.NewOrm()
+	lastId, err = o.InsertMulti(num, items)
+
+	return
+}
+
+// GetBaseFromComTradeMaxDate 获取un数据中的最大日期数据
+func GetBaseFromComTradeMaxDate() (max_date time.Time, err error) {
+	o := orm.NewOrm()
+	sql := ` SELECT max(a.create_time)as max_date FROM com_trade_data as a `
+	err = o.Raw(sql).QueryRow(&max_date)
+	return
+}

+ 92 - 0
models/data_manage/com_trade_index.go

@@ -0,0 +1,92 @@
+package data_manage
+
+import (
+	"github.com/beego/beego/v2/client/orm"
+	"github.com/rdlucklib/rdluck_tools/paging"
+	"time"
+)
+
+// ComTradeIndex 联合国商品贸易数据指标表
+type ComTradeIndex struct {
+	ComTradeId  int       `orm:"column(com_trade_id);pk"`
+	IndexCode   string    `description:"联合国商品贸易编码"`
+	IndexName   string    `description:"联合国商品名称"`
+	IndexNameCn string    `description:"联合国商品名称(中文)"`
+	UpdateTime  time.Time `description:"数据最近更新时间"`
+	CreateTime  time.Time `description:"创建时间"`
+}
+
+// GetAllComTradeIndexList 获取指标数据列表
+func GetAllComTradeIndexList(startDate string) (list []*ComTradeIndex, err error) {
+	o := orm.NewOrm()
+	sql := `SELECT * FROM com_trade_index WHERE create_time>=?  ORDER BY com_trade_id DESC `
+	_, err = o.Raw(sql, startDate).QueryRows(&list)
+
+	return
+}
+
+func GetBaseFromComTradeCodeMappingAll(dateStr string) (list []*BaseFromEiaSteoIndex, err error) {
+	o := orm.NewOrm()
+	sql := `SELECT * FROM com_trade_index WHERE create_time>=?`
+	_, err = o.Raw(sql, dateStr).QueryRows(&list)
+	return
+}
+
+// AddBaseFromComTradeIndex 添加新的指标
+func AddBaseFromComTradeIndex(item *ComTradeIndex) (lastId int64, err error) {
+	o := orm.NewOrm()
+	lastId, err = o.Insert(item)
+	return
+}
+
+// ComTradeCodeMapping 联合国商品贸易数据指标编码与分类编码的关系表
+type ComTradeCodeMapping struct {
+	Code       string    `orm:"column(code);pk"`
+	IndexCode  string    `description:"联合国商品贸易编码"`
+	Type       int       `description:"1:价格;2:重量"`
+	Name       string    `description:"指标名称"`
+	CreateTime time.Time `description:"创建时间"`
+}
+
+// GetAllComTradeCodeMappingList 获取联合国商品贸易数据指标编码与分类编码的关系列表
+func GetAllComTradeCodeMappingList(startDate string) (list []*ComTradeCodeMapping, err error) {
+	o := orm.NewOrm()
+	sql := `SELECT * FROM com_trade_code_mapping WHERE create_time>=?  ORDER BY create_time ASC `
+	_, err = o.Raw(sql, startDate).QueryRows(&list)
+
+	return
+}
+
+// AddBaseFromComTradeCodeMapping 添加联合国商品贸易数据指标编码与分类编码的关系
+func AddBaseFromComTradeCodeMapping(item *ComTradeCodeMapping) (lastId int64, err error) {
+	o := orm.NewOrm()
+	lastId, err = o.Insert(item)
+	return
+}
+
+type ComTradeIndexMappingResp struct {
+	Ret     int
+	Msg     string
+	ErrMsg  string
+	ErrCode string
+	Data    ComTradeIndexAndMappingResp
+}
+
+type ComTradeIndexAndMappingResp struct {
+	IndexList   []ComTradeIndex
+	MappingList []ComTradeCodeMapping
+}
+
+type ComTradeIndexDataResp struct {
+	Ret     int
+	Msg     string
+	ErrMsg  string
+	ErrCode string
+	Data    ComTradeIndexAndDataResp
+}
+
+// ComTradeIndexAndDataResp 指标数据结构体
+type ComTradeIndexAndDataResp struct {
+	List   []*ComTradeData
+	Paging *paging.PagingItem `description:"分页数据"`
+}

+ 3 - 0
models/db.go

@@ -52,6 +52,9 @@ func initEdbDataTable() {
 		new(data_manage.BaseFromChangesVisitorsCovid),
 		new(data_manage.BaseFromEiaSteoIndex),
 		new(data_manage.BaseFromEiaSteoData),
+		new(data_manage.ComTradeIndex),       // 联合国指标表
+		new(data_manage.ComTradeCodeMapping), // 联合国指标关系表
+		new(data_manage.ComTradeData),        // 联合国数据表
 	)
 }
 

+ 164 - 0
services/com_trade.go

@@ -0,0 +1,164 @@
+package services
+
+import (
+	"encoding/json"
+	"errors"
+	"fmt"
+	"hongze/hongze_task_trial/models/data_manage"
+	"hongze/hongze_task_trial/utils"
+	"time"
+)
+
+// SyncComTradeIndex 同步UN指标
+func SyncComTradeIndex() (err error) {
+	var startDate string
+	maxDate, err := data_manage.GetBaseFromComTradeMaxDate()
+	if err != nil || maxDate.IsZero() {
+		startDate = time.Now().AddDate(0, 0, -1).Format(utils.FormatDate) + " 00:00:00"
+	} else {
+		startDate = maxDate.AddDate(0, 0, 1).Format(utils.FormatDate) + " 00:00:00"
+	}
+
+	method := `index/list`
+	data := make(map[string]interface{})
+	data["Source"] = utils.DATA_SOURCE_COM_TRADE
+	data["StartDate"] = startDate
+	//data["EndDate"] = endDate
+	result, err := HttpPost(method, data)
+	utils.FileLog.Info(result)
+	fmt.Println(result)
+
+	respObj := new(data_manage.ComTradeIndexMappingResp)
+	err = json.Unmarshal([]byte(result), &respObj)
+	if err != nil {
+		fmt.Println("json.Unmarshal err:" + err.Error())
+		return err
+	}
+
+	// 指标处理
+	{
+		//获取所有指标信息  某一天的
+		allComTradeIndex, tmpErr := data_manage.GetAllComTradeIndexList(startDate)
+		if tmpErr != nil {
+			fmt.Println("get GetAllComTradeCodeMappingList err:" + tmpErr.Error())
+			return
+		}
+
+		existCodeMappingMap := make(map[string]*data_manage.ComTradeIndex)
+		for _, v := range allComTradeIndex {
+			existCodeMappingMap[v.IndexCode] = v
+		}
+
+		for _, zv := range respObj.Data.IndexList {
+			if _, ok := existCodeMappingMap[zv.IndexCode]; !ok {
+				newID, tmpErr := data_manage.AddBaseFromComTradeIndex(&zv)
+				if tmpErr != nil {
+					fmt.Println("insert error:", tmpErr)
+				}
+				fmt.Println("insert new indexID:", newID)
+			}
+		}
+	}
+
+	// mapping处理
+	{
+		//获取所有指标信息  某一天的
+		allCodeMappingIndex, tmpErr := data_manage.GetAllComTradeCodeMappingList(startDate)
+		if tmpErr != nil {
+			fmt.Println("get GetAllComTradeCodeMappingList err:" + tmpErr.Error())
+			return
+		}
+
+		existCodeMappingMap := make(map[string]*data_manage.ComTradeCodeMapping)
+		for _, v := range allCodeMappingIndex {
+			existCodeMappingMap[v.Code] = v
+		}
+
+		for _, zv := range respObj.Data.MappingList {
+			if _, ok := existCodeMappingMap[zv.Code]; !ok {
+				_, err = data_manage.AddBaseFromComTradeCodeMapping(&zv)
+				if err != nil {
+					fmt.Println("AddBaseFromComTradeCodeMapping error:", err)
+				}
+				fmt.Println("AddBaseFromComTradeCodeMapping new Code:", zv.Code)
+			}
+		}
+	}
+
+	return err
+}
+
+// SyncComTradeData 同步UN指标数据
+func SyncComTradeData() (err error) {
+	startDate := time.Now().Format(utils.FormatDate) + " 00:00:00"
+	//var startDate string
+	maxDate, err := data_manage.GetBaseFromComTradeMaxDate()
+	if err != nil || maxDate.IsZero() {
+		startDate = time.Now().AddDate(0, 0, -1).Format(utils.FormatDate) + " 00:00:00"
+	} else {
+		startDate = maxDate.AddDate(0, 0, 1).Format(utils.FormatDate) + " 00:00:00"
+	}
+
+	method := `index/data/list_page`
+
+	existDataMap := make(map[string]*data_manage.ComTradeData)
+	allData, err1 := data_manage.GetAllComTradeDataList(startDate)
+	if err1 != nil {
+		fmt.Println("get GetBaseFromEiaSteoIndexDataAll err:" + err1.Error())
+		return
+	}
+	for _, dv := range allData {
+		tmpKey := dv.IndexTradeCode + "_" + dv.DataTime
+		existDataMap[tmpKey] = dv
+	}
+
+	//获取所有指标信息  某一天的
+	maxPage := 1
+
+	for currPage := 0; currPage < maxPage; currPage++ {
+		data := make(map[string]interface{})
+		data["Source"] = utils.DATA_SOURCE_COM_TRADE
+		data["StartDate"] = startDate
+		data["CurrPage"] = currPage
+		data["PageSize"] = 500 //
+
+		var result string
+		result, err = HttpPost(method, data)
+		utils.FileLog.Info(result)
+		fmt.Println(result)
+
+		respObj := new(data_manage.ComTradeIndexDataResp)
+		err = json.Unmarshal([]byte(result), &respObj)
+		if err != nil {
+			fmt.Println("json.Unmarshal err:" + err.Error())
+			return err
+		}
+		if respObj.Ret != 200 {
+			err = errors.New(respObj.ErrMsg)
+			return
+		}
+		// 总页码数
+		maxPage = respObj.Data.Paging.Pages
+
+		addDataList := make([]*data_manage.ComTradeData, 0)
+		if respObj.Data.List != nil && len(respObj.Data.List) > 0 {
+			for _, dv := range respObj.Data.List {
+				tmpKey := dv.IndexTradeCode + "_" + dv.DataTime
+				if _, ok := existDataMap[tmpKey]; !ok {
+					addDataList = append(addDataList, dv)
+					existDataMap[tmpKey] = dv
+				}
+			}
+		}
+
+		// 最后如果还有数据未插入,那么继续插入吧
+		if len(addDataList) > 0 {
+			_, err = data_manage.MultiAddBaseFromComTradeData(addDataList)
+			if err != nil {
+				fmt.Println("AddBaseFromComTradeData error:", err)
+			}
+		}
+	}
+
+	return err
+}

+ 2 - 2
services/eia_steo.go

@@ -8,7 +8,7 @@ import (
 	"time"
 )
 
-// EIA STEO报告 指标
+// SyncEiaSteoIndex EIA STEO报告 指标
 func SyncEiaSteoIndex() (err error) {
 	var startDate string
 	maxDate, err := data_manage.GetBaseFromEiaSteoIndexMaxDate()
@@ -57,7 +57,7 @@ func SyncEiaSteoIndex() (err error) {
 	return err
 }
 
-// EIA STEO报告 指标
+// SyncEiaSteoIndexData EIA STEO报告 指标
 func SyncEiaSteoIndexData() (err error) {
 	startDate := time.Now().Format(utils.FormatDate) + " 00:00:00"
 

+ 25 - 9
services/sync_hz_data.go

@@ -52,26 +52,42 @@ func SyncHzDataIndex(cont context.Context) (err error) {
 		return
 	}
 
-	//////EiaSteo
-	//err = SyncEiaSteoIndex()
-	//if err != nil {
-	//	fmt.Println("SyncEiaSteoIndex Err:" + err.Error())
-	//	return
-	//}
-	//SyncHzDataIndexData()
+	// EiaSteo
+	err = SyncEiaSteoIndex()
+	if err != nil {
+		fmt.Println("SyncEiaSteoIndex Err:" + err.Error())
+		return
+	}
+
+	// UN联合国数据
+	err = SyncComTradeIndex()
+	if err != nil {
+		fmt.Println("SyncComTradeIndexAndData Err:" + err.Error())
+		return
+	}
+
+	// 同步指标数据
+	SyncHzDataIndexData()
 	fmt.Println("SyncHzDataIndex end:", time.Now().Format(utils.FormatDateTime))
 	return err
 }
 
-// 同步指标数据
+// SyncHzDataIndexData 同步指标数据
 func SyncHzDataIndexData() {
 	var err error
-	////EiaSteo
+	//EiaSteo
 	err = SyncEiaSteoIndexData()
 	if err != nil {
 		fmt.Println("SyncEiaSteoIndexData Err:" + err.Error())
 		return
 	}
 
+	// 同步un数据
+	err = SyncComTradeData()
+	if err != nil {
+		fmt.Println("SyncComTradeData Err:" + err.Error())
+		return
+	}
+
 	fmt.Println(err)
 }