Browse Source

增加广期所持仓分析

xyxie 11 months ago
parent
commit
32b46ce397

+ 2 - 6
models/data_manage/base_from_trade_index.go

@@ -1,12 +1,8 @@
 package data_manage
 
-import (
-	"github.com/beego/beego/v2/client/orm"
-)
-
-func GetFirstBaseFromTradeIndexByDate(exchange string) (item *BaseFromTradeShanghaiIndex, err error) {
+/*func GetFirstBaseFromTradeIndexByDate(exchange string) (item *BaseFromTradeShanghaiIndex, err error) {
 	o := orm.NewOrm()
 	sql := "SELECT * FROM base_from_trade_" + exchange + "_index where rank < 50 order by data_time asc"
 	err = o.Raw(sql).QueryRow(&item)
 	return
-}
+}*/

+ 23 - 0
models/data_manage/trade_position_analysis.go

@@ -44,6 +44,10 @@ type TradePositionIneTop struct {
 	TradePositionTop
 }
 
+type TradePositionGuangzhouTop struct {
+	TradePositionTop
+}
+
 func InsertMultiTradePositionTop(exchange string, items []*TradePositionTop) (err error) {
 	o := orm.NewOrm()
 	if exchange == "dalian" {
@@ -86,6 +90,14 @@ func InsertMultiTradePositionTop(exchange string, items []*TradePositionTop) (er
 		}
 		_, err = o.InsertMulti(len(list), list)
 		return
+	} else if exchange == "guangzhou" {
+		list := make([]*TradePositionGuangzhouTop, 0)
+		for _, v := range items {
+			tmp := (*TradePositionGuangzhouTop)(unsafe.Pointer(v))
+			list = append(list, tmp)
+		}
+		_, err = o.InsertMulti(len(list), list)
+		return
 	}
 
 	return
@@ -389,3 +401,14 @@ func DeleteTradePositionTopAllByExchangeDataTime(exchange string, startDate, end
 	_, err = o.Raw(sql, startDate, endDate).Exec()
 	return
 }
+
+type GetFirstBaseFromTradeIndeDate struct {
+	DataTime string
+}
+
+func GetFirstBaseFromTradeIndexByDate(exchange string) (item *GetFirstBaseFromTradeIndeDate, err error) {
+	o := orm.NewOrm()
+	sql := "SELECT * FROM base_from_trade_" + exchange + "_index where rank < 50 order by data_time asc"
+	err = o.Raw(sql).QueryRow(&item)
+	return
+}

+ 202 - 0
models/data_manage/trade_position_analysis_guangzhou.go

@@ -0,0 +1,202 @@
+package data_manage
+
+import (
+	"eta/eta_task/utils"
+	"github.com/beego/beego/v2/client/orm"
+	"time"
+)
+
+func MultiInsertTradeGuangzhouDataToTop(exchange string, startDate, endDate string) (err error) {
+	o := orm.NewOrm()
+	now := time.Now().Format(utils.FormatDateTime)
+
+	//新增买单榜单
+	sql1 := `INSERT INTO trade_position_guangzhou_top(classify_name,classify_type,deal_short_name,deal_value,deal_change,data_time,deal_type,source_type,rank,create_time,modify_time)
+SELECT
+	n.classify_code,n.contract,SUBSTRING_INDEX(c.index_name, '_', 1) AS index_name,a.value,a.qty_sub,a.data_time,1,0,0,?,?
+FROM
+	base_from_trade_guangzhou_data a
+	LEFT JOIN base_from_trade_guangzhou_index c ON a.base_from_trade_guangzhou_index_id = c.base_from_trade_guangzhou_index_id 
+	LEFT JOIN base_from_trade_guangzhou_contract n on c.base_from_trade_guangzhou_contract_id = n.base_from_trade_guangzhou_contract_id
+WHERE
+	a.data_time between ? and ?
+	and c.base_from_trade_guangzhou_classify_id in (7,8)
+	and c.index_name like "%持买单量%"
+	and c.index_name not like "%日成交持仓排名%"`
+	_, err = o.Raw(sql1, now, now, startDate, endDate).Exec()
+	if err != nil {
+		return
+	}
+	//新增卖单榜单
+	sql2 := `INSERT INTO trade_position_guangzhou_top(classify_name,classify_type,deal_short_name,deal_value,deal_change,data_time,deal_type,source_type,rank,create_time,modify_time)
+SELECT
+	n.classify_code,n.contract,SUBSTRING_INDEX(c.index_name, '_', 1) AS index_name,a.value,a.qty_sub,a.data_time,2,0,0,?,?
+FROM
+	base_from_trade_guangzhou_data a
+	LEFT JOIN base_from_trade_guangzhou_index c ON a.base_from_trade_guangzhou_index_id = c.base_from_trade_guangzhou_index_id 
+	LEFT JOIN base_from_trade_guangzhou_contract n on c.base_from_trade_guangzhou_contract_id = n.base_from_trade_guangzhou_contract_id
+WHERE
+	a.data_time between ? and ?
+	and c.base_from_trade_guangzhou_classify_id in (7,8)
+	and c.index_name like "%持卖单量%"
+	and c.index_name not like "%日成交持仓排名%"`
+	_, err = o.Raw(sql2, now, now, startDate, endDate).Exec()
+	if err != nil {
+		return
+	}
+	//更新买单、卖单榜单排名字段
+	sql3 := `update trade_position_guangzhou_top s
+JOIN (
+SELECT
+  	classify_type,
+	deal_short_name,
+	data_time,
+	deal_type,
+  (@row_number := IF(@prev_year = CONCAT_WS( "_", data_time, deal_type, classify_type), @row_number + 1, 1) ) AS row_number,
+  @prev_year := CONCAT_WS( "_", data_time, deal_type, classify_type)
+FROM
+  trade_position_guangzhou_top,(SELECT @row_number := 0, @prev_year := NULL) r
+	where data_time between ? and ?
+ORDER BY
+  data_time asc,
+  deal_type asc,
+  classify_type asc,
+  deal_value DESC
+	) t
+	ON s.classify_type = t.classify_type AND s.deal_short_name = t.deal_short_name 	
+	AND s.data_time = t.data_time AND s.deal_type = t.deal_type
+SET s.rank = t.row_number where s.data_time between ? and ?;`
+
+	_, err = o.Raw(sql3, startDate, endDate, startDate, endDate).Exec()
+
+	return
+}
+
+// GetTradePositionTopOriginGuangzhouDataTimes 获取榜单原始数据日期-正序
+func GetTradePositionTopOriginGuangzhouDataTimes(exchange string) (dates []string, err error) {
+	o := orm.NewOrm()
+	sql := `SELECT
+	DISTINCT a.data_time
+FROM
+	base_from_trade_guangzhou_data a
+	LEFT JOIN base_from_trade_guangzhou_index c ON a.base_from_trade_guangzhou_index_id = c.base_from_trade_guangzhou_index_id
+WHERE
+    c.base_from_trade_guangzhou_classify_id IN ( 7, 8 ) 
+	AND ( c.index_name LIKE "%持买单量%" OR c.index_name LIKE "%持卖单量%" ) 
+	AND c.index_name NOT LIKE "%日成交持仓排名%" 
+ORDER BY
+	a.data_time asc`
+	_, err = o.Raw(sql).QueryRows(&dates)
+	return
+}
+
+func GetTradePositionOriginGuangzhouClassifyCountByExchangeDataTime(exchange string, startDate, endDate string) (count int64, err error) {
+	o := orm.NewOrm()
+	sql := `SELECT
+	count(DISTINCT n.classify_code, n.contract ) 
+FROM
+	base_from_trade_guangzhou_data a
+	LEFT JOIN base_from_trade_guangzhou_index c ON a.base_from_trade_guangzhou_index_id = c.base_from_trade_guangzhou_index_id
+	LEFT JOIN base_from_trade_guangzhou_contract n ON c.base_from_trade_guangzhou_contract_id = n.base_from_trade_guangzhou_contract_id 
+WHERE
+	a.data_time between ? and ?
+	AND c.base_from_trade_guangzhou_classify_id IN ( 7, 8 ) 
+	AND ( c.index_name LIKE "%持买单量%" OR c.index_name LIKE "%持卖单量%" ) 
+	AND c.index_name NOT LIKE "%日成交持仓排名%" 
+ORDER BY
+	a.value DESC`
+	err = o.Raw(sql, startDate, endDate).QueryRow(&count)
+	return
+}
+
+func GetFirstBaseFromTradeGuangzhouIndexByDate(exchange string) (item *GetFirstBaseFromTradeIndeDate, err error) {
+	o := orm.NewOrm()
+	sql := `SELECT
+	a.data_time
+FROM
+	base_from_trade_guangzhou_data a
+	LEFT JOIN base_from_trade_guangzhou_index c ON a.base_from_trade_guangzhou_index_id = c.base_from_trade_guangzhou_index_id
+WHERE
+	c.base_from_trade_guangzhou_classify_id IN ( 7, 8 ) 
+	AND ( c.index_name LIKE "%持买单量%" OR c.index_name LIKE "%持卖单量%" ) 
+	AND c.index_name NOT LIKE "%日成交持仓排名%" 
+ORDER BY
+	a.data_time asc`
+	err = o.Raw(sql).QueryRow(&item)
+	return
+}
+
+func GetTradePositionOriginClassifyGuangzhouByExchangeDataTime(exchange, startDate, endDate string) (list []TradePositionClassifyInfo, err error) {
+	o := orm.NewOrm()
+	sql := `SELECT DISTINCT classify_code as classify_name, contract as classify_type FROM base_from_trade_guangzhou_contract where  trade_date >= ? and trade_date <= ?`
+	_, err = o.Raw(sql, startDate, endDate).QueryRows(&list)
+	return
+}
+
+func MultiInsertTradeBaseDataToTopGuangzhouByClassify(exchange string, startDate, endDate string, classifyNames, classifyTypes []string) (err error) {
+	o := orm.NewOrm()
+	now := time.Now().Format(utils.FormatDateTime)
+
+	//新增买单榜单
+	sql1 := `INSERT INTO trade_position_guangzhou_top(classify_name,classify_type,deal_short_name,deal_value,deal_change,data_time,deal_type,source_type,rank,create_time,modify_time)
+SELECT
+	n.classify_code,n.contract,SUBSTRING_INDEX(c.index_name, '_', 1) AS index_name,a.value,a.qty_sub,a.data_time,1,0,0,?,?
+FROM
+	base_from_trade_guangzhou_data a
+	LEFT JOIN base_from_trade_guangzhou_index c ON a.base_from_trade_guangzhou_index_id = c.base_from_trade_guangzhou_index_id 
+	LEFT JOIN base_from_trade_guangzhou_contract n on c.base_from_trade_guangzhou_contract_id = n.base_from_trade_guangzhou_contract_id
+WHERE
+	a.data_time between ? and ?
+	and c.base_from_trade_guangzhou_classify_id in (7,8)
+    and n.classify_code in (` + utils.GetOrmInReplace(len(classifyNames)) + `) and n.contract in (` + utils.GetOrmInReplace(len(classifyTypes)) + `)
+	and c.index_name like "%持买单量%"
+	and c.index_name not like "%日成交持仓排名%"`
+	_, err = o.Raw(sql1, now, now, startDate, endDate, classifyNames, classifyTypes).Exec()
+	if err != nil {
+		return
+	}
+	//新增卖单榜单
+	sql2 := `INSERT INTO trade_position_guangzhou_top(classify_name,classify_type,deal_short_name,deal_value,deal_change,data_time,deal_type,source_type,rank,create_time,modify_time)
+SELECT
+	n.classify_code,n.contract,SUBSTRING_INDEX(c.index_name, '_', 1) AS index_name,a.value,a.qty_sub,a.data_time,2,0,0,?,?
+FROM
+	base_from_trade_guangzhou_data a
+	LEFT JOIN base_from_trade_guangzhou_index c ON a.base_from_trade_guangzhou_index_id = c.base_from_trade_guangzhou_index_id 
+	LEFT JOIN base_from_trade_guangzhou_contract n on c.base_from_trade_guangzhou_contract_id = n.base_from_trade_guangzhou_contract_id
+WHERE
+	a.data_time between ? and ?
+	and c.base_from_trade_guangzhou_classify_id in (7,8)
+  	and n.classify_code in (` + utils.GetOrmInReplace(len(classifyNames)) + `) and n.contract in (` + utils.GetOrmInReplace(len(classifyTypes)) + `)
+	and c.index_name like "%持卖单量%"
+	and c.index_name not like "%日成交持仓排名%"`
+	_, err = o.Raw(sql2, now, now, startDate, endDate, classifyNames, classifyTypes).Exec()
+	if err != nil {
+		return
+	}
+	//更新买单、卖单榜单排名字段
+	sql3 := `update trade_position_guangzhou_top s
+JOIN (
+SELECT
+  	classify_type,
+	deal_short_name,
+	data_time,
+	deal_type,
+  (@row_number := IF(@prev_year = CONCAT_WS( "_", data_time, deal_type, classify_type), @row_number + 1, 1) ) AS row_number,
+  @prev_year := CONCAT_WS( "_", data_time, deal_type, classify_type)
+FROM
+  trade_position_guangzhou_top,(SELECT @row_number := 0, @prev_year := NULL) r
+	where data_time between ? and ? and classify_type in (` + utils.GetOrmInReplace(len(classifyTypes)) + `)
+ORDER BY
+  data_time asc,
+  deal_type asc,
+  classify_type asc,
+  deal_value DESC
+	) t
+	ON s.classify_type = t.classify_type AND s.deal_short_name = t.deal_short_name 	
+	AND s.data_time = t.data_time AND s.deal_type = t.deal_type
+SET s.rank = t.row_number where s.data_time between ? and ? and s.classify_type in (` + utils.GetOrmInReplace(len(classifyTypes)) + `);`
+
+	_, err = o.Raw(sql3, startDate, endDate, classifyTypes, startDate, endDate, classifyTypes).Exec()
+
+	return
+}

+ 1 - 0
models/db.go

@@ -147,6 +147,7 @@ func initTradePositionTop() {
 		new(data_manage.TradePositionIneTop),
 		new(data_manage.TradePositionCffexTop),
 		new(data_manage.BaseFromTradeClassify), // 交易所分类
+		new(data_manage.TradePositionGuangzhouTop),
 	)
 }
 

+ 118 - 14
services/data/trade_position_analysis.go

@@ -13,7 +13,7 @@ import (
 
 // InitPositionTask 统计今日交易所的持仓分析数据
 func InitPositionTask(cont context.Context) (err error) {
-	exchanges := []string{"zhengzhou", "dalian", "shanghai", "cffex", "ine"} //郑商所,大商所,上期所,中金所,上期能源
+	exchanges := []string{"zhengzhou", "dalian", "shanghai", "cffex", "ine", "guangzhou"} //郑商所,大商所,上期所,中金所,上期能源,广期所
 	for i := 1; i >= 0; i-- {
 		startDate := time.Now().AddDate(0, 0, -i).Format(utils.FormatDate)
 		endDate := startDate
@@ -25,7 +25,110 @@ func InitPositionTask(cont context.Context) (err error) {
 
 			fmt.Println("开始" + startDate + "结束" + endDate)
 			utils.FileLog.Info(fmt.Sprintf("InitTradePosition:开始:%s; 结束:%s", startDate, endDate))
-			tErr, errMsg := InitTradePosition(exchange, startDate, endDate)
+			var tradePosition TradePositionInterface
+			if exchange == "guangzhou" {
+				tradePosition = &GuangzhouTradePosition{}
+			} else {
+				tradePosition = &BaseTradePosition{}
+			}
+			tErr, errMsg := InitTradePosition(exchange, startDate, endDate, tradePosition)
+			if tErr != nil {
+				err = tErr
+				fmt.Println("InitTradePosition: 操作失败:" + errMsg + tErr.Error())
+				utils.FileLog.Info(fmt.Sprintf("InitTradePosition: 操作失败:%s:%s", errMsg, tErr.Error()))
+				continue
+			}
+
+			fmt.Println("InitTradePosition:" + exchange + "已完成")
+			utils.FileLog.Info("InitTradePosition:" + exchange + "已完成")
+		}
+	}
+
+	// 处理交易所的分类
+	{
+		allBaseFromTradeClassify, tmpErr := data_manage.GetAllBaseFromTradeClassify()
+		if tmpErr != nil {
+			utils.FileLog.Info(fmt.Sprintf("获取所有交易所分类失败,;err:%s", tmpErr.Error()))
+			return
+		}
+		tradeClassifyMap := make(map[string]*data_manage.BaseFromTradeClassify)
+		for _, v := range allBaseFromTradeClassify {
+			key := fmt.Sprintf("%s_%s_%s", v.Exchange, v.ClassifyName, v.ClassifyType)
+			tradeClassifyMap[key] = v
+		}
+
+		baseFromTradeClassifyList := make([]*data_manage.BaseFromTradeClassify, 0)
+		for _, v := range exchanges {
+			tradeClassifyNameList, tmpErr := data_manage.GetExchangeClassify(v)
+			if tmpErr != nil {
+				utils.FileLog.Info(fmt.Sprintf("获取%s分类失败,;err:%s", v, tmpErr.Error()))
+				continue
+			}
+
+			for _, classify := range tradeClassifyNameList {
+				key := fmt.Sprintf("%s_%s_%s", v, classify.ClassifyName, classify.ClassifyType)
+
+				if tradeClassify, ok := tradeClassifyMap[key]; !ok {
+					baseFromTradeClassifyList = append(baseFromTradeClassifyList, &data_manage.BaseFromTradeClassify{
+						Id:           0,
+						ClassifyName: classify.ClassifyName,
+						ClassifyType: classify.ClassifyType,
+						Exchange:     v,
+						LatestDate:   classify.DataTime,
+						CreateTime:   time.Now(),
+						ModifyTime:   classify.ModifyTime,
+					})
+				} else {
+					if tradeClassify.LatestDate.Before(classify.DataTime) {
+						tradeClassify.LatestDate = classify.DataTime
+						tradeClassify.ModifyTime = classify.ModifyTime
+						tradeClassify.Update([]string{"LatestDate", "ModifyTime"})
+					}
+				}
+			}
+		}
+
+		lenAddList := len(baseFromTradeClassifyList)
+		if lenAddList > 0 {
+			baseAddNum := 500
+			num := lenAddList / baseAddNum
+			lastNum := lenAddList % baseAddNum
+			for i := 0; i <= num; i++ {
+				tmpNum := baseAddNum
+				if i == num && lastNum > 0 {
+					tmpNum = lastNum
+				}
+				data_manage.MultiAddBaseFromTradeClassify(baseFromTradeClassifyList[i*baseAddNum : (i*baseAddNum + tmpNum)])
+			}
+		}
+	}
+
+	return
+}
+
+// InitGuangzhouPositionTask 初始化广期所持仓分析排名情况
+func InitGuangzhouPositionTask() (err error) {
+	//exchanges := []string{"zhengzhou", "dalian", "shanghai", "cffex", "ine", "guangzhou"} //郑商所,大商所,上期所,中金所,上期能源
+	exchanges := []string{"guangzhou"} //郑商所,大商所,上期所,中金所,上期能源
+	for i := 0; i >= 0; i-- {
+		//startDate := time.Now().AddDate(0, 0, -i).Format(utils.FormatDate)
+		startDate := "2023-09-20"
+		endDate := startDate
+		for _, v := range exchanges {
+			exchange := v
+			err = nil
+			fmt.Println("InitPositionTask:	启动:" + exchange)
+			utils.FileLog.Info("InitPositionTask:	启动:" + exchange)
+
+			fmt.Println("开始" + startDate + "结束" + endDate)
+			utils.FileLog.Info(fmt.Sprintf("InitTradePosition:开始:%s; 结束:%s", startDate, endDate))
+			var tradePosition TradePositionInterface
+			if exchange == "guangzhou" {
+				tradePosition = &GuangzhouTradePosition{}
+			} else {
+				tradePosition = &BaseTradePosition{}
+			}
+			tErr, errMsg := InitTradePosition(exchange, startDate, endDate, tradePosition)
 			if tErr != nil {
 				err = tErr
 				fmt.Println("InitTradePosition: 操作失败:" + errMsg + tErr.Error())
@@ -100,7 +203,7 @@ func InitPositionTask(cont context.Context) (err error) {
 	return
 }
 
-func InitTradePosition(exchange, startDate, endDate string) (err error, errMsg string) {
+func InitTradePosition(exchange, startDate, endDate string, tradePosition TradePositionInterface) (err error, errMsg string) {
 	defer func() {
 		if err != nil {
 			tips := fmt.Sprintf("统计今日交易所的持仓分析数据失败, Exchange: %s, Err: %s, Msg: %s", exchange, err.Error(), errMsg)
@@ -119,7 +222,7 @@ func InitTradePosition(exchange, startDate, endDate string) (err error, errMsg s
 		//数据存在不同步的情况,有些合约会提早更新,有些合约会延迟更新
 		//判断合约数是否一致
 
-		originNum, tmpErr := data_manage.GetTradePositionOriginClassifyCountByExchangeDataTime(exchange, startDate, endDate)
+		originNum, tmpErr := tradePosition.GetTradePositionOriginClassifyCountByExchangeDataTime(exchange, startDate, endDate)
 		if tmpErr != nil {
 			err = tmpErr
 			errMsg = "查询原始数据分类个数失败,GetTradePositionOriginClassifyCountByExchangeDataTime() Err: "
@@ -142,7 +245,8 @@ func InitTradePosition(exchange, startDate, endDate string) (err error, errMsg s
 			return
 		}
 	}
-	err = data_manage.MultiInsertTradeBaseDataToTop(exchange, startDate, endDate)
+
+	err = tradePosition.MultiInsertTradeBaseDataToTop(exchange, startDate, endDate)
 	if err != nil {
 		errMsg = "新增原始数据失败,MultiInsertTradeBaseDataToTop() Err: "
 		return
@@ -167,7 +271,7 @@ func InitTradePosition(exchange, startDate, endDate string) (err error, errMsg s
 	}
 
 	// 原始数据日期
-	dates, e := data_manage.GetTradePositionTopOriginDataTimes(exchange)
+	dates, e := tradePosition.GetTradePositionTopOriginDataTimes(exchange)
 	if e != nil {
 		err = fmt.Errorf("GetTradePositionTopOriginDataTimes err: %s", e.Error())
 		return
@@ -258,13 +362,13 @@ func InitTradePosition(exchange, startDate, endDate string) (err error, errMsg s
 	}
 
 	//生成净多单,净空单榜单
-	err = createAnalysisCleanTop(exchange, startDate, endDate)
+	err = createAnalysisCleanTop(exchange, startDate, endDate, tradePosition)
 	if err != nil {
 		errMsg = "创建净多单,净空单数据失败,createAnalysisCleanTop() Err: "
 		return
 	}
 
-	err = DealYesterdayData(exchange, startDate)
+	err = DealYesterdayData(exchange, startDate, tradePosition)
 	if err != nil {
 		errMsg = "处理昨日数据失败,DealYesterdayData() Err: "
 		return
@@ -329,9 +433,9 @@ func dealTradeOriginData(dataTimeMap map[string]*data_manage.TradePositionTop, o
 }
 
 // DealYesterdayData 更新昨日数据
-func DealYesterdayData(exchange, startDate string) (err error) {
+func DealYesterdayData(exchange, startDate string, tradePosition TradePositionInterface) (err error) {
 	// 查询最早的日期
-	firstItem, err := data_manage.GetFirstBaseFromTradeIndexByDate(exchange)
+	firstItem, err := tradePosition.GetFirstBaseFromTradeIndexByDate(exchange)
 	if err != nil {
 		return
 	}
@@ -340,7 +444,7 @@ func DealYesterdayData(exchange, startDate string) (err error) {
 	}
 
 	// 前一个交易日, 前两个交易日
-	dates, e := data_manage.GetTradePositionTopOriginDataTimes(exchange)
+	dates, e := tradePosition.GetTradePositionTopOriginDataTimes(exchange)
 	if e != nil {
 		err = fmt.Errorf("GetTradePositionTopOriginDataTimes err: %s", e.Error())
 		return
@@ -462,7 +566,7 @@ func DealYesterdayData(exchange, startDate string) (err error) {
 		}
 
 		//重新生成净多单和净空单的榜单
-		err = createAnalysisCleanTop(exchange, yesterdayStr, yesterdayStr)
+		err = createAnalysisCleanTop(exchange, yesterdayStr, yesterdayStr, tradePosition)
 		if err != nil {
 			return
 		}
@@ -478,7 +582,7 @@ func DealYesterdayData(exchange, startDate string) (err error) {
 }
 
 // createAnalysisCleanTop 生成净多单,净空单榜单
-func createAnalysisCleanTop(exchange, startDate, endDate string) (err error) {
+func createAnalysisCleanTop(exchange, startDate, endDate string, tradePosition TradePositionInterface) (err error) {
 	defer func() {
 		if err != nil {
 			fmt.Println("createAnalysisCleanTop err: " + err.Error())
@@ -493,7 +597,7 @@ func createAnalysisCleanTop(exchange, startDate, endDate string) (err error) {
 	subChangeMap2 := make(map[string]int) //净空单map
 
 	// 2023-05-10 此处取前一个交易日, 不一定是昨日
-	dates, e := data_manage.GetTradePositionTopOriginDataTimes(exchange)
+	dates, e := tradePosition.GetTradePositionTopOriginDataTimes(exchange)
 	if e != nil {
 		err = fmt.Errorf("GetTradePositionTopOriginDataTimes err: %s", e.Error())
 		return

+ 24 - 14
services/data/trade_position_analysis_classify.go

@@ -12,10 +12,12 @@ import (
 
 // FixPositionTask 补全缺失的合约, 注意和原先的定时任务区分开来
 func FixPositionTask() (err error) {
-	exchanges := []string{"zhengzhou", "dalian", "shanghai", "cffex", "ine"} //郑商所,大商所,上期所,中金所,上期能源
+	exchanges := []string{"zhengzhou", "dalian", "shanghai", "cffex", "ine", "guangzhou"} //郑商所,大商所,上期所,中金所,上期能源
+	//exchanges := []string{"guangzhou"} //郑商所,大商所,上期所,中金所,上期能源
 	for i := 194; i > 1; i-- {
 		// 定时任务避开昨日和今日的数据,以免和原先的定时任务InitPositionTask冲突
 		startDate := time.Now().AddDate(0, 0, -i).Format(utils.FormatDate)
+		//startDate := "2024-01-08"
 		endDate := startDate
 		for _, v := range exchanges {
 			exchange := v
@@ -25,7 +27,14 @@ func FixPositionTask() (err error) {
 
 			fmt.Println("开始" + startDate + "结束" + endDate)
 			utils.FileLog.Info(fmt.Sprintf("FixPositionTask:开始:%s; 结束:%s", startDate, endDate))
-			tErr, errMsg := InitTradePositionClassify(exchange, startDate, endDate)
+
+			var tradePosition TradePositionInterface
+			if exchange == "guangzhou" {
+				tradePosition = &GuangzhouTradePosition{}
+			} else {
+				tradePosition = &BaseTradePosition{}
+			}
+			tErr, errMsg := InitTradePositionClassify(exchange, startDate, endDate, tradePosition)
 			if tErr != nil {
 				err = tErr
 				fmt.Println("FixPositionTask: 操作失败:" + errMsg + tErr.Error())
@@ -41,16 +50,17 @@ func FixPositionTask() (err error) {
 }
 
 // InitTradePositionClassify 持仓分析补全缺失的合约
-func InitTradePositionClassify(exchange, startDate, endDate string) (err error, errMsg string) {
+func InitTradePositionClassify(exchange, startDate, endDate string, tradePosition TradePositionInterface) (err error, errMsg string) {
 	defer func() {
 		if err != nil {
 			tips := fmt.Sprintf("持仓分析-补全缺失的品种数据 操作失败, Exchange: %s, Err: %s, Msg: %s", exchange, err.Error(), errMsg)
+			utils.FileLog.Info(tips)
 			alarm_msg.SendAlarmMsg(tips, 3)
 		}
 	}()
 
 	//判断合约数与数据源是否一致
-	originClassifyList, tmpErr := data_manage.GetTradePositionOriginClassifyByExchangeDataTime(exchange, startDate, endDate)
+	originClassifyList, tmpErr := tradePosition.GetTradePositionOriginClassifyByExchangeDataTime(exchange, startDate, endDate)
 	if tmpErr != nil {
 		err = tmpErr
 		errMsg = "查询原始数据分类个数失败,GetTradePositionOriginClassifyCountByExchangeDataTime() Err: "
@@ -94,7 +104,7 @@ func InitTradePositionClassify(exchange, startDate, endDate string) (err error,
 	}
 
 	//批量导入缺失的合约
-	err = data_manage.MultiInsertTradeBaseDataToTopByClassify(exchange, startDate, endDate, classifyNames, classifyTypes)
+	err = tradePosition.MultiInsertTradeBaseDataToTopByClassify(exchange, startDate, endDate, classifyNames, classifyTypes)
 	if err != nil {
 		errMsg = "新增原始数据失败,MultiInsertTradeBaseDataToTop() Err: "
 		return
@@ -119,7 +129,7 @@ func InitTradePositionClassify(exchange, startDate, endDate string) (err error,
 	}
 
 	// 原始数据日期
-	dates, e := data_manage.GetTradePositionTopOriginDataTimes(exchange)
+	dates, e := tradePosition.GetTradePositionTopOriginDataTimes(exchange)
 	if e != nil {
 		err = fmt.Errorf("GetTradePositionTopOriginDataTimes err: %s", e.Error())
 		return
@@ -210,13 +220,13 @@ func InitTradePositionClassify(exchange, startDate, endDate string) (err error,
 	}
 
 	//生成净多单,净空单榜单
-	err = createAnalysisCleanTopClassify(exchange, startDate, endDate, classifyNames, classifyTypes)
+	err = createAnalysisCleanTopClassify(exchange, startDate, endDate, classifyNames, classifyTypes, tradePosition)
 	if err != nil {
 		errMsg = "创建净多单,净空单数据失败,createAnalysisCleanTop() Err: "
 		return
 	}
 
-	err = DealYesterdayDataClassify(exchange, startDate, classifyNames, classifyTypes)
+	err = DealYesterdayDataClassify(exchange, startDate, classifyNames, classifyTypes, tradePosition)
 	if err != nil {
 		errMsg = "处理昨日数据失败,DealYesterdayData() Err: "
 		return
@@ -225,9 +235,9 @@ func InitTradePositionClassify(exchange, startDate, endDate string) (err error,
 }
 
 // DealYesterdayDataClassify 更新部分合约的昨日数据
-func DealYesterdayDataClassify(exchange, startDate string, classifyNames, classifyTypes []string) (err error) {
+func DealYesterdayDataClassify(exchange, startDate string, classifyNames, classifyTypes []string, tradePosition TradePositionInterface) (err error) {
 	// 查询最早的日期
-	firstItem, err := data_manage.GetFirstBaseFromTradeIndexByDate(exchange)
+	firstItem, err := tradePosition.GetFirstBaseFromTradeIndexByDate(exchange)
 	if err != nil {
 		return
 	}
@@ -236,7 +246,7 @@ func DealYesterdayDataClassify(exchange, startDate string, classifyNames, classi
 	}
 
 	// 前一个交易日, 前两个交易日
-	dates, e := data_manage.GetTradePositionTopOriginDataTimes(exchange)
+	dates, e := tradePosition.GetTradePositionTopOriginDataTimes(exchange)
 	if e != nil {
 		err = fmt.Errorf("GetTradePositionTopOriginDataTimes err: %s", e.Error())
 		return
@@ -349,7 +359,7 @@ func DealYesterdayDataClassify(exchange, startDate string, classifyNames, classi
 		}
 
 		//重新生成净多单和净空单的榜单
-		err = createAnalysisCleanTopClassify(exchange, yesterdayStr, yesterdayStr, classifyNames, classifyTypes)
+		err = createAnalysisCleanTopClassify(exchange, yesterdayStr, yesterdayStr, classifyNames, classifyTypes, tradePosition)
 		if err != nil {
 			return
 		}
@@ -365,7 +375,7 @@ func DealYesterdayDataClassify(exchange, startDate string, classifyNames, classi
 }
 
 // createAnalysisCleanTopClassify 生成部分合约的净多单,净空单榜单
-func createAnalysisCleanTopClassify(exchange, startDate, endDate string, classifyNames, classifyTypes []string) (err error) {
+func createAnalysisCleanTopClassify(exchange, startDate, endDate string, classifyNames, classifyTypes []string, tradePosition TradePositionInterface) (err error) {
 	defer func() {
 		if err != nil {
 			fmt.Println("createAnalysisCleanTop err: " + err.Error())
@@ -380,7 +390,7 @@ func createAnalysisCleanTopClassify(exchange, startDate, endDate string, classif
 	subChangeMap2 := make(map[string]int) //净空单map
 
 	// 2023-05-10 此处取前一个交易日, 不一定是昨日
-	dates, e := data_manage.GetTradePositionTopOriginDataTimes(exchange)
+	dates, e := tradePosition.GetTradePositionTopOriginDataTimes(exchange)
 	if e != nil {
 		err = fmt.Errorf("GetTradePositionTopOriginDataTimes err: %s", e.Error())
 		return

+ 78 - 0
services/data/trade_position_analysis_interface.go

@@ -0,0 +1,78 @@
+package data
+
+import "eta/eta_task/models/data_manage"
+
+type TradePositionInterface interface {
+	MultiInsertTradeBaseDataToTop(exchange, startDate, endDate string) (err error)
+	GetTradePositionTopOriginDataTimes(exchange string) (dates []string, err error)
+	GetTradePositionOriginClassifyCountByExchangeDataTime(exchange string, startDate, endDate string) (count int64, err error)
+	GetFirstBaseFromTradeIndexByDate(exchange string) (item *data_manage.GetFirstBaseFromTradeIndeDate, err error)
+	GetTradePositionOriginClassifyByExchangeDataTime(exchange, startDate, endDate string) (list []data_manage.TradePositionClassifyInfo, err error)
+	MultiInsertTradeBaseDataToTopByClassify(exchange string, startDate, endDate string, classifyNames, classifyTypes []string) (err error)
+}
+
+type BaseTradePosition struct{}
+
+func (b *BaseTradePosition) MultiInsertTradeBaseDataToTop(exchange, startDate, endDate string) (err error) {
+	err = data_manage.MultiInsertTradeBaseDataToTop(exchange, startDate, endDate)
+	return
+}
+
+func (b *BaseTradePosition) GetTradePositionTopOriginDataTimes(exchange string) (dates []string, err error) {
+	dates, err = data_manage.GetTradePositionTopOriginDataTimes(exchange)
+	return
+}
+
+func (b *BaseTradePosition) GetTradePositionOriginClassifyCountByExchangeDataTime(exchange string, startDate, endDate string) (count int64, err error) {
+	count, err = data_manage.GetTradePositionOriginClassifyCountByExchangeDataTime(exchange, startDate, endDate)
+	return
+}
+
+func (b *BaseTradePosition) GetFirstBaseFromTradeIndexByDate(exchange string) (item *data_manage.GetFirstBaseFromTradeIndeDate, err error) {
+	item, err = data_manage.GetFirstBaseFromTradeIndexByDate(exchange)
+	return
+}
+
+func (b *BaseTradePosition) GetTradePositionOriginClassifyByExchangeDataTime(exchange, startDate, endDate string) (list []data_manage.TradePositionClassifyInfo, err error) {
+	list, err = data_manage.GetTradePositionOriginClassifyByExchangeDataTime(exchange, startDate, endDate)
+	return
+}
+
+func (b *BaseTradePosition) MultiInsertTradeBaseDataToTopByClassify(exchange string, startDate, endDate string, classifyNames, classifyTypes []string) (err error) {
+	err = data_manage.MultiInsertTradeBaseDataToTopByClassify(exchange, startDate, endDate, classifyNames, classifyTypes)
+	return
+}
+
+type GuangzhouTradePosition struct{}
+
+func (g *GuangzhouTradePosition) MultiInsertTradeBaseDataToTop(exchange, startDate, endDate string) (err error) {
+	err = data_manage.MultiInsertTradeGuangzhouDataToTop(exchange, startDate, endDate)
+
+	//查询出今日的value值
+	return
+}
+
+func (g *GuangzhouTradePosition) GetTradePositionTopOriginDataTimes(exchange string) (dates []string, err error) {
+	dates, err = data_manage.GetTradePositionTopOriginGuangzhouDataTimes(exchange)
+	return
+}
+
+func (g *GuangzhouTradePosition) GetTradePositionOriginClassifyCountByExchangeDataTime(exchange string, startDate, endDate string) (count int64, err error) {
+	count, err = data_manage.GetTradePositionOriginGuangzhouClassifyCountByExchangeDataTime(exchange, startDate, endDate)
+	return
+}
+
+func (g *GuangzhouTradePosition) GetFirstBaseFromTradeIndexByDate(exchange string) (item *data_manage.GetFirstBaseFromTradeIndeDate, err error) {
+	item, err = data_manage.GetFirstBaseFromTradeGuangzhouIndexByDate(exchange)
+	return
+}
+
+func (g *GuangzhouTradePosition) GetTradePositionOriginClassifyByExchangeDataTime(exchange, startDate, endDate string) (list []data_manage.TradePositionClassifyInfo, err error) {
+	list, err = data_manage.GetTradePositionOriginClassifyGuangzhouByExchangeDataTime(exchange, startDate, endDate)
+	return
+}
+
+func (g *GuangzhouTradePosition) MultiInsertTradeBaseDataToTopByClassify(exchange string, startDate, endDate string, classifyNames, classifyTypes []string) (err error) {
+	err = data_manage.MultiInsertTradeBaseDataToTopGuangzhouByClassify(exchange, startDate, endDate, classifyNames, classifyTypes)
+	return
+}