hsun há 1 ano atrás
pai
commit
815c4ef066

+ 2 - 2
main.go

@@ -19,8 +19,8 @@ func main() {
 		web.BConfig.WebConfig.DirectoryIndex = true
 		web.BConfig.WebConfig.StaticDir["/swagger"] = "swagger"
 	}
-	go services.Task()
-	//services.TaskTest()
+	//go services.Task()
+	services.TaskTest()
 	web.BConfig.RecoverFunc = Recover
 	web.Run()
 }

+ 22 - 12
models/data_manage/trade_position_analysis.go

@@ -1,6 +1,7 @@
 package data_manage
 
 import (
+	"fmt"
 	"github.com/beego/beego/v2/client/orm"
 	"hongze/hongze_task/utils"
 	"time"
@@ -90,27 +91,27 @@ func InsertMultiTradePositionTop(exchange string, items []*TradePositionTop) (er
 	return
 }
 
-
 func GetTradePositionTopByExchangeDataTime(exchange string, startDate, endDate string) (list []*TradePositionTop, err error) {
 	o := orm.NewOrmUsingDB("data")
-	sql := "SELECT * FROM trade_position_"+exchange+"_top where data_time >= ? and data_time <= ? and deal_type in (1,2) ORDER BY classify_name, classify_type, deal_type, data_time, deal_value desc"
+	sql := "SELECT * FROM trade_position_" + exchange + "_top where data_time >= ? and data_time <= ? and deal_type in (1,2) ORDER BY classify_name, classify_type, deal_type, data_time, deal_value desc"
 	_, err = o.Raw(sql, startDate, endDate).QueryRows(&list)
 	return
 }
 
 func GetTradePositionTopCountByExchangeDataTime(exchange string, startDate, endDate string) (count int64, err error) {
 	o := orm.NewOrmUsingDB("data")
-	sql := "SELECT count(*) FROM trade_position_"+exchange+"_top where data_time >= ? and data_time <= ? and deal_type in (1,2) ORDER BY classify_name, classify_type, deal_type, data_time, deal_value desc"
+	sql := "SELECT count(*) FROM trade_position_" + exchange + "_top where data_time >= ? and data_time <= ? and deal_type in (1,2) ORDER BY classify_name, classify_type, deal_type, data_time, deal_value desc"
 	err = o.Raw(sql, startDate, endDate).QueryRow(&count)
 	return
 }
 
 func GetTradePositionTopByExchangeSourceType(exchange string, dataTime string, sourceType int) (list []*TradePositionTop, err error) {
 	o := orm.NewOrmUsingDB("data")
-	sql := "SELECT * FROM trade_position_"+exchange+"_top where data_time= ? and source_type = ? ORDER BY classify_name, classify_type, deal_type, deal_value desc"
+	sql := "SELECT * FROM trade_position_" + exchange + "_top where data_time= ? and source_type = ? ORDER BY classify_name, classify_type, deal_type, deal_value desc"
 	_, err = o.Raw(sql, dataTime, sourceType).QueryRows(&list)
 	return
 }
+
 type TradeTopClassify struct {
 	ClassifyName string //分类名称
 	ClassifyType string //分类名称下的类型
@@ -149,7 +150,7 @@ type UpdateDealValueChange struct {
 
 func MultiUpdatePositionTop(exchange string, updates []UpdateDealValueChange) (err error) {
 	o := orm.NewOrmUsingDB("data")
-	p, err := o.Raw("UPDATE trade_position_"+exchange+"_top SET deal_value=?, deal_change=?, source_type=?, modify_time=? WHERE id = ?").Prepare()
+	p, err := o.Raw("UPDATE trade_position_" + exchange + "_top SET deal_value=?, deal_change=?, source_type=?, modify_time=? WHERE id = ?").Prepare()
 	if err != nil {
 		return
 	}
@@ -167,29 +168,38 @@ func MultiUpdatePositionTop(exchange string, updates []UpdateDealValueChange) (e
 
 func DeletePositionTopByDataTime(exchange string, dataTime string, dealType int) (err error) {
 	o := orm.NewOrmUsingDB("data")
-	sql := "delete from trade_position_"+exchange+"_Top WHERE data_time=? and deal_type=?"
+	sql := "delete from trade_position_" + exchange + "_Top WHERE data_time=? and deal_type=?"
 	_, err = o.Raw(sql, dataTime, dealType).Exec()
 	return
 }
 
 func GetTradePositionTopByExchangeDataTimeType(exchange string, dataTime string, dealType int) (list []TradePositionTop, err error) {
 	o := orm.NewOrmUsingDB("data")
-	sql := "select * from trade_position_"+exchange+"_Top WHERE data_time=? and deal_type=?"
-	_, err = o.Raw(sql,dataTime, dealType).QueryRows(&list)
+	sql := "select * from trade_position_" + exchange + "_Top WHERE data_time=? and deal_type=?"
+	_, err = o.Raw(sql, dataTime, dealType).QueryRows(&list)
 	return
 }
 
 func MultiInsertTradeBaseDataToTop(exchange string, startDate, endDate string) (err error) {
 	o := orm.NewOrmUsingDB("data")
 	now := time.Now().Format(utils.FormatDateTime)
-	sql1 := `INSERT INTO trade_position_`+exchange+`_top(classify_name,classify_type,deal_short_name,deal_value,deal_change,data_time,deal_type,source_type,rank,create_time,modify_time)
-	SELECT classify_name,classify_type,buy_short_name,buy_value,buy_change,data_time,1,0,rank,?,? FROM base_from_trade_`+exchange+`_index where rank <50 and buy_short_name !="" and data_time between ? and ?`
+	sql1 := `INSERT INTO trade_position_` + exchange + `_top(classify_name,classify_type,deal_short_name,deal_value,deal_change,data_time,deal_type,source_type,rank,create_time,modify_time)
+	SELECT classify_name,classify_type,buy_short_name,buy_value,buy_change,data_time,1,0,rank,?,? FROM base_from_trade_` + exchange + `_index where rank <50 and buy_short_name !="" and data_time between ? and ?`
 	_, err = o.Raw(sql1, now, now, startDate, endDate).Exec()
 	if err != nil {
 		return
 	}
-	sql2 := `INSERT INTO trade_position_`+exchange+`_top(classify_name,classify_type,deal_short_name,deal_value,deal_change,data_time,deal_type,source_type,rank,create_time,modify_time)
-SELECT classify_name,classify_type,sold_short_name,sold_value,sold_change,data_time,2,0,rank,?,? FROM base_from_trade_`+exchange+`_index where rank <50 and sold_short_name !="" and data_time between ? and ?`
+	sql2 := `INSERT INTO trade_position_` + exchange + `_top(classify_name,classify_type,deal_short_name,deal_value,deal_change,data_time,deal_type,source_type,rank,create_time,modify_time)
+SELECT classify_name,classify_type,sold_short_name,sold_value,sold_change,data_time,2,0,rank,?,? FROM base_from_trade_` + exchange + `_index where rank <50 and sold_short_name !="" and data_time between ? and ?`
 	_, err = o.Raw(sql2, now, now, startDate, endDate).Exec()
 	return
 }
+
+// GetTradePositionTopOriginDataTimes 获取榜单原始数据日期-正序
+func GetTradePositionTopOriginDataTimes(exchange string) (dates []string, err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := `SELECT DISTINCT data_time FROM base_from_trade_%s_index ORDER BY data_time ASC`
+	sql = fmt.Sprintf(sql, exchange)
+	_, err = o.Raw(sql).QueryRows(&dates)
+	return
+}

+ 91 - 31
services/data/trade_position_analysis.go

@@ -12,27 +12,27 @@ import (
 
 // InitPositionTask 统计今日交易所的持仓分析数据
 func InitPositionTask(cont context.Context) (err error) {
-	exchanges := []string{"zhengzhou","dalian","shanghai","cffex","ine"} //郑商所,大商所,上期所,中金所,上期能源
+	exchanges := []string{"zhengzhou", "dalian", "shanghai", "cffex", "ine"} //郑商所,大商所,上期所,中金所,上期能源
 	startDate := time.Now().Format(utils.FormatDate)
 	endDate := startDate
 	for _, v := range exchanges {
 		exchange := v
 		err = nil
-		fmt.Println("InitPositionTask:	启动:"+exchange)
-		utils.FileLog.Info("InitPositionTask:	启动:"+exchange)
+		fmt.Println("InitPositionTask:	启动:" + exchange)
+		utils.FileLog.Info("InitPositionTask:	启动:" + exchange)
 
-		fmt.Println("开始"+startDate+"结束"+endDate)
+		fmt.Println("开始" + startDate + "结束" + endDate)
 		utils.FileLog.Info(fmt.Sprintf("InitTradePosition:开始:%s; 结束:%s", startDate, endDate))
 		tErr, errMsg := InitTradePosition(exchange, startDate, endDate)
 		if tErr != nil {
 			err = tErr
-			fmt.Println("InitTradePosition: 操作失败:"+errMsg+tErr.Error())
+			fmt.Println("InitTradePosition: 操作失败:" + errMsg + tErr.Error())
 			utils.FileLog.Info(fmt.Sprintf("InitTradePosition: 操作失败:%s:%s", errMsg, tErr.Error()))
 			continue
 		}
 
-		fmt.Println("InitTradePosition:"+exchange+"已完成")
-		utils.FileLog.Info("InitTradePosition:"+exchange+"已完成")
+		fmt.Println("InitTradePosition:" + exchange + "已完成")
+		utils.FileLog.Info("InitTradePosition:" + exchange + "已完成")
 	}
 	return
 }
@@ -63,6 +63,13 @@ func InitTradePosition(exchange, startDate, endDate string) (err error, errMsg s
 		return
 	}
 
+	// 原始数据日期
+	dates, e := data_manage.GetTradePositionTopOriginDataTimes(exchange)
+	if e != nil {
+		err = fmt.Errorf("GetTradePositionTopOriginDataTimes err: %s", e.Error())
+		return
+	}
+
 	now := time.Now()
 	dataTimeMap := make(map[string]*data_manage.TradePositionTop)
 	onlyEmptyMap := make(map[string]bool)
@@ -71,7 +78,7 @@ func InitTradePosition(exchange, startDate, endDate string) (err error, errMsg s
 	topLastRankMap := make(map[string]int)
 	list := make([]*data_manage.TradePositionTop, 0)
 	for _, v := range originList {
-		tmp0, tmpErr := dealTradeOriginData(dataTimeMap, onlyEmptyMap, onlyEmptyNameMap, v, topLastMap, topLastRankMap, startDate, now)
+		tmp0, tmpErr := dealTradeOriginData(dataTimeMap, onlyEmptyMap, onlyEmptyNameMap, v, topLastMap, topLastRankMap, startDate, now, dates)
 		if tmpErr != nil {
 			err = tmpErr
 			errMsg = "处理原始数据失败 dealTradeOriginData() Err: "
@@ -162,7 +169,7 @@ func InitTradePosition(exchange, startDate, endDate string) (err error, errMsg s
 	return
 }
 
-func dealTradeOriginData(dataTimeMap map[string]*data_manage.TradePositionTop, onlyEmptyMap map[string]bool, onlyEmptyNameMap map[string]*data_manage.TradePositionTop, currentItem *data_manage.TradePositionTop, topLastMap map[string]int, topLastRankMap map[string]int, startDate string, now time.Time) (tmp0 *data_manage.TradePositionTop, err error) {
+func dealTradeOriginData(dataTimeMap map[string]*data_manage.TradePositionTop, onlyEmptyMap map[string]bool, onlyEmptyNameMap map[string]*data_manage.TradePositionTop, currentItem *data_manage.TradePositionTop, topLastMap map[string]int, topLastRankMap map[string]int, startDate string, now time.Time, dates []string) (tmp0 *data_manage.TradePositionTop, err error) {
 	classifyName := currentItem.ClassifyName
 	classifyType := currentItem.ClassifyType
 	dealShortName := currentItem.DealShortName
@@ -179,11 +186,12 @@ func dealTradeOriginData(dataTimeMap map[string]*data_manage.TradePositionTop, o
 		topLastRankMap[classifyName+"_"+classifyType+"_"+dataTime+"_"+dealTypeStr] = currentItem.Rank
 	}
 	if dataTime > startDate {
-		tmpTimeStr, tErr := getYesterdayDate(dataTime)
-		if tErr != nil {
-			err = tErr
-			return
-		}
+		//tmpTimeStr, tErr := getYesterdayDate(dataTime)
+		//if tErr != nil {
+		//	err = tErr
+		//	return
+		//}
+		tmpTimeStr := getPrevTradeDataDate(dataTime, dates)
 		if tmpTimeStr < startDate {
 			return
 		}
@@ -191,7 +199,8 @@ func dealTradeOriginData(dataTimeMap map[string]*data_manage.TradePositionTop, o
 		if _, ok := dataTimeMap[classifyName+"_"+classifyType+"_"+dealTypeStr+"_"+dealShortName+"_"+tmpTimeStr]; !ok {
 			yesterdayVal := dealValue - dealChange
 			yesterdayChange := 0
-			beforeYesterday, _ := getYesterdayDate(tmpTimeStr)
+			//beforeYesterday, _ := getYesterdayDate(tmpTimeStr)
+			beforeYesterday := getPrevTradeDataDate(tmpTimeStr, dates)
 			beforeYesterdayItem, ok1 := dataTimeMap[classifyName+"_"+classifyType+"_"+dealTypeStr+"_"+dealShortName+"_"+beforeYesterday]
 			if ok1 {
 				yesterdayChange = yesterdayVal - beforeYesterdayItem.DealValue
@@ -216,7 +225,7 @@ func dealTradeOriginData(dataTimeMap map[string]*data_manage.TradePositionTop, o
 	return
 }
 
-// 更新昨日数据
+// DealYesterdayData 更新昨日数据
 func DealYesterdayData(exchange, startDate string) (err error) {
 	// 查询最早的日期
 	firstItem, err := data_manage.GetFirstBaseFromTradeIndexByDate(exchange)
@@ -227,15 +236,24 @@ func DealYesterdayData(exchange, startDate string) (err error) {
 		return
 	}
 
-	yesterdayStr, err := getYesterdayDate(startDate)
-	if err != nil {
-		return
-	}
-	//查找前日的值,并更新对应的更改
-	beforeYesterdayStr, err := getYesterdayDate(yesterdayStr)
-	if err != nil {
+	// 前一个交易日, 前两个交易日
+	dates, e := data_manage.GetTradePositionTopOriginDataTimes(exchange)
+	if e != nil {
+		err = fmt.Errorf("GetTradePositionTopOriginDataTimes err: %s", e.Error())
 		return
 	}
+	yesterdayStr := getPrevTradeDataDate(startDate, dates)
+	beforeYesterdayStr := getPrevTradeDataDate(yesterdayStr, dates)
+
+	//yesterdayStr, err := getYesterdayDate(startDate)
+	//if err != nil {
+	//	return
+	//}
+	////查找前日的值,并更新对应的更改
+	//beforeYesterdayStr, err := getYesterdayDate(yesterdayStr)
+	//if err != nil {
+	//	return
+	//}
 	// 先查出T日最原始的数据
 	originList, err := data_manage.GetTradePositionTopByExchangeDataTime(exchange, startDate, startDate)
 	if err != nil {
@@ -352,17 +370,34 @@ func DealYesterdayData(exchange, startDate string) (err error) {
 
 // createAnalysisCleanTop 生成净多单,净空单榜单
 func createAnalysisCleanTop(exchange, startDate, endDate string) (err error) {
+	defer func() {
+		if err != nil {
+			fmt.Println("createAnalysisCleanTop err: " + err.Error())
+		}
+	}()
+
 	topList := make([]*data_manage.TradePositionTop, 0)
 	now := time.Now()
 	var subDataList data_manage.TradePositionSubList
 
 	subChangeMap1 := make(map[string]int) //净多单map
 	subChangeMap2 := make(map[string]int) //净空单map
-	//查询所有差值数据,
-	yesterday, err := getYesterdayDate(startDate)
-	if err != nil {
+
+	// 2023-05-10 此处取前一个交易日, 不一定是昨日
+	dates, e := data_manage.GetTradePositionTopOriginDataTimes(exchange)
+	if e != nil {
+		err = fmt.Errorf("GetTradePositionTopOriginDataTimes err: %s", e.Error())
 		return
 	}
+	yesterday := getPrevTradeDataDate(startDate, dates)
+
+	//查询所有差值数据,
+	//yesterday, err := getYesterdayDate(startDate)
+	//if err != nil {
+	//	return
+	//}
+
+	// 上一个交易日的净多单
 	yesterdayTopList1, tErr := data_manage.GetTradePositionTopByExchangeDataTimeType(exchange, yesterday, 3)
 	if tErr != nil {
 		err = tErr
@@ -375,6 +410,7 @@ func createAnalysisCleanTop(exchange, startDate, endDate string) (err error) {
 		}
 	}
 
+	// 上一个交易日的净空单
 	yesterdayTopList2, tErr := data_manage.GetTradePositionTopByExchangeDataTimeType(exchange, yesterday, 4)
 	if tErr != nil {
 		err = tErr
@@ -387,6 +423,7 @@ func createAnalysisCleanTop(exchange, startDate, endDate string) (err error) {
 		}
 	}
 
+	// 根据当日多单/空单数据, 生成净多单/净空单数据
 	originDataList, err := data_manage.GetTradePositionTopByExchangeDataTime(exchange, startDate, endDate)
 	if err != nil {
 		return
@@ -422,6 +459,8 @@ func createAnalysisCleanTop(exchange, startDate, endDate string) (err error) {
 	if len(subDataList) > 0 {
 		sort.Sort(subDataList)
 	}
+
+	// 根据净多单/净空单数据, 比对上一个交易日的日期计算成交变化量, 并写入
 	var dealType int
 	rankMap := make(map[string]int)
 	for _, v := range subDataList {
@@ -444,12 +483,16 @@ func createAnalysisCleanTop(exchange, startDate, endDate string) (err error) {
 				rankMap[v.ClassifyName+"_"+v.ClassifyType+"_"+v.DataTime+"_4"]++
 			}
 		}
+
+		// 2023-05-10 目前看该方法的引用startDate和endDate其实是同一天, 所以前一个交易日直接用上面的yesterday
+		tmpTimeStr := yesterday
+
 		//和T-1日比较差值
-		var tmpTimeStr string
-		tmpTimeStr, err = getYesterdayDate(v.DataTime)
-		if err != nil {
-			return
-		}
+		//var tmpTimeStr string
+		//tmpTimeStr, err = getYesterdayDate(v.DataTime)
+		//if err != nil {
+		//	return
+		//}
 		yesterdayStr := v.ClassifyName + "_" + v.ClassifyType + "_" + tmpTimeStr + "_" + v.DealShortName
 		dealChange := 0
 		if dealType == 3 {
@@ -509,3 +552,20 @@ func getYesterdayDate(today string) (yesterday string, err error) {
 	yesterday = tmpTimeDate.Format(utils.FormatDate)
 	return
 }
+
+// getPrevTradeDataDate 获取指定日期上一个交易日日期
+func getPrevTradeDataDate(date string, dates []string) string {
+	pre := -1
+	for k, v := range dates {
+		n := k - 1
+		if v == date && n >= 0 {
+			pre = n
+			break
+		}
+	}
+	// 找不到就随便给个不存在日期
+	if pre == -1 {
+		return "1980-01-01"
+	}
+	return dates[pre]
+}

+ 11 - 10
services/task.go

@@ -213,12 +213,12 @@ func releaseTask() {
 	task.AddTask("resetEdbInfoIsUpdate", resetEdbInfoIsUpdate)
 
 	// 定时检测同花顺客群推送消息状态
-	checkThsReportList := task.NewTask("checkThsReportList", "0 */2 * * * * ", CheckThsReportList)
-	task.AddTask("checkThsReportList", checkThsReportList)
+	//checkThsReportList := task.NewTask("checkThsReportList", "0 */2 * * * * ", CheckThsReportList)
+	//task.AddTask("checkThsReportList", checkThsReportList)
 
 	// 定时统计交易所的持仓分析数据
-	InitPositionTask := task.NewTask("checkThsReportList", "0 30 16-20 * * *", data.InitPositionTask)
-	task.AddTask("checkThsReportList", InitPositionTask)
+	initPositionTask := task.NewTask("initPositionTask", "0 30 16-20 * * *", data.InitPositionTask)
+	task.AddTask("initPositionTask", initPositionTask)
 
 	// 每日2:45更新每刻报销-客户档案
 	syncMaycurCompanyProfile := task.NewTask("syncMaycurCompanyProfile", "0 45 2 * * * ", maycur.DailyUpdateCompanyProfile)
@@ -227,13 +227,14 @@ func releaseTask() {
 
 func TaskTest() {
 	fmt.Println("The task is start")
-	//companyReportPermissionClose := task.NewTask("companyTryOut", "0 5 0 * * *", CompanyReportPermissionClose)
-	//companyReportPermissionClose := task.NewTask("companyReportPermissionClose", "0/30 * * * * *", CompanyReportPermissionClose)
-	//task.AddTask("用户产品权限试用-->关闭", companyReportPermissionClose)
-	//publishVoiceBroadcast := task.NewTask("publishVoiceBroadcast", "0 */1 * * * *", PublishVoiceBroadcast)
-	//task.AddTask("定时发布研报语音播报", publishVoiceBroadcast)
 
-	task.StartTask()
+	e, msg := data.InitTradePosition("shanghai", "2023-02-13", "2023-02-13")
+	if e != nil {
+		fmt.Println(e.Error())
+		fmt.Println(msg)
+	}
+
+	//task.StartTask()
 	fmt.Println("The task is end")
 }