浏览代码

增加修复部分缺失的历史合约逻辑

xyxie 1 年之前
父节点
当前提交
79995e710e
共有 2 个文件被更改,包括 118 次插入2 次删除
  1. 92 0
      models/data_manage/trade_position_analysis.go
  2. 26 2
      services/data/trade_position_analysis.go

+ 92 - 0
models/data_manage/trade_position_analysis.go

@@ -98,6 +98,13 @@ func GetTradePositionTopByExchangeDataTime(exchange string, startDate, endDate s
 	return
 	return
 }
 }
 
 
+func GetTradePositionTopByExchangeDataTimeByClassify(exchange string, startDate, endDate string, classifyNames, classifyTypes []string) (list []*TradePositionTop, err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := `SELECT * FROM trade_position_` + exchange + `_top where data_time >= ? and data_time <= ? and deal_type in (1,2) and classify_name in (` + utils.GetOrmInReplace(len(classifyNames)) + `)  and classify_type in (` + utils.GetOrmInReplace(len(classifyTypes)) + `) ORDER BY classify_name, classify_type, deal_type, data_time, deal_value desc`
+	_, err = o.Raw(sql, startDate, endDate, classifyNames, classifyTypes).QueryRows(&list)
+	return
+}
+
 func GetTradePositionTopCountByExchangeDataTime(exchange string, startDate, endDate string) (count int64, err error) {
 func GetTradePositionTopCountByExchangeDataTime(exchange string, startDate, endDate string) (count int64, err error) {
 	o := orm.NewOrmUsingDB("data")
 	o := orm.NewOrmUsingDB("data")
 	sql := "SELECT count(*) FROM trade_position_" + exchange + "_top where data_time >= ? and data_time <= ? and deal_type in (1,2) ORDER BY classify_name, classify_type, deal_type, data_time, deal_value desc"
 	sql := "SELECT count(*) FROM trade_position_" + exchange + "_top where data_time >= ? and data_time <= ? and deal_type in (1,2) ORDER BY classify_name, classify_type, deal_type, data_time, deal_value desc"
@@ -112,6 +119,13 @@ func GetTradePositionTopByExchangeSourceType(exchange string, dataTime string, s
 	return
 	return
 }
 }
 
 
+func GetTradePositionTopByExchangeSourceTypeClassify(exchange string, dataTime string, sourceType int, classifyNames, classifyTypes []string) (list []*TradePositionTop, err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := `SELECT * FROM trade_position_` + exchange + `_top where data_time= ? and source_type = ? and classify_name in (` + utils.GetOrmInReplace(len(classifyNames)) + `)  and classify_type in (` + utils.GetOrmInReplace(len(classifyTypes)) + `) ORDER BY classify_name, classify_type, deal_type, deal_value desc`
+	_, err = o.Raw(sql, dataTime, sourceType, classifyNames, classifyTypes).QueryRows(&list)
+	return
+}
+
 type TradeTopClassify struct {
 type TradeTopClassify struct {
 	ClassifyName string //分类名称
 	ClassifyName string //分类名称
 	ClassifyType string //分类名称下的类型
 	ClassifyType string //分类名称下的类型
@@ -179,6 +193,13 @@ func DeletePositionTopByDataTime(exchange string, dataTime string, dealType int)
 	return
 	return
 }
 }
 
 
+func DeletePositionTopByDataTimeClassify(exchange string, dataTime string, dealType int, classifyNames, classifyTypes []string) (err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := `delete from trade_position_` + exchange + `_top WHERE data_time=? and deal_type=? and classify_name in (` + utils.GetOrmInReplace(len(classifyNames)) + `)  and classify_type in (` + utils.GetOrmInReplace(len(classifyTypes)) + `)`
+	_, err = o.Raw(sql, dataTime, dealType, classifyNames, classifyTypes).Exec()
+	return
+}
+
 func GetTradePositionTopByExchangeDataTimeType(exchange string, dataTime string, dealType int) (list []TradePositionTop, err error) {
 func GetTradePositionTopByExchangeDataTimeType(exchange string, dataTime string, dealType int) (list []TradePositionTop, err error) {
 	o := orm.NewOrmUsingDB("data")
 	o := orm.NewOrmUsingDB("data")
 	sql := "select * from trade_position_" + exchange + "_top WHERE data_time=? and deal_type=?"
 	sql := "select * from trade_position_" + exchange + "_top WHERE data_time=? and deal_type=?"
@@ -186,6 +207,13 @@ func GetTradePositionTopByExchangeDataTimeType(exchange string, dataTime string,
 	return
 	return
 }
 }
 
 
+func GetTradePositionTopByExchangeDataTimeTypeClassify(exchange string, dataTime string, dealType int, classifyNames, classifyTypes []string) (list []TradePositionTop, err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := `select * from trade_position_` + exchange + `_top WHERE data_time=? and deal_type=? and classify_name in (` + utils.GetOrmInReplace(len(classifyNames)) + `)  and classify_type in (` + utils.GetOrmInReplace(len(classifyTypes)) + `)`
+	_, err = o.Raw(sql, dataTime, dealType, classifyNames, classifyTypes).QueryRows(&list)
+	return
+}
+
 func MultiInsertTradeBaseDataToTop(exchange string, startDate, endDate string) (err error) {
 func MultiInsertTradeBaseDataToTop(exchange string, startDate, endDate string) (err error) {
 	o := orm.NewOrmUsingDB("data")
 	o := orm.NewOrmUsingDB("data")
 	now := time.Now().Format(utils.FormatDateTime)
 	now := time.Now().Format(utils.FormatDateTime)
@@ -201,6 +229,21 @@ SELECT classify_name,classify_type,sold_short_name,sold_value,sold_change,data_t
 	return
 	return
 }
 }
 
 
+func MultiInsertTradeBaseDataToTopByClassify(exchange string, startDate, endDate string, classifyNames, classifyTypes []string) (err error) {
+	o := orm.NewOrmUsingDB("data")
+	now := time.Now().Format(utils.FormatDateTime)
+	sql1 := `INSERT INTO trade_position_` + exchange + `_top(classify_name,classify_type,deal_short_name,deal_value,deal_change,data_time,deal_type,source_type,rank,create_time,modify_time)
+	SELECT classify_name,classify_type,buy_short_name,buy_value,buy_change,data_time,1,0,rank,?,? FROM base_from_trade_` + exchange + `_index where rank <50 and buy_short_name !="" and data_time between ? and ? and classify_name in (` + utils.GetOrmInReplace(len(classifyNames)) + `)  and classify_type in (` + utils.GetOrmInReplace(len(classifyTypes)) + `)`
+	_, err = o.Raw(sql1, now, now, startDate, endDate, classifyNames, classifyTypes).Exec()
+	if err != nil {
+		return
+	}
+	sql2 := `INSERT INTO trade_position_` + exchange + `_top(classify_name,classify_type,deal_short_name,deal_value,deal_change,data_time,deal_type,source_type,rank,create_time,modify_time)
+SELECT classify_name,classify_type,sold_short_name,sold_value,sold_change,data_time,2,0,rank,?,? FROM base_from_trade_` + exchange + `_index where rank <50 and sold_short_name !="" and data_time between ? and ? and classify_name in (` + utils.GetOrmInReplace(len(classifyNames)) + `)  and classify_type in (` + utils.GetOrmInReplace(len(classifyTypes)) + `)`
+	_, err = o.Raw(sql2, now, now, startDate, endDate, classifyNames, classifyTypes).Exec()
+	return
+}
+
 // GetTradePositionTopOriginDataTimes 获取榜单原始数据日期-正序
 // GetTradePositionTopOriginDataTimes 获取榜单原始数据日期-正序
 func GetTradePositionTopOriginDataTimes(exchange string) (dates []string, err error) {
 func GetTradePositionTopOriginDataTimes(exchange string) (dates []string, err error) {
 	o := orm.NewOrmUsingDB("data")
 	o := orm.NewOrmUsingDB("data")
@@ -279,6 +322,14 @@ func GetTradePositionTopCleanByExchangeDataTime(exchange string, startDate, endD
 	return
 	return
 }
 }
 
 
+// GetTradePositionTopCleanByExchangeDataTimeClassify 根据时间查询净多单和净空单的值
+func GetTradePositionTopCleanByExchangeDataTimeClassify(exchange string, startDate, endDate string, classifyNames, classifyTypes []string) (list []*TradePositionTop, err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := `SELECT * FROM trade_position_` + exchange + `_top where data_time >= ? and data_time <= ? and deal_type in (3,4) and classify_name in (` + utils.GetOrmInReplace(len(classifyNames)) + `)  and classify_type in (` + utils.GetOrmInReplace(len(classifyTypes)) + `) ORDER BY classify_name, classify_type, deal_type, data_time, deal_value desc`
+	_, err = o.Raw(sql, startDate, endDate, classifyNames, classifyTypes).QueryRows(&list)
+	return
+}
+
 // MultiUpdatePositionTopChangeVal 批量更新榜单里变化量的值
 // MultiUpdatePositionTopChangeVal 批量更新榜单里变化量的值
 func MultiUpdatePositionTopChangeVal(exchange string, updates []UpdateChangeVal) (err error) {
 func MultiUpdatePositionTopChangeVal(exchange string, updates []UpdateChangeVal) (err error) {
 	o := orm.NewOrmUsingDB("data")
 	o := orm.NewOrmUsingDB("data")
@@ -297,3 +348,44 @@ func MultiUpdatePositionTopChangeVal(exchange string, updates []UpdateChangeVal)
 	}
 	}
 	return
 	return
 }
 }
+
+func GetTradePositionOriginClassifyCountByExchangeDataTime(exchange string, startDate, endDate string) (count int64, err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := `SELECT COUNT(DISTINCT classify_name, classify_type) FROM base_from_trade_` + exchange + `_index where rank <50 and (buy_short_name !="" || sold_short_name !="" ) and data_time >= ? and data_time <= ?`
+	err = o.Raw(sql, startDate, endDate).QueryRow(&count)
+	return
+}
+
+func GetTradePositionTopClassifyCountByExchangeDataTime(exchange string, startDate, endDate string) (count int64, err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := "SELECT COUNT(DISTINCT classify_name, classify_type)  FROM trade_position_" + exchange + "_top where data_time >= ? and data_time <= ? and deal_type in (1,2) "
+	err = o.Raw(sql, startDate, endDate).QueryRow(&count)
+	return
+}
+
+type TradePositionClassifyInfo struct {
+	ClassifyName string //分类名称
+	ClassifyType string //分类名称下的类型
+}
+
+func GetTradePositionOriginClassifyByExchangeDataTime(exchange string, startDate, endDate string) (list []TradePositionClassifyInfo, err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := `SELECT DISTINCT classify_name, classify_type FROM base_from_trade_` + exchange + `_index where rank <50 and (buy_short_name !="" || sold_short_name !="" ) and data_time >= ? and data_time <= ?`
+	_, err = o.Raw(sql, startDate, endDate).QueryRows(&list)
+	return
+}
+
+func GetTradePositionTopClassifyByExchangeDataTime(exchange string, startDate, endDate string) (list []TradePositionClassifyInfo, err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := "SELECT DISTINCT classify_name, classify_type  FROM trade_position_" + exchange + "_top where data_time >= ? and data_time <= ? and deal_type in (1,2) "
+	_, err = o.Raw(sql, startDate, endDate).QueryRows(&list)
+	return
+}
+
+// DeleteTradePositionTopAllByExchangeDataTime 删除计算数据
+func DeleteTradePositionTopAllByExchangeDataTime(exchange string, startDate, endDate string) (err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := "DELETE FROM trade_position_" + exchange + "_top where data_time >= ? and data_time <= ? "
+	_, err = o.Raw(sql, startDate, endDate).Exec()
+	return
+}

+ 26 - 2
services/data/trade_position_analysis.go

@@ -116,7 +116,31 @@ func InitTradePosition(exchange, startDate, endDate string) (err error, errMsg s
 	}
 	}
 	if num > 0 {
 	if num > 0 {
 		//err = fmt.Errorf("数据已存在,无需处理")
 		//err = fmt.Errorf("数据已存在,无需处理")
-		return
+		//数据存在不同步的情况,有些合约会提早更新,有些合约会延迟更新
+		//判断合约数是否一致
+
+		originNum, tmpErr := data_manage.GetTradePositionOriginClassifyCountByExchangeDataTime(exchange, startDate, endDate)
+		if tmpErr != nil {
+			err = tmpErr
+			errMsg = "查询原始数据分类个数失败,GetTradePositionOriginClassifyCountByExchangeDataTime() Err: "
+			return
+		}
+		topNum, tmpErr := data_manage.GetTradePositionTopClassifyCountByExchangeDataTime(exchange, startDate, endDate)
+		if tmpErr != nil {
+			err = tmpErr
+			errMsg = "查询榜单数据分类个数失败,GetTradePositionTopClassifyCountByExchangeDataTime() Err: "
+			return
+		}
+		if originNum == topNum {
+			//err = fmt.Errorf("数据已存在,无需处理")
+			return
+		}
+		//如果合约数不一致,则删除今日数据
+		err = data_manage.DeleteTradePositionTopAllByExchangeDataTime(exchange, startDate, endDate)
+		if err != nil {
+			errMsg = "删除榜单数据失败,DeleteTradePositionTopAllByExchangeDataTime() Err: "
+			return
+		}
 	}
 	}
 	err = data_manage.MultiInsertTradeBaseDataToTop(exchange, startDate, endDate)
 	err = data_manage.MultiInsertTradeBaseDataToTop(exchange, startDate, endDate)
 	if err != nil {
 	if err != nil {
@@ -239,7 +263,7 @@ func InitTradePosition(exchange, startDate, endDate string) (err error, errMsg s
 		errMsg = "创建净多单,净空单数据失败,createAnalysisCleanTop() Err: "
 		errMsg = "创建净多单,净空单数据失败,createAnalysisCleanTop() Err: "
 		return
 		return
 	}
 	}
-	// 特殊处理起始日期前一天的数据
+
 	err = DealYesterdayData(exchange, startDate)
 	err = DealYesterdayData(exchange, startDate)
 	if err != nil {
 	if err != nil {
 		errMsg = "处理昨日数据失败,DealYesterdayData() Err: "
 		errMsg = "处理昨日数据失败,DealYesterdayData() Err: "