package data import ( "eta_gn/eta_task/models/data_manage" "eta_gn/eta_task/services/alarm_msg" "eta_gn/eta_task/utils" "fmt" "sort" "strconv" "time" ) func FixPositionTask() (err error) { exchanges := []string{"zhengzhou", "dalian", "shanghai", "cffex", "ine", "guangzhou"} //郑商所,大商所,上期所,中金所,上期能源 for i := 194; i > 1; i-- { startDate := time.Now().AddDate(0, 0, -i).Format(utils.FormatDate) endDate := startDate for _, v := range exchanges { exchange := v err = nil fmt.Println("FixPositionTask: 启动:" + exchange) utils.FileLog.Info("FixPositionTask: 启动:" + exchange) fmt.Println("开始" + startDate + "结束" + endDate) utils.FileLog.Info(fmt.Sprintf("FixPositionTask:开始:%s; 结束:%s", startDate, endDate)) var tradePosition TradePositionInterface if exchange == "guangzhou" { tradePosition = &GuangzhouTradePosition{} } else { tradePosition = &BaseTradePosition{} } tErr, errMsg := InitTradePositionClassify(exchange, startDate, endDate, tradePosition) if tErr != nil { err = tErr fmt.Println("FixPositionTask: 操作失败:" + errMsg + tErr.Error()) utils.FileLog.Info(fmt.Sprintf("InitTradePosition: 操作失败:%s:%s", errMsg, tErr.Error())) continue } fmt.Println("FixPositionTask:" + exchange + "已完成") utils.FileLog.Info("FixPositionTask:" + exchange + "已完成") } } return } func InitTradePositionClassify(exchange, startDate, endDate string, tradePosition TradePositionInterface) (err error, errMsg string) { defer func() { if err != nil { tips := fmt.Sprintf("持仓分析-补全缺失的品种数据 操作失败, Exchange: %s, Err: %s, Msg: %s", exchange, err.Error(), errMsg) utils.FileLog.Info(tips) alarm_msg.SendAlarmMsg(tips, 3) } }() originClassifyList, tmpErr := tradePosition.GetTradePositionOriginClassifyByExchangeDataTime(exchange, startDate, endDate) if tmpErr != nil { err = tmpErr errMsg = "查询原始数据分类个数失败,GetTradePositionOriginClassifyCountByExchangeDataTime() Err: " return } topClassifyList, tmpErr := data_manage.GetTradePositionTopClassifyByExchangeDataTime(exchange, startDate, endDate) if tmpErr != nil { err = tmpErr errMsg = "查询榜单数据分类个数失败,GetTradePositionTopClassifyCountByExchangeDataTime() Err: " return } if len(originClassifyList) == len(topClassifyList) { return } if len(originClassifyList) == 0 { return } topClassifyMap := make(map[string][]data_manage.TradePositionClassifyInfo) for _, v := range topClassifyList { str := fmt.Sprintf("%s_%s", v.ClassifyName, v.ClassifyType) topClassifyMap[str] = append(topClassifyMap[str], v) } classifyMap := make(map[string]struct{}) classifyNames := make([]string, 0) classifyTypes := make([]string, 0) for _, v := range originClassifyList { str := fmt.Sprintf("%s_%s", v.ClassifyName, v.ClassifyType) if _, ok := topClassifyMap[str]; !ok { classifyTypes = append(classifyTypes, v.ClassifyType) if _, ok1 := classifyMap[v.ClassifyName]; !ok1 { classifyNames = append(classifyNames, v.ClassifyName) classifyMap[v.ClassifyName] = struct{}{} } } } if len(classifyTypes) == 0 { return } err = tradePosition.MultiInsertTradeBaseDataToTopByClassify(exchange, startDate, endDate, classifyNames, classifyTypes) if err != nil { errMsg = "新增原始数据失败,MultiInsertTradeBaseDataToTop() Err: " return } originList, err := data_manage.GetTradePositionTopByExchangeDataTimeByClassify(exchange, startDate, endDate, classifyNames, classifyTypes) if err != nil { errMsg = "查询原始数据失败, GetTradePositionTopByExchangeDataTime() Err: " return } if len(originList) <= 0 { w := time.Now().Weekday().String() if w == "Saturday" || w == "Sunday" { return } if time.Now().Hour() != 23 { return } err = fmt.Errorf("原始数据没有值") return } dates, e := tradePosition.GetTradePositionTopOriginDataTimes(exchange) if e != nil { err = fmt.Errorf("GetTradePositionTopOriginDataTimes err: %s", e.Error()) return } now := time.Now() dataTimeMap := make(map[string]*data_manage.TradePositionTop) onlyEmptyMap := make(map[string]bool) onlyEmptyNameMap := make(map[string]*data_manage.TradePositionTop) topLastMap := make(map[string]int) topLastRankMap := make(map[string]int) list := make([]*data_manage.TradePositionTop, 0) for _, v := range originList { tmp0, tmpErr := dealTradeOriginData(dataTimeMap, onlyEmptyMap, onlyEmptyNameMap, v, topLastMap, topLastRankMap, startDate, now, dates) if tmpErr != nil { err = tmpErr errMsg = "处理原始数据失败 dealTradeOriginData() Err: " return } if tmp0 != nil { list = append(list, tmp0) } if len(list) >= 1000 { err = data_manage.InsertMultiTradePositionTop(exchange, list) if err != nil { errMsg = "批量新增昨日数据失败,InsertMultiTradePositionTop() Err: " return } list = make([]*data_manage.TradePositionTop, 0) } } if len(list) > 0 { err = data_manage.InsertMultiTradePositionTop(exchange, list) if err != nil { errMsg = "批量新增昨日数据失败,InsertMultiTradePositionTop() Err: " return } list = make([]*data_manage.TradePositionTop, 0) } for k, v := range onlyEmptyNameMap { _, ok1 := onlyEmptyMap[k+"_1"] _, ok2 := onlyEmptyMap[k+"_2"] var dealType int if ok1 && !ok2 { dealType = 2 //只有买单没有卖单 } else if !ok1 && ok2 { dealType = 1 //只有卖单没有买单的情况 } else { continue } if dealType > 0 { str := v.ClassifyName + "_" + v.ClassifyType + "_" + v.DataTime + "_" + strconv.Itoa(dealType) dealValue := 0 if lastVal, ok := topLastMap[str]; ok { dealValue = int(float64(lastVal)*0.7 + 0.5) } tmp := &data_manage.TradePositionTop{ ClassifyName: v.ClassifyName, ClassifyType: v.ClassifyType, DealShortName: v.DealShortName, DataTime: v.DataTime, DealValue: dealValue, CreateTime: now, ModifyTime: now, DealType: dealType, SourceType: 2, } list = append(list, tmp) if len(list) >= 1000 { err = data_manage.InsertMultiTradePositionTop(exchange, list) if err != nil { errMsg = "批量新增前日数据失败,InsertMultiTradePositionTop() Err: " return } list = make([]*data_manage.TradePositionTop, 0) } } } if len(list) > 0 { err = data_manage.InsertMultiTradePositionTop(exchange, list) if err != nil { errMsg = "批量新增前日数据失败,InsertMultiTradePositionTop() Err: " return } } err = createAnalysisCleanTopClassify(exchange, startDate, endDate, classifyNames, classifyTypes, tradePosition) if err != nil { errMsg = "创建净多单,净空单数据失败,createAnalysisCleanTop() Err: " return } err = DealYesterdayDataClassify(exchange, startDate, classifyNames, classifyTypes, tradePosition) if err != nil { errMsg = "处理昨日数据失败,DealYesterdayData() Err: " return } return } func DealYesterdayDataClassify(exchange, startDate string, classifyNames, classifyTypes []string, tradePosition TradePositionInterface) (err error) { firstItem, err := tradePosition.GetFirstBaseFromTradeIndexByDate(exchange) if err != nil { return } if startDate == firstItem.DataTime { //如果当前是起始日,则无需统计修改前一天的数据 return } dates, e := tradePosition.GetTradePositionTopOriginDataTimes(exchange) if e != nil { err = fmt.Errorf("GetTradePositionTopOriginDataTimes err: %s", e.Error()) return } yesterdayStr := getPrevTradeDataDate(startDate, dates) beforeYesterdayStr := getPrevTradeDataDate(yesterdayStr, dates) originList, err := data_manage.GetTradePositionTopByExchangeDataTimeByClassify(exchange, startDate, startDate, classifyNames, classifyTypes) if err != nil { return } originBuyMap := make(map[string]*data_manage.TradePositionTop) originSoldMap := make(map[string]*data_manage.TradePositionTop) for _, v := range originList { if v.SourceType != 0 { continue } str := v.ClassifyName + "_" + v.ClassifyType + "_" + v.DealShortName if v.DealType == 1 { originBuyMap[str] = v } else if v.DealType == 2 { originSoldMap[str] = v } } changeList, err := data_manage.GetTradePositionTopByExchangeSourceTypeClassify(exchange, yesterdayStr, 2, classifyNames, classifyTypes) if err != nil { return } if len(changeList) <= 0 { return } beforeYesterdayList, err := data_manage.GetTradePositionTopByExchangeDataTimeByClassify(exchange, beforeYesterdayStr, beforeYesterdayStr, classifyNames, classifyTypes) if err != nil { return } beforeYesterdayMap1 := make(map[string]int) beforeYesterdayMap2 := make(map[string]int) if len(beforeYesterdayList) > 0 { for _, v := range beforeYesterdayList { if v.SourceType == 2 { continue } str := v.ClassifyName + "_" + v.ClassifyType + "_" + v.DealShortName if v.DealType == 1 { beforeYesterdayMap1[str] = v.DealValue } else if v.DealType == 2 { beforeYesterdayMap2[str] = v.DealValue } } } now := time.Now() var updateAnalysisData []data_manage.UpdateDealValueChange for _, v := range changeList { str := v.ClassifyName + "_" + v.ClassifyType + "_" + v.DealShortName dealValue := 0 dealChange := 0 if v.DealType == 1 { if n, ok := originBuyMap[str]; ok { dealValue = n.DealValue - n.DealChange if beforeVal, ok1 := beforeYesterdayMap1[str]; ok1 { dealChange = dealValue - beforeVal } tmp := data_manage.UpdateDealValueChange{ Id: v.Id, DealValue: dealValue, DealChange: dealChange, SourceType: 1, ModifyTime: now, } updateAnalysisData = append(updateAnalysisData, tmp) } } else if v.DealType == 2 { if n, ok := originSoldMap[str]; ok { dealValue = n.DealValue - n.DealChange if beforeVal, ok1 := beforeYesterdayMap2[str]; ok1 { dealChange = dealValue - beforeVal } tmp := data_manage.UpdateDealValueChange{ Id: v.Id, DealValue: dealValue, DealChange: dealChange, SourceType: 1, ModifyTime: now, } updateAnalysisData = append(updateAnalysisData, tmp) } } } if len(updateAnalysisData) > 0 { err = data_manage.MultiUpdatePositionTop(exchange, updateAnalysisData) if err != nil { return } err = data_manage.DeletePositionTopByDataTimeClassify(exchange, yesterdayStr, 3, classifyNames, classifyTypes) if err != nil { return } err = data_manage.DeletePositionTopByDataTimeClassify(exchange, yesterdayStr, 4, classifyNames, classifyTypes) if err != nil { return } err = createAnalysisCleanTopClassify(exchange, yesterdayStr, yesterdayStr, classifyNames, classifyTypes, tradePosition) if err != nil { return } err = updateAnalysisCleanTopChangeValClassify(exchange, startDate, yesterdayStr, classifyNames, classifyTypes) if err != nil { return } } return } func createAnalysisCleanTopClassify(exchange, startDate, endDate string, classifyNames, classifyTypes []string, tradePosition TradePositionInterface) (err error) { defer func() { if err != nil { fmt.Println("createAnalysisCleanTop err: " + err.Error()) } }() topList := make([]*data_manage.TradePositionTop, 0) now := time.Now() var subDataList data_manage.TradePositionSubList subChangeMap1 := make(map[string]int) //净多单map subChangeMap2 := make(map[string]int) //净空单map dates, e := tradePosition.GetTradePositionTopOriginDataTimes(exchange) if e != nil { err = fmt.Errorf("GetTradePositionTopOriginDataTimes err: %s", e.Error()) return } yesterday := getPrevTradeDataDate(startDate, dates) yesterdayTopList1, tErr := data_manage.GetTradePositionTopByExchangeDataTimeTypeClassify(exchange, yesterday, 3, classifyNames, classifyTypes) if tErr != nil { err = tErr return } if len(yesterdayTopList1) > 0 { for _, v := range yesterdayTopList1 { nameStr := v.ClassifyName + "_" + v.ClassifyType + "_" + v.DataTime + "_" + v.DealShortName subChangeMap1[nameStr] = v.DealValue } } yesterdayTopList2, tErr := data_manage.GetTradePositionTopByExchangeDataTimeTypeClassify(exchange, yesterday, 4, classifyNames, classifyTypes) if tErr != nil { err = tErr return } if len(yesterdayTopList2) > 0 { for _, v := range yesterdayTopList2 { nameStr := v.ClassifyName + "_" + v.ClassifyType + "_" + v.DataTime + "_" + v.DealShortName subChangeMap2[nameStr] = v.DealValue } } originDataList, err := data_manage.GetTradePositionTopByExchangeDataTimeByClassify(exchange, startDate, endDate, classifyNames, classifyTypes) if err != nil { return } buyDataMap := make(map[string]int) for _, v := range originDataList { str := v.ClassifyName + "_" + v.ClassifyType + "_" + v.DataTime + "_" + v.DealShortName if v.DealType == 1 { buyDataMap[str] = v.DealValue } else if v.DealType == 2 { subValue := 0 dealType := 0 if buy, ok := buyDataMap[str]; ok { subValue = buy - v.DealValue if subValue >= 0 { dealType = 3 } else { subValue = -subValue dealType = 4 } } tmp := &data_manage.TradePositionSub{ ClassifyName: v.ClassifyName, ClassifyType: v.ClassifyType, DataTime: v.DataTime, DealShortName: v.DealShortName, SubValue: subValue, DealType: dealType, } subDataList = append(subDataList, tmp) } } if len(subDataList) > 0 { sort.Sort(subDataList) } var dealType int rankMap := make(map[string]int) for _, v := range subDataList { subValue := v.SubValue nameStr := v.ClassifyName + "_" + v.ClassifyType + "_" + v.DataTime + "_" + v.DealShortName if v.DealType == 3 { subChangeMap1[nameStr] = subValue dealType = 3 if _, ok := rankMap[v.ClassifyName+"_"+v.ClassifyType+"_"+v.DataTime+"_3"]; !ok { rankMap[v.ClassifyName+"_"+v.ClassifyType+"_"+v.DataTime+"_3"] = 1 } else { rankMap[v.ClassifyName+"_"+v.ClassifyType+"_"+v.DataTime+"_3"]++ } } else if v.DealType == 4 { subChangeMap2[nameStr] = subValue dealType = 4 if _, ok := rankMap[v.ClassifyName+"_"+v.ClassifyType+"_"+v.DataTime+"_4"]; !ok { rankMap[v.ClassifyName+"_"+v.ClassifyType+"_"+v.DataTime+"_4"] = 1 } else { rankMap[v.ClassifyName+"_"+v.ClassifyType+"_"+v.DataTime+"_4"]++ } } tmpTimeStr := yesterday yesterdayStr := v.ClassifyName + "_" + v.ClassifyType + "_" + tmpTimeStr + "_" + v.DealShortName dealChange := 0 if dealType == 3 { if c, ok := subChangeMap1[yesterdayStr]; ok { dealChange = subValue - c } } else if dealType == 4 { if c, ok := subChangeMap2[yesterdayStr]; ok { dealChange = subValue - c } } tmp := &data_manage.TradePositionTop{ ClassifyName: v.ClassifyName, ClassifyType: v.ClassifyType, DataTime: v.DataTime, CreateTime: now, ModifyTime: now, DealShortName: v.DealShortName, DealValue: subValue, DealChange: dealChange, DealType: dealType, Rank: rankMap[v.ClassifyName+"_"+v.ClassifyType+"_"+v.DataTime+"_"+strconv.Itoa(dealType)], } topList = append(topList, tmp) if len(topList) >= 1000 { err = data_manage.InsertMultiTradePositionTop(exchange, topList) if err != nil { return } topList = make([]*data_manage.TradePositionTop, 0) } } if len(topList) >= 0 { err = data_manage.InsertMultiTradePositionTop(exchange, topList) if err != nil { return } } return } func updateAnalysisCleanTopChangeValClassify(exchange, startDate, yesterday string, classifyNames, classifyTypes []string) (err error) { defer func() { if err != nil { fmt.Println("updateAnalysisCleanTopChangeVal err: " + err.Error()) } }() topList := make([]*data_manage.TradePositionTop, 0) //T日和T+1日列表 todayTopList := make([]*data_manage.TradePositionTop, 0) //T日列表 yesterdayTopListMap := make(map[string]int) //净多单净空单持仓量map topList, err = data_manage.GetTradePositionTopCleanByExchangeDataTimeClassify(exchange, yesterday, startDate, classifyNames, classifyTypes) if err != nil { return } if len(topList) == 0 { return } for _, v := range topList { if v.DataTime == startDate { todayTopList = append(todayTopList, v) } else if v.DataTime == yesterday { nameStr := v.ClassifyName + "_" + v.ClassifyType + "_" + v.DataTime + "_" + v.DealShortName + "_" + strconv.Itoa(v.DealType) yesterdayTopListMap[nameStr] = v.DealValue } } if len(todayTopList) == 0 { return } now := time.Now() updateList := make([]data_manage.UpdateChangeVal, 0) for _, v := range todayTopList { yesterdayStr := v.ClassifyName + "_" + v.ClassifyType + "_" + yesterday + "_" + v.DealShortName + "_" + strconv.Itoa(v.DealType) dealChange := 0 if c, ok := yesterdayTopListMap[yesterdayStr]; ok { dealChange = v.DealValue - c } if dealChange != v.DealChange { tmp := data_manage.UpdateChangeVal{ Id: v.Id, ModifyTime: now, DealChange: dealChange, } updateList = append(updateList, tmp) } } if len(updateList) > 0 { err = data_manage.MultiUpdatePositionTopChangeVal(exchange, updateList) if err != nil { return } } return }