123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451 |
- package models
- import (
- "eta/eta_index_lib/models/mgo"
- "eta/eta_index_lib/utils"
- "fmt"
- "github.com/beego/beego/v2/client/orm"
- "github.com/shopspring/decimal"
- "go.mongodb.org/mongo-driver/bson"
- "go.mongodb.org/mongo-driver/bson/primitive"
- "reflect"
- "time"
- )
- // Business 外部数据
- type Business struct {
- }
- // AddBaseParams
- // @Description: 基础指标的添加参数
- type AddBaseParams struct {
- EdbCode string `description:"指标编码"`
- EdbName string `description:"指标名称"`
- Unit string `description:"单位"`
- ClassifyId int `description:"所属分类"`
- SysUserId int `description:"用户id"`
- SysUserRealName string `description:"用户真实名称"`
- UniqueCode string `description:"编码"`
- }
- // EditBaseParams
- // @Description: 基础指标的修改参数
- type EditBaseParams struct {
- EdbCode string `description:"指标编码"`
- EdbName string `description:"指标名称"`
- EdbNameEn string `description:"指标名称(英文)"`
- Unit string `description:"单位"`
- UnitEn string `description:"单位(英文)"`
- ClassifyId int `description:"所属分类"`
- SysUserId int `description:"用户id"`
- SysUserRealName string `description:"用户真实名称"`
- UniqueCode string `description:"编码"`
- Lang string `description:"语言版本"`
- EdbInfo *EdbInfo `description:"指标信息"`
- }
- type RefreshBaseParams struct {
- EdbInfo *EdbInfo
- StartDate string
- EndDate string
- }
- // Add
- // @Description: 添加指标
- // @author: Roc
- // @receiver obj
- // @datetime 2024-04-30 17:35:14
- // @param params AddBaseParams
- // @param businessIndexItem *BaseFromBusinessIndex
- // @return edbInfo *EdbInfo
- // @return err error
- // @return errMsg string
- func (obj Business) Add(params AddBaseParams, businessIndexItem *BaseFromBusinessIndex) (edbInfo *EdbInfo, err error, errMsg string) {
- o := orm.NewOrm()
- to, err := o.Begin()
- if err != nil {
- return
- }
- defer func() {
- if err != nil {
- _ = to.Rollback()
- fmt.Println(reflect.TypeOf(obj).Name(), ";Add,Err:"+err.Error())
- } else {
- _ = to.Commit()
- }
- }()
- edbInfo = new(EdbInfo)
- edbInfo.Source = obj.GetSource()
- edbInfo.SourceName = obj.GetSourceName()
- edbInfo.EdbCode = params.EdbCode
- edbInfo.EdbName = params.EdbName
- edbInfo.EdbNameSource = params.EdbName
- edbInfo.Frequency = businessIndexItem.Frequency
- edbInfo.Unit = params.Unit
- edbInfo.ClassifyId = params.ClassifyId
- edbInfo.SysUserId = params.SysUserId
- edbInfo.SysUserRealName = params.SysUserRealName
- edbInfo.CreateTime = time.Now()
- edbInfo.ModifyTime = time.Now()
- edbInfo.UniqueCode = params.UniqueCode
- edbInfo.CalculateFormula = ``
- edbInfo.EdbNameEn = params.EdbName
- edbInfo.UnitEn = params.Unit
- edbInfo.EdbType = obj.GetEdbType()
- edbInfo.SubSource = businessIndexItem.Source
- edbInfo.SubSourceName = businessIndexItem.SourceName
- newEdbInfoId, tmpErr := to.Insert(edbInfo)
- if tmpErr != nil {
- err = tmpErr
- return
- }
- edbInfo.EdbInfoId = int(newEdbInfoId)
- // 更新数据
- err = obj.refresh(edbInfo, "")
- return
- }
- // Edit
- // @Description: 编辑指标
- // @author: Roc
- // @receiver obj
- // @datetime 2024-04-30 17:35:05
- // @param params EditBaseParams
- // @param businessIndexItem *BaseFromBusinessIndex
- // @return err error
- // @return errMsg string
- func (obj Business) Edit(params EditBaseParams, businessIndexItem *BaseFromBusinessIndex) (err error, errMsg string) {
- edbInfo := params.EdbInfo
- o := orm.NewOrm()
- to, err := o.Begin()
- if err != nil {
- return
- }
- defer func() {
- if err != nil {
- _ = to.Rollback()
- fmt.Println(reflect.TypeOf(obj).Name(), ";Edit,Err:"+err.Error())
- } else {
- _ = to.Commit()
- }
- }()
- //oldEdbInfo := *edbInfo
- //修改指标信息
- edbInfo.EdbName = params.EdbName
- edbInfo.EdbNameSource = params.EdbName
- edbInfo.Frequency = businessIndexItem.Frequency
- edbInfo.Unit = params.Unit
- edbInfo.ClassifyId = params.ClassifyId
- edbInfo.EdbNameEn = params.EdbNameEn
- edbInfo.UnitEn = params.UnitEn
- edbInfo.SubSource = businessIndexItem.Source
- edbInfo.SubSourceName = businessIndexItem.SourceName
- edbInfo.ModifyTime = time.Now()
- _, err = to.Update(edbInfo, "EdbName", "EdbNameSource", "Frequency", "Unit", "ClassifyId", "CalculateFormula", "ModifyTime", "EdbNameEn", "UnitEn", "SubSource", "SubSourceName")
- if err != nil {
- return
- }
- //计算数据
- err = obj.refresh(edbInfo, "")
- return
- }
- // Refresh 刷新
- func (obj Business) Refresh(params RefreshBaseParams) (err error, errMsg string) {
- to, err := orm.NewOrm().Begin()
- if err != nil {
- return
- }
- defer func() {
- if err != nil {
- _ = to.Rollback()
- fmt.Println(reflect.TypeOf(obj).Name(), ";Refresh,Err:"+err.Error())
- } else {
- _ = to.Commit()
- }
- }()
- // 计算数据
- err = obj.refresh(params.EdbInfo, params.StartDate)
- return
- }
- // GetSource 获取来源编码id
- func (obj Business) GetSource() int {
- return utils.DATA_SOURCE_BUSINESS
- }
- // GetSourceName 获取来源名称
- func (obj Business) GetSourceName() string {
- return utils.DATA_SOURCE_NAME_BUSINESS
- }
- // GetEdbType 获取指标类型
- func (obj Business) GetEdbType() int {
- return utils.DEFAULT_EDB_TYPE
- }
- func (obj Business) refresh(edbInfo *EdbInfo, startDate string) (err error) {
- //获取已存在的所有数据
- baseDataList, err := obj.getBaseBusinessData(edbInfo, startDate)
- //获取指标所有数据
- existDataList := make([]*mgo.EdbDataBusiness, 0)
- mogDataObj := new(mgo.EdbDataBusiness)
- {
- // 构建查询条件
- queryConditions := bson.M{
- "edb_code": edbInfo.EdbCode,
- }
- if startDate != `` {
- //获取已存在的所有数据
- startDateTime, tmpErr := time.ParseInLocation(utils.FormatDate, startDate, time.Local)
- if tmpErr != nil {
- err = tmpErr
- return
- }
- queryConditions["data_time"] = bson.M{"$gte": startDateTime}
- }
- existDataList, err = mogDataObj.GetAllDataList(queryConditions)
- if err != nil {
- fmt.Println("getEdbDataBusinessList Err:" + err.Error())
- return
- }
- }
- existDataMap := make(map[string]*mgo.EdbDataBusiness)
- removeDataTimeMap := make(map[string]bool) //需要移除的日期数据
- for _, v := range existDataList {
- tmpDate := v.DataTime.Format(utils.FormatDate)
- existDataMap[tmpDate] = v
- removeDataTimeMap[tmpDate] = true
- }
- needAddDateMap := make(map[time.Time]int)
- // 待添加的数据集
- addDataList := make([]mgo.EdbDataBusiness, 0)
- // 待更新的数据集
- updateDataList := make([]mgo.EdbDataBusiness, 0)
- for _, tmpData := range baseDataList {
- currDate := tmpData.DataTime
- currDateStr := currDate.Format(utils.FormatDate)
- // 当前的实际值
- saveValue := decimal.NewFromFloat(tmpData.Value).Round(4).String()
- existData, ok := existDataMap[currDateStr]
- // 如果库中已经存在该数据的话,那么就进行值的变更操作
- if ok {
- // 已经入到指标库的值
- existValStr := decimal.NewFromFloat(existData.Value).Round(4).String()
- //校验待删除日期数据里面是否存在该元素,如果存在的话,那么移除该日期
- delete(removeDataTimeMap, currDateStr)
- if existValStr != saveValue {
- existData.Value = tmpData.Value
- updateDataList = append(updateDataList, *existData)
- }
- continue
- }
- // 库中不存在该日期的数据
- timestamp := currDate.UnixNano() / 1e6
- addDataList = append(addDataList, mgo.EdbDataBusiness{
- EdbInfoId: edbInfo.EdbInfoId,
- EdbCode: edbInfo.EdbCode,
- DataTime: currDate,
- Value: tmpData.Value,
- CreateTime: time.Now(),
- ModifyTime: time.Now(),
- DataTimestamp: timestamp,
- })
- needAddDateMap[currDate] = 1
- }
- //删除已经不存在的指标数据(由于该指标当日的数据删除了)
- removeDateList := make([]string, 0)
- {
- for dateTime := range removeDataTimeMap {
- removeDateList = append(removeDateList, dateTime)
- }
- removeNum := len(removeDateList)
- if removeNum > 0 {
- // todo 删除指定日期的数据
- }
- }
- // 入库
- {
- coll := mogDataObj.GetCollection()
- //删除已经不存在的指标数据(由于该指标当日的数据删除了)
- //{
- // for dateTime := range removeDataTimeMap {
- // removeDateList = append(removeDateList, dateTime)
- // }
- // removeNum := len(removeDateList)
- // if removeNum > 0 {
- // err = mogDataObj.RemoveManyByColl(coll, addDataList)
- // if err != nil {
- // fmt.Println("mogDataObj.RemoveMany() Err:" + err.Error())
- // return
- // }
- // }
- //}
- // 插入新数据
- if len(addDataList) > 0 {
- err = mogDataObj.BatchInsertDataByColl(coll, addDataList)
- if err != nil {
- fmt.Println("mogDataObj.BatchInsertData() Err:" + err.Error())
- return
- }
- }
- // 修改历史数据
- if len(updateDataList) > 0 {
- for _, v := range updateDataList {
- err = mogDataObj.UpdateDataByColl(coll, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}})
- if err != nil {
- fmt.Println("mogDataObj.UpdateDataByColl:Err:" + err.Error())
- return
- }
- }
- }
- }
- return
- }
- // GetEdbInfoMaxAndMinInfo 获取指标的最新数据记录信息
- func (obj Business) GetEdbInfoMaxAndMinInfo(edbCode string) (item *EdbInfoMaxAndMinInfo, err error) {
- mogDataObj := new(mgo.EdbDataBusiness)
- pipeline := []bson.M{
- {"$match": bson.M{"edb_code": edbCode}},
- {"$group": bson.M{
- "_id": nil,
- "min_date": bson.M{"$min": "$data_time"},
- "max_date": bson.M{"$max": "$data_time"},
- "min_value": bson.M{"$min": "$value"},
- "max_value": bson.M{"$max": "$value"},
- }},
- {"$project": bson.M{"_id": 0}}, // 可选,如果不需要_id字段
- }
- result, err := mogDataObj.GetEdbInfoMaxAndMinInfo(pipeline)
- if err != nil {
- fmt.Println("getEdbDataBusinessList Err:" + err.Error())
- return
- }
- if !result.MaxDate.IsZero() {
- whereQuery := bson.M{"edb_code": edbCode, "data_time": result.MaxDate}
- selectParam := bson.D{{"value", 1}, {"_id", 0}}
- latestValue, tmpErr := mogDataObj.GetLatestValue(whereQuery, selectParam)
- if tmpErr != nil {
- err = tmpErr
- return
- }
- result.LatestValue = latestValue.Value
- result.EndValue = latestValue.Value
- }
- item = &EdbInfoMaxAndMinInfo{
- MinDate: result.MinDate.Format(utils.FormatDate),
- MaxDate: result.MaxDate.Format(utils.FormatDate),
- MinValue: result.MinValue,
- MaxValue: result.MaxValue,
- LatestValue: result.LatestValue,
- LatestDate: result.LatestDate.Format(utils.FormatDate),
- EndValue: result.EndValue,
- }
- return
- }
- // UnifiedModifyEdbInfoMaxAndMinInfo
- // @Description: 修改指标的最大最小值和最新值
- // @author: Roc
- // @receiver obj
- // @datetime 2024-04-30 17:07:35
- // @param edbInfo *EdbInfo
- // @return err error
- func (obj Business) UnifiedModifyEdbInfoMaxAndMinInfo(edbInfo *EdbInfo) (err error) {
- edbInfoMaxAndMinInfo, err := obj.GetEdbInfoMaxAndMinInfo(edbInfo.EdbCode)
- if err != nil {
- return
- }
- err = ModifyEdbInfoMaxAndMinInfo(edbInfo.EdbInfoId, edbInfoMaxAndMinInfo)
- return
- }
- // EdbInfoMgoData
- // @Description: mgo里面的数据
- type EdbInfoMgoData struct {
- EdbDataId primitive.ObjectID `description:"数据ID"`
- DataTime time.Time `description:"数据日期"`
- Value float64 `description:"数据"`
- EdbCode string `description:"指标编码"`
- }
- // getBaseBusinessData
- // @Description: 获取基础数据
- // @author: Roc
- // @receiver obj
- // @datetime 2024-04-30 11:10:46
- // @param edbInfo *EdbInfo
- // @param startDate string
- // @return newDataList []EdbInfoSearchData
- // @return err error
- func (obj Business) getBaseBusinessData(edbInfo *EdbInfo, startDate string) (newDataList []EdbInfoMgoData, err error) {
- newDataList = make([]EdbInfoMgoData, 0)
- // 获取数据源的指标数据
- mogDataObj := new(mgo.BaseFromBusinessData)
- // 构建查询条件
- queryConditions := bson.M{
- "index_code": edbInfo.EdbCode,
- }
- if startDate != `` {
- //获取已存在的所有数据
- startDateTime, tmpErr := time.ParseInLocation(utils.FormatDate, startDate, time.Local)
- if tmpErr != nil {
- err = tmpErr
- return
- }
- queryConditions["data_time"] = bson.M{"$gte": startDateTime}
- }
- baseDataList, err := mogDataObj.GetAllDataList(queryConditions)
- if err != nil {
- fmt.Println("getBaseBusinessData Err:" + err.Error())
- return
- }
- for _, v := range baseDataList {
- newDataList = append(newDataList, EdbInfoMgoData{
- EdbDataId: v.ID,
- DataTime: v.DataTime,
- Value: v.Value,
- EdbCode: v.IndexCode,
- })
- }
- return
- }
|