package models import ( "eta/eta_index_lib/models/mgo" "eta/eta_index_lib/utils" "fmt" "github.com/beego/beego/v2/client/orm" "github.com/shopspring/decimal" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "reflect" "time" ) // Business 外部数据 type Business struct { } // AddBaseParams // @Description: 基础指标的添加参数 type AddBaseParams struct { EdbCode string `description:"指标编码"` EdbName string `description:"指标名称"` Unit string `description:"单位"` ClassifyId int `description:"所属分类"` SysUserId int `description:"用户id"` SysUserRealName string `description:"用户真实名称"` UniqueCode string `description:"编码"` } // EditBaseParams // @Description: 基础指标的修改参数 type EditBaseParams struct { EdbCode string `description:"指标编码"` EdbName string `description:"指标名称"` EdbNameEn string `description:"指标名称(英文)"` Unit string `description:"单位"` UnitEn string `description:"单位(英文)"` ClassifyId int `description:"所属分类"` SysUserId int `description:"用户id"` SysUserRealName string `description:"用户真实名称"` UniqueCode string `description:"编码"` Lang string `description:"语言版本"` EdbInfo *EdbInfo `description:"指标信息"` } type RefreshBaseParams struct { EdbInfo *EdbInfo StartDate string EndDate string } // Add // @Description: 添加指标 // @author: Roc // @receiver obj // @datetime 2024-04-30 17:35:14 // @param params AddBaseParams // @param businessIndexItem *BaseFromBusinessIndex // @return edbInfo *EdbInfo // @return err error // @return errMsg string func (obj Business) Add(params AddBaseParams, businessIndexItem *BaseFromBusinessIndex) (edbInfo *EdbInfo, err error, errMsg string) { o := orm.NewOrm() to, err := o.Begin() if err != nil { return } defer func() { if err != nil { _ = to.Rollback() fmt.Println(reflect.TypeOf(obj).Name(), ";Add,Err:"+err.Error()) } else { _ = to.Commit() } }() edbInfo = new(EdbInfo) edbInfo.Source = obj.GetSource() edbInfo.SourceName = obj.GetSourceName() edbInfo.EdbCode = params.EdbCode edbInfo.EdbName = params.EdbName edbInfo.EdbNameSource = params.EdbName edbInfo.Frequency = businessIndexItem.Frequency edbInfo.Unit = params.Unit edbInfo.ClassifyId = params.ClassifyId edbInfo.SysUserId = params.SysUserId edbInfo.SysUserRealName = params.SysUserRealName edbInfo.CreateTime = time.Now() edbInfo.ModifyTime = time.Now() edbInfo.UniqueCode = params.UniqueCode edbInfo.CalculateFormula = `` edbInfo.EdbNameEn = params.EdbName edbInfo.UnitEn = params.Unit edbInfo.EdbType = obj.GetEdbType() edbInfo.SubSource = businessIndexItem.Source edbInfo.SubSourceName = businessIndexItem.SourceName newEdbInfoId, tmpErr := to.Insert(edbInfo) if tmpErr != nil { err = tmpErr return } edbInfo.EdbInfoId = int(newEdbInfoId) // 更新数据 err = obj.refresh(edbInfo, "") return } // Edit // @Description: 编辑指标 // @author: Roc // @receiver obj // @datetime 2024-04-30 17:35:05 // @param params EditBaseParams // @param businessIndexItem *BaseFromBusinessIndex // @return err error // @return errMsg string func (obj Business) Edit(params EditBaseParams, businessIndexItem *BaseFromBusinessIndex) (err error, errMsg string) { edbInfo := params.EdbInfo o := orm.NewOrm() to, err := o.Begin() if err != nil { return } defer func() { if err != nil { _ = to.Rollback() fmt.Println(reflect.TypeOf(obj).Name(), ";Edit,Err:"+err.Error()) } else { _ = to.Commit() } }() //oldEdbInfo := *edbInfo //修改指标信息 edbInfo.EdbName = params.EdbName edbInfo.EdbNameSource = params.EdbName edbInfo.Frequency = businessIndexItem.Frequency edbInfo.Unit = params.Unit edbInfo.ClassifyId = params.ClassifyId edbInfo.EdbNameEn = params.EdbNameEn edbInfo.UnitEn = params.UnitEn edbInfo.SubSource = businessIndexItem.Source edbInfo.SubSourceName = businessIndexItem.SourceName edbInfo.ModifyTime = time.Now() _, err = to.Update(edbInfo, "EdbName", "EdbNameSource", "Frequency", "Unit", "ClassifyId", "CalculateFormula", "ModifyTime", "EdbNameEn", "UnitEn", "SubSource", "SubSourceName") if err != nil { return } //计算数据 err = obj.refresh(edbInfo, "") return } // Refresh 刷新 func (obj Business) Refresh(params RefreshBaseParams) (err error, errMsg string) { to, err := orm.NewOrm().Begin() if err != nil { return } defer func() { if err != nil { _ = to.Rollback() fmt.Println(reflect.TypeOf(obj).Name(), ";Refresh,Err:"+err.Error()) } else { _ = to.Commit() } }() // 计算数据 err = obj.refresh(params.EdbInfo, params.StartDate) return } // GetSource 获取来源编码id func (obj Business) GetSource() int { return utils.DATA_SOURCE_BUSINESS } // GetSourceName 获取来源名称 func (obj Business) GetSourceName() string { return utils.DATA_SOURCE_NAME_BUSINESS } // GetEdbType 获取指标类型 func (obj Business) GetEdbType() int { return utils.DEFAULT_EDB_TYPE } func (obj Business) refresh(edbInfo *EdbInfo, startDate string) (err error) { //获取已存在的所有数据 baseDataList, err := obj.getBaseBusinessData(edbInfo, startDate) //获取指标所有数据 existDataList := make([]*mgo.EdbDataBusiness, 0) mogDataObj := new(mgo.EdbDataBusiness) { // 构建查询条件 queryConditions := bson.M{ "edb_code": edbInfo.EdbCode, } if startDate != `` { //获取已存在的所有数据 startDateTime, tmpErr := time.ParseInLocation(utils.FormatDate, startDate, time.Local) if tmpErr != nil { err = tmpErr return } queryConditions["data_time"] = bson.M{"$gte": startDateTime} } existDataList, err = mogDataObj.GetAllDataList(queryConditions) if err != nil { fmt.Println("getEdbDataBusinessList Err:" + err.Error()) return } } existDataMap := make(map[string]*mgo.EdbDataBusiness) removeDataTimeMap := make(map[string]bool) //需要移除的日期数据 for _, v := range existDataList { tmpDate := v.DataTime.Format(utils.FormatDate) existDataMap[tmpDate] = v removeDataTimeMap[tmpDate] = true } needAddDateMap := make(map[time.Time]int) // 待添加的数据集 addDataList := make([]mgo.EdbDataBusiness, 0) // 待更新的数据集 updateDataList := make([]mgo.EdbDataBusiness, 0) for _, tmpData := range baseDataList { currDate := tmpData.DataTime currDateStr := currDate.Format(utils.FormatDate) // 当前的实际值 saveValue := decimal.NewFromFloat(tmpData.Value).Round(4).String() existData, ok := existDataMap[currDateStr] // 如果库中已经存在该数据的话,那么就进行值的变更操作 if ok { // 已经入到指标库的值 existValStr := decimal.NewFromFloat(existData.Value).Round(4).String() //校验待删除日期数据里面是否存在该元素,如果存在的话,那么移除该日期 delete(removeDataTimeMap, currDateStr) if existValStr != saveValue { existData.Value = tmpData.Value updateDataList = append(updateDataList, *existData) } continue } // 库中不存在该日期的数据 timestamp := currDate.UnixNano() / 1e6 addDataList = append(addDataList, mgo.EdbDataBusiness{ EdbInfoId: edbInfo.EdbInfoId, EdbCode: edbInfo.EdbCode, DataTime: currDate, Value: tmpData.Value, CreateTime: time.Now(), ModifyTime: time.Now(), DataTimestamp: timestamp, }) needAddDateMap[currDate] = 1 } //删除已经不存在的指标数据(由于该指标当日的数据删除了) removeDateList := make([]string, 0) { for dateTime := range removeDataTimeMap { removeDateList = append(removeDateList, dateTime) } removeNum := len(removeDateList) if removeNum > 0 { // todo 删除指定日期的数据 } } // 入库 { coll := mogDataObj.GetCollection() //删除已经不存在的指标数据(由于该指标当日的数据删除了) //{ // for dateTime := range removeDataTimeMap { // removeDateList = append(removeDateList, dateTime) // } // removeNum := len(removeDateList) // if removeNum > 0 { // err = mogDataObj.RemoveManyByColl(coll, addDataList) // if err != nil { // fmt.Println("mogDataObj.RemoveMany() Err:" + err.Error()) // return // } // } //} // 插入新数据 if len(addDataList) > 0 { err = mogDataObj.BatchInsertDataByColl(coll, addDataList) if err != nil { fmt.Println("mogDataObj.BatchInsertData() Err:" + err.Error()) return } } // 修改历史数据 if len(updateDataList) > 0 { for _, v := range updateDataList { err = mogDataObj.UpdateDataByColl(coll, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}}) if err != nil { fmt.Println("mogDataObj.UpdateDataByColl:Err:" + err.Error()) return } } } } return } // GetEdbInfoMaxAndMinInfo 获取指标的最新数据记录信息 func (obj Business) GetEdbInfoMaxAndMinInfo(edbCode string) (item *EdbInfoMaxAndMinInfo, err error) { mogDataObj := new(mgo.EdbDataBusiness) pipeline := []bson.M{ {"$match": bson.M{"edb_code": edbCode}}, {"$group": bson.M{ "_id": nil, "min_date": bson.M{"$min": "$data_time"}, "max_date": bson.M{"$max": "$data_time"}, "min_value": bson.M{"$min": "$value"}, "max_value": bson.M{"$max": "$value"}, }}, {"$project": bson.M{"_id": 0}}, // 可选,如果不需要_id字段 } result, err := mogDataObj.GetEdbInfoMaxAndMinInfo(pipeline) if err != nil { fmt.Println("getEdbDataBusinessList Err:" + err.Error()) return } if !result.MaxDate.IsZero() { whereQuery := bson.M{"edb_code": edbCode, "data_time": result.MaxDate} selectParam := bson.D{{"value", 1}, {"_id", 0}} latestValue, tmpErr := mogDataObj.GetLatestValue(whereQuery, selectParam) if tmpErr != nil { err = tmpErr return } result.LatestValue = latestValue.Value result.EndValue = latestValue.Value } item = &EdbInfoMaxAndMinInfo{ MinDate: result.MinDate.Format(utils.FormatDate), MaxDate: result.MaxDate.Format(utils.FormatDate), MinValue: result.MinValue, MaxValue: result.MaxValue, LatestValue: result.LatestValue, LatestDate: result.LatestDate.Format(utils.FormatDate), EndValue: result.EndValue, } return } // UnifiedModifyEdbInfoMaxAndMinInfo // @Description: 修改指标的最大最小值和最新值 // @author: Roc // @receiver obj // @datetime 2024-04-30 17:07:35 // @param edbInfo *EdbInfo // @return err error func (obj Business) UnifiedModifyEdbInfoMaxAndMinInfo(edbInfo *EdbInfo) (err error) { edbInfoMaxAndMinInfo, err := obj.GetEdbInfoMaxAndMinInfo(edbInfo.EdbCode) if err != nil { return } err = ModifyEdbInfoMaxAndMinInfo(edbInfo.EdbInfoId, edbInfoMaxAndMinInfo) return } // EdbInfoMgoData // @Description: mgo里面的数据 type EdbInfoMgoData struct { EdbDataId primitive.ObjectID `description:"数据ID"` DataTime time.Time `description:"数据日期"` Value float64 `description:"数据"` EdbCode string `description:"指标编码"` } // getBaseBusinessData // @Description: 获取基础数据 // @author: Roc // @receiver obj // @datetime 2024-04-30 11:10:46 // @param edbInfo *EdbInfo // @param startDate string // @return newDataList []EdbInfoSearchData // @return err error func (obj Business) getBaseBusinessData(edbInfo *EdbInfo, startDate string) (newDataList []EdbInfoMgoData, err error) { newDataList = make([]EdbInfoMgoData, 0) // 获取数据源的指标数据 mogDataObj := new(mgo.BaseFromBusinessData) // 构建查询条件 queryConditions := bson.M{ "index_code": edbInfo.EdbCode, } if startDate != `` { //获取已存在的所有数据 startDateTime, tmpErr := time.ParseInLocation(utils.FormatDate, startDate, time.Local) if tmpErr != nil { err = tmpErr return } queryConditions["data_time"] = bson.M{"$gte": startDateTime} } baseDataList, err := mogDataObj.GetAllDataList(queryConditions) if err != nil { fmt.Println("getBaseBusinessData Err:" + err.Error()) return } for _, v := range baseDataList { newDataList = append(newDataList, EdbInfoMgoData{ EdbDataId: v.ID, DataTime: v.DataTime, Value: v.Value, EdbCode: v.IndexCode, }) } return }