package data import ( "eta_gn/eta_task/models/data_manage" "eta_gn/eta_task/services/alarm_msg" "eta_gn/eta_task/utils" "fmt" "sort" "strconv" "time" ) // FixPositionTask 补全缺失的合约, 注意和原先的定时任务区分开来 func FixPositionTask() (err error) { exchanges := []string{"zhengzhou", "dalian", "shanghai", "cffex", "ine", "guangzhou"} //郑商所,大商所,上期所,中金所,上期能源 //exchanges := []string{"guangzhou"} //郑商所,大商所,上期所,中金所,上期能源 for i := 194; i > 1; i-- { // 定时任务避开昨日和今日的数据,以免和原先的定时任务InitPositionTask冲突 startDate := time.Now().AddDate(0, 0, -i).Format(utils.FormatDate) //startDate := "2024-01-08" endDate := startDate for _, v := range exchanges { exchange := v err = nil fmt.Println("FixPositionTask: 启动:" + exchange) utils.FileLog.Info("FixPositionTask: 启动:" + exchange) fmt.Println("开始" + startDate + "结束" + endDate) utils.FileLog.Info(fmt.Sprintf("FixPositionTask:开始:%s; 结束:%s", startDate, endDate)) var tradePosition TradePositionInterface if exchange == "guangzhou" { tradePosition = &GuangzhouTradePosition{} } else { tradePosition = &BaseTradePosition{} } tErr, errMsg := InitTradePositionClassify(exchange, startDate, endDate, tradePosition) if tErr != nil { err = tErr fmt.Println("FixPositionTask: 操作失败:" + errMsg + tErr.Error()) utils.FileLog.Info(fmt.Sprintf("InitTradePosition: 操作失败:%s:%s", errMsg, tErr.Error())) continue } fmt.Println("FixPositionTask:" + exchange + "已完成") utils.FileLog.Info("FixPositionTask:" + exchange + "已完成") } } return } // InitTradePositionClassify 持仓分析补全缺失的合约 func InitTradePositionClassify(exchange, startDate, endDate string, tradePosition TradePositionInterface) (err error, errMsg string) { defer func() { if err != nil { tips := fmt.Sprintf("持仓分析-补全缺失的品种数据 操作失败, Exchange: %s, Err: %s, Msg: %s", exchange, err.Error(), errMsg) utils.FileLog.Info(tips) alarm_msg.SendAlarmMsg(tips, 3) } }() //判断合约数与数据源是否一致 originClassifyList, tmpErr := tradePosition.GetTradePositionOriginClassifyByExchangeDataTime(exchange, startDate, endDate) if tmpErr != nil { err = tmpErr errMsg = "查询原始数据分类个数失败,GetTradePositionOriginClassifyCountByExchangeDataTime() Err: " return } topClassifyList, tmpErr := data_manage.GetTradePositionTopClassifyByExchangeDataTime(exchange, startDate, endDate) if tmpErr != nil { err = tmpErr errMsg = "查询榜单数据分类个数失败,GetTradePositionTopClassifyCountByExchangeDataTime() Err: " return } if len(originClassifyList) == len(topClassifyList) { //合约数目一致,无需补全 return } if len(originClassifyList) == 0 { return } topClassifyMap := make(map[string][]data_manage.TradePositionClassifyInfo) for _, v := range topClassifyList { str := fmt.Sprintf("%s_%s", v.ClassifyName, v.ClassifyType) topClassifyMap[str] = append(topClassifyMap[str], v) } classifyMap := make(map[string]struct{}) classifyNames := make([]string, 0) classifyTypes := make([]string, 0) for _, v := range originClassifyList { str := fmt.Sprintf("%s_%s", v.ClassifyName, v.ClassifyType) if _, ok := topClassifyMap[str]; !ok { classifyTypes = append(classifyTypes, v.ClassifyType) if _, ok1 := classifyMap[v.ClassifyName]; !ok1 { classifyNames = append(classifyNames, v.ClassifyName) classifyMap[v.ClassifyName] = struct{}{} } } } if len(classifyTypes) == 0 { return } //批量导入缺失的合约 err = tradePosition.MultiInsertTradeBaseDataToTopByClassify(exchange, startDate, endDate, classifyNames, classifyTypes) if err != nil { errMsg = "新增原始数据失败,MultiInsertTradeBaseDataToTop() Err: " return } originList, err := data_manage.GetTradePositionTopByExchangeDataTimeByClassify(exchange, startDate, endDate, classifyNames, classifyTypes) if err != nil { errMsg = "查询原始数据失败, GetTradePositionTopByExchangeDataTime() Err: " return } if len(originList) <= 0 { // 忽略周末 w := time.Now().Weekday().String() if w == "Saturday" || w == "Sunday" { return } // 每天最后一个小时执行依旧无数据时, 才进行邮件提示 if time.Now().Hour() != 23 { return } err = fmt.Errorf("原始数据没有值") return } // 原始数据日期 dates, e := tradePosition.GetTradePositionTopOriginDataTimes(exchange) if e != nil { err = fmt.Errorf("GetTradePositionTopOriginDataTimes err: %s", e.Error()) return } now := time.Now() dataTimeMap := make(map[string]*data_manage.TradePositionTop) onlyEmptyMap := make(map[string]bool) onlyEmptyNameMap := make(map[string]*data_manage.TradePositionTop) topLastMap := make(map[string]int) topLastRankMap := make(map[string]int) list := make([]*data_manage.TradePositionTop, 0) for _, v := range originList { tmp0, tmpErr := dealTradeOriginData(dataTimeMap, onlyEmptyMap, onlyEmptyNameMap, v, topLastMap, topLastRankMap, startDate, now, dates) if tmpErr != nil { err = tmpErr errMsg = "处理原始数据失败 dealTradeOriginData() Err: " return } if tmp0 != nil { list = append(list, tmp0) } if len(list) >= 1000 { err = data_manage.InsertMultiTradePositionTop(exchange, list) if err != nil { errMsg = "批量新增昨日数据失败,InsertMultiTradePositionTop() Err: " return } list = make([]*data_manage.TradePositionTop, 0) } } if len(list) > 0 { err = data_manage.InsertMultiTradePositionTop(exchange, list) if err != nil { errMsg = "批量新增昨日数据失败,InsertMultiTradePositionTop() Err: " return } list = make([]*data_manage.TradePositionTop, 0) } // 处理某个期货公司只有买单没有卖单,或者只有卖单没有买单的情况 for k, v := range onlyEmptyNameMap { _, ok1 := onlyEmptyMap[k+"_1"] _, ok2 := onlyEmptyMap[k+"_2"] var dealType int if ok1 && !ok2 { dealType = 2 //只有买单没有卖单 } else if !ok1 && ok2 { dealType = 1 //只有卖单没有买单的情况 } else { continue } if dealType > 0 { str := v.ClassifyName + "_" + v.ClassifyType + "_" + v.DataTime + "_" + strconv.Itoa(dealType) dealValue := 0 if lastVal, ok := topLastMap[str]; ok { dealValue = int(float64(lastVal)*0.7 + 0.5) } tmp := &data_manage.TradePositionTop{ ClassifyName: v.ClassifyName, ClassifyType: v.ClassifyType, DealShortName: v.DealShortName, DataTime: v.DataTime, DealValue: dealValue, CreateTime: now, ModifyTime: now, DealType: dealType, SourceType: 2, } list = append(list, tmp) if len(list) >= 1000 { err = data_manage.InsertMultiTradePositionTop(exchange, list) if err != nil { errMsg = "批量新增前日数据失败,InsertMultiTradePositionTop() Err: " return } list = make([]*data_manage.TradePositionTop, 0) } } } if len(list) > 0 { err = data_manage.InsertMultiTradePositionTop(exchange, list) if err != nil { errMsg = "批量新增前日数据失败,InsertMultiTradePositionTop() Err: " return } } //生成净多单,净空单榜单 err = createAnalysisCleanTopClassify(exchange, startDate, endDate, classifyNames, classifyTypes, tradePosition) if err != nil { errMsg = "创建净多单,净空单数据失败,createAnalysisCleanTop() Err: " return } err = DealYesterdayDataClassify(exchange, startDate, classifyNames, classifyTypes, tradePosition) if err != nil { errMsg = "处理昨日数据失败,DealYesterdayData() Err: " return } return } // DealYesterdayDataClassify 更新部分合约的昨日数据 func DealYesterdayDataClassify(exchange, startDate string, classifyNames, classifyTypes []string, tradePosition TradePositionInterface) (err error) { // 查询最早的日期 firstItem, err := tradePosition.GetFirstBaseFromTradeIndexByDate(exchange) if err != nil { return } if startDate == firstItem.DataTime { //如果当前是起始日,则无需统计修改前一天的数据 return } // 前一个交易日, 前两个交易日 dates, e := tradePosition.GetTradePositionTopOriginDataTimes(exchange) if e != nil { err = fmt.Errorf("GetTradePositionTopOriginDataTimes err: %s", e.Error()) return } yesterdayStr := getPrevTradeDataDate(startDate, dates) beforeYesterdayStr := getPrevTradeDataDate(yesterdayStr, dates) // 先查出T日最原始的数据 originList, err := data_manage.GetTradePositionTopByExchangeDataTimeByClassify(exchange, startDate, startDate, classifyNames, classifyTypes) if err != nil { return } originBuyMap := make(map[string]*data_manage.TradePositionTop) originSoldMap := make(map[string]*data_manage.TradePositionTop) for _, v := range originList { if v.SourceType != 0 { continue } str := v.ClassifyName + "_" + v.ClassifyType + "_" + v.DealShortName if v.DealType == 1 { originBuyMap[str] = v } else if v.DealType == 2 { originSoldMap[str] = v } } // 然后查询T-1中数据来源类型是2的数据 changeList, err := data_manage.GetTradePositionTopByExchangeSourceTypeClassify(exchange, yesterdayStr, 2, classifyNames, classifyTypes) if err != nil { return } if len(changeList) <= 0 { //err = fmt.Errorf("前天的数据无需修改") return } // 查询出前日的成交量 beforeYesterdayList, err := data_manage.GetTradePositionTopByExchangeDataTimeByClassify(exchange, beforeYesterdayStr, beforeYesterdayStr, classifyNames, classifyTypes) if err != nil { return } beforeYesterdayMap1 := make(map[string]int) beforeYesterdayMap2 := make(map[string]int) if len(beforeYesterdayList) > 0 { for _, v := range beforeYesterdayList { if v.SourceType == 2 { continue } str := v.ClassifyName + "_" + v.ClassifyType + "_" + v.DealShortName if v.DealType == 1 { beforeYesterdayMap1[str] = v.DealValue } else if v.DealType == 2 { beforeYesterdayMap2[str] = v.DealValue } } } // 根据原始数据中的值推算出最新的值 now := time.Now() // 批量更新到分析表中, var updateAnalysisData []data_manage.UpdateDealValueChange for _, v := range changeList { str := v.ClassifyName + "_" + v.ClassifyType + "_" + v.DealShortName dealValue := 0 dealChange := 0 if v.DealType == 1 { if n, ok := originBuyMap[str]; ok { dealValue = n.DealValue - n.DealChange if beforeVal, ok1 := beforeYesterdayMap1[str]; ok1 { dealChange = dealValue - beforeVal } tmp := data_manage.UpdateDealValueChange{ Id: v.Id, DealValue: dealValue, DealChange: dealChange, SourceType: 1, ModifyTime: now, } updateAnalysisData = append(updateAnalysisData, tmp) } } else if v.DealType == 2 { if n, ok := originSoldMap[str]; ok { dealValue = n.DealValue - n.DealChange if beforeVal, ok1 := beforeYesterdayMap2[str]; ok1 { dealChange = dealValue - beforeVal } tmp := data_manage.UpdateDealValueChange{ Id: v.Id, DealValue: dealValue, DealChange: dealChange, SourceType: 1, ModifyTime: now, } updateAnalysisData = append(updateAnalysisData, tmp) } } } if len(updateAnalysisData) > 0 { err = data_manage.MultiUpdatePositionTop(exchange, updateAnalysisData) if err != nil { return } //删除T-1日净多单和净空单的榜单 err = data_manage.DeletePositionTopByDataTimeClassify(exchange, yesterdayStr, 3, classifyNames, classifyTypes) if err != nil { return } err = data_manage.DeletePositionTopByDataTimeClassify(exchange, yesterdayStr, 4, classifyNames, classifyTypes) if err != nil { return } //重新生成净多单和净空单的榜单 err = createAnalysisCleanTopClassify(exchange, yesterdayStr, yesterdayStr, classifyNames, classifyTypes, tradePosition) if err != nil { return } //T-1日重新生成净多单和净空单的榜单后,需要更新T日净多单和净空单榜单里的变化量 err = updateAnalysisCleanTopChangeValClassify(exchange, startDate, yesterdayStr, classifyNames, classifyTypes) if err != nil { return } } return } // createAnalysisCleanTopClassify 生成部分合约的净多单,净空单榜单 func createAnalysisCleanTopClassify(exchange, startDate, endDate string, classifyNames, classifyTypes []string, tradePosition TradePositionInterface) (err error) { defer func() { if err != nil { fmt.Println("createAnalysisCleanTop err: " + err.Error()) } }() topList := make([]*data_manage.TradePositionTop, 0) now := time.Now() var subDataList data_manage.TradePositionSubList subChangeMap1 := make(map[string]int) //净多单map subChangeMap2 := make(map[string]int) //净空单map // 2023-05-10 此处取前一个交易日, 不一定是昨日 dates, e := tradePosition.GetTradePositionTopOriginDataTimes(exchange) if e != nil { err = fmt.Errorf("GetTradePositionTopOriginDataTimes err: %s", e.Error()) return } yesterday := getPrevTradeDataDate(startDate, dates) // 上一个交易日的净多单 yesterdayTopList1, tErr := data_manage.GetTradePositionTopByExchangeDataTimeTypeClassify(exchange, yesterday, 3, classifyNames, classifyTypes) if tErr != nil { err = tErr return } if len(yesterdayTopList1) > 0 { for _, v := range yesterdayTopList1 { nameStr := v.ClassifyName + "_" + v.ClassifyType + "_" + v.DataTime + "_" + v.DealShortName subChangeMap1[nameStr] = v.DealValue } } // 上一个交易日的净空单 yesterdayTopList2, tErr := data_manage.GetTradePositionTopByExchangeDataTimeTypeClassify(exchange, yesterday, 4, classifyNames, classifyTypes) if tErr != nil { err = tErr return } if len(yesterdayTopList2) > 0 { for _, v := range yesterdayTopList2 { nameStr := v.ClassifyName + "_" + v.ClassifyType + "_" + v.DataTime + "_" + v.DealShortName subChangeMap2[nameStr] = v.DealValue } } // 根据当日多单/空单数据, 生成净多单/净空单数据 originDataList, err := data_manage.GetTradePositionTopByExchangeDataTimeByClassify(exchange, startDate, endDate, classifyNames, classifyTypes) if err != nil { return } buyDataMap := make(map[string]int) for _, v := range originDataList { str := v.ClassifyName + "_" + v.ClassifyType + "_" + v.DataTime + "_" + v.DealShortName if v.DealType == 1 { buyDataMap[str] = v.DealValue } else if v.DealType == 2 { subValue := 0 dealType := 0 if buy, ok := buyDataMap[str]; ok { subValue = buy - v.DealValue if subValue >= 0 { dealType = 3 } else { subValue = -subValue dealType = 4 } } tmp := &data_manage.TradePositionSub{ ClassifyName: v.ClassifyName, ClassifyType: v.ClassifyType, DataTime: v.DataTime, DealShortName: v.DealShortName, SubValue: subValue, DealType: dealType, } subDataList = append(subDataList, tmp) } } if len(subDataList) > 0 { sort.Sort(subDataList) } // 根据净多单/净空单数据, 比对上一个交易日的日期计算成交变化量, 并写入 var dealType int rankMap := make(map[string]int) for _, v := range subDataList { subValue := v.SubValue nameStr := v.ClassifyName + "_" + v.ClassifyType + "_" + v.DataTime + "_" + v.DealShortName if v.DealType == 3 { subChangeMap1[nameStr] = subValue dealType = 3 if _, ok := rankMap[v.ClassifyName+"_"+v.ClassifyType+"_"+v.DataTime+"_3"]; !ok { rankMap[v.ClassifyName+"_"+v.ClassifyType+"_"+v.DataTime+"_3"] = 1 } else { rankMap[v.ClassifyName+"_"+v.ClassifyType+"_"+v.DataTime+"_3"]++ } } else if v.DealType == 4 { subChangeMap2[nameStr] = subValue dealType = 4 if _, ok := rankMap[v.ClassifyName+"_"+v.ClassifyType+"_"+v.DataTime+"_4"]; !ok { rankMap[v.ClassifyName+"_"+v.ClassifyType+"_"+v.DataTime+"_4"] = 1 } else { rankMap[v.ClassifyName+"_"+v.ClassifyType+"_"+v.DataTime+"_4"]++ } } // 2023-05-10 目前看该方法的引用startDate和endDate其实是同一天, 所以前一个交易日直接用上面的yesterday tmpTimeStr := yesterday //和T-1日比较差值 //var tmpTimeStr string //tmpTimeStr, err = getYesterdayDate(v.DataTime) //if err != nil { // return //} yesterdayStr := v.ClassifyName + "_" + v.ClassifyType + "_" + tmpTimeStr + "_" + v.DealShortName dealChange := 0 if dealType == 3 { if c, ok := subChangeMap1[yesterdayStr]; ok { dealChange = subValue - c } } else if dealType == 4 { if c, ok := subChangeMap2[yesterdayStr]; ok { dealChange = subValue - c } } tmp := &data_manage.TradePositionTop{ ClassifyName: v.ClassifyName, ClassifyType: v.ClassifyType, DataTime: v.DataTime, CreateTime: now, ModifyTime: now, DealShortName: v.DealShortName, DealValue: subValue, DealChange: dealChange, DealType: dealType, Rank: rankMap[v.ClassifyName+"_"+v.ClassifyType+"_"+v.DataTime+"_"+strconv.Itoa(dealType)], } topList = append(topList, tmp) if len(topList) >= 1000 { err = data_manage.InsertMultiTradePositionTop(exchange, topList) if err != nil { return } topList = make([]*data_manage.TradePositionTop, 0) } } if len(topList) >= 0 { err = data_manage.InsertMultiTradePositionTop(exchange, topList) if err != nil { return } } return } // updateAnalysisCleanTopChangeValClassify T-1日重新生成净多单和净空单的榜单后,需要更新T日净多单和净空单榜单里的变化量 func updateAnalysisCleanTopChangeValClassify(exchange, startDate, yesterday string, classifyNames, classifyTypes []string) (err error) { defer func() { if err != nil { fmt.Println("updateAnalysisCleanTopChangeVal err: " + err.Error()) } }() //查询T日的净多单和净空单榜单列表 //查询T-1日的净多单和净空单列表 //组装数据,计算T日与T-1日的变更值 //更新变更值 topList := make([]*data_manage.TradePositionTop, 0) //T日和T+1日列表 todayTopList := make([]*data_manage.TradePositionTop, 0) //T日列表 yesterdayTopListMap := make(map[string]int) //净多单净空单持仓量map // 查询T日和T-1日的净多单和净空单列表 topList, err = data_manage.GetTradePositionTopCleanByExchangeDataTimeClassify(exchange, yesterday, startDate, classifyNames, classifyTypes) if err != nil { return } if len(topList) == 0 { return } for _, v := range topList { if v.DataTime == startDate { todayTopList = append(todayTopList, v) } else if v.DataTime == yesterday { nameStr := v.ClassifyName + "_" + v.ClassifyType + "_" + v.DataTime + "_" + v.DealShortName + "_" + strconv.Itoa(v.DealType) yesterdayTopListMap[nameStr] = v.DealValue } } if len(todayTopList) == 0 { return } // 根据净多单/净空单数据, 比对上一个交易日的日期计算成交变化量, 并写入 now := time.Now() updateList := make([]data_manage.UpdateChangeVal, 0) for _, v := range todayTopList { //T日值-T-1日值 yesterdayStr := v.ClassifyName + "_" + v.ClassifyType + "_" + yesterday + "_" + v.DealShortName + "_" + strconv.Itoa(v.DealType) dealChange := 0 if c, ok := yesterdayTopListMap[yesterdayStr]; ok { dealChange = v.DealValue - c } if dealChange != v.DealChange { tmp := data_manage.UpdateChangeVal{ Id: v.Id, ModifyTime: now, DealChange: dealChange, } updateList = append(updateList, tmp) } } if len(updateList) > 0 { err = data_manage.MultiUpdatePositionTopChangeVal(exchange, updateList) if err != nil { return } } return }