123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581 |
- package models
- import (
- "encoding/json"
- "eta/eta_index_lib/utils"
- "fmt"
- "github.com/beego/beego/v2/client/orm"
- "github.com/shopspring/decimal"
- "reflect"
- "sort"
- "time"
- )
- // EdbThsHf 自有数据
- type EdbThsHf struct{}
- // GetSource 获取来源编码id
- func (obj EdbThsHf) GetSource() int {
- return utils.DATA_SOURCE_THS
- }
- // GetSubSource 获取子来源编码id
- func (obj EdbThsHf) GetSubSource() int {
- return utils.DATA_SUB_SOURCE_HIGH_FREQUENCY
- }
- // GetSourceName 获取来源名称
- func (obj EdbThsHf) GetSourceName() string {
- return utils.DATA_SOURCE_NAME_THS
- }
- // GetEdbType 获取指标类型
- func (obj EdbThsHf) GetEdbType() int {
- return utils.DEFAULT_EDB_TYPE
- }
- // ThsHfAddBaseParams
- // @Description: 基础指标的添加参数
- type ThsHfAddBaseParams struct {
- EdbCode string `description:"指标编码"`
- EdbName string `description:"指标名称"`
- Unit string `description:"单位"`
- Frequency string `description:"频度"`
- Sort int `description:"排序"`
- ClassifyId int `description:"所属分类"`
- SysUserId int `description:"用户id"`
- SysUserRealName string `description:"用户真实名称"`
- UniqueCode string `description:"唯一编码"`
- ConvertRule string `description:"转换规则"`
- }
- // ThsHfEditBaseParams
- // @Description: 基础指标的修改参数
- type ThsHfEditBaseParams struct {
- EdbCode string `description:"指标编码"`
- EdbName string `description:"指标名称"`
- EdbNameEn string `description:"指标名称(英文)"`
- Unit string `description:"单位"`
- UnitEn string `description:"单位(英文)"`
- ClassifyId int `description:"所属分类"`
- SysUserId int `description:"用户id"`
- SysUserRealName string `description:"用户真实名称"`
- UniqueCode string `description:"编码"`
- Lang string `description:"语言版本"`
- EdbInfo *EdbInfo `description:"指标信息"`
- }
- type ThsHfRefreshBaseParams struct {
- EdbInfo *EdbInfo
- StartDate string
- EndDate string
- }
- // Add
- // @Description: 添加指标
- // @author: Roc
- // @receiver obj
- // @datetime 2024-04-30 17:35:14
- // @param params ThsHfAddBaseParams
- // @param businessIndexItem *BaseFromBusinessIndex
- // @return edbInfo *EdbInfo
- // @return err error
- // @return errMsg string
- func (obj EdbThsHf) Add(params ThsHfAddBaseParams, baseIndex *BaseFromThsHfIndex) (edbInfo *EdbInfo, err error) {
- o := orm.NewOrm()
- tx, e := o.Begin()
- if e != nil {
- err = fmt.Errorf("orm begin err: %v", e)
- return
- }
- defer func() {
- if err != nil {
- _ = tx.Rollback()
- utils.FileLog.Info(fmt.Sprintf("%s err: %v", reflect.TypeOf(obj).Name(), err))
- return
- }
- _ = tx.Commit()
- }()
- // 新增指标
- edbInfo = new(EdbInfo)
- edbInfo.Source = obj.GetSource()
- edbInfo.SubSource = obj.GetSubSource()
- edbInfo.SourceName = obj.GetSourceName()
- edbInfo.EdbType = obj.GetEdbType()
- edbInfo.EdbCode = params.EdbCode
- edbInfo.EdbName = params.EdbName
- edbInfo.EdbNameEn = params.EdbName
- edbInfo.EdbNameSource = params.EdbName
- edbInfo.Frequency = params.Frequency
- edbInfo.Unit = params.Unit
- edbInfo.UnitEn = params.Unit
- edbInfo.StartDate = baseIndex.StartDate.Format(utils.FormatDate) // 默认取源指标的时间, 刷新完成后更新
- edbInfo.EndDate = baseIndex.EndDate.Format(utils.FormatDate)
- edbInfo.ClassifyId = params.ClassifyId
- edbInfo.SysUserId = params.SysUserId
- edbInfo.SysUserRealName = params.SysUserRealName
- edbInfo.Sort = params.Sort
- edbInfo.TerminalCode = baseIndex.TerminalCode
- edbInfo.UniqueCode = params.UniqueCode
- edbInfo.CreateTime = time.Now()
- edbInfo.ModifyTime = time.Now()
- edbInfoId, e := tx.Insert(edbInfo)
- if e != nil {
- err = fmt.Errorf("insert edb err: %v", e)
- return
- }
- edbInfo.EdbInfoId = int(edbInfoId)
- // 新增指标关联
- edbMapping := new(BaseFromEdbMapping)
- edbMapping.BaseFromIndexId = baseIndex.BaseFromThsHfIndexId
- edbMapping.BaseIndexCode = baseIndex.IndexCode
- edbMapping.EdbInfoId = edbInfo.EdbInfoId
- edbMapping.EdbCode = edbInfo.EdbCode
- edbMapping.Source = obj.GetSource()
- edbMapping.SubSource = obj.GetSubSource()
- edbMapping.ConvertRule = params.ConvertRule
- edbMapping.CreateTime = time.Now().Local()
- edbMapping.ModifyTime = time.Now().Local()
- edbMappingId, e := tx.Insert(edbMapping)
- if e != nil {
- err = fmt.Errorf("insert base edb mapping err: %v", e)
- return
- }
- edbMapping.Id = int(edbMappingId)
- // 刷新数据
- err = obj.Refresh(edbInfo, edbMapping, "")
- return
- }
- func (obj EdbThsHf) Refresh(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping, startDate string) (err error) {
- if edbInfo == nil || edbBaseMapping == nil {
- err = fmt.Errorf("指标信息/关联信息有误, EdbInfo: %v, EdbBaseMapping: %v", edbInfo, edbBaseMapping)
- return
- }
- // 真实数据的最大日期, 插入规则配置的日期
- var realDataMaxDate, edbDataInsertConfigDate time.Time
- var edbDataInsertConfig *EdbDataInsertConfig
- var isFindConfigDateRealData bool
- {
- conf, e := GetEdbDataInsertConfigByEdbId(edbInfo.EdbInfoId)
- if e != nil && e.Error() != utils.ErrNoRow() {
- err = fmt.Errorf("GetEdbDataInsertConfigByEdbId err: %v", e)
- return
- }
- edbDataInsertConfig = conf
- if edbDataInsertConfig != nil {
- edbDataInsertConfigDate = edbDataInsertConfig.Date
- }
- }
- // 查询时间为开始时间-3d
- var queryDate string
- if startDate != "" {
- st, e := time.ParseInLocation(utils.FormatDate, startDate, time.Local)
- if e != nil {
- err = fmt.Errorf("刷新开始时间有误, %v", e)
- return
- }
- queryDate = st.AddDate(0, 0, -3).Format(utils.FormatDate)
- }
- // 源指标数据
- baseDataList := make([]*BaseFromThsHfData, 0)
- {
- ob := new(BaseFromThsHfData)
- cond := fmt.Sprintf(" AND %s = ?", ob.Cols().IndexCode)
- pars := make([]interface{}, 0)
- pars = append(pars, edbBaseMapping.BaseIndexCode)
- if queryDate != "" {
- cond += fmt.Sprintf(" AND %s >= ?", ob.Cols().DataTime)
- pars = append(pars, queryDate)
- }
- list, e := ob.GetItemsByCondition(cond, pars, []string{}, fmt.Sprintf("%s ASC", ob.Cols().DataTime))
- if e != nil {
- err = fmt.Errorf("获取数据源数据失败, %v", e)
- return
- }
- baseDataList = list
- }
- // 转换数据
- convertRule := new(ThsHfIndexConvert2EdbRule)
- if e := json.Unmarshal([]byte(edbBaseMapping.ConvertRule), &convertRule); e != nil {
- err = fmt.Errorf("转换规则有误, %v", e)
- return
- }
- convertData, e := ThsHfConvertData2DayByRule(baseDataList, convertRule)
- if e != nil {
- err = fmt.Errorf("转换数据失败, %v", e)
- return
- }
- if len(convertData) == 0 {
- utils.FileLog.Info("同花顺高频-转换无数据, EdbCode: %s", edbInfo.EdbCode)
- return
- }
- // 获取已有数据
- dataOb := new(EdbDataThsHf)
- dataExists := make(map[string]*EdbDataThsHf)
- searchExistMap := make(map[string]*EdbInfoSearchData)
- {
- cond := fmt.Sprintf(" AND %s = ?", dataOb.Cols().EdbInfoId)
- pars := make([]interface{}, 0)
- pars = append(pars, edbInfo.EdbInfoId)
- if queryDate != "" {
- cond += fmt.Sprintf(" AND %s >= ?", dataOb.Cols().DataTime)
- pars = append(pars, queryDate)
- }
- list, e := dataOb.GetItemsByCondition(cond, pars, []string{}, "")
- if e != nil {
- err = fmt.Errorf("获取指标数据失败, %v", e)
- return
- }
- for _, v := range list {
- dataExists[v.DataTime.Format(utils.FormatDate)] = v
- searchExistMap[v.DataTime.Format(utils.FormatDate)] = &EdbInfoSearchData{
- EdbDataId: v.EdbDataId,
- EdbInfoId: v.EdbInfoId,
- DataTime: v.DataTime.Format(utils.FormatDate),
- Value: v.Value,
- EdbCode: v.EdbCode,
- DataTimestamp: v.DataTimestamp,
- }
- }
- }
- // 比对数据
- insertExist := make(map[string]bool)
- insertData := make([]*EdbDataThsHf, 0)
- updateData := make([]*EdbDataThsHf, 0)
- for k, v := range convertData {
- strDate := k.Format(utils.FormatDate)
- // 手动插入数据的判断
- if realDataMaxDate.IsZero() || k.After(realDataMaxDate) {
- realDataMaxDate = k
- }
- if edbDataInsertConfigDate.IsZero() || k.Equal(edbDataInsertConfigDate) {
- isFindConfigDateRealData = true
- }
- // 入库值
- saveVal := decimal.NewFromFloat(v).Round(4).String()
- d, e := decimal.NewFromString(saveVal)
- if e != nil {
- utils.FileLog.Info(fmt.Sprintf("EdbDataThsHf NewFromString err: %v", e))
- continue
- }
- saveFloat, _ := d.Float64()
- // 更新
- exists := dataExists[strDate]
- if exists != nil {
- existVal := decimal.NewFromFloat(exists.Value).Round(4).String()
- if saveVal != existVal {
- exists.Value = saveFloat
- updateData = append(updateData, exists)
- }
- continue
- }
- // 新增
- if insertExist[strDate] {
- continue
- }
- insertExist[strDate] = true
- timestamp := k.UnixNano() / 1e6
- insertData = append(insertData, &EdbDataThsHf{
- EdbInfoId: edbInfo.EdbInfoId,
- EdbCode: edbInfo.EdbCode,
- DataTime: k,
- Value: saveFloat,
- CreateTime: time.Now(),
- ModifyTime: time.Now(),
- DataTimestamp: timestamp,
- })
- }
- // 批量新增/更新
- if len(insertData) > 0 {
- if e = dataOb.CreateMulti(insertData); e != nil {
- err = fmt.Errorf("批量新增指标数据失败, %v", e)
- return
- }
- }
- if len(updateData) > 0 {
- if e = dataOb.MultiUpdateValue(updateData); e != nil {
- err = fmt.Errorf("批量更新指标数据失败, %v", e)
- return
- }
- }
- // 处理手工数据补充的配置
- HandleConfigInsertEdbData(realDataMaxDate, edbDataInsertConfig, edbInfo.EdbInfoId, obj.GetSource(), obj.GetSubSource(), searchExistMap, isFindConfigDateRealData)
- return
- }
- // ThsHfConvertData2DayByRule 原指标数据转换为日度数据
- func ThsHfConvertData2DayByRule(originData []*BaseFromThsHfData, convertRule *ThsHfIndexConvert2EdbRule) (timeData map[time.Time]float64, err error) {
- // PS: originData为期望开始日期前三日(有两天非交易日, 那么周一的前日应当算上周五的)至结束日期的数据
- timeData = make(map[time.Time]float64)
- if len(originData) == 0 || convertRule == nil {
- return
- }
- if !utils.InArrayByInt([]int{1, 2}, convertRule.ConvertType) {
- err = fmt.Errorf("取值类型有误, ConvertType: %d", convertRule.ConvertType)
- return
- }
- // 升序排序
- sort.Slice(originData, func(i, j int) bool {
- return originData[i].DataTimestamp < originData[j].DataTimestamp
- })
- // 将数据根据日期进行分组
- var sortDates []string
- groupDateData := make(map[string][]*BaseFromThsHfData)
- for _, v := range originData {
- d := v.DataTime.Format(utils.FormatDate)
- if !utils.InArrayByStr(sortDates, d) {
- sortDates = append(sortDates, d)
- }
- if groupDateData[d] == nil {
- groupDateData[d] = make([]*BaseFromThsHfData, 0)
- }
- groupDateData[d] = append(groupDateData[d], v)
- }
- // 取值方式-指定时间的值
- if convertRule.ConvertType == 1 {
- for k, v := range sortDates {
- todayTime, e := time.ParseInLocation(utils.FormatDate, v, time.Local)
- if e != nil {
- utils.FileLog.Info("当日日期转换有误, date: %s, err: %v", v, e)
- continue
- }
- var timeTarget time.Time
- dateData := make([]*BaseFromThsHfData, 0)
- // 当日
- if convertRule.ConvertFixed.FixedDay == 1 {
- tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", v, convertRule.ConvertFixed.FixedTime), time.Local)
- if e != nil {
- utils.FileLog.Info(fmt.Sprintf("当日timeTarget转换有误, %v", e))
- continue
- }
- timeTarget = tg
- dt := groupDateData[v]
- if dt == nil {
- utils.FileLog.Info(fmt.Sprintf("%s当日无数据", v))
- continue
- }
- if len(dt) == 0 {
- continue
- }
- dateData = dt
- }
- // 前一日
- if convertRule.ConvertFixed.FixedDay == 2 {
- if k < 1 {
- utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v))
- continue
- }
- preDate := sortDates[k-1]
- tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", preDate, convertRule.ConvertFixed.FixedTime), time.Local)
- if e != nil {
- utils.FileLog.Info(fmt.Sprintf("前日timeTarget转换有误, %v", e))
- continue
- }
- timeTarget = tg
- dt := groupDateData[preDate]
- if dt == nil {
- utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v))
- continue
- }
- if len(dt) == 0 {
- continue
- }
- dateData = dt
- }
- if len(dateData) == 0 {
- utils.FileLog.Info("日期%s无数据序列", v)
- continue
- }
- // 重新获取数据序列中, 时间在目标时间点之后的
- newDateData := make([]*BaseFromThsHfData, 0)
- for kv, dv := range dateData {
- if dv.DataTime.Before(timeTarget) {
- continue
- }
- // 由于升序排列, 直接取之后所有的数据
- newDateData = append(newDateData, dateData[kv:]...)
- break
- }
- // 取重组后当日数据中的第一个(有可能目标时间点无值, 那么取之后时间最近的值)
- if len(newDateData) == 0 {
- utils.FileLog.Info("日期%s无有效数据", v)
- continue
- }
- timeData[todayTime] = newDateData[0].Value
- }
- return
- }
- // 取值方式-区间计算值
- for k, v := range sortDates {
- todayTime, e := time.ParseInLocation(utils.FormatDate, v, time.Local)
- if e != nil {
- utils.FileLog.Info("当日日期转换有误, date: %s, err: %v", v, e)
- continue
- }
- var thisDate, preDate string
- thisDate = v
- if k > 1 {
- preDate = sortDates[k-1]
- }
- var startTimeTarget, endTimeTarget time.Time
- // 起始时间-当日/前一日
- if convertRule.ConvertArea.StartDay == 1 {
- tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", thisDate, convertRule.ConvertArea.StartTime), time.Local)
- if e != nil {
- utils.FileLog.Info(fmt.Sprintf("当日startTimeTarget转换有误, %v", e))
- continue
- }
- startTimeTarget = tg
- }
- if convertRule.ConvertArea.StartDay == 2 {
- if preDate == "" {
- utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v))
- continue
- }
- tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", preDate, convertRule.ConvertArea.StartTime), time.Local)
- if e != nil {
- utils.FileLog.Info(fmt.Sprintf("前日startTimeTarget转换有误, %v", e))
- continue
- }
- startTimeTarget = tg
- }
- // 截止时间-当日/前一日
- if convertRule.ConvertArea.EndDay == 1 {
- tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", thisDate, convertRule.ConvertArea.EndTime), time.Local)
- if e != nil {
- utils.FileLog.Info(fmt.Sprintf("当日endTimeTarget转换有误, %v", e))
- continue
- }
- endTimeTarget = tg
- }
- if convertRule.ConvertArea.EndDay == 2 {
- if preDate == "" {
- utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v))
- continue
- }
- tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", preDate, convertRule.ConvertArea.EndTime), time.Local)
- if e != nil {
- utils.FileLog.Info(fmt.Sprintf("前日endTimeTarget转换有误, %v", e))
- continue
- }
- endTimeTarget = tg
- }
- if startTimeTarget.IsZero() || endTimeTarget.IsZero() {
- utils.FileLog.Info(fmt.Sprintf("起始截止时间有误, start: %v, end: %v", startTimeTarget, endTimeTarget))
- continue
- }
- // 合并前日当日数据
- dateData := make([]*BaseFromThsHfData, 0)
- if convertRule.ConvertArea.StartDay == 1 && convertRule.ConvertArea.EndDay == 1 {
- // 起始截止均为当日
- dateData = groupDateData[thisDate]
- if dateData == nil {
- utils.FileLog.Info(fmt.Sprintf("%s当日无数据", thisDate))
- continue
- }
- if len(dateData) == 0 {
- utils.FileLog.Info(fmt.Sprintf("%s当日无数据", thisDate))
- continue
- }
- } else {
- if preDate == "" {
- continue
- }
- // 起始截止时间含前日
- preData := groupDateData[preDate]
- if preData == nil {
- utils.FileLog.Info(fmt.Sprintf("%s前日无数据", thisDate))
- continue
- }
- if len(preData) == 0 {
- utils.FileLog.Info(fmt.Sprintf("%s前日无数据", thisDate))
- continue
- }
- thisData := groupDateData[thisDate]
- if thisData == nil {
- utils.FileLog.Info(fmt.Sprintf("%s当日无数据", thisDate))
- continue
- }
- if len(thisData) == 0 {
- utils.FileLog.Info(fmt.Sprintf("%s当日无数据", thisDate))
- continue
- }
- dateData = append(dateData, preData...)
- dateData = append(dateData, thisData...)
- }
- if len(dateData) == 0 {
- utils.FileLog.Info("日期%s无数据序列", v)
- continue
- }
- // 重组时间区间内的数据
- newDateData := make([]*BaseFromThsHfData, 0)
- for _, dv := range dateData {
- if dv.DataTime.Before(startTimeTarget) || dv.DataTime.After(endTimeTarget) {
- continue
- }
- newDateData = append(newDateData, dv)
- }
- if len(newDateData) == 0 {
- utils.FileLog.Info(fmt.Sprintf("时间区间内无数据, start: %v, end: %v", startTimeTarget, endTimeTarget))
- continue
- }
- // 取出区间内的均值/最值
- var avgVal, minVal, maxVal, sumVal float64
- minVal, maxVal = newDateData[0].Value, newDateData[0].Value
- for _, nv := range newDateData {
- sumVal += nv.Value
- if nv.Value > maxVal {
- maxVal = nv.Value
- }
- if nv.Value < minVal {
- minVal = nv.Value
- }
- }
- avgVal = sumVal / float64(len(newDateData))
- switch convertRule.ConvertArea.CalculateType {
- case 1:
- timeData[todayTime] = avgVal
- case 2:
- timeData[todayTime] = maxVal
- case 3:
- timeData[todayTime] = minVal
- default:
- utils.FileLog.Info(fmt.Sprintf("计算方式有误, CalculateType: %d", convertRule.ConvertArea.CalculateType))
- }
- }
- return
- }
|