瀏覽代碼

feat:新增持仓分析

Roc 1 年之前
父節點
當前提交
8a290618f0

+ 12 - 0
models/data_manage/base_from_trade_index.go

@@ -0,0 +1,12 @@
+package data_manage
+
+import (
+	"github.com/beego/beego/v2/client/orm"
+)
+
+func GetFirstBaseFromTradeIndexByDate(exchange string) (item *BaseFromTradeShanghaiIndex, err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := "SELECT * FROM base_from_trade_" + exchange + "_index where rank < 50 order by data_time asc"
+	err = o.Raw(sql).QueryRow(&item)
+	return
+}

+ 256 - 0
models/data_manage/trade_position_analysis.go

@@ -0,0 +1,256 @@
+package data_manage
+
+import (
+	"fmt"
+	"github.com/beego/beego/v2/client/orm"
+	"hongze/hongze_task_trial/utils"
+	"time"
+	"unsafe"
+)
+
+// 持仓榜单表
+type TradePositionTop struct {
+	Id            uint64    `orm:"column(id);pk"`
+	ClassifyName  string    //分类名称
+	ClassifyType  string    //分类名称下的类型
+	DataTime      string    //数据日期
+	CreateTime    time.Time //插入时间
+	ModifyTime    time.Time //修改时间
+	DealShortName string    //成交量公司简称
+	DealValue     int       //成交量
+	DealChange    int       //成交变化量
+	DealType      int       //交易类型:1多单,2空单,3净多单,4净空单
+	SourceType    int       //数据来源,0是原始数据的值,1是由T+1日推算出的值,2是由T日的榜单数据推算出的值
+	Rank          int       //排名
+}
+
+type TradePositionDalianTop struct {
+	TradePositionTop
+}
+
+type TradePositionZhengzhouTop struct {
+	TradePositionTop
+}
+
+type TradePositionCffexTop struct {
+	TradePositionTop
+}
+
+type TradePositionShanghaiTop struct {
+	TradePositionTop
+}
+
+type TradePositionIneTop struct {
+	TradePositionTop
+}
+
+func InsertMultiTradePositionTop(exchange string, items []*TradePositionTop) (err error) {
+	o := orm.NewOrmUsingDB("data")
+	if exchange == "dalian" {
+		list := make([]*TradePositionDalianTop, 0)
+		for _, v := range items {
+			tmp := (*TradePositionDalianTop)(unsafe.Pointer(v))
+			list = append(list, tmp)
+		}
+		_, err = o.InsertMulti(len(list), list)
+		return
+	} else if exchange == "zhengzhou" {
+		list := make([]*TradePositionZhengzhouTop, 0)
+		for _, v := range items {
+			tmp := (*TradePositionZhengzhouTop)(unsafe.Pointer(v))
+			list = append(list, tmp)
+		}
+		_, err = o.InsertMulti(len(list), list)
+		return
+	} else if exchange == "cffex" {
+		list := make([]*TradePositionCffexTop, 0)
+		for _, v := range items {
+			tmp := (*TradePositionCffexTop)(unsafe.Pointer(v))
+			list = append(list, tmp)
+		}
+		_, err = o.InsertMulti(len(list), list)
+		return
+	} else if exchange == "shanghai" {
+		list := make([]*TradePositionShanghaiTop, 0)
+		for _, v := range items {
+			tmp := (*TradePositionShanghaiTop)(unsafe.Pointer(v))
+			list = append(list, tmp)
+		}
+		_, err = o.InsertMulti(len(list), list)
+		return
+	} else if exchange == "ine" {
+		list := make([]*TradePositionIneTop, 0)
+		for _, v := range items {
+			tmp := (*TradePositionIneTop)(unsafe.Pointer(v))
+			list = append(list, tmp)
+		}
+		_, err = o.InsertMulti(len(list), list)
+		return
+	}
+
+	return
+}
+
+func GetTradePositionTopByExchangeDataTime(exchange string, startDate, endDate string) (list []*TradePositionTop, err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := "SELECT * FROM trade_position_" + exchange + "_top where data_time >= ? and data_time <= ? and deal_type in (1,2) ORDER BY classify_name, classify_type, deal_type, data_time, deal_value desc"
+	_, err = o.Raw(sql, startDate, endDate).QueryRows(&list)
+	return
+}
+
+func GetTradePositionTopCountByExchangeDataTime(exchange string, startDate, endDate string) (count int64, err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := "SELECT count(*) FROM trade_position_" + exchange + "_top where data_time >= ? and data_time <= ? and deal_type in (1,2) ORDER BY classify_name, classify_type, deal_type, data_time, deal_value desc"
+	err = o.Raw(sql, startDate, endDate).QueryRow(&count)
+	return
+}
+
+func GetTradePositionTopByExchangeSourceType(exchange string, dataTime string, sourceType int) (list []*TradePositionTop, err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := "SELECT * FROM trade_position_" + exchange + "_top where data_time= ? and source_type = ? ORDER BY classify_name, classify_type, deal_type, deal_value desc"
+	_, err = o.Raw(sql, dataTime, sourceType).QueryRows(&list)
+	return
+}
+
+type TradeTopClassify struct {
+	ClassifyName string //分类名称
+	ClassifyType string //分类名称下的类型
+}
+
+type TradePositionSub struct {
+	ClassifyName  string //分类名称
+	ClassifyType  string //分类名称下的类型
+	DataTime      string //数据日期
+	DealShortName string //成交量公司简称
+	SubValue      int    //差值
+	DealType      int
+}
+
+type TradePositionSubList []*TradePositionSub
+
+func (v TradePositionSubList) Len() int {
+	return len(v)
+}
+
+func (v TradePositionSubList) Swap(i, j int) {
+	v[i], v[j] = v[j], v[i]
+}
+
+func (v TradePositionSubList) Less(i, j int) bool {
+	return v[i].SubValue > v[j].SubValue
+}
+
+type UpdateDealValueChange struct {
+	Id         uint64
+	DealValue  int //成交量
+	DealChange int
+	SourceType int
+	ModifyTime time.Time //修改时间
+}
+
+func MultiUpdatePositionTop(exchange string, updates []UpdateDealValueChange) (err error) {
+	o := orm.NewOrmUsingDB("data")
+	p, err := o.Raw("UPDATE trade_position_" + exchange + "_top SET deal_value=?, deal_change=?, source_type=?, modify_time=? WHERE id = ?").Prepare()
+	if err != nil {
+		return
+	}
+	defer func() {
+		_ = p.Close() // 别忘记关闭 statement
+	}()
+	for _, v := range updates {
+		_, err = p.Exec(v.DealValue, v.DealChange, v.SourceType, v.ModifyTime, v.Id)
+		if err != nil {
+			return
+		}
+	}
+	return
+}
+
+func DeletePositionTopByDataTime(exchange string, dataTime string, dealType int) (err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := "delete from trade_position_" + exchange + "_Top WHERE data_time=? and deal_type=?"
+	_, err = o.Raw(sql, dataTime, dealType).Exec()
+	return
+}
+
+func GetTradePositionTopByExchangeDataTimeType(exchange string, dataTime string, dealType int) (list []TradePositionTop, err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := "select * from trade_position_" + exchange + "_Top WHERE data_time=? and deal_type=?"
+	_, err = o.Raw(sql, dataTime, dealType).QueryRows(&list)
+	return
+}
+
+func MultiInsertTradeBaseDataToTop(exchange string, startDate, endDate string) (err error) {
+	o := orm.NewOrmUsingDB("data")
+	now := time.Now().Format(utils.FormatDateTime)
+	sql1 := `INSERT INTO trade_position_` + exchange + `_top(classify_name,classify_type,deal_short_name,deal_value,deal_change,data_time,deal_type,source_type,rank,create_time,modify_time)
+	SELECT classify_name,classify_type,buy_short_name,buy_value,buy_change,data_time,1,0,rank,?,? FROM base_from_trade_` + exchange + `_index where rank <50 and buy_short_name !="" and data_time between ? and ?`
+	_, err = o.Raw(sql1, now, now, startDate, endDate).Exec()
+	if err != nil {
+		return
+	}
+	sql2 := `INSERT INTO trade_position_` + exchange + `_top(classify_name,classify_type,deal_short_name,deal_value,deal_change,data_time,deal_type,source_type,rank,create_time,modify_time)
+SELECT classify_name,classify_type,sold_short_name,sold_value,sold_change,data_time,2,0,rank,?,? FROM base_from_trade_` + exchange + `_index where rank <50 and sold_short_name !="" and data_time between ? and ?`
+	_, err = o.Raw(sql2, now, now, startDate, endDate).Exec()
+	return
+}
+
+// GetTradePositionTopOriginDataTimes 获取榜单原始数据日期-正序
+func GetTradePositionTopOriginDataTimes(exchange string) (dates []string, err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := `SELECT DISTINCT data_time FROM base_from_trade_%s_index ORDER BY data_time ASC`
+	sql = fmt.Sprintf(sql, exchange)
+	_, err = o.Raw(sql).QueryRows(&dates)
+	return
+}
+
+// BaseFromTradeClassify 交易所分类表
+type BaseFromTradeClassify struct {
+	Id           uint64    `orm:"column(id);pk"`
+	ClassifyName string    //分类名称
+	ClassifyType string    //分类名称下的类型
+	Exchange     string    //交易所
+	CreateTime   time.Time //插入时间
+	ModifyTime   time.Time //修改时间
+}
+
+// GetAllBaseFromTradeClassify 获取所有的交易所分类列表
+func GetAllBaseFromTradeClassify() (list []BaseFromTradeClassify, err error) {
+	sql := `SELECT * FROM base_from_trade_classify   `
+
+	o := orm.NewOrmUsingDB("data")
+	_, err = o.Raw(sql).QueryRows(&list)
+
+	return
+}
+
+// MultiAddBaseFromTradeClassify 批量插入交易所分类
+func MultiAddBaseFromTradeClassify(items []*BaseFromTradeClassify) (err error) {
+	if len(items) == 0 {
+		return
+	}
+	o := orm.NewOrmUsingDB("data")
+	_, err = o.InsertMulti(len(items), items)
+	return
+}
+
+type TradeClassifyName struct {
+	ClassifyName string //分类名称
+	ClassifyType string //分类名称下的类型
+}
+
+// GetExchangeClassify 获取交易所分类列表
+func GetExchangeClassify(exchange string) (list []TradeClassifyName, err error) {
+	tableName := "base_from_trade_" + exchange + "_index"
+	orderStr := "classify_name DESC, classify_type asc"
+	if exchange == "zhengzhou" {
+		orderStr = "classify_name asc"
+	}
+	sql := `SELECT classify_name, classify_type FROM ` + tableName + ` WHERE rank <=20 and rank > 0 GROUP BY classify_name, classify_type  `
+	sql += ` ORDER BY ` + orderStr
+
+	o := orm.NewOrmUsingDB("data")
+	_, err = o.Raw(sql).QueryRows(&list)
+
+	return
+}

+ 643 - 0
services/data/trade_position_analysis.go

@@ -0,0 +1,643 @@
+package data
+
+import (
+	"context"
+	"fmt"
+	"hongze/hongze_task_trial/models/data_manage"
+	"hongze/hongze_task_trial/services/alarm_msg"
+	"hongze/hongze_task_trial/utils"
+	"sort"
+	"strconv"
+	"time"
+)
+
+// InitPositionTask 统计今日交易所的持仓分析数据
+func InitPositionTask(cont context.Context) (err error) {
+	exchanges := []string{"zhengzhou", "dalian", "shanghai", "cffex", "ine"} //郑商所,大商所,上期所,中金所,上期能源
+	for i := 1; i >= 0; i-- {
+		startDate := time.Now().AddDate(0, 0, -i).Format(utils.FormatDate)
+		endDate := startDate
+		for _, v := range exchanges {
+			exchange := v
+			err = nil
+			fmt.Println("InitPositionTask:	启动:" + exchange)
+			utils.FileLog.Info("InitPositionTask:	启动:" + exchange)
+
+			fmt.Println("开始" + startDate + "结束" + endDate)
+			utils.FileLog.Info(fmt.Sprintf("InitTradePosition:开始:%s; 结束:%s", startDate, endDate))
+			tErr, errMsg := InitTradePosition(exchange, startDate, endDate)
+			if tErr != nil {
+				err = tErr
+				fmt.Println("InitTradePosition: 操作失败:" + errMsg + tErr.Error())
+				utils.FileLog.Info(fmt.Sprintf("InitTradePosition: 操作失败:%s:%s", errMsg, tErr.Error()))
+				continue
+			}
+
+			fmt.Println("InitTradePosition:" + exchange + "已完成")
+			utils.FileLog.Info("InitTradePosition:" + exchange + "已完成")
+		}
+	}
+
+	// 处理交易所的分类
+	{
+		allBaseFromTradeClassify, tmpErr := data_manage.GetAllBaseFromTradeClassify()
+		if tmpErr != nil {
+			utils.FileLog.Info(fmt.Sprintf("获取所有交易所分类失败,;err:%s", tmpErr.Error()))
+			return
+		}
+		tradeClassifyMap := make(map[string]int)
+		for _, v := range allBaseFromTradeClassify {
+			key := fmt.Sprintf("%s_%s_%s", v.Exchange, v.ClassifyName, v.ClassifyType)
+			tradeClassifyMap[key] = 1
+		}
+
+		baseFromTradeClassifyList := make([]*data_manage.BaseFromTradeClassify, 0)
+		for _, v := range exchanges {
+			tradeClassifyNameList, tmpErr := data_manage.GetExchangeClassify(v)
+			if tmpErr != nil {
+				utils.FileLog.Info(fmt.Sprintf("获取%s分类失败,;err:%s", v, tmpErr.Error()))
+				continue
+			}
+
+			for _, classify := range tradeClassifyNameList {
+				key := fmt.Sprintf("%s_%s_%s", v, classify.ClassifyName, classify.ClassifyType)
+
+				if _, ok := tradeClassifyMap[key]; !ok {
+					baseFromTradeClassifyList = append(baseFromTradeClassifyList, &data_manage.BaseFromTradeClassify{
+						Id:           0,
+						ClassifyName: classify.ClassifyName,
+						ClassifyType: classify.ClassifyType,
+						Exchange:     v,
+						CreateTime:   time.Now(),
+						ModifyTime:   time.Now(),
+					})
+				}
+			}
+		}
+
+		lenAddList := len(baseFromTradeClassifyList)
+		if lenAddList > 0 {
+			baseAddNum := 500
+			num := lenAddList / baseAddNum
+			lastNum := lenAddList % baseAddNum
+			for i := 0; i <= num; i++ {
+				tmpNum := baseAddNum
+				if i == num && lastNum > 0 {
+					tmpNum = lastNum
+				}
+				data_manage.MultiAddBaseFromTradeClassify(baseFromTradeClassifyList[i*baseAddNum : (i*baseAddNum + tmpNum)])
+			}
+		}
+	}
+
+	return
+}
+
+func InitTradePosition(exchange, startDate, endDate string) (err error, errMsg string) {
+	defer func() {
+		if err != nil {
+			tips := fmt.Sprintf("统计今日交易所的持仓分析数据失败, Exchange: %s, Err: %s, Msg: %s", exchange, err.Error(), errMsg)
+			alarm_msg.SendAlarmMsg(tips, 3)
+		}
+	}()
+
+	// 批量插入今日的初始值
+	num, err := data_manage.GetTradePositionTopCountByExchangeDataTime(exchange, startDate, endDate)
+	if err != nil {
+		errMsg = "查询原始数据失败,GetTradePositionTopCountByExchangeDataTime() Err: "
+		return
+	}
+	if num > 0 {
+		//err = fmt.Errorf("数据已存在,无需处理")
+		return
+	}
+	err = data_manage.MultiInsertTradeBaseDataToTop(exchange, startDate, endDate)
+	if err != nil {
+		errMsg = "新增原始数据失败,MultiInsertTradeBaseDataToTop() Err: "
+		return
+	}
+	originList, err := data_manage.GetTradePositionTopByExchangeDataTime(exchange, startDate, endDate)
+	if err != nil {
+		errMsg = "查询原始数据失败, GetTradePositionTopByExchangeDataTime() Err: "
+		return
+	}
+	if len(originList) <= 0 {
+		// 忽略周末
+		w := time.Now().Weekday().String()
+		if w == "Saturday" || w == "Sunday" {
+			return
+		}
+		// 每天最后一个小时执行依旧无数据时, 才进行邮件提示
+		if time.Now().Hour() != 23 {
+			return
+		}
+		err = fmt.Errorf("原始数据没有值")
+		return
+	}
+
+	// 原始数据日期
+	dates, e := data_manage.GetTradePositionTopOriginDataTimes(exchange)
+	if e != nil {
+		err = fmt.Errorf("GetTradePositionTopOriginDataTimes err: %s", e.Error())
+		return
+	}
+
+	now := time.Now()
+	dataTimeMap := make(map[string]*data_manage.TradePositionTop)
+	onlyEmptyMap := make(map[string]bool)
+	onlyEmptyNameMap := make(map[string]*data_manage.TradePositionTop)
+	topLastMap := make(map[string]int)
+	topLastRankMap := make(map[string]int)
+	list := make([]*data_manage.TradePositionTop, 0)
+	for _, v := range originList {
+		tmp0, tmpErr := dealTradeOriginData(dataTimeMap, onlyEmptyMap, onlyEmptyNameMap, v, topLastMap, topLastRankMap, startDate, now, dates)
+		if tmpErr != nil {
+			err = tmpErr
+			errMsg = "处理原始数据失败 dealTradeOriginData() Err: "
+			return
+		}
+		if tmp0 != nil {
+			list = append(list, tmp0)
+		}
+		if len(list) >= 1000 {
+			err = data_manage.InsertMultiTradePositionTop(exchange, list)
+			if err != nil {
+				errMsg = "批量新增昨日数据失败,InsertMultiTradePositionTop() Err: "
+				return
+			}
+			list = make([]*data_manage.TradePositionTop, 0)
+		}
+	}
+	if len(list) > 0 {
+		err = data_manage.InsertMultiTradePositionTop(exchange, list)
+		if err != nil {
+			errMsg = "批量新增昨日数据失败,InsertMultiTradePositionTop() Err: "
+			return
+		}
+		list = make([]*data_manage.TradePositionTop, 0)
+	}
+	// 处理某个期货公司只有买单没有卖单,或者只有卖单没有买单的情况
+	for k, v := range onlyEmptyNameMap {
+		_, ok1 := onlyEmptyMap[k+"_1"]
+		_, ok2 := onlyEmptyMap[k+"_2"]
+		var dealType int
+		if ok1 && !ok2 {
+			dealType = 2 //只有买单没有卖单
+		} else if !ok1 && ok2 {
+			dealType = 1 //只有卖单没有买单的情况
+		} else {
+			continue
+		}
+		if dealType > 0 {
+			str := v.ClassifyName + "_" + v.ClassifyType + "_" + v.DataTime + "_" + strconv.Itoa(dealType)
+			dealValue := 0
+			if lastVal, ok := topLastMap[str]; ok {
+				dealValue = int(float64(lastVal)*0.7 + 0.5)
+			}
+
+			tmp := &data_manage.TradePositionTop{
+				ClassifyName:  v.ClassifyName,
+				ClassifyType:  v.ClassifyType,
+				DealShortName: v.DealShortName,
+				DataTime:      v.DataTime,
+				DealValue:     dealValue,
+				CreateTime:    now,
+				ModifyTime:    now,
+				DealType:      dealType,
+				SourceType:    2,
+			}
+			list = append(list, tmp)
+			if len(list) >= 1000 {
+				err = data_manage.InsertMultiTradePositionTop(exchange, list)
+				if err != nil {
+					errMsg = "批量新增前日数据失败,InsertMultiTradePositionTop() Err: "
+					return
+				}
+				list = make([]*data_manage.TradePositionTop, 0)
+			}
+		}
+	}
+
+	if len(list) > 0 {
+		err = data_manage.InsertMultiTradePositionTop(exchange, list)
+		if err != nil {
+			errMsg = "批量新增前日数据失败,InsertMultiTradePositionTop() Err: "
+			return
+		}
+	}
+
+	//生成净多单,净空单榜单
+	err = createAnalysisCleanTop(exchange, startDate, endDate)
+	if err != nil {
+		errMsg = "创建净多单,净空单数据失败,createAnalysisCleanTop() Err: "
+		return
+	}
+	// 特殊处理起始日期前一天的数据
+	err = DealYesterdayData(exchange, startDate)
+	if err != nil {
+		errMsg = "处理昨日数据失败,DealYesterdayData() Err: "
+		return
+	}
+	return
+}
+
+func dealTradeOriginData(dataTimeMap map[string]*data_manage.TradePositionTop, onlyEmptyMap map[string]bool, onlyEmptyNameMap map[string]*data_manage.TradePositionTop, currentItem *data_manage.TradePositionTop, topLastMap map[string]int, topLastRankMap map[string]int, startDate string, now time.Time, dates []string) (tmp0 *data_manage.TradePositionTop, err error) {
+	classifyName := currentItem.ClassifyName
+	classifyType := currentItem.ClassifyType
+	dealShortName := currentItem.DealShortName
+	dealValue := currentItem.DealValue
+	dealChange := currentItem.DealChange
+	dataTime := currentItem.DataTime
+	dealType := currentItem.DealType
+	dealTypeStr := strconv.Itoa(dealType)
+	dataTimeMap[classifyName+"_"+classifyType+"_"+dealTypeStr+"_"+dealShortName+"_"+dataTime] = currentItem
+	onlyEmptyMap[classifyName+"_"+classifyType+"_"+dataTime+"_"+dealShortName+"_"+dealTypeStr] = true
+	onlyEmptyNameMap[classifyName+"_"+classifyType+"_"+dataTime+"_"+dealShortName] = currentItem
+	if currentItem.Rank > topLastRankMap[classifyName+"_"+classifyType+"_"+dataTime+"_"+dealTypeStr] {
+		topLastMap[classifyName+"_"+classifyType+"_"+dataTime+"_"+dealTypeStr] = dealValue
+		topLastRankMap[classifyName+"_"+classifyType+"_"+dataTime+"_"+dealTypeStr] = currentItem.Rank
+	}
+	if dataTime > startDate {
+		//tmpTimeStr, tErr := getYesterdayDate(dataTime)
+		//if tErr != nil {
+		//	err = tErr
+		//	return
+		//}
+		tmpTimeStr := getPrevTradeDataDate(dataTime, dates)
+		if tmpTimeStr < startDate {
+			return
+		}
+		// 判断T-1日是否有值, 如果T-1日为空,则根据T日的值计算出T-1的值
+		if _, ok := dataTimeMap[classifyName+"_"+classifyType+"_"+dealTypeStr+"_"+dealShortName+"_"+tmpTimeStr]; !ok {
+			yesterdayVal := dealValue - dealChange
+			yesterdayChange := 0
+			//beforeYesterday, _ := getYesterdayDate(tmpTimeStr)
+			beforeYesterday := getPrevTradeDataDate(tmpTimeStr, dates)
+			beforeYesterdayItem, ok1 := dataTimeMap[classifyName+"_"+classifyType+"_"+dealTypeStr+"_"+dealShortName+"_"+beforeYesterday]
+			if ok1 {
+				yesterdayChange = yesterdayVal - beforeYesterdayItem.DealValue
+			}
+			tmp0 = &data_manage.TradePositionTop{
+				ClassifyName:  classifyName,
+				ClassifyType:  classifyType,
+				DealShortName: dealShortName,
+				DealValue:     yesterdayVal,
+				DealChange:    yesterdayChange,
+				DataTime:      tmpTimeStr,
+				CreateTime:    now,
+				ModifyTime:    now,
+				DealType:      dealType,
+				SourceType:    1,
+			}
+			dataTimeMap[classifyName+"_"+classifyType+"_"+dealTypeStr+"_"+dealShortName+"_"+tmpTimeStr] = tmp0
+			onlyEmptyMap[classifyName+"_"+classifyType+"_"+tmpTimeStr+"_"+dealShortName+"_"+dealTypeStr] = true
+			onlyEmptyNameMap[classifyName+"_"+classifyType+"_"+tmpTimeStr+"_"+dealShortName] = tmp0
+		}
+	}
+	return
+}
+
+// DealYesterdayData 更新昨日数据
+func DealYesterdayData(exchange, startDate string) (err error) {
+	// 查询最早的日期
+	firstItem, err := data_manage.GetFirstBaseFromTradeIndexByDate(exchange)
+	if err != nil {
+		return
+	}
+	if startDate == firstItem.DataTime { //如果当前是起始日,则无需统计修改前一天的数据
+		return
+	}
+
+	// 前一个交易日, 前两个交易日
+	dates, e := data_manage.GetTradePositionTopOriginDataTimes(exchange)
+	if e != nil {
+		err = fmt.Errorf("GetTradePositionTopOriginDataTimes err: %s", e.Error())
+		return
+	}
+	yesterdayStr := getPrevTradeDataDate(startDate, dates)
+	beforeYesterdayStr := getPrevTradeDataDate(yesterdayStr, dates)
+
+	//yesterdayStr, err := getYesterdayDate(startDate)
+	//if err != nil {
+	//	return
+	//}
+	////查找前日的值,并更新对应的更改
+	//beforeYesterdayStr, err := getYesterdayDate(yesterdayStr)
+	//if err != nil {
+	//	return
+	//}
+	// 先查出T日最原始的数据
+	originList, err := data_manage.GetTradePositionTopByExchangeDataTime(exchange, startDate, startDate)
+	if err != nil {
+		return
+	}
+	originBuyMap := make(map[string]*data_manage.TradePositionTop)
+	originSoldMap := make(map[string]*data_manage.TradePositionTop)
+	for _, v := range originList {
+		if v.SourceType != 0 {
+			continue
+		}
+		str := v.ClassifyName + "_" + v.ClassifyType + "_" + v.DealShortName
+		if v.DealType == 1 {
+			originBuyMap[str] = v
+		} else if v.DealType == 2 {
+			originSoldMap[str] = v
+		}
+	}
+
+	// 然后查询T-1中数据来源类型是2的数据
+	changeList, err := data_manage.GetTradePositionTopByExchangeSourceType(exchange, yesterdayStr, 2)
+	if err != nil {
+		return
+	}
+	if len(changeList) <= 0 {
+		//err = fmt.Errorf("前天的数据无需修改")
+		return
+	}
+	// 查询出前日的成交量
+	beforeYesterdayList, err := data_manage.GetTradePositionTopByExchangeDataTime(exchange, beforeYesterdayStr, beforeYesterdayStr)
+	if err != nil {
+		return
+	}
+	beforeYesterdayMap1 := make(map[string]int)
+	beforeYesterdayMap2 := make(map[string]int)
+	if len(beforeYesterdayList) > 0 {
+		for _, v := range beforeYesterdayList {
+			if v.SourceType == 2 {
+				continue
+			}
+			str := v.ClassifyName + "_" + v.ClassifyType + "_" + v.DealShortName
+			if v.DealType == 1 {
+				beforeYesterdayMap1[str] = v.DealValue
+			} else if v.DealType == 2 {
+				beforeYesterdayMap2[str] = v.DealValue
+			}
+		}
+	}
+	// 根据原始数据中的值推算出最新的值
+	now := time.Now()
+	// 批量更新到分析表中,
+	var updateAnalysisData []data_manage.UpdateDealValueChange
+	for _, v := range changeList {
+		str := v.ClassifyName + "_" + v.ClassifyType + "_" + v.DealShortName
+		dealValue := 0
+		dealChange := 0
+		if v.DealType == 1 {
+			if n, ok := originBuyMap[str]; ok {
+				dealValue = n.DealValue - n.DealChange
+				if beforeVal, ok1 := beforeYesterdayMap1[str]; ok1 {
+					dealChange = dealValue - beforeVal
+				}
+				tmp := data_manage.UpdateDealValueChange{
+					Id:         v.Id,
+					DealValue:  dealValue,
+					DealChange: dealChange,
+					SourceType: 1,
+					ModifyTime: now,
+				}
+				updateAnalysisData = append(updateAnalysisData, tmp)
+			}
+		} else if v.DealType == 2 {
+			if n, ok := originSoldMap[str]; ok {
+				dealValue = n.DealValue - n.DealChange
+				if beforeVal, ok1 := beforeYesterdayMap2[str]; ok1 {
+					dealChange = dealValue - beforeVal
+				}
+				tmp := data_manage.UpdateDealValueChange{
+					Id:         v.Id,
+					DealValue:  dealValue,
+					DealChange: dealChange,
+					SourceType: 1,
+					ModifyTime: now,
+				}
+				updateAnalysisData = append(updateAnalysisData, tmp)
+			}
+		}
+	}
+	if len(updateAnalysisData) > 0 {
+		err = data_manage.MultiUpdatePositionTop(exchange, updateAnalysisData)
+		if err != nil {
+			return
+		}
+		//删除T-1日净多单和净空单的榜单
+		err = data_manage.DeletePositionTopByDataTime(exchange, yesterdayStr, 3)
+		if err != nil {
+			return
+		}
+
+		err = data_manage.DeletePositionTopByDataTime(exchange, yesterdayStr, 4)
+		if err != nil {
+			return
+		}
+
+		//重新生成净多单和净空单的榜单
+		err = createAnalysisCleanTop(exchange, yesterdayStr, yesterdayStr)
+		if err != nil {
+			return
+		}
+	}
+
+	return
+}
+
+// createAnalysisCleanTop 生成净多单,净空单榜单
+func createAnalysisCleanTop(exchange, startDate, endDate string) (err error) {
+	defer func() {
+		if err != nil {
+			fmt.Println("createAnalysisCleanTop err: " + err.Error())
+		}
+	}()
+
+	topList := make([]*data_manage.TradePositionTop, 0)
+	now := time.Now()
+	var subDataList data_manage.TradePositionSubList
+
+	subChangeMap1 := make(map[string]int) //净多单map
+	subChangeMap2 := make(map[string]int) //净空单map
+
+	// 2023-05-10 此处取前一个交易日, 不一定是昨日
+	dates, e := data_manage.GetTradePositionTopOriginDataTimes(exchange)
+	if e != nil {
+		err = fmt.Errorf("GetTradePositionTopOriginDataTimes err: %s", e.Error())
+		return
+	}
+	yesterday := getPrevTradeDataDate(startDate, dates)
+
+	//查询所有差值数据,
+	//yesterday, err := getYesterdayDate(startDate)
+	//if err != nil {
+	//	return
+	//}
+
+	// 上一个交易日的净多单
+	yesterdayTopList1, tErr := data_manage.GetTradePositionTopByExchangeDataTimeType(exchange, yesterday, 3)
+	if tErr != nil {
+		err = tErr
+		return
+	}
+	if len(yesterdayTopList1) > 0 {
+		for _, v := range yesterdayTopList1 {
+			nameStr := v.ClassifyName + "_" + v.ClassifyType + "_" + v.DataTime + "_" + v.DealShortName
+			subChangeMap1[nameStr] = v.DealValue
+		}
+	}
+
+	// 上一个交易日的净空单
+	yesterdayTopList2, tErr := data_manage.GetTradePositionTopByExchangeDataTimeType(exchange, yesterday, 4)
+	if tErr != nil {
+		err = tErr
+		return
+	}
+	if len(yesterdayTopList2) > 0 {
+		for _, v := range yesterdayTopList2 {
+			nameStr := v.ClassifyName + "_" + v.ClassifyType + "_" + v.DataTime + "_" + v.DealShortName
+			subChangeMap2[nameStr] = v.DealValue
+		}
+	}
+
+	// 根据当日多单/空单数据, 生成净多单/净空单数据
+	originDataList, err := data_manage.GetTradePositionTopByExchangeDataTime(exchange, startDate, endDate)
+	if err != nil {
+		return
+	}
+	buyDataMap := make(map[string]int)
+	for _, v := range originDataList {
+		str := v.ClassifyName + "_" + v.ClassifyType + "_" + v.DataTime + "_" + v.DealShortName
+		if v.DealType == 1 {
+			buyDataMap[str] = v.DealValue
+		} else if v.DealType == 2 {
+			subValue := 0
+			dealType := 0
+			if buy, ok := buyDataMap[str]; ok {
+				subValue = buy - v.DealValue
+				if subValue >= 0 {
+					dealType = 3
+				} else {
+					subValue = -subValue
+					dealType = 4
+				}
+			}
+			tmp := &data_manage.TradePositionSub{
+				ClassifyName:  v.ClassifyName,
+				ClassifyType:  v.ClassifyType,
+				DataTime:      v.DataTime,
+				DealShortName: v.DealShortName,
+				SubValue:      subValue,
+				DealType:      dealType,
+			}
+			subDataList = append(subDataList, tmp)
+		}
+	}
+	if len(subDataList) > 0 {
+		sort.Sort(subDataList)
+	}
+
+	// 根据净多单/净空单数据, 比对上一个交易日的日期计算成交变化量, 并写入
+	var dealType int
+	rankMap := make(map[string]int)
+	for _, v := range subDataList {
+		subValue := v.SubValue
+		nameStr := v.ClassifyName + "_" + v.ClassifyType + "_" + v.DataTime + "_" + v.DealShortName
+		if v.DealType == 3 {
+			subChangeMap1[nameStr] = subValue
+			dealType = 3
+			if _, ok := rankMap[v.ClassifyName+"_"+v.ClassifyType+"_"+v.DataTime+"_3"]; !ok {
+				rankMap[v.ClassifyName+"_"+v.ClassifyType+"_"+v.DataTime+"_3"] = 1
+			} else {
+				rankMap[v.ClassifyName+"_"+v.ClassifyType+"_"+v.DataTime+"_3"]++
+			}
+		} else if v.DealType == 4 {
+			subChangeMap2[nameStr] = subValue
+			dealType = 4
+			if _, ok := rankMap[v.ClassifyName+"_"+v.ClassifyType+"_"+v.DataTime+"_4"]; !ok {
+				rankMap[v.ClassifyName+"_"+v.ClassifyType+"_"+v.DataTime+"_4"] = 1
+			} else {
+				rankMap[v.ClassifyName+"_"+v.ClassifyType+"_"+v.DataTime+"_4"]++
+			}
+		}
+
+		// 2023-05-10 目前看该方法的引用startDate和endDate其实是同一天, 所以前一个交易日直接用上面的yesterday
+		tmpTimeStr := yesterday
+
+		//和T-1日比较差值
+		//var tmpTimeStr string
+		//tmpTimeStr, err = getYesterdayDate(v.DataTime)
+		//if err != nil {
+		//	return
+		//}
+		yesterdayStr := v.ClassifyName + "_" + v.ClassifyType + "_" + tmpTimeStr + "_" + v.DealShortName
+		dealChange := 0
+		if dealType == 3 {
+			if c, ok := subChangeMap1[yesterdayStr]; ok {
+				dealChange = subValue - c
+			}
+		} else if dealType == 4 {
+			if c, ok := subChangeMap2[yesterdayStr]; ok {
+				dealChange = subValue - c
+			}
+		}
+		tmp := &data_manage.TradePositionTop{
+			ClassifyName:  v.ClassifyName,
+			ClassifyType:  v.ClassifyType,
+			DataTime:      v.DataTime,
+			CreateTime:    now,
+			ModifyTime:    now,
+			DealShortName: v.DealShortName,
+			DealValue:     subValue,
+			DealChange:    dealChange,
+			DealType:      dealType,
+			Rank:          rankMap[v.ClassifyName+"_"+v.ClassifyType+"_"+v.DataTime+"_"+strconv.Itoa(dealType)],
+		}
+		topList = append(topList, tmp)
+		if len(topList) >= 1000 {
+			err = data_manage.InsertMultiTradePositionTop(exchange, topList)
+			if err != nil {
+				return
+			}
+			topList = make([]*data_manage.TradePositionTop, 0)
+		}
+	}
+
+	if len(topList) >= 0 {
+		err = data_manage.InsertMultiTradePositionTop(exchange, topList)
+		if err != nil {
+			return
+		}
+	}
+	return
+}
+
+func getYesterdayDate(today string) (yesterday string, err error) {
+	i := 1
+	tmpTime, err := time.ParseInLocation(utils.FormatDate, today, time.Local)
+	if err != nil {
+		return
+	}
+	tmpTimeDate := tmpTime.AddDate(0, 0, -i)
+	weekStr := tmpTimeDate.Weekday().String()
+	if weekStr == "Sunday" {
+		i += 2
+	} else if weekStr == "Saturday" {
+		i += 1
+	}
+	tmpTimeDate = tmpTime.AddDate(0, 0, -i)
+	yesterday = tmpTimeDate.Format(utils.FormatDate)
+	return
+}
+
+// getPrevTradeDataDate 获取指定日期上一个交易日日期
+func getPrevTradeDataDate(date string, dates []string) string {
+	pre := -1
+	for k, v := range dates {
+		n := k - 1
+		if v == date && n >= 0 {
+			pre = n
+			break
+		}
+	}
+	// 找不到就随便给个不存在日期
+	if pre == -1 {
+		return "1980-01-01"
+	}
+	return dates[pre]
+}

+ 4 - 0
services/task.go

@@ -26,6 +26,10 @@ func releaseTask() {
 	syncHzDataIndex := task.NewTask("syncHzDataIndex", "0 30 0,18,21 * * *", SyncHzDataIndex)
 	task.AddTask("syncHzDataIndex", syncHzDataIndex)
 
+	// 定时统计交易所的持仓分析数据
+	initPositionTask := task.NewTask("initPositionTask", "0 0 1,19,22 * * *", data.InitPositionTask)
+	task.AddTask("initPositionTask", initPositionTask)
+
 	//刷新指标数据
 	refreshData := task.NewTask("refreshData", "0 30 0,19 * * *", RefreshData)
 	task.AddTask("refreshData", refreshData)