package models import ( "eta/eta_index_lib/models/mgo" "eta/eta_index_lib/utils" "fmt" "github.com/beego/beego/v2/client/orm" "github.com/shopspring/decimal" "go.mongodb.org/mongo-driver/bson" "reflect" "strings" "time" ) // Business 自有数据 type Business struct { } // AddBaseParams // @Description: 基础指标的添加参数 type AddBaseParams struct { EdbCode string `description:"指标编码"` EdbName string `description:"指标名称"` Unit string `description:"单位"` ClassifyId int `description:"所属分类"` SysUserId int `description:"用户id"` SysUserRealName string `description:"用户真实名称"` UniqueCode string `description:"编码"` } // EditBaseParams // @Description: 基础指标的修改参数 type EditBaseParams struct { EdbCode string `description:"指标编码"` EdbName string `description:"指标名称"` EdbNameEn string `description:"指标名称(英文)"` Unit string `description:"单位"` UnitEn string `description:"单位(英文)"` ClassifyId int `description:"所属分类"` SysUserId int `description:"用户id"` SysUserRealName string `description:"用户真实名称"` UniqueCode string `description:"编码"` Lang string `description:"语言版本"` EdbInfo *EdbInfo `description:"指标信息"` } type RefreshBaseParams struct { EdbInfo *EdbInfo StartDate string EndDate string } // Add // @Description: 添加指标 // @author: Roc // @receiver obj // @datetime 2024-04-30 17:35:14 // @param params AddBaseParams // @param businessIndexItem *BaseFromBusinessIndex // @return edbInfo *EdbInfo // @return err error // @return errMsg string func (obj Business) Add(params AddBaseParams, businessIndexItem *BaseFromBusinessIndex) (edbInfo *EdbInfo, err error, errMsg string) { o := orm.NewOrm() to, err := o.Begin() if err != nil { return } defer func() { if err != nil { _ = to.Rollback() fmt.Println(reflect.TypeOf(obj).Name(), ";Add,Err:"+err.Error()) } else { _ = to.Commit() } }() edbInfo = new(EdbInfo) edbInfo.Source = obj.GetSource() edbInfo.SourceName = obj.GetSourceName() edbInfo.EdbCode = params.EdbCode edbInfo.EdbName = params.EdbName edbInfo.EdbNameSource = params.EdbName edbInfo.Frequency = businessIndexItem.Frequency edbInfo.Unit = params.Unit edbInfo.ClassifyId = params.ClassifyId edbInfo.SysUserId = params.SysUserId edbInfo.SysUserRealName = params.SysUserRealName edbInfo.CreateTime = time.Now() edbInfo.ModifyTime = time.Now() edbInfo.UniqueCode = params.UniqueCode edbInfo.CalculateFormula = `` edbInfo.EdbNameEn = params.EdbName edbInfo.UnitEn = params.Unit edbInfo.EdbType = obj.GetEdbType() edbInfo.SubSource = businessIndexItem.Source edbInfo.SubSourceName = businessIndexItem.SourceName edbInfo.Sort = GetAddEdbMaxSortByClassifyId(params.ClassifyId, utils.EDB_INFO_TYPE) newEdbInfoId, tmpErr := to.Insert(edbInfo) if tmpErr != nil { err = tmpErr return } edbInfo.EdbInfoId = int(newEdbInfoId) // 更新数据 err = obj.refresh(to, edbInfo, "") return } // Edit // @Description: 编辑指标 // @author: Roc // @receiver obj // @datetime 2024-04-30 17:35:05 // @param params EditBaseParams // @param businessIndexItem *BaseFromBusinessIndex // @return err error // @return errMsg string func (obj Business) Edit(params EditBaseParams, businessIndexItem *BaseFromBusinessIndex) (err error, errMsg string) { edbInfo := params.EdbInfo o := orm.NewOrm() to, err := o.Begin() if err != nil { return } defer func() { if err != nil { _ = to.Rollback() fmt.Println(reflect.TypeOf(obj).Name(), ";Edit,Err:"+err.Error()) } else { _ = to.Commit() } }() //oldEdbInfo := *edbInfo //修改指标信息 edbInfo.EdbName = params.EdbName edbInfo.EdbNameSource = params.EdbName edbInfo.Frequency = businessIndexItem.Frequency edbInfo.Unit = params.Unit edbInfo.ClassifyId = params.ClassifyId edbInfo.EdbNameEn = params.EdbNameEn edbInfo.UnitEn = params.UnitEn edbInfo.SubSource = businessIndexItem.Source edbInfo.SubSourceName = businessIndexItem.SourceName edbInfo.ModifyTime = time.Now() _, err = to.Update(edbInfo, "EdbName", "EdbNameSource", "Frequency", "Unit", "ClassifyId", "CalculateFormula", "ModifyTime", "EdbNameEn", "UnitEn", "SubSource", "SubSourceName") if err != nil { return } //计算数据 err = obj.refresh(to, edbInfo, "") return } // Refresh 刷新 func (obj Business) Refresh(params RefreshBaseParams) (err error, errMsg string) { to, err := orm.NewOrm().Begin() if err != nil { return } defer func() { if err != nil { _ = to.Rollback() fmt.Println(reflect.TypeOf(obj).Name(), ";Refresh,Err:"+err.Error()) } else { _ = to.Commit() } }() // 计算数据 err = obj.refresh(to, params.EdbInfo, params.StartDate) return } // GetSource 获取来源编码id func (obj Business) GetSource() int { return utils.DATA_SOURCE_BUSINESS } // GetSourceName 获取来源名称 func (obj Business) GetSourceName() string { return utils.DATA_SOURCE_NAME_BUSINESS } // GetEdbType 获取指标类型 func (obj Business) GetEdbType() int { return utils.DEFAULT_EDB_TYPE } func (obj Business) refresh(to orm.TxOrmer, edbInfo *EdbInfo, startDate string) (err error) { if utils.UseMongo { return obj.refreshByMongo(edbInfo, startDate) } else { return obj.refreshByMysql(to, edbInfo, startDate) } return } func (obj Business) refreshByMongo(edbInfo *EdbInfo, startDate string) (err error) { // 真实数据的最大日期 , 插入规则配置的日期 var realDataMaxDate, edbDataInsertConfigDate time.Time var edbDataInsertConfig *EdbDataInsertConfig var isFindConfigDateRealData bool //是否找到配置日期的实际数据的值 { edbDataInsertConfig, err = GetEdbDataInsertConfigByEdbId(edbInfo.EdbInfoId) if err != nil && err.Error() != utils.ErrNoRow() { return } if edbDataInsertConfig != nil { edbDataInsertConfigDate = edbDataInsertConfig.Date } } //获取已存在的所有数据 baseDataList, err := obj.getBaseBusinessDataByMongo(edbInfo, startDate) //获取指标所有数据 existDataList := make([]*mgo.EdbDataBusiness, 0) mogDataObj := new(mgo.EdbDataBusiness) { // 构建查询条件 queryConditions := bson.M{ "edb_code": edbInfo.EdbCode, } if startDate != `` { //获取已存在的所有数据 startDateTime, tmpErr := time.ParseInLocation(utils.FormatDate, startDate, time.Local) if tmpErr != nil { err = tmpErr return } queryConditions["data_time"] = bson.M{"$gte": startDateTime} } existDataList, err = mogDataObj.GetAllDataList(queryConditions, []string{"data_time"}) if err != nil { fmt.Println(obj.GetSourceName() + ",refresh err;getEdbDataBusinessList Err:" + err.Error()) return } } existDataMap := make(map[string]*mgo.EdbDataBusiness) removeDataTimeMap := make(map[string]bool) //需要移除的日期数据 for _, v := range existDataList { tmpDate := v.DataTime.Format(utils.FormatDate) existDataMap[tmpDate] = v removeDataTimeMap[tmpDate] = true } needAddDateMap := make(map[time.Time]int) // 待添加的数据集 addDataList := make([]interface{}, 0) // 待更新的数据集 updateDataList := make([]mgo.EdbDataBusiness, 0) for _, tmpData := range baseDataList { currDate := tmpData.DataTime currDateStr := currDate.Format(utils.FormatDate) // 当前的实际值 saveValue := decimal.NewFromFloat(tmpData.Value).Round(4).String() // 下面代码主要目的是处理掉手动插入的数据判断 { if realDataMaxDate.IsZero() || currDate.After(realDataMaxDate) { realDataMaxDate = currDate } if edbDataInsertConfigDate.IsZero() || currDate.Equal(edbDataInsertConfigDate) { isFindConfigDateRealData = true } } existData, ok := existDataMap[currDateStr] // 如果库中已经存在该数据的话,那么就进行值的变更操作 if ok { // 已经入到指标库的值 existValStr := decimal.NewFromFloat(existData.Value).Round(4).String() //校验待删除日期数据里面是否存在该元素,如果存在的话,那么移除该日期 delete(removeDataTimeMap, currDateStr) if existValStr != saveValue { existData.Value = tmpData.Value updateDataList = append(updateDataList, *existData) } continue } // 库中不存在该日期的数据 timestamp := currDate.UnixNano() / 1e6 addDataList = append(addDataList, mgo.EdbDataBusiness{ EdbInfoId: edbInfo.EdbInfoId, EdbCode: edbInfo.EdbCode, DataTime: currDate, Value: tmpData.Value, CreateTime: time.Now(), ModifyTime: time.Now(), DataTimestamp: timestamp, }) needAddDateMap[currDate] = 1 } // 入库 { coll := mogDataObj.GetCollection() //删除已经不存在的指标数据(由于该指标当日的数据删除了) { removeDateList := make([]time.Time, 0) for dateTime := range removeDataTimeMap { //获取已存在的所有数据 tmpDateTime, tmpErr := time.ParseInLocation(utils.FormatDate, dateTime, time.Local) if tmpErr != nil { err = tmpErr return } removeDateList = append(removeDateList, tmpDateTime) } removeNum := len(removeDateList) if removeNum > 0 { err = mogDataObj.RemoveManyByColl(coll, bson.M{"edb_code": edbInfo.EdbCode, "data_time": bson.M{"$in": removeDateList}}) if err != nil { fmt.Println("mogDataObj.RemoveMany() Err:" + err.Error()) return } } } // 插入新数据 if len(addDataList) > 0 { err = mogDataObj.BatchInsertDataByColl(coll, 500, addDataList) if err != nil { fmt.Println("mogDataObj.BatchInsertData() Err:" + err.Error()) return } } // 修改历史数据 if len(updateDataList) > 0 { for _, v := range updateDataList { err = mogDataObj.UpdateDataByColl(coll, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}}) if err != nil { fmt.Println("mogDataObj.UpdateDataByColl:Err:" + err.Error()) return } } } } // 处理手工数据补充的配置 HandleConfigInsertEdbDataByMongo(realDataMaxDate, edbDataInsertConfig, edbInfo.EdbInfoId, edbInfo.Source, edbInfo.SubSource, existDataMap, isFindConfigDateRealData) return } func (obj Business) refreshByMysql(to orm.TxOrmer, edbInfo *EdbInfo, startDate string) (err error) { dataTableName := GetEdbDataTableName(edbInfo.Source, edbInfo.SubSource) edbInfoIdStr := fmt.Sprint(edbInfo.EdbInfoId) // 真实数据的最大日期 , 插入规则配置的日期 var realDataMaxDate, edbDataInsertConfigDate time.Time var edbDataInsertConfig *EdbDataInsertConfig var isFindConfigDateRealData bool //是否找到配置日期的实际数据的值 { edbDataInsertConfig, err = GetEdbDataInsertConfigByEdbId(edbInfo.EdbInfoId) if err != nil && err.Error() != utils.ErrNoRow() { return } if edbDataInsertConfig != nil { edbDataInsertConfigDate = edbDataInsertConfig.Date } } //获取已存在的所有数据 baseDataList, err := obj.getBaseBusinessDataByMysql(edbInfo, startDate) //获取指标所有数据 var existCondition string var existPars []interface{} existCondition += " AND edb_info_id=? " existPars = append(existPars, edbInfo.EdbInfoId) if startDate != "" { existCondition += " AND data_time>=? " existPars = append(existPars, startDate) } existList, err := GetEdbDataByCondition(edbInfo.Source, edbInfo.SubSource, existCondition, existPars) if err != nil { fmt.Println(obj.GetSourceName() + ",refreshByMysql err;getEdbDataBusinessList Err:" + err.Error()) return err } existDataMap := make(map[string]*EdbInfoSearchData) removeDataTimeMap := make(map[string]bool) //需要移除的日期数据 for _, v := range existList { existDataMap[v.DataTime] = v removeDataTimeMap[v.DataTime] = true } needAddDateMap := make(map[time.Time]int) // 待添加的数据集 addSql := ` INSERT INTO ` + dataTableName + ` (edb_info_id,edb_code,data_time,value,create_time,modify_time,data_timestamp) values ` var isAdd bool // 待更新的数据集 updateDataList := make([]*EdbInfoSearchData, 0) for _, tmpData := range baseDataList { currDate := tmpData.DataTime currDateStr := currDate.Format(utils.FormatDate) // 当前的实际值 saveValue := decimal.NewFromFloat(tmpData.Value).Round(4).String() // 下面代码主要目的是处理掉手动插入的数据判断 { if realDataMaxDate.IsZero() || currDate.After(realDataMaxDate) { realDataMaxDate = currDate } if edbDataInsertConfigDate.IsZero() || currDate.Equal(edbDataInsertConfigDate) { isFindConfigDateRealData = true } } existData, ok := existDataMap[currDateStr] // 如果库中已经存在该数据的话,那么就进行值的变更操作 if ok { // 已经入到指标库的值 existValStr := decimal.NewFromFloat(existData.Value).Round(4).String() //校验待删除日期数据里面是否存在该元素,如果存在的话,那么移除该日期 delete(removeDataTimeMap, currDateStr) if existValStr != saveValue { existData.Value = tmpData.Value updateDataList = append(updateDataList, existData) } continue } // 库中不存在该日期的数据 timestamp := currDate.UnixNano() / 1e6 needAddDateMap[currDate] = 1 addSql += GetAddSql(edbInfoIdStr, edbInfo.EdbCode, currDateStr, fmt.Sprint(timestamp), saveValue) isAdd = true } //删除已经不存在的指标数据(由于该指标当日的数据删除了) { removeDateList := make([]string, 0) for dateTime := range removeDataTimeMap { removeDateList = append(removeDateList, dateTime) } removeNum := len(removeDateList) if removeNum > 0 { //如果拼接指标变更了,那么需要删除所有的指标数据 sql := fmt.Sprintf(` DELETE FROM %s WHERE edb_info_id = ? and data_time in (`+utils.GetOrmInReplace(removeNum)+`) `, dataTableName) _, err = to.Raw(sql, edbInfo.EdbInfoId, removeDateList).Exec() if err != nil { err = fmt.Errorf("删除自有数据的明细数据失败,Err:" + err.Error()) return } } } if isAdd { addSql = strings.TrimRight(addSql, ",") _, err = to.Raw(addSql).Exec() if err != nil { fmt.Println("RefreshAllCalculate add Err", err.Error()) return } } // 修改历史数据 if len(updateDataList) > 0 { for _, v := range updateDataList { err = ModifyEdbDataById(edbInfo.Source, edbInfo.SubSource, v.EdbDataId, fmt.Sprint(v.Value)) if err != nil { fmt.Println(obj.GetSourceName() + ",refreshByMysql:Err:" + err.Error()) return err } } } // 处理手工数据补充的配置 HandleConfigInsertEdbData(realDataMaxDate, edbDataInsertConfig, edbInfo.EdbInfoId, edbInfo.Source, edbInfo.SubSource, existDataMap, isFindConfigDateRealData) return } // GetEdbInfoMaxAndMinInfo 获取指标的最新数据记录信息 func (obj Business) getEdbInfoMaxAndMinInfoByMongo(edbCode string) (item *EdbInfoMaxAndMinInfo, err error) { mogDataObj := new(mgo.EdbDataBusiness) pipeline := []bson.M{ {"$match": bson.M{"edb_code": edbCode}}, {"$group": bson.M{ "_id": nil, "min_date": bson.M{"$min": "$data_time"}, "max_date": bson.M{"$max": "$data_time"}, "min_value": bson.M{"$min": "$value"}, "max_value": bson.M{"$max": "$value"}, }}, {"$project": bson.M{"_id": 0}}, // 可选,如果不需要_id字段 } result, err := mogDataObj.GetEdbInfoMaxAndMinInfo(pipeline) if err != nil { fmt.Println("EdbDataBusiness getEdbDataBusinessList Err:" + err.Error()) return } if !result.MaxDate.IsZero() { whereQuery := bson.M{"edb_code": edbCode, "data_time": result.MaxDate} selectParam := bson.D{{"value", 1}, {"_id", 0}} latestValue, tmpErr := mogDataObj.GetLatestValue(whereQuery, selectParam) if tmpErr != nil { err = tmpErr return } result.LatestValue = latestValue.Value result.EndValue = latestValue.Value } item = &EdbInfoMaxAndMinInfo{ MinDate: result.MinDate.Format(utils.FormatDate), MaxDate: result.MaxDate.Format(utils.FormatDate), MinValue: result.MinValue, MaxValue: result.MaxValue, LatestValue: result.LatestValue, LatestDate: result.LatestDate.Format(utils.FormatDate), EndValue: result.EndValue, } return } // UnifiedModifyEdbInfoMaxAndMinInfo // @Description: 修改指标的最大最小值和最新值 // @author: Roc // @receiver obj // @datetime 2024-04-30 17:07:35 // @param edbInfo *EdbInfo // @return err error func (obj Business) UnifiedModifyEdbInfoMaxAndMinInfo(edbInfo *EdbInfo) (err error) { if utils.UseMongo { edbInfoMaxAndMinInfo, tmpErr := obj.getEdbInfoMaxAndMinInfoByMongo(edbInfo.EdbCode) // 如果正常获取到了,那就去修改指标的最大最小值 if tmpErr == nil && edbInfoMaxAndMinInfo != nil { err = ModifyEdbInfoMaxAndMinInfo(edbInfo.EdbInfoId, edbInfoMaxAndMinInfo) } else { // 清空的目的是为了避免异常返回 err = nil } } else { err, _ = UnifiedModifyEdbInfoMaxAndMinInfo(edbInfo) } return } // EdbInfoMgoData // @Description: mgo里面的数据 type EdbInfoMgoData struct { //EdbDataId primitive.ObjectID `description:"数据ID"` DataTime time.Time `description:"数据日期"` Value float64 `description:"数据"` EdbCode string `description:"指标编码"` } // getBaseBusinessData // @Description: 获取基础数据 // @author: Roc // @receiver obj // @datetime 2024-04-30 11:10:46 // @param edbInfo *EdbInfo // @param startDate string // @return newDataList []EdbInfoSearchData // @return err error func (obj Business) getBaseBusinessData(edbInfo *EdbInfo, startDate string) (newDataList []EdbInfoMgoData, err error) { return obj.getBaseBusinessDataByMongo(edbInfo, startDate) } // getBaseBusinessDataByMongo // @Description: 从mongo中获取基础的明细数据 // @author: Roc // @receiver obj // @datetime 2024-07-02 10:12:02 // @param edbInfo *EdbInfo // @param startDate string // @return newDataList []EdbInfoMgoData // @return err error func (obj Business) getBaseBusinessDataByMongo(edbInfo *EdbInfo, startDate string) (newDataList []EdbInfoMgoData, err error) { newDataList = make([]EdbInfoMgoData, 0) // 获取数据源的指标数据 mogDataObj := new(mgo.BaseFromBusinessData) // 构建查询条件 queryConditions := bson.M{ "index_code": edbInfo.EdbCode, } if startDate != `` { //获取已存在的所有数据 startDateTime, tmpErr := time.ParseInLocation(utils.FormatDate, startDate, time.Local) if tmpErr != nil { err = tmpErr return } queryConditions["data_time"] = bson.M{"$gte": startDateTime} } baseDataList, err := mogDataObj.GetAllDataList(queryConditions, []string{"data_time"}) if err != nil { fmt.Println("getBaseBusinessData Err:" + err.Error()) return } for _, v := range baseDataList { newDataList = append(newDataList, EdbInfoMgoData{ //EdbDataId: v.ID, DataTime: v.DataTime, Value: v.Value, EdbCode: v.IndexCode, }) } return } // getBaseBusinessDataByMysql // @Description: 从mysql中获取基础的明细数据 // @author: Roc // @receiver obj // @datetime 2024-07-02 10:12:16 // @param edbInfo *EdbInfo // @param startDate string // @return newDataList []EdbInfoMgoData // @return err error func (obj Business) getBaseBusinessDataByMysql(edbInfo *EdbInfo, startDate string) (newDataList []EdbInfoMgoData, err error) { newDataList = make([]EdbInfoMgoData, 0) // 获取数据源的指标数据 baseBusinessDataObj := new(BaseFromBusinessData) // 构建查询条件 var condition []string var pars []interface{} condition = append(condition, "index_code = ? ") pars = append(pars, edbInfo.EdbCode) if startDate != `` { condition = append(condition, " data_time >= ? ") pars = append(pars, startDate) } baseDataList, err := baseBusinessDataObj.GetAllDataList(condition, pars, " data_time ASC ") if err != nil { fmt.Println("getBaseBusinessData Err:" + err.Error()) return } for _, v := range baseDataList { newDataList = append(newDataList, EdbInfoMgoData{ //EdbDataId: v.BusinessDataId, DataTime: v.DataTime, Value: v.Value, EdbCode: v.IndexCode, }) } return }