package models import ( "encoding/json" "eta/eta_index_lib/models/mgo" "eta/eta_index_lib/utils" "fmt" "github.com/beego/beego/v2/client/orm" "github.com/shopspring/decimal" "go.mongodb.org/mongo-driver/bson" "reflect" "sort" "time" ) // EdbThsHf 自有数据 type EdbThsHf struct{} // GetSource 获取来源编码id func (obj EdbThsHf) GetSource() int { return utils.DATA_SOURCE_THS } // GetSubSource 获取子来源编码id func (obj EdbThsHf) GetSubSource() int { return utils.DATA_SUB_SOURCE_HIGH_FREQUENCY } // GetSourceName 获取来源名称 func (obj EdbThsHf) GetSourceName() string { return utils.DATA_SOURCE_NAME_THS } // GetEdbType 获取指标类型 func (obj EdbThsHf) GetEdbType() int { return utils.DEFAULT_EDB_TYPE } // ThsHfAddBaseParams // @Description: 基础指标的添加参数 type ThsHfAddBaseParams struct { EdbCode string `description:"指标编码"` EdbName string `description:"指标名称"` Unit string `description:"单位"` Frequency string `description:"频度"` Sort int `description:"排序"` ClassifyId int `description:"所属分类"` SysUserId int `description:"用户id"` SysUserRealName string `description:"用户真实名称"` UniqueCode string `description:"唯一编码"` ConvertRule string `description:"转换规则"` } // ThsHfEditBaseParams // @Description: 基础指标的修改参数 type ThsHfEditBaseParams struct { EdbCode string `description:"指标编码"` EdbName string `description:"指标名称"` EdbNameEn string `description:"指标名称(英文)"` Unit string `description:"单位"` UnitEn string `description:"单位(英文)"` ClassifyId int `description:"所属分类"` SysUserId int `description:"用户id"` SysUserRealName string `description:"用户真实名称"` UniqueCode string `description:"编码"` Lang string `description:"语言版本"` EdbInfo *EdbInfo `description:"指标信息"` } type ThsHfRefreshBaseParams struct { EdbInfo *EdbInfo StartDate string EndDate string } // Add // @Description: 添加指标 func (obj EdbThsHf) Add(params ThsHfAddBaseParams, baseIndex *BaseFromThsHfIndex) (edbInfo *EdbInfo, err error) { o := orm.NewOrm() tx, e := o.Begin() if e != nil { err = fmt.Errorf("orm begin err: %v", e) return } defer func() { if err != nil { _ = tx.Rollback() utils.FileLog.Info(fmt.Sprintf("%s err: %v", reflect.TypeOf(obj).Name(), err)) return } _ = tx.Commit() }() // 新增指标 edbInfo = new(EdbInfo) edbInfo.Source = obj.GetSource() edbInfo.SubSource = obj.GetSubSource() edbInfo.SourceName = obj.GetSourceName() edbInfo.EdbType = obj.GetEdbType() edbInfo.EdbCode = params.EdbCode edbInfo.EdbName = params.EdbName edbInfo.EdbNameEn = params.EdbName edbInfo.EdbNameSource = params.EdbName edbInfo.Frequency = params.Frequency edbInfo.Unit = params.Unit edbInfo.UnitEn = params.Unit edbInfo.StartDate = baseIndex.StartDate.Format(utils.FormatDate) // 默认取源指标的时间, 刷新完成后更新 edbInfo.EndDate = baseIndex.EndDate.Format(utils.FormatDate) edbInfo.ClassifyId = params.ClassifyId edbInfo.SysUserId = params.SysUserId edbInfo.SysUserRealName = params.SysUserRealName edbInfo.Sort = params.Sort edbInfo.TerminalCode = baseIndex.TerminalCode edbInfo.UniqueCode = params.UniqueCode edbInfo.CreateTime = time.Now() edbInfo.ModifyTime = time.Now() edbInfoId, e := tx.Insert(edbInfo) if e != nil { err = fmt.Errorf("insert edb err: %v", e) return } edbInfo.EdbInfoId = int(edbInfoId) // 新增指标关联 edbMapping := new(BaseFromEdbMapping) edbMapping.BaseFromIndexId = baseIndex.BaseFromThsHfIndexId edbMapping.BaseIndexCode = baseIndex.IndexCode edbMapping.EdbInfoId = edbInfo.EdbInfoId edbMapping.EdbCode = edbInfo.EdbCode edbMapping.Source = obj.GetSource() edbMapping.SubSource = obj.GetSubSource() edbMapping.ConvertRule = params.ConvertRule edbMapping.CreateTime = time.Now().Local() edbMapping.ModifyTime = time.Now().Local() edbMappingId, e := tx.Insert(edbMapping) if e != nil { err = fmt.Errorf("insert base edb mapping err: %v", e) return } edbMapping.Id = int(edbMappingId) // 刷新数据 err = obj.Refresh(edbInfo, edbMapping, "") return } func (obj EdbThsHf) Refresh(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping, startDate string) (err error) { if utils.UseMongo { return obj.refreshByMongo(edbInfo, edbBaseMapping, startDate) } return obj.refreshByMysql(edbInfo, edbBaseMapping, startDate) } func (obj EdbThsHf) refreshByMysql(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping, startDate string) (err error) { if edbInfo == nil || edbBaseMapping == nil { err = fmt.Errorf("指标信息/关联信息有误, EdbInfo: %v, EdbBaseMapping: %v", edbInfo, edbBaseMapping) return } // 真实数据的最大日期, 插入规则配置的日期 var realDataMaxDate, edbDataInsertConfigDate time.Time var edbDataInsertConfig *EdbDataInsertConfig var isFindConfigDateRealData bool { conf, e := GetEdbDataInsertConfigByEdbId(edbInfo.EdbInfoId) if e != nil && e.Error() != utils.ErrNoRow() { err = fmt.Errorf("GetEdbDataInsertConfigByEdbId err: %v", e) return } edbDataInsertConfig = conf if edbDataInsertConfig != nil { edbDataInsertConfigDate = edbDataInsertConfig.Date } } // 查询时间为开始时间-3d var queryDate string if startDate != "" { st, e := time.ParseInLocation(utils.FormatDate, startDate, time.Local) if e != nil { err = fmt.Errorf("刷新开始时间有误, %v", e) return } queryDate = st.AddDate(0, 0, -3).Format(utils.FormatDate) } // 源指标数据 baseDataList := make([]*BaseFromThsHfData, 0) { ob := new(BaseFromThsHfData) cond := fmt.Sprintf(" AND %s = ?", ob.Cols().IndexCode) pars := make([]interface{}, 0) pars = append(pars, edbBaseMapping.BaseIndexCode) if queryDate != "" { cond += fmt.Sprintf(" AND %s >= ?", ob.Cols().DataTime) pars = append(pars, queryDate) } list, e := ob.GetItemsByCondition(cond, pars, []string{}, fmt.Sprintf("%s ASC", ob.Cols().DataTime)) if e != nil { err = fmt.Errorf("获取数据源数据失败, %v", e) return } baseDataList = list } // 转换数据 convertRule := new(ThsHfIndexConvert2EdbRule) if e := json.Unmarshal([]byte(edbBaseMapping.ConvertRule), &convertRule); e != nil { err = fmt.Errorf("转换规则有误, %v", e) return } convertOriginData := make([]*ThsHfConvertOriginData, 0) for _, v := range baseDataList { convertOriginData = append(convertOriginData, &ThsHfConvertOriginData{ DataTime: v.DataTime, Value: v.Value, }) } convertData, e := ThsHfConvertData2DayByRule(convertOriginData, convertRule) if e != nil { err = fmt.Errorf("转换数据失败, %v", e) return } if len(convertData) == 0 { utils.FileLog.Info("同花顺高频-转换无数据, EdbCode: %s", edbInfo.EdbCode) return } // 获取已有数据 dataOb := new(EdbDataThsHf) dataExists := make(map[string]*EdbDataThsHf) searchExistMap := make(map[string]*EdbInfoSearchData) { cond := fmt.Sprintf(" AND %s = ?", dataOb.Cols().EdbInfoId) pars := make([]interface{}, 0) pars = append(pars, edbInfo.EdbInfoId) if queryDate != "" { cond += fmt.Sprintf(" AND %s >= ?", dataOb.Cols().DataTime) pars = append(pars, queryDate) } list, e := dataOb.GetItemsByCondition(cond, pars, []string{}, "") if e != nil { err = fmt.Errorf("获取指标数据失败, %v", e) return } for _, v := range list { dataExists[v.DataTime.Format(utils.FormatDate)] = v searchExistMap[v.DataTime.Format(utils.FormatDate)] = &EdbInfoSearchData{ EdbDataId: v.EdbDataId, EdbInfoId: v.EdbInfoId, DataTime: v.DataTime.Format(utils.FormatDate), Value: v.Value, EdbCode: v.EdbCode, DataTimestamp: v.DataTimestamp, } } } // 比对数据 insertExist := make(map[string]bool) insertData := make([]*EdbDataThsHf, 0) updateData := make([]*EdbDataThsHf, 0) for k, v := range convertData { strDate := k.Format(utils.FormatDate) // 手动插入数据的判断 if realDataMaxDate.IsZero() || k.After(realDataMaxDate) { realDataMaxDate = k } if edbDataInsertConfigDate.IsZero() || k.Equal(edbDataInsertConfigDate) { isFindConfigDateRealData = true } // 入库值 saveVal := decimal.NewFromFloat(v).Round(4).String() d, e := decimal.NewFromString(saveVal) if e != nil { utils.FileLog.Info(fmt.Sprintf("EdbDataThsHf NewFromString err: %v", e)) continue } saveFloat, _ := d.Float64() // 更新 exists := dataExists[strDate] if exists != nil { existVal := decimal.NewFromFloat(exists.Value).Round(4).String() if saveVal != existVal { exists.Value = saveFloat updateData = append(updateData, exists) } continue } // 新增 if insertExist[strDate] { continue } insertExist[strDate] = true timestamp := k.UnixNano() / 1e6 insertData = append(insertData, &EdbDataThsHf{ EdbInfoId: edbInfo.EdbInfoId, EdbCode: edbInfo.EdbCode, DataTime: k, Value: saveFloat, CreateTime: time.Now(), ModifyTime: time.Now(), DataTimestamp: timestamp, }) } // 批量新增/更新 if len(insertData) > 0 { if e = dataOb.CreateMulti(insertData); e != nil { err = fmt.Errorf("批量新增指标数据失败, %v", e) return } } if len(updateData) > 0 { if e = dataOb.MultiUpdateValue(updateData); e != nil { err = fmt.Errorf("批量更新指标数据失败, %v", e) return } } // 处理手工数据补充的配置 HandleConfigInsertEdbData(realDataMaxDate, edbDataInsertConfig, edbInfo.EdbInfoId, obj.GetSource(), obj.GetSubSource(), searchExistMap, isFindConfigDateRealData) return } func (obj EdbThsHf) refreshByMongo(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping, startDate string) (err error) { defer func() { if err != nil { utils.FileLog.Info(fmt.Sprintf("refreshByMongo, err: %v", err)) } }() var realDataMaxDate, edbDataInsertConfigDate time.Time var edbDataInsertConfig *EdbDataInsertConfig var isFindConfigDateRealData bool //是否找到配置日期的实际数据的值 { insertConfig, e := GetEdbDataInsertConfigByEdbId(edbInfo.EdbInfoId) if e != nil && e.Error() != utils.ErrNoRow() { err = fmt.Errorf("GetEdbDataInsertConfigByEdbId, err: %v", e) return } edbDataInsertConfig = insertConfig if edbDataInsertConfig != nil { edbDataInsertConfigDate = edbDataInsertConfig.Date } } // 查询时间为开始时间-3d var queryDate string if startDate != "" { st, e := time.ParseInLocation(utils.FormatDate, startDate, time.Local) if e != nil { err = fmt.Errorf("刷新开始时间有误, %v", e) return } queryDate = st.AddDate(0, 0, -3).Format(utils.FormatDate) } // 获取源指标数据 baseDataList, e := obj.getBaseIndexDataByMongo(edbBaseMapping.BaseIndexCode, queryDate) if e != nil { err = fmt.Errorf("getBaseIndexDataByMongo, err: %v", e) return } // 转换数据 convertRule := new(ThsHfIndexConvert2EdbRule) if e := json.Unmarshal([]byte(edbBaseMapping.ConvertRule), &convertRule); e != nil { err = fmt.Errorf("转换规则有误, %v", e) return } convertOriginData := make([]*ThsHfConvertOriginData, 0) for _, v := range baseDataList { convertOriginData = append(convertOriginData, &ThsHfConvertOriginData{ DataTime: v.DataTime, Value: v.Value, }) } convertData, e := ThsHfConvertData2DayByRule(convertOriginData, convertRule) if e != nil { err = fmt.Errorf("转换数据失败, %v", e) return } if len(convertData) == 0 { utils.FileLog.Info("同花顺高频-转换无数据, EdbCode: %s", edbInfo.EdbCode) return } //获取指标所有数据 existDataList := make([]*mgo.EdbDataThsHf, 0) mogDataObj := new(mgo.EdbDataThsHf) { // 构建查询条件 queryConditions := bson.M{ "edb_code": edbInfo.EdbCode, } if queryDate != `` { //获取已存在的所有数据 startDateTime, e := time.ParseInLocation(utils.FormatDate, queryDate, time.Local) if e != nil { err = fmt.Errorf("startDateTime parse err: %v", e) return } queryConditions["data_time"] = bson.M{"$gte": startDateTime} } existDataList, e = mogDataObj.GetAllDataList(queryConditions, []string{"data_time"}) if e != nil { err = fmt.Errorf("GetAllDataList, err: %v", e) return } } existDataMap := make(map[string]*mgo.EdbDataThsHf) //removeDataTimeMap := make(map[string]bool) //需要移除的日期数据 for _, v := range existDataList { tmpDate := v.DataTime.Format(utils.FormatDate) existDataMap[tmpDate] = v //removeDataTimeMap[tmpDate] = true } // 待添加的数据集 addDataList := make([]interface{}, 0) updateDataList := make([]mgo.EdbDataThsHf, 0) insertExist := make(map[string]bool) for k, v := range convertData { strDate := k.Format(utils.FormatDate) // 手动插入数据的判断 if realDataMaxDate.IsZero() || k.After(realDataMaxDate) { realDataMaxDate = k } if edbDataInsertConfigDate.IsZero() || k.Equal(edbDataInsertConfigDate) { isFindConfigDateRealData = true } // 入库值 saveVal := decimal.NewFromFloat(v).Round(4).String() d, e := decimal.NewFromString(saveVal) if e != nil { utils.FileLog.Info(fmt.Sprintf("EdbDataThsHf NewFromString err: %v", e)) continue } saveFloat, _ := d.Float64() // 更新 exists := existDataMap[strDate] if exists != nil { existVal := decimal.NewFromFloat(exists.Value).Round(4).String() if saveVal != existVal { exists.Value = saveFloat updateDataList = append(updateDataList, *exists) } continue } // 新增 if insertExist[strDate] { continue } insertExist[strDate] = true timestamp := k.UnixNano() / 1e6 addDataList = append(addDataList, &mgo.EdbDataThsHf{ EdbInfoId: edbInfo.EdbInfoId, EdbCode: edbInfo.EdbCode, DataTime: k, Value: saveFloat, CreateTime: time.Now(), ModifyTime: time.Now(), DataTimestamp: timestamp, }) } // 入库 { coll := mogDataObj.GetCollection() //删除已经不存在的指标数据(由于该指标当日的数据删除了) //{ // removeDateList := make([]time.Time, 0) // for dateTime := range removeDataTimeMap { // //获取已存在的所有数据 // tmpDateTime, e := time.ParseInLocation(utils.FormatDate, dateTime, time.Local) // if e != nil { // err = fmt.Errorf("tmpDateTime parse err: %v", e) // return // } // removeDateList = append(removeDateList, tmpDateTime) // } // removeNum := len(removeDateList) // if removeNum > 0 { // if e = mogDataObj.RemoveManyByColl(coll, bson.M{"edb_code": edbInfo.EdbCode, "data_time": bson.M{"$in": removeDateList}}); e != nil { // err = fmt.Errorf("RemoveManyByColl, err: %v", e) // return // } // } //} // 插入新数据 if len(addDataList) > 0 { if e = mogDataObj.BatchInsertDataByColl(coll, 500, addDataList); e != nil { err = fmt.Errorf("BatchInsertDataByColl, err: %v", e) return } } // 修改历史数据 if len(updateDataList) > 0 { for _, v := range updateDataList { if e = mogDataObj.UpdateDataByColl(coll, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}}); e != nil { err = fmt.Errorf("UpdateDataByColl, err: %v", e) return } } } } // 处理手工数据补充的配置 obj.HandleConfigInsertEdbDataByMongo(realDataMaxDate, edbDataInsertConfig, edbInfo.EdbInfoId, edbInfo.Source, edbInfo.SubSource, existDataMap, isFindConfigDateRealData) return } type ThsHfConvertOriginData struct { DataTime time.Time `description:"数据日期(至时分秒)"` Value float64 `description:"数据值"` } // ThsHfConvertData2DayByRule 原指标数据转换为日度数据 func ThsHfConvertData2DayByRule(originData []*ThsHfConvertOriginData, convertRule *ThsHfIndexConvert2EdbRule) (timeData map[time.Time]float64, err error) { // PS: originData为期望开始日期前三日(有两天非交易日, 那么周一的前日应当算上周五的)至结束日期的数据 timeData = make(map[time.Time]float64) if len(originData) == 0 || convertRule == nil { return } if !utils.InArrayByInt([]int{1, 2}, convertRule.ConvertType) { err = fmt.Errorf("取值类型有误, ConvertType: %d", convertRule.ConvertType) return } // 升序排序 sort.Slice(originData, func(i, j int) bool { return originData[i].DataTime.Before(originData[j].DataTime) }) // 将数据根据日期进行分组 var sortDates []string groupDateData := make(map[string][]*ThsHfConvertOriginData) for _, v := range originData { d := v.DataTime.Format(utils.FormatDate) if !utils.InArrayByStr(sortDates, d) { sortDates = append(sortDates, d) } if groupDateData[d] == nil { groupDateData[d] = make([]*ThsHfConvertOriginData, 0) } groupDateData[d] = append(groupDateData[d], v) } // 取值方式-指定时间的值 if convertRule.ConvertType == 1 { for k, v := range sortDates { todayTime, e := time.ParseInLocation(utils.FormatDate, v, time.Local) if e != nil { utils.FileLog.Info("当日日期转换有误, date: %s, err: %v", v, e) continue } var timeTarget time.Time dateData := make([]*ThsHfConvertOriginData, 0) // 当日 if convertRule.ConvertFixed.FixedDay == 1 { tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", v, convertRule.ConvertFixed.FixedTime), time.Local) if e != nil { utils.FileLog.Info(fmt.Sprintf("当日timeTarget转换有误, %v", e)) continue } timeTarget = tg dt := groupDateData[v] if dt == nil { utils.FileLog.Info(fmt.Sprintf("%s当日无数据", v)) continue } if len(dt) == 0 { continue } dateData = dt } // 前一日 if convertRule.ConvertFixed.FixedDay == 2 { if k < 1 { utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v)) continue } preDate := sortDates[k-1] tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", preDate, convertRule.ConvertFixed.FixedTime), time.Local) if e != nil { utils.FileLog.Info(fmt.Sprintf("前日timeTarget转换有误, %v", e)) continue } timeTarget = tg dt := groupDateData[preDate] if dt == nil { utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v)) continue } if len(dt) == 0 { continue } dateData = dt } if len(dateData) == 0 { utils.FileLog.Info("日期%s无数据序列", v) continue } // 重新获取数据序列中, 时间在目标时间点之后的 newDateData := make([]*ThsHfConvertOriginData, 0) for kv, dv := range dateData { if dv.DataTime.Before(timeTarget) { continue } // 由于升序排列, 直接取之后所有的数据 newDateData = append(newDateData, dateData[kv:]...) break } // 取重组后当日数据中的第一个(有可能目标时间点无值, 那么取之后时间最近的值) if len(newDateData) == 0 { utils.FileLog.Info("日期%s无有效数据", v) continue } timeData[todayTime] = newDateData[0].Value } return } // 取值方式-区间计算值 for k, v := range sortDates { todayTime, e := time.ParseInLocation(utils.FormatDate, v, time.Local) if e != nil { utils.FileLog.Info("当日日期转换有误, date: %s, err: %v", v, e) continue } var thisDate, preDate string thisDate = v if k > 1 { preDate = sortDates[k-1] } var startTimeTarget, endTimeTarget time.Time // 起始时间-当日/前一日 if convertRule.ConvertArea.StartDay == 1 { tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", thisDate, convertRule.ConvertArea.StartTime), time.Local) if e != nil { utils.FileLog.Info(fmt.Sprintf("当日startTimeTarget转换有误, %v", e)) continue } startTimeTarget = tg } if convertRule.ConvertArea.StartDay == 2 { if preDate == "" { utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v)) continue } tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", preDate, convertRule.ConvertArea.StartTime), time.Local) if e != nil { utils.FileLog.Info(fmt.Sprintf("前日startTimeTarget转换有误, %v", e)) continue } startTimeTarget = tg } // 截止时间-当日/前一日 if convertRule.ConvertArea.EndDay == 1 { tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", thisDate, convertRule.ConvertArea.EndTime), time.Local) if e != nil { utils.FileLog.Info(fmt.Sprintf("当日endTimeTarget转换有误, %v", e)) continue } endTimeTarget = tg } if convertRule.ConvertArea.EndDay == 2 { if preDate == "" { utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v)) continue } tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", preDate, convertRule.ConvertArea.EndTime), time.Local) if e != nil { utils.FileLog.Info(fmt.Sprintf("前日endTimeTarget转换有误, %v", e)) continue } endTimeTarget = tg } if startTimeTarget.IsZero() || endTimeTarget.IsZero() { utils.FileLog.Info(fmt.Sprintf("起始截止时间有误, start: %v, end: %v", startTimeTarget, endTimeTarget)) continue } // 合并前日当日数据 dateData := make([]*ThsHfConvertOriginData, 0) if convertRule.ConvertArea.StartDay == 1 && convertRule.ConvertArea.EndDay == 1 { // 起始截止均为当日 dateData = groupDateData[thisDate] if dateData == nil { utils.FileLog.Info(fmt.Sprintf("%s当日无数据", thisDate)) continue } if len(dateData) == 0 { utils.FileLog.Info(fmt.Sprintf("%s当日无数据", thisDate)) continue } } else { if preDate == "" { continue } // 起始截止时间含前日 preData := groupDateData[preDate] if preData == nil { utils.FileLog.Info(fmt.Sprintf("%s前日无数据", thisDate)) continue } if len(preData) == 0 { utils.FileLog.Info(fmt.Sprintf("%s前日无数据", thisDate)) continue } thisData := groupDateData[thisDate] if thisData == nil { utils.FileLog.Info(fmt.Sprintf("%s当日无数据", thisDate)) continue } if len(thisData) == 0 { utils.FileLog.Info(fmt.Sprintf("%s当日无数据", thisDate)) continue } dateData = append(dateData, preData...) dateData = append(dateData, thisData...) } if len(dateData) == 0 { utils.FileLog.Info("日期%s无数据序列", v) continue } // 重组时间区间内的数据 newDateData := make([]*ThsHfConvertOriginData, 0) for _, dv := range dateData { if dv.DataTime.Before(startTimeTarget) || dv.DataTime.After(endTimeTarget) { continue } newDateData = append(newDateData, dv) } if len(newDateData) == 0 { utils.FileLog.Info(fmt.Sprintf("时间区间内无数据, start: %v, end: %v", startTimeTarget, endTimeTarget)) continue } // 取出区间内的均值/最值 var avgVal, minVal, maxVal, sumVal float64 minVal, maxVal = newDateData[0].Value, newDateData[0].Value for _, nv := range newDateData { sumVal += nv.Value if nv.Value > maxVal { maxVal = nv.Value } if nv.Value < minVal { minVal = nv.Value } } avgVal = sumVal / float64(len(newDateData)) switch convertRule.ConvertArea.CalculateType { case 1: timeData[todayTime] = avgVal case 2: timeData[todayTime] = maxVal case 3: timeData[todayTime] = minVal default: utils.FileLog.Info(fmt.Sprintf("计算方式有误, CalculateType: %d", convertRule.ConvertArea.CalculateType)) } } return } func (obj EdbThsHf) getBaseIndexDataByMongo(baseIndexCode, startDate string) (newDataList []EdbInfoMgoData, err error) { newDataList = make([]EdbInfoMgoData, 0) // 获取数据源的指标数据 mogDataObj := new(mgo.BaseFromThsHfData) // 构建查询条件 queryConditions := bson.M{ "index_code": baseIndexCode, } if startDate != `` { //获取已存在的所有数据 startDateTime, tmpErr := time.ParseInLocation(utils.FormatDate, startDate, time.Local) if tmpErr != nil { err = tmpErr return } queryConditions["data_time"] = bson.M{"$gte": startDateTime} } baseDataList, err := mogDataObj.GetAllDataList(queryConditions, []string{"data_time"}) if err != nil { fmt.Println("GetAllDataList Err:" + err.Error()) return } for _, v := range baseDataList { newDataList = append(newDataList, EdbInfoMgoData{ //EdbDataId: v.ID, DataTime: v.DataTime, Value: v.Value, EdbCode: v.IndexCode, }) } return } func (obj EdbThsHf) HandleConfigInsertEdbDataByMongo(realDataMaxDate time.Time, edbDataInsertConfig *EdbDataInsertConfig, edbInfoId, source, subSource int, existMap map[string]*mgo.EdbDataThsHf, isFindConfigDateRealData bool) { if edbDataInsertConfig == nil { return } var err error defer func() { if err != nil { utils.FileLog.Info(fmt.Sprintf("ThsHf-HandleConfigInsertEdbDataByMongo, err: %v", err)) } }() edbDataInsertConfigDate := edbDataInsertConfig.Date // 配置的日期 // 如果存在真实数据的最大日期 && 存在配置插入数据的最大日期 && 真实数据的最大日期 晚于/等于 配置插入数据的最大日期 if realDataMaxDate.After(edbDataInsertConfigDate) || realDataMaxDate.Equal(edbDataInsertConfigDate) { go DeleteEdbDataInsertConfigByEdbId(edbInfoId) mogDataObj := mgo.EdbDataThsHf{} coll := mogDataObj.GetCollection() edbDataInsertConfigDateStr := edbDataInsertConfigDate.Format(utils.FormatDate) // 如果没有找到找到配置日期的实际数据,那么就直接删除 if item, ok := existMap[edbDataInsertConfigDateStr]; ok && !isFindConfigDateRealData { mogDataObj.RemoveManyByColl(coll, bson.M{"_id": item.ID}) } } else { o := orm.NewOrm() edbDataInsertConfig.RealDate = realDataMaxDate _, err = o.Update(edbDataInsertConfig, "RealDate") } return } func (obj EdbThsHf) UnifiedModifyEdbInfoMaxAndMinInfo(edbInfo *EdbInfo) (err error) { if utils.UseMongo { edbInfoMaxAndMinInfo, tmpErr := obj.getEdbInfoMaxAndMinInfoByMongo(edbInfo.EdbCode) // 如果正常获取到了,那就去修改指标的最大最小值 if tmpErr == nil && edbInfoMaxAndMinInfo != nil { err = ModifyEdbInfoMaxAndMinInfo(edbInfo.EdbInfoId, edbInfoMaxAndMinInfo) } else { // 清空的目的是为了避免异常返回 err = nil } } else { err, _ = UnifiedModifyEdbInfoMaxAndMinInfo(edbInfo) } return } func (obj EdbThsHf) getEdbInfoMaxAndMinInfoByMongo(edbCode string) (item *EdbInfoMaxAndMinInfo, err error) { mogDataObj := new(mgo.EdbDataThsHf) pipeline := []bson.M{ {"$match": bson.M{"edb_code": edbCode}}, {"$group": bson.M{ "_id": nil, "min_date": bson.M{"$min": "$data_time"}, "max_date": bson.M{"$max": "$data_time"}, "min_value": bson.M{"$min": "$value"}, "max_value": bson.M{"$max": "$value"}, }}, {"$project": bson.M{"_id": 0}}, // 可选,如果不需要_id字段 } result, err := mogDataObj.GetEdbInfoMaxAndMinInfo(pipeline) if err != nil { fmt.Println("EdbDataBusiness getEdbDataBusinessList Err:" + err.Error()) return } if !result.MaxDate.IsZero() { whereQuery := bson.M{"edb_code": edbCode, "data_time": result.MaxDate} selectParam := bson.D{{"value", 1}, {"_id", 0}} latestValue, tmpErr := mogDataObj.GetLatestValue(whereQuery, selectParam) if tmpErr != nil { err = tmpErr return } result.LatestValue = latestValue.Value result.EndValue = latestValue.Value } item = &EdbInfoMaxAndMinInfo{ MinDate: result.MinDate.Format(utils.FormatDate), MaxDate: result.MaxDate.Format(utils.FormatDate), MinValue: result.MinValue, MaxValue: result.MaxValue, LatestValue: result.LatestValue, LatestDate: result.LatestDate.Format(utils.FormatDate), EndValue: result.EndValue, } return }