123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924 |
- package models
- import (
- "encoding/json"
- "eta/eta_index_lib/models/mgo"
- "eta/eta_index_lib/utils"
- "fmt"
- "github.com/beego/beego/v2/client/orm"
- "github.com/shopspring/decimal"
- "go.mongodb.org/mongo-driver/bson"
- "reflect"
- "sort"
- "time"
- )
- // EdbThsHf 自有数据
- type EdbThsHf struct{}
- // GetSource 获取来源编码id
- func (obj EdbThsHf) GetSource() int {
- return utils.DATA_SOURCE_THS
- }
- // GetSubSource 获取子来源编码id
- func (obj EdbThsHf) GetSubSource() int {
- return utils.DATA_SUB_SOURCE_HIGH_FREQUENCY
- }
- // GetSourceName 获取来源名称
- func (obj EdbThsHf) GetSourceName() string {
- return utils.DATA_SOURCE_NAME_THS
- }
- // GetEdbType 获取指标类型
- func (obj EdbThsHf) GetEdbType() int {
- return utils.DEFAULT_EDB_TYPE
- }
- // ThsHfAddBaseParams
- // @Description: 基础指标的添加参数
- type ThsHfAddBaseParams struct {
- EdbCode string `description:"指标编码"`
- EdbName string `description:"指标名称"`
- Unit string `description:"单位"`
- Frequency string `description:"频度"`
- Sort int `description:"排序"`
- ClassifyId int `description:"所属分类"`
- SysUserId int `description:"用户id"`
- SysUserRealName string `description:"用户真实名称"`
- UniqueCode string `description:"唯一编码"`
- ConvertRule string `description:"转换规则"`
- }
- // ThsHfEditBaseParams
- // @Description: 基础指标的修改参数
- type ThsHfEditBaseParams struct {
- EdbCode string `description:"指标编码"`
- EdbName string `description:"指标名称"`
- EdbNameEn string `description:"指标名称(英文)"`
- Unit string `description:"单位"`
- UnitEn string `description:"单位(英文)"`
- ClassifyId int `description:"所属分类"`
- SysUserId int `description:"用户id"`
- SysUserRealName string `description:"用户真实名称"`
- UniqueCode string `description:"编码"`
- Lang string `description:"语言版本"`
- EdbInfo *EdbInfo `description:"指标信息"`
- }
- type ThsHfRefreshBaseParams struct {
- EdbInfo *EdbInfo
- StartDate string
- EndDate string
- }
- // Add
- // @Description: 添加指标
- func (obj EdbThsHf) Add(params ThsHfAddBaseParams, baseIndex *BaseFromThsHfIndex) (edbInfo *EdbInfo, err error) {
- o := orm.NewOrm()
- tx, e := o.Begin()
- if e != nil {
- err = fmt.Errorf("orm begin err: %v", e)
- return
- }
- defer func() {
- if err != nil {
- _ = tx.Rollback()
- utils.FileLog.Info(fmt.Sprintf("%s err: %v", reflect.TypeOf(obj).Name(), err))
- return
- }
- _ = tx.Commit()
- }()
- // 新增指标
- edbInfo = new(EdbInfo)
- edbInfo.Source = obj.GetSource()
- edbInfo.SubSource = obj.GetSubSource()
- edbInfo.SourceName = obj.GetSourceName()
- edbInfo.EdbType = obj.GetEdbType()
- edbInfo.EdbCode = params.EdbCode
- edbInfo.EdbName = params.EdbName
- edbInfo.EdbNameEn = params.EdbName
- edbInfo.EdbNameSource = params.EdbName
- edbInfo.Frequency = params.Frequency
- edbInfo.Unit = params.Unit
- edbInfo.UnitEn = params.Unit
- edbInfo.StartDate = baseIndex.StartDate.Format(utils.FormatDate) // 默认取源指标的时间, 刷新完成后更新
- edbInfo.EndDate = baseIndex.EndDate.Format(utils.FormatDate)
- edbInfo.ClassifyId = params.ClassifyId
- edbInfo.SysUserId = params.SysUserId
- edbInfo.SysUserRealName = params.SysUserRealName
- edbInfo.Sort = params.Sort
- edbInfo.TerminalCode = baseIndex.TerminalCode
- edbInfo.UniqueCode = params.UniqueCode
- edbInfo.CreateTime = time.Now()
- edbInfo.ModifyTime = time.Now()
- edbInfoId, e := tx.Insert(edbInfo)
- if e != nil {
- err = fmt.Errorf("insert edb err: %v", e)
- return
- }
- edbInfo.EdbInfoId = int(edbInfoId)
- // 新增指标关联
- edbMapping := new(BaseFromEdbMapping)
- edbMapping.BaseFromIndexId = baseIndex.BaseFromThsHfIndexId
- edbMapping.BaseIndexCode = baseIndex.IndexCode
- edbMapping.EdbInfoId = edbInfo.EdbInfoId
- edbMapping.EdbCode = edbInfo.EdbCode
- edbMapping.Source = obj.GetSource()
- edbMapping.SubSource = obj.GetSubSource()
- edbMapping.ConvertRule = params.ConvertRule
- edbMapping.CreateTime = time.Now().Local()
- edbMapping.ModifyTime = time.Now().Local()
- edbMappingId, e := tx.Insert(edbMapping)
- if e != nil {
- err = fmt.Errorf("insert base edb mapping err: %v", e)
- return
- }
- edbMapping.Id = int(edbMappingId)
- // 刷新数据
- err = obj.Refresh(edbInfo, edbMapping, "")
- return
- }
- func (obj EdbThsHf) Refresh(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping, startDate string) (err error) {
- if utils.UseMongo {
- return obj.refreshByMongo(edbInfo, edbBaseMapping, startDate)
- }
- return obj.refreshByMysql(edbInfo, edbBaseMapping, startDate)
- }
- func (obj EdbThsHf) refreshByMysql(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping, startDate string) (err error) {
- if edbInfo == nil || edbBaseMapping == nil {
- err = fmt.Errorf("指标信息/关联信息有误, EdbInfo: %v, EdbBaseMapping: %v", edbInfo, edbBaseMapping)
- return
- }
- // 真实数据的最大日期, 插入规则配置的日期
- var realDataMaxDate, edbDataInsertConfigDate time.Time
- var edbDataInsertConfig *EdbDataInsertConfig
- var isFindConfigDateRealData bool
- {
- conf, e := GetEdbDataInsertConfigByEdbId(edbInfo.EdbInfoId)
- if e != nil && e.Error() != utils.ErrNoRow() {
- err = fmt.Errorf("GetEdbDataInsertConfigByEdbId err: %v", e)
- return
- }
- edbDataInsertConfig = conf
- if edbDataInsertConfig != nil {
- edbDataInsertConfigDate = edbDataInsertConfig.Date
- }
- }
- // 查询时间为开始时间-3d
- var queryDate string
- if startDate != "" {
- st, e := time.ParseInLocation(utils.FormatDate, startDate, time.Local)
- if e != nil {
- err = fmt.Errorf("刷新开始时间有误, %v", e)
- return
- }
- queryDate = st.AddDate(0, 0, -3).Format(utils.FormatDate)
- }
- // 源指标数据
- baseDataList := make([]*BaseFromThsHfData, 0)
- {
- ob := new(BaseFromThsHfData)
- cond := fmt.Sprintf(" AND %s = ?", ob.Cols().IndexCode)
- pars := make([]interface{}, 0)
- pars = append(pars, edbBaseMapping.BaseIndexCode)
- if queryDate != "" {
- cond += fmt.Sprintf(" AND %s >= ?", ob.Cols().DataTime)
- pars = append(pars, queryDate)
- }
- list, e := ob.GetItemsByCondition(cond, pars, []string{}, fmt.Sprintf("%s ASC", ob.Cols().DataTime))
- if e != nil {
- err = fmt.Errorf("获取数据源数据失败, %v", e)
- return
- }
- baseDataList = list
- }
- // 转换数据
- convertRule := new(ThsHfIndexConvert2EdbRule)
- if e := json.Unmarshal([]byte(edbBaseMapping.ConvertRule), &convertRule); e != nil {
- err = fmt.Errorf("转换规则有误, %v", e)
- return
- }
- convertOriginData := make([]*ThsHfConvertOriginData, 0)
- for _, v := range baseDataList {
- convertOriginData = append(convertOriginData, &ThsHfConvertOriginData{
- DataTime: v.DataTime,
- Value: v.Value,
- })
- }
- convertData, e := ThsHfConvertData2DayByRule(convertOriginData, convertRule)
- if e != nil {
- err = fmt.Errorf("转换数据失败, %v", e)
- return
- }
- if len(convertData) == 0 {
- utils.FileLog.Info("同花顺高频-转换无数据, EdbCode: %s", edbInfo.EdbCode)
- return
- }
- // 获取已有数据
- dataOb := new(EdbDataThsHf)
- dataExists := make(map[string]*EdbDataThsHf)
- searchExistMap := make(map[string]*EdbInfoSearchData)
- {
- cond := fmt.Sprintf(" AND %s = ?", dataOb.Cols().EdbInfoId)
- pars := make([]interface{}, 0)
- pars = append(pars, edbInfo.EdbInfoId)
- if queryDate != "" {
- cond += fmt.Sprintf(" AND %s >= ?", dataOb.Cols().DataTime)
- pars = append(pars, queryDate)
- }
- list, e := dataOb.GetItemsByCondition(cond, pars, []string{}, "")
- if e != nil {
- err = fmt.Errorf("获取指标数据失败, %v", e)
- return
- }
- for _, v := range list {
- dataExists[v.DataTime.Format(utils.FormatDate)] = v
- searchExistMap[v.DataTime.Format(utils.FormatDate)] = &EdbInfoSearchData{
- EdbDataId: v.EdbDataId,
- EdbInfoId: v.EdbInfoId,
- DataTime: v.DataTime.Format(utils.FormatDate),
- Value: v.Value,
- EdbCode: v.EdbCode,
- DataTimestamp: v.DataTimestamp,
- }
- }
- }
- // 比对数据
- insertExist := make(map[string]bool)
- insertData := make([]*EdbDataThsHf, 0)
- updateData := make([]*EdbDataThsHf, 0)
- for k, v := range convertData {
- strDate := k.Format(utils.FormatDate)
- // 手动插入数据的判断
- if realDataMaxDate.IsZero() || k.After(realDataMaxDate) {
- realDataMaxDate = k
- }
- if edbDataInsertConfigDate.IsZero() || k.Equal(edbDataInsertConfigDate) {
- isFindConfigDateRealData = true
- }
- // 入库值
- saveVal := decimal.NewFromFloat(v).Round(4).String()
- d, e := decimal.NewFromString(saveVal)
- if e != nil {
- utils.FileLog.Info(fmt.Sprintf("EdbDataThsHf NewFromString err: %v", e))
- continue
- }
- saveFloat, _ := d.Float64()
- // 更新
- exists := dataExists[strDate]
- if exists != nil {
- existVal := decimal.NewFromFloat(exists.Value).Round(4).String()
- if saveVal != existVal {
- exists.Value = saveFloat
- updateData = append(updateData, exists)
- }
- continue
- }
- // 新增
- if insertExist[strDate] {
- continue
- }
- insertExist[strDate] = true
- timestamp := k.UnixNano() / 1e6
- insertData = append(insertData, &EdbDataThsHf{
- EdbInfoId: edbInfo.EdbInfoId,
- EdbCode: edbInfo.EdbCode,
- DataTime: k,
- Value: saveFloat,
- CreateTime: time.Now(),
- ModifyTime: time.Now(),
- DataTimestamp: timestamp,
- })
- }
- // 批量新增/更新
- if len(insertData) > 0 {
- if e = dataOb.CreateMulti(insertData); e != nil {
- err = fmt.Errorf("批量新增指标数据失败, %v", e)
- return
- }
- }
- if len(updateData) > 0 {
- if e = dataOb.MultiUpdateValue(updateData); e != nil {
- err = fmt.Errorf("批量更新指标数据失败, %v", e)
- return
- }
- }
- // 处理手工数据补充的配置
- HandleConfigInsertEdbData(realDataMaxDate, edbDataInsertConfig, edbInfo.EdbInfoId, obj.GetSource(), obj.GetSubSource(), searchExistMap, isFindConfigDateRealData)
- return
- }
- func (obj EdbThsHf) refreshByMongo(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping, startDate string) (err error) {
- defer func() {
- if err != nil {
- utils.FileLog.Info(fmt.Sprintf("refreshByMongo, err: %v", err))
- }
- }()
- var realDataMaxDate, edbDataInsertConfigDate time.Time
- var edbDataInsertConfig *EdbDataInsertConfig
- var isFindConfigDateRealData bool //是否找到配置日期的实际数据的值
- {
- insertConfig, e := GetEdbDataInsertConfigByEdbId(edbInfo.EdbInfoId)
- if e != nil && e.Error() != utils.ErrNoRow() {
- err = fmt.Errorf("GetEdbDataInsertConfigByEdbId, err: %v", e)
- return
- }
- edbDataInsertConfig = insertConfig
- if edbDataInsertConfig != nil {
- edbDataInsertConfigDate = edbDataInsertConfig.Date
- }
- }
- // 查询时间为开始时间-3d
- var queryDate string
- if startDate != "" {
- st, e := time.ParseInLocation(utils.FormatDate, startDate, time.Local)
- if e != nil {
- err = fmt.Errorf("刷新开始时间有误, %v", e)
- return
- }
- queryDate = st.AddDate(0, 0, -3).Format(utils.FormatDate)
- }
- // 获取源指标数据
- baseDataList, e := obj.getBaseIndexDataByMongo(edbBaseMapping.BaseIndexCode, queryDate)
- if e != nil {
- err = fmt.Errorf("getBaseIndexDataByMongo, err: %v", e)
- return
- }
- // 转换数据
- convertRule := new(ThsHfIndexConvert2EdbRule)
- if e := json.Unmarshal([]byte(edbBaseMapping.ConvertRule), &convertRule); e != nil {
- err = fmt.Errorf("转换规则有误, %v", e)
- return
- }
- convertOriginData := make([]*ThsHfConvertOriginData, 0)
- for _, v := range baseDataList {
- convertOriginData = append(convertOriginData, &ThsHfConvertOriginData{
- DataTime: v.DataTime,
- Value: v.Value,
- })
- }
- convertData, e := ThsHfConvertData2DayByRule(convertOriginData, convertRule)
- if e != nil {
- err = fmt.Errorf("转换数据失败, %v", e)
- return
- }
- if len(convertData) == 0 {
- utils.FileLog.Info("同花顺高频-转换无数据, EdbCode: %s", edbInfo.EdbCode)
- return
- }
- //获取指标所有数据
- existDataList := make([]*mgo.EdbDataThsHf, 0)
- mogDataObj := new(mgo.EdbDataThsHf)
- {
- // 构建查询条件
- queryConditions := bson.M{
- "edb_code": edbInfo.EdbCode,
- }
- if queryDate != `` {
- //获取已存在的所有数据
- startDateTime, e := time.ParseInLocation(utils.FormatDate, queryDate, time.Local)
- if e != nil {
- err = fmt.Errorf("startDateTime parse err: %v", e)
- return
- }
- queryConditions["data_time"] = bson.M{"$gte": startDateTime}
- }
- existDataList, e = mogDataObj.GetAllDataList(queryConditions, []string{"data_time"})
- if e != nil {
- err = fmt.Errorf("GetAllDataList, err: %v", e)
- return
- }
- }
- existDataMap := make(map[string]*mgo.EdbDataThsHf)
- //removeDataTimeMap := make(map[string]bool) //需要移除的日期数据
- for _, v := range existDataList {
- tmpDate := v.DataTime.Format(utils.FormatDate)
- existDataMap[tmpDate] = v
- //removeDataTimeMap[tmpDate] = true
- }
- // 待添加的数据集
- addDataList := make([]interface{}, 0)
- updateDataList := make([]mgo.EdbDataThsHf, 0)
- insertExist := make(map[string]bool)
- for k, v := range convertData {
- strDate := k.Format(utils.FormatDate)
- // 手动插入数据的判断
- if realDataMaxDate.IsZero() || k.After(realDataMaxDate) {
- realDataMaxDate = k
- }
- if edbDataInsertConfigDate.IsZero() || k.Equal(edbDataInsertConfigDate) {
- isFindConfigDateRealData = true
- }
- // 入库值
- saveVal := decimal.NewFromFloat(v).Round(4).String()
- d, e := decimal.NewFromString(saveVal)
- if e != nil {
- utils.FileLog.Info(fmt.Sprintf("EdbDataThsHf NewFromString err: %v", e))
- continue
- }
- saveFloat, _ := d.Float64()
- // 更新
- exists := existDataMap[strDate]
- if exists != nil {
- existVal := decimal.NewFromFloat(exists.Value).Round(4).String()
- if saveVal != existVal {
- exists.Value = saveFloat
- updateDataList = append(updateDataList, *exists)
- }
- continue
- }
- // 新增
- if insertExist[strDate] {
- continue
- }
- insertExist[strDate] = true
- timestamp := k.UnixNano() / 1e6
- addDataList = append(addDataList, &mgo.EdbDataThsHf{
- EdbInfoId: edbInfo.EdbInfoId,
- EdbCode: edbInfo.EdbCode,
- DataTime: k,
- Value: saveFloat,
- CreateTime: time.Now(),
- ModifyTime: time.Now(),
- DataTimestamp: timestamp,
- })
- }
- // 入库
- {
- coll := mogDataObj.GetCollection()
- //删除已经不存在的指标数据(由于该指标当日的数据删除了)
- //{
- // removeDateList := make([]time.Time, 0)
- // for dateTime := range removeDataTimeMap {
- // //获取已存在的所有数据
- // tmpDateTime, e := time.ParseInLocation(utils.FormatDate, dateTime, time.Local)
- // if e != nil {
- // err = fmt.Errorf("tmpDateTime parse err: %v", e)
- // return
- // }
- // removeDateList = append(removeDateList, tmpDateTime)
- // }
- // removeNum := len(removeDateList)
- // if removeNum > 0 {
- // if e = mogDataObj.RemoveManyByColl(coll, bson.M{"edb_code": edbInfo.EdbCode, "data_time": bson.M{"$in": removeDateList}}); e != nil {
- // err = fmt.Errorf("RemoveManyByColl, err: %v", e)
- // return
- // }
- // }
- //}
- // 插入新数据
- if len(addDataList) > 0 {
- if e = mogDataObj.BatchInsertDataByColl(coll, 500, addDataList); e != nil {
- err = fmt.Errorf("BatchInsertDataByColl, err: %v", e)
- return
- }
- }
- // 修改历史数据
- if len(updateDataList) > 0 {
- for _, v := range updateDataList {
- if e = mogDataObj.UpdateDataByColl(coll, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}}); e != nil {
- err = fmt.Errorf("UpdateDataByColl, err: %v", e)
- return
- }
- }
- }
- }
- // 处理手工数据补充的配置
- obj.HandleConfigInsertEdbDataByMongo(realDataMaxDate, edbDataInsertConfig, edbInfo.EdbInfoId, edbInfo.Source, edbInfo.SubSource, existDataMap, isFindConfigDateRealData)
- return
- }
- type ThsHfConvertOriginData struct {
- DataTime time.Time `description:"数据日期(至时分秒)"`
- Value float64 `description:"数据值"`
- }
- // ThsHfConvertData2DayByRule 原指标数据转换为日度数据
- func ThsHfConvertData2DayByRule(originData []*ThsHfConvertOriginData, convertRule *ThsHfIndexConvert2EdbRule) (timeData map[time.Time]float64, err error) {
- // PS: originData为期望开始日期前三日(有两天非交易日, 那么周一的前日应当算上周五的)至结束日期的数据
- timeData = make(map[time.Time]float64)
- if len(originData) == 0 || convertRule == nil {
- return
- }
- if !utils.InArrayByInt([]int{1, 2}, convertRule.ConvertType) {
- err = fmt.Errorf("取值类型有误, ConvertType: %d", convertRule.ConvertType)
- return
- }
- // 升序排序
- sort.Slice(originData, func(i, j int) bool {
- return originData[i].DataTime.Before(originData[j].DataTime)
- })
- // 将数据根据日期进行分组
- var sortDates []string
- groupDateData := make(map[string][]*ThsHfConvertOriginData)
- for _, v := range originData {
- d := v.DataTime.Format(utils.FormatDate)
- if !utils.InArrayByStr(sortDates, d) {
- sortDates = append(sortDates, d)
- }
- if groupDateData[d] == nil {
- groupDateData[d] = make([]*ThsHfConvertOriginData, 0)
- }
- groupDateData[d] = append(groupDateData[d], v)
- }
- // 取值方式-指定时间的值
- if convertRule.ConvertType == 1 {
- for k, v := range sortDates {
- todayTime, e := time.ParseInLocation(utils.FormatDate, v, time.Local)
- if e != nil {
- utils.FileLog.Info("当日日期转换有误, date: %s, err: %v", v, e)
- continue
- }
- var timeTarget time.Time
- dateData := make([]*ThsHfConvertOriginData, 0)
- // 当日
- if convertRule.ConvertFixed.FixedDay == 1 {
- tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", v, convertRule.ConvertFixed.FixedTime), time.Local)
- if e != nil {
- utils.FileLog.Info(fmt.Sprintf("当日timeTarget转换有误, %v", e))
- continue
- }
- timeTarget = tg
- dt := groupDateData[v]
- if dt == nil {
- utils.FileLog.Info(fmt.Sprintf("%s当日无数据", v))
- continue
- }
- if len(dt) == 0 {
- continue
- }
- dateData = dt
- }
- // 前一日
- if convertRule.ConvertFixed.FixedDay == 2 {
- if k < 1 {
- utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v))
- continue
- }
- preDate := sortDates[k-1]
- tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", preDate, convertRule.ConvertFixed.FixedTime), time.Local)
- if e != nil {
- utils.FileLog.Info(fmt.Sprintf("前日timeTarget转换有误, %v", e))
- continue
- }
- timeTarget = tg
- dt := groupDateData[preDate]
- if dt == nil {
- utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v))
- continue
- }
- if len(dt) == 0 {
- continue
- }
- dateData = dt
- }
- if len(dateData) == 0 {
- utils.FileLog.Info("日期%s无数据序列", v)
- continue
- }
- // 重新获取数据序列中, 时间在目标时间点之后的
- newDateData := make([]*ThsHfConvertOriginData, 0)
- for kv, dv := range dateData {
- if dv.DataTime.Before(timeTarget) {
- continue
- }
- // 由于升序排列, 直接取之后所有的数据
- newDateData = append(newDateData, dateData[kv:]...)
- break
- }
- // 取重组后当日数据中的第一个(有可能目标时间点无值, 那么取之后时间最近的值)
- if len(newDateData) == 0 {
- utils.FileLog.Info("日期%s无有效数据", v)
- continue
- }
- timeData[todayTime] = newDateData[0].Value
- }
- return
- }
- // 取值方式-区间计算值
- for k, v := range sortDates {
- todayTime, e := time.ParseInLocation(utils.FormatDate, v, time.Local)
- if e != nil {
- utils.FileLog.Info("当日日期转换有误, date: %s, err: %v", v, e)
- continue
- }
- var thisDate, preDate string
- thisDate = v
- if k > 1 {
- preDate = sortDates[k-1]
- }
- var startTimeTarget, endTimeTarget time.Time
- // 起始时间-当日/前一日
- if convertRule.ConvertArea.StartDay == 1 {
- tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", thisDate, convertRule.ConvertArea.StartTime), time.Local)
- if e != nil {
- utils.FileLog.Info(fmt.Sprintf("当日startTimeTarget转换有误, %v", e))
- continue
- }
- startTimeTarget = tg
- }
- if convertRule.ConvertArea.StartDay == 2 {
- if preDate == "" {
- utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v))
- continue
- }
- tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", preDate, convertRule.ConvertArea.StartTime), time.Local)
- if e != nil {
- utils.FileLog.Info(fmt.Sprintf("前日startTimeTarget转换有误, %v", e))
- continue
- }
- startTimeTarget = tg
- }
- // 截止时间-当日/前一日
- if convertRule.ConvertArea.EndDay == 1 {
- tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", thisDate, convertRule.ConvertArea.EndTime), time.Local)
- if e != nil {
- utils.FileLog.Info(fmt.Sprintf("当日endTimeTarget转换有误, %v", e))
- continue
- }
- endTimeTarget = tg
- }
- if convertRule.ConvertArea.EndDay == 2 {
- if preDate == "" {
- utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v))
- continue
- }
- tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", preDate, convertRule.ConvertArea.EndTime), time.Local)
- if e != nil {
- utils.FileLog.Info(fmt.Sprintf("前日endTimeTarget转换有误, %v", e))
- continue
- }
- endTimeTarget = tg
- }
- if startTimeTarget.IsZero() || endTimeTarget.IsZero() {
- utils.FileLog.Info(fmt.Sprintf("起始截止时间有误, start: %v, end: %v", startTimeTarget, endTimeTarget))
- continue
- }
- // 合并前日当日数据
- dateData := make([]*ThsHfConvertOriginData, 0)
- if convertRule.ConvertArea.StartDay == 1 && convertRule.ConvertArea.EndDay == 1 {
- // 起始截止均为当日
- dateData = groupDateData[thisDate]
- if dateData == nil {
- utils.FileLog.Info(fmt.Sprintf("%s当日无数据", thisDate))
- continue
- }
- if len(dateData) == 0 {
- utils.FileLog.Info(fmt.Sprintf("%s当日无数据", thisDate))
- continue
- }
- } else {
- if preDate == "" {
- continue
- }
- // 起始截止时间含前日
- preData := groupDateData[preDate]
- if preData == nil {
- utils.FileLog.Info(fmt.Sprintf("%s前日无数据", thisDate))
- continue
- }
- if len(preData) == 0 {
- utils.FileLog.Info(fmt.Sprintf("%s前日无数据", thisDate))
- continue
- }
- thisData := groupDateData[thisDate]
- if thisData == nil {
- utils.FileLog.Info(fmt.Sprintf("%s当日无数据", thisDate))
- continue
- }
- if len(thisData) == 0 {
- utils.FileLog.Info(fmt.Sprintf("%s当日无数据", thisDate))
- continue
- }
- dateData = append(dateData, preData...)
- dateData = append(dateData, thisData...)
- }
- if len(dateData) == 0 {
- utils.FileLog.Info("日期%s无数据序列", v)
- continue
- }
- // 重组时间区间内的数据
- newDateData := make([]*ThsHfConvertOriginData, 0)
- for _, dv := range dateData {
- if dv.DataTime.Before(startTimeTarget) || dv.DataTime.After(endTimeTarget) {
- continue
- }
- newDateData = append(newDateData, dv)
- }
- if len(newDateData) == 0 {
- utils.FileLog.Info(fmt.Sprintf("时间区间内无数据, start: %v, end: %v", startTimeTarget, endTimeTarget))
- continue
- }
- // 取出区间内的均值/最值
- var avgVal, minVal, maxVal, sumVal float64
- minVal, maxVal = newDateData[0].Value, newDateData[0].Value
- for _, nv := range newDateData {
- sumVal += nv.Value
- if nv.Value > maxVal {
- maxVal = nv.Value
- }
- if nv.Value < minVal {
- minVal = nv.Value
- }
- }
- avgVal = sumVal / float64(len(newDateData))
- switch convertRule.ConvertArea.CalculateType {
- case 1:
- timeData[todayTime] = avgVal
- case 2:
- timeData[todayTime] = maxVal
- case 3:
- timeData[todayTime] = minVal
- default:
- utils.FileLog.Info(fmt.Sprintf("计算方式有误, CalculateType: %d", convertRule.ConvertArea.CalculateType))
- }
- }
- return
- }
- func (obj EdbThsHf) getBaseIndexDataByMongo(baseIndexCode, startDate string) (newDataList []EdbInfoMgoData, err error) {
- newDataList = make([]EdbInfoMgoData, 0)
- // 获取数据源的指标数据
- mogDataObj := new(mgo.BaseFromThsHfData)
- // 构建查询条件
- queryConditions := bson.M{
- "index_code": baseIndexCode,
- }
- if startDate != `` {
- //获取已存在的所有数据
- startDateTime, tmpErr := time.ParseInLocation(utils.FormatDate, startDate, time.Local)
- if tmpErr != nil {
- err = tmpErr
- return
- }
- queryConditions["data_time"] = bson.M{"$gte": startDateTime}
- }
- baseDataList, err := mogDataObj.GetAllDataList(queryConditions, []string{"data_time"})
- if err != nil {
- fmt.Println("GetAllDataList Err:" + err.Error())
- return
- }
- for _, v := range baseDataList {
- newDataList = append(newDataList, EdbInfoMgoData{
- //EdbDataId: v.ID,
- DataTime: v.DataTime,
- Value: v.Value,
- EdbCode: v.IndexCode,
- })
- }
- return
- }
- func (obj EdbThsHf) HandleConfigInsertEdbDataByMongo(realDataMaxDate time.Time, edbDataInsertConfig *EdbDataInsertConfig, edbInfoId, source, subSource int, existMap map[string]*mgo.EdbDataThsHf, isFindConfigDateRealData bool) {
- if edbDataInsertConfig == nil {
- return
- }
- var err error
- defer func() {
- if err != nil {
- utils.FileLog.Info(fmt.Sprintf("ThsHf-HandleConfigInsertEdbDataByMongo, err: %v", err))
- }
- }()
- edbDataInsertConfigDate := edbDataInsertConfig.Date // 配置的日期
- // 如果存在真实数据的最大日期 && 存在配置插入数据的最大日期 && 真实数据的最大日期 晚于/等于 配置插入数据的最大日期
- if realDataMaxDate.After(edbDataInsertConfigDate) || realDataMaxDate.Equal(edbDataInsertConfigDate) {
- go DeleteEdbDataInsertConfigByEdbId(edbInfoId)
- mogDataObj := mgo.EdbDataThsHf{}
- coll := mogDataObj.GetCollection()
- edbDataInsertConfigDateStr := edbDataInsertConfigDate.Format(utils.FormatDate)
- // 如果没有找到找到配置日期的实际数据,那么就直接删除
- if item, ok := existMap[edbDataInsertConfigDateStr]; ok && !isFindConfigDateRealData {
- mogDataObj.RemoveManyByColl(coll, bson.M{"_id": item.ID})
- }
- } else {
- o := orm.NewOrm()
- edbDataInsertConfig.RealDate = realDataMaxDate
- _, err = o.Update(edbDataInsertConfig, "RealDate")
- }
- return
- }
- func (obj EdbThsHf) UnifiedModifyEdbInfoMaxAndMinInfo(edbInfo *EdbInfo) (err error) {
- if utils.UseMongo {
- edbInfoMaxAndMinInfo, tmpErr := obj.getEdbInfoMaxAndMinInfoByMongo(edbInfo.EdbCode)
- // 如果正常获取到了,那就去修改指标的最大最小值
- if tmpErr == nil && edbInfoMaxAndMinInfo != nil {
- err = ModifyEdbInfoMaxAndMinInfo(edbInfo.EdbInfoId, edbInfoMaxAndMinInfo)
- } else {
- // 清空的目的是为了避免异常返回
- err = nil
- }
- } else {
- err, _ = UnifiedModifyEdbInfoMaxAndMinInfo(edbInfo)
- }
- return
- }
- func (obj EdbThsHf) getEdbInfoMaxAndMinInfoByMongo(edbCode string) (item *EdbInfoMaxAndMinInfo, err error) {
- mogDataObj := new(mgo.EdbDataThsHf)
- pipeline := []bson.M{
- {"$match": bson.M{"edb_code": edbCode}},
- {"$group": bson.M{
- "_id": nil,
- "min_date": bson.M{"$min": "$data_time"},
- "max_date": bson.M{"$max": "$data_time"},
- "min_value": bson.M{"$min": "$value"},
- "max_value": bson.M{"$max": "$value"},
- }},
- {"$project": bson.M{"_id": 0}}, // 可选,如果不需要_id字段
- }
- result, err := mogDataObj.GetEdbInfoMaxAndMinInfo(pipeline)
- if err != nil {
- fmt.Println("EdbDataBusiness getEdbDataBusinessList Err:" + err.Error())
- return
- }
- if !result.MaxDate.IsZero() {
- whereQuery := bson.M{"edb_code": edbCode, "data_time": result.MaxDate}
- selectParam := bson.D{{"value", 1}, {"_id", 0}}
- latestValue, tmpErr := mogDataObj.GetLatestValue(whereQuery, selectParam)
- if tmpErr != nil {
- err = tmpErr
- return
- }
- result.LatestValue = latestValue.Value
- result.EndValue = latestValue.Value
- }
- item = &EdbInfoMaxAndMinInfo{
- MinDate: result.MinDate.Format(utils.FormatDate),
- MaxDate: result.MaxDate.Format(utils.FormatDate),
- MinValue: result.MinValue,
- MaxValue: result.MaxValue,
- LatestValue: result.LatestValue,
- LatestDate: result.LatestDate.Format(utils.FormatDate),
- EndValue: result.EndValue,
- }
- return
- }
|