Browse Source

新增广期所数据同步

tuoling805 1 year ago
parent
commit
8a3a3027a9

+ 53 - 0
models/data_manage/base_from_icpi.go

@@ -2,6 +2,7 @@ package data_manage
 
 import (
 	"github.com/beego/beego/v2/client/orm"
+	"github.com/rdlucklib/rdluck_tools/paging"
 	"time"
 )
 
@@ -74,3 +75,55 @@ func AddBaseFromIcpiClassify(item *BaseFromIcpiClassify) (lastId int64, err erro
 	lastId, err = o.Insert(item)
 	return
 }
+
+// GetBaseFromComTradeMaxDate 获取ICPI消费者指数最大数据
+func GetBaseFromIcpiMaxDate() (max_date time.Time, err error) {
+	o := orm.NewOrm()
+	sql := ` SELECT max(a.create_time)as max_date FROM base_from_icpi_data as a `
+	err = o.Raw(sql).QueryRow(&max_date)
+	return
+}
+
+type BaseFromIcpiData struct {
+	BaseFromIcpiDataId  int       `orm:"column(base_from_icpi_data_id);pk"`
+	BaseFromIcpiIndexId int       `description:"指标id"`
+	IndexCode           string    `description:"指标编码"`
+	DataTime            string    `description:"日期"`
+	Value               string    `description:"值"`
+	CreateTime          time.Time `description:"创建时间"`
+	ModifyTime          time.Time `description:"修改时间"`
+}
+
+// GetAllComTradeDataList 获取ICPI消费者指数数据
+func GetAllBaseFromIcpiDataList(startDate string) (list []*BaseFromIcpiData, err error) {
+	o := orm.NewOrm()
+	sql := `SELECT * FROM base_from_icpi_data WHERE create_time>=?  ORDER BY base_from_icpi_data_id ASC `
+	_, err = o.Raw(sql, startDate).QueryRows(&list)
+	return
+}
+
+type BaseFromIcpiDataResp struct {
+	Ret     int
+	Msg     string
+	ErrMsg  string
+	ErrCode string
+	Data    BaseFromIcpiDataIndexAndDataResp
+}
+
+// ComTradeIndexDataResp 分页列表响应体
+type BaseFromIcpiDataIndexAndDataResp struct {
+	List   []*BaseFromIcpiData
+	Paging *paging.PagingItem `description:"分页数据"`
+}
+
+// MultiAddBaseFromComTradeData 批量添加数据
+func MultiAddBaseFromIcpiDataIndex(items []*BaseFromIcpiData) (lastId int64, err error) {
+	num := len(items)
+	if num <= 0 {
+		return
+	}
+	o := orm.NewOrm()
+	lastId, err = o.InsertMulti(num, items)
+
+	return
+}

+ 84 - 0
models/data_manage/base_from_trade_guangzhou.go

@@ -2,6 +2,7 @@ package data_manage
 
 import (
 	"github.com/beego/beego/v2/client/orm"
+	"github.com/rdlucklib/rdluck_tools/paging"
 	"time"
 )
 
@@ -68,3 +69,86 @@ func AddBaseFromTradeGuangzhouClassify(item *BaseFromTradeGuangzhouClassify) (la
 	lastId, err = o.Insert(item)
 	return
 }
+
+type BaseFromTradeGuangzhouContract struct {
+	BaseFromTradeGuangzhouContractId int    `orm:"column(base_from_trade_guangzhou_contract_id);pk"`
+	BaseFromTradeGuangzhouClassifyId int    `description:"分类id"`
+	ClassifyCode                     string `description:"分类编码"`
+	Contract                         string `description:"合约编码"`
+	TradeDate                        string `description:"合约日期"`
+}
+
+// 新增合约
+func (obj *BaseFromTradeGuangzhouIndex) AddBaseFromTradeGuangzhouContract(item *BaseFromTradeGuangzhouContract) (err error) {
+	o := orm.NewOrmUsingDB("data")
+	_, err = o.Insert(item)
+	return
+}
+
+type GuangzhouContractResp struct {
+	Ret     int
+	Msg     string
+	ErrMsg  string
+	ErrCode string
+	Data    []*BaseFromTradeGuangzhouContract
+}
+
+func GetBaseFromTradeGuangzhouContractAll() (list []*BaseFromTradeGuangzhouContract, err error) {
+	o := orm.NewOrm()
+	sql := `SELECT * FROM base_from_trade_guangzhou_contract `
+	_, err = o.Raw(sql).QueryRows(&list)
+	return
+}
+
+// GetBaseFromComTradeMaxDate 获取广州期货交易所最大数据
+func GetBaseFromTradeGuangzhouMaxDate() (max_date time.Time, err error) {
+	o := orm.NewOrm()
+	sql := ` SELECT max(a.create_time)as max_date FROM base_from_trade_guangzhou_data as a `
+	err = o.Raw(sql).QueryRow(&max_date)
+	return
+}
+
+type BaseFromTradeGuangzhouData struct {
+	BaseFromTradeGuangzhouDataId  int       `orm:"column(base_from_trade_guangzhou_data_id);pk"`
+	BaseFromTradeGuangzhouIndexId int       `description:"指标id"`
+	IndexCode                     string    `description:"指标编码"`
+	DataTime                      string    `description:"数据日期"`
+	Value                         float64   `description:"数据值"`
+	QtySub                        float64   `description:"增减"`
+	CreateTime                    time.Time `description:"创建日期"`
+	ModifyTime                    time.Time `description:"修改日期"`
+}
+
+// GetAllComTradeDataList 获取广州期货交易所数据
+func GetAllBaseFromTradeGuangzhouDataList(startDate string) (list []*BaseFromTradeGuangzhouData, err error) {
+	o := orm.NewOrm()
+	sql := `SELECT * FROM base_from_trade_guangzhou_data WHERE create_time>=?  ORDER BY base_from_trade_guangzhou_data_id ASC `
+	_, err = o.Raw(sql, startDate).QueryRows(&list)
+	return
+}
+
+type BaseFromTradeGuangzhouDataResp struct {
+	Ret     int
+	Msg     string
+	ErrMsg  string
+	ErrCode string
+	Data    BaseFromTradeGuangzhouIndexAndDataResp
+}
+
+// ComTradeIndexDataResp 分页列表响应体
+type BaseFromTradeGuangzhouIndexAndDataResp struct {
+	List   []*BaseFromTradeGuangzhouData
+	Paging *paging.PagingItem `description:"分页数据"`
+}
+
+// MultiAddBaseFromComTradeData 批量添加数据
+func MultiAddBaseFromTradeGuangzhouData(items []*BaseFromTradeGuangzhouData) (lastId int64, err error) {
+	num := len(items)
+	if num <= 0 {
+		return
+	}
+	o := orm.NewOrm()
+	lastId, err = o.InsertMulti(num, items)
+
+	return
+}

+ 4 - 0
models/db.go

@@ -114,9 +114,13 @@ func initEdbDataTable() {
 
 		//广期所
 		new(data_manage.BaseFromTradeGuangzhouClassify),
+		new(data_manage.BaseFromTradeGuangzhouContract),
+		new(data_manage.BaseFromTradeGuangzhouIndex),
+		new(data_manage.BaseFromTradeGuangzhouData),
 		//ICPI
 		new(data_manage.BaseFromIcpiIndex),
 		new(data_manage.BaseFromIcpiClassify),
+		new(data_manage.BaseFromIcpiData),
 	)
 }
 

+ 76 - 0
services/icpi.go

@@ -2,6 +2,7 @@ package services
 
 import (
 	"encoding/json"
+	"errors"
 	"eta/eta_task/models/data_manage"
 	"eta/eta_task/utils"
 	"fmt"
@@ -96,3 +97,78 @@ func SyncBaseFromIcpiClassify() (err error) {
 	}
 	return err
 }
+
+// ICPI消费指数-数据
+func SyncBaseFromIcpiData() (err error) {
+	startDate := time.Now().Format(utils.FormatDate) + " 00:00:00"
+	//var startDate string
+	maxDate, err := data_manage.GetBaseFromIcpiMaxDate()
+	if err != nil || maxDate.IsZero() {
+		startDate = time.Now().AddDate(0, 0, -1).Format(utils.FormatDate) + " 00:00:00"
+	} else {
+		startDate = maxDate.AddDate(0, 0, 1).Format(utils.FormatDate) + " 00:00:00"
+	}
+
+	method := `index/data/list_page`
+
+	existDataMap := make(map[string]*data_manage.BaseFromIcpiData)
+	allData, err1 := data_manage.GetAllBaseFromIcpiDataList(startDate)
+	if err1 != nil {
+		fmt.Println("get GetAllBaseFromIcpiDataList err:" + err1.Error())
+		return
+	}
+	for _, dv := range allData {
+		tmpKey := dv.IndexCode + "_" + dv.DataTime
+		existDataMap[tmpKey] = dv
+	}
+
+	//获取所有指标信息  某一天的
+	maxPage := 1
+
+	for currPage := 0; currPage < maxPage; currPage++ {
+		data := make(map[string]interface{})
+		data["Source"] = utils.DATA_SOURCE_ICPI
+		data["StartDate"] = startDate
+		data["CurrPage"] = currPage
+		data["PageSize"] = 500 //
+
+		var result string
+		result, err = HttpPost("SyncBaseFromIcpiData", method, data)
+		utils.FileLog.Info(result)
+		fmt.Println(result)
+
+		respObj := new(data_manage.BaseFromIcpiDataResp)
+		err = json.Unmarshal([]byte(result), &respObj)
+		if err != nil {
+			fmt.Println("json.Unmarshal err:" + err.Error())
+			return err
+		}
+		if respObj.Ret != 200 {
+			err = errors.New(respObj.ErrMsg)
+			return
+		}
+		// 总页码数
+		maxPage = respObj.Data.Paging.Pages
+
+		addDataList := make([]*data_manage.BaseFromIcpiData, 0)
+		if respObj.Data.List != nil && len(respObj.Data.List) > 0 {
+			for _, dv := range respObj.Data.List {
+				tmpKey := dv.IndexCode + "_" + dv.DataTime
+				if _, ok := existDataMap[tmpKey]; !ok {
+					addDataList = append(addDataList, dv)
+					existDataMap[tmpKey] = dv
+				}
+			}
+		}
+
+		// 最后如果还有数据未插入,那么继续插入吧
+		if len(addDataList) > 0 {
+			_, err = data_manage.MultiAddBaseFromIcpiDataIndex(addDataList)
+			if err != nil {
+				fmt.Println("MultiAddBaseFromIcpiDataIndex error:", err)
+			}
+		}
+	}
+
+	return err
+}

+ 18 - 2
services/sync_hz_data.go

@@ -82,11 +82,21 @@ func SyncHzDataIndex(cont context.Context) (err error) {
 			return
 		}
 		//合约信息
-
+		err = SyncFromGuangzhouContract()
+		if err != nil {
+			fmt.Println("SyncFromGuangzhouContract Err:" + err.Error())
+			return
+		}
 		//指标信息
 		err = SyncFromGuangzhouIndex()
 		if err != nil {
-			fmt.Println("SyncRankingFromCffex Err:" + err.Error())
+			fmt.Println("SyncFromGuangzhouIndex Err:" + err.Error())
+			return
+		}
+		//数据
+		err = SyncFromGuangzhouTradeData()
+		if err != nil {
+			fmt.Println("SyncFromGuangzhouTradeData Err:" + err.Error())
 			return
 		}
 	}
@@ -105,6 +115,11 @@ func SyncHzDataIndex(cont context.Context) (err error) {
 			fmt.Println("SyncBaseFromIcpi Err:" + err.Error())
 			return
 		}
+		err = SyncBaseFromIcpiData()
+		if err != nil {
+			fmt.Println("SyncBaseFromIcpiData Err:" + err.Error())
+			return
+		}
 	}
 
 	// 同步指标数据
@@ -131,5 +146,6 @@ func SyncHzDataIndexData() {
 		return
 	}
 
+	//
 	fmt.Println(err)
 }

+ 119 - 0
services/trade_guangzhou.go

@@ -2,9 +2,11 @@ package services
 
 import (
 	"encoding/json"
+	"errors"
 	"eta/eta_task/models/data_manage"
 	"eta/eta_task/utils"
 	"fmt"
+	"time"
 )
 
 // 广州商品交易所-分类
@@ -86,3 +88,120 @@ func SyncFromGuangzhouIndex() (err error) {
 	}
 	return err
 }
+
+// 广州商品交易所-合约
+func SyncFromGuangzhouContract() (err error) {
+	data := make(map[string]interface{})
+	data["Source"] = utils.DATA_SOURCE_GFEX
+	method := `contract/list`
+	result, err := HttpPost("SyncFromGuangzhouContract", method, data)
+	if err != nil {
+		fmt.Println("SyncFromGuangzhouContract HttpPost Err:", err.Error())
+	}
+	utils.FileLog.Info(result)
+	fmt.Println("SyncFromGuangzhouContract result:", result)
+
+	respObj := new(data_manage.GuangzhouContractResp)
+	err = json.Unmarshal([]byte(result), &respObj)
+	if err != nil {
+		return err
+	}
+	//获取所有指标信息  某一天的
+	allClassify, err := data_manage.GetBaseFromTradeGuangzhouContractAll()
+	if err != nil {
+		return
+	}
+
+	existContractMap := make(map[int]*data_manage.BaseFromTradeGuangzhouContract)
+	for _, v := range allClassify {
+		existContractMap[v.BaseFromTradeGuangzhouContractId] = v
+	}
+
+	obj := new(data_manage.BaseFromTradeGuangzhouIndex)
+
+	for _, item := range respObj.Data {
+		if _, ok := existContractMap[item.BaseFromTradeGuangzhouClassifyId]; !ok {
+			err = obj.AddBaseFromTradeGuangzhouContract(item)
+			if err != nil {
+				fmt.Println("AddBaseFromTradeGuangzhouContract error:", err)
+			}
+			fmt.Println("AddBaseFromTradeGuangzhouContract new indexID:")
+		}
+	}
+	return err
+}
+
+// SyncComTradeData 同步UN指标数据
+func SyncFromGuangzhouTradeData() (err error) {
+	startDate := time.Now().Format(utils.FormatDate) + " 00:00:00"
+	//var startDate string
+	maxDate, err := data_manage.GetBaseFromTradeGuangzhouMaxDate()
+	if err != nil || maxDate.IsZero() {
+		startDate = time.Now().AddDate(0, 0, -1).Format(utils.FormatDate) + " 00:00:00"
+	} else {
+		startDate = maxDate.AddDate(0, 0, 1).Format(utils.FormatDate) + " 00:00:00"
+	}
+
+	method := `index/data/list_page`
+
+	existDataMap := make(map[string]*data_manage.BaseFromTradeGuangzhouData)
+	allData, err1 := data_manage.GetAllBaseFromTradeGuangzhouDataList(startDate)
+	if err1 != nil {
+		fmt.Println("get GetAllBaseFromTradeGuangzhouDataList err:" + err1.Error())
+		return
+	}
+	for _, dv := range allData {
+		tmpKey := dv.IndexCode + "_" + dv.DataTime
+		existDataMap[tmpKey] = dv
+	}
+
+	//获取所有指标信息  某一天的
+	maxPage := 1
+
+	for currPage := 0; currPage < maxPage; currPage++ {
+		data := make(map[string]interface{})
+		data["Source"] = utils.DATA_SOURCE_GFEX
+		data["StartDate"] = startDate
+		data["CurrPage"] = currPage
+		data["PageSize"] = 500 //
+
+		var result string
+		result, err = HttpPost("SyncFromGuangzhouTradeData", method, data)
+		utils.FileLog.Info(result)
+		fmt.Println(result)
+
+		respObj := new(data_manage.BaseFromTradeGuangzhouDataResp)
+		err = json.Unmarshal([]byte(result), &respObj)
+		if err != nil {
+			fmt.Println("json.Unmarshal err:" + err.Error())
+			return err
+		}
+		if respObj.Ret != 200 {
+			err = errors.New(respObj.ErrMsg)
+			return
+		}
+		// 总页码数
+		maxPage = respObj.Data.Paging.Pages
+
+		addDataList := make([]*data_manage.BaseFromTradeGuangzhouData, 0)
+		if respObj.Data.List != nil && len(respObj.Data.List) > 0 {
+			for _, dv := range respObj.Data.List {
+				tmpKey := dv.IndexCode + "_" + dv.DataTime
+				if _, ok := existDataMap[tmpKey]; !ok {
+					addDataList = append(addDataList, dv)
+					existDataMap[tmpKey] = dv
+				}
+			}
+		}
+
+		// 最后如果还有数据未插入,那么继续插入吧
+		if len(addDataList) > 0 {
+			_, err = data_manage.MultiAddBaseFromTradeGuangzhouData(addDataList)
+			if err != nil {
+				fmt.Println("MultiAddBaseFromTradeGuangzhouData error:", err)
+			}
+		}
+	}
+
+	return err
+}