123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682 |
- package models
- import (
- "eta/eta_index_lib/models/mgo"
- "eta/eta_index_lib/utils"
- "fmt"
- "github.com/beego/beego/v2/client/orm"
- "github.com/shopspring/decimal"
- "go.mongodb.org/mongo-driver/bson"
- "reflect"
- "strings"
- "time"
- )
- type Business struct {
- }
- type AddBaseParams struct {
- EdbCode string `description:"指标编码"`
- EdbName string `description:"指标名称"`
- Unit string `description:"单位"`
- ClassifyId int `description:"所属分类"`
- SysUserId int `description:"用户id"`
- SysUserRealName string `description:"用户真实名称"`
- UniqueCode string `description:"编码"`
- }
- type EditBaseParams struct {
- EdbCode string `description:"指标编码"`
- EdbName string `description:"指标名称"`
- EdbNameEn string `description:"指标名称(英文)"`
- Unit string `description:"单位"`
- UnitEn string `description:"单位(英文)"`
- ClassifyId int `description:"所属分类"`
- SysUserId int `description:"用户id"`
- SysUserRealName string `description:"用户真实名称"`
- UniqueCode string `description:"编码"`
- Lang string `description:"语言版本"`
- EdbInfo *EdbInfo `description:"指标信息"`
- }
- type RefreshBaseParams struct {
- EdbInfo *EdbInfo
- StartDate string
- EndDate string
- }
- func (obj Business) Add(params AddBaseParams, businessIndexItem *BaseFromBusinessIndex) (edbInfo *EdbInfo, err error, errMsg string) {
- o := orm.NewOrm()
- to, err := o.Begin()
- if err != nil {
- return
- }
- defer func() {
- if err != nil {
- _ = to.Rollback()
- fmt.Println(reflect.TypeOf(obj).Name(), ";Add,Err:"+err.Error())
- } else {
- _ = to.Commit()
- }
- }()
- edbInfo = new(EdbInfo)
- edbInfo.Source = obj.GetSource()
- edbInfo.SourceName = obj.GetSourceName()
- edbInfo.EdbCode = params.EdbCode
- edbInfo.EdbName = params.EdbName
- edbInfo.EdbNameSource = params.EdbName
- edbInfo.Frequency = businessIndexItem.Frequency
- edbInfo.Unit = params.Unit
- edbInfo.ClassifyId = params.ClassifyId
- edbInfo.SysUserId = params.SysUserId
- edbInfo.SysUserRealName = params.SysUserRealName
- edbInfo.CreateTime = time.Now()
- edbInfo.ModifyTime = time.Now()
- edbInfo.UniqueCode = params.UniqueCode
- edbInfo.CalculateFormula = ``
- edbInfo.EdbNameEn = params.EdbName
- edbInfo.UnitEn = params.Unit
- edbInfo.EdbType = obj.GetEdbType()
- edbInfo.SubSource = businessIndexItem.Source
- edbInfo.SubSourceName = businessIndexItem.SourceName
- edbInfo.Sort = GetAddEdbMaxSortByClassifyId(params.ClassifyId, utils.EDB_INFO_TYPE)
- newEdbInfoId, tmpErr := to.Insert(edbInfo)
- if tmpErr != nil {
- err = tmpErr
- return
- }
- edbInfo.EdbInfoId = int(newEdbInfoId)
-
- err = obj.refresh(to, edbInfo, "")
- return
- }
- func (obj Business) Edit(params EditBaseParams, businessIndexItem *BaseFromBusinessIndex) (err error, errMsg string) {
- edbInfo := params.EdbInfo
- o := orm.NewOrm()
- to, err := o.Begin()
- if err != nil {
- return
- }
- defer func() {
- if err != nil {
- _ = to.Rollback()
- fmt.Println(reflect.TypeOf(obj).Name(), ";Edit,Err:"+err.Error())
- } else {
- _ = to.Commit()
- }
- }()
-
-
- edbInfo.EdbName = params.EdbName
- edbInfo.EdbNameSource = params.EdbName
- edbInfo.Frequency = businessIndexItem.Frequency
- edbInfo.Unit = params.Unit
- edbInfo.ClassifyId = params.ClassifyId
- edbInfo.EdbNameEn = params.EdbNameEn
- edbInfo.UnitEn = params.UnitEn
- edbInfo.SubSource = businessIndexItem.Source
- edbInfo.SubSourceName = businessIndexItem.SourceName
- edbInfo.ModifyTime = time.Now()
- _, err = to.Update(edbInfo, "EdbName", "EdbNameSource", "Frequency", "Unit", "ClassifyId", "CalculateFormula", "ModifyTime", "EdbNameEn", "UnitEn", "SubSource", "SubSourceName")
- if err != nil {
- return
- }
-
- err = obj.refresh(to, edbInfo, "")
- return
- }
- func (obj Business) Refresh(params RefreshBaseParams) (err error, errMsg string) {
- to, err := orm.NewOrm().Begin()
- if err != nil {
- return
- }
- defer func() {
- if err != nil {
- _ = to.Rollback()
- fmt.Println(reflect.TypeOf(obj).Name(), ";Refresh,Err:"+err.Error())
- } else {
- _ = to.Commit()
- }
- }()
-
- err = obj.refresh(to, params.EdbInfo, params.StartDate)
- return
- }
- func (obj Business) GetSource() int {
- return utils.DATA_SOURCE_BUSINESS
- }
- func (obj Business) GetSourceName() string {
- return utils.DATA_SOURCE_NAME_BUSINESS
- }
- func (obj Business) GetEdbType() int {
- return utils.DEFAULT_EDB_TYPE
- }
- func (obj Business) refresh(to orm.TxOrmer, edbInfo *EdbInfo, startDate string) (err error) {
- if utils.UseMongo {
- return obj.refreshByMongo(edbInfo, startDate)
- } else {
- return obj.refreshByMysql(to, edbInfo, startDate)
- }
- return
- }
- func (obj Business) refreshByMongo(edbInfo *EdbInfo, startDate string) (err error) {
-
- var realDataMaxDate, edbDataInsertConfigDate time.Time
- var edbDataInsertConfig *EdbDataInsertConfig
- var isFindConfigDateRealData bool
- {
- edbDataInsertConfig, err = GetEdbDataInsertConfigByEdbId(edbInfo.EdbInfoId)
- if err != nil && err.Error() != utils.ErrNoRow() {
- return
- }
- if edbDataInsertConfig != nil {
- edbDataInsertConfigDate = edbDataInsertConfig.Date
- }
- }
-
- baseDataList, err := obj.getBaseBusinessDataByMongo(edbInfo, startDate)
-
- existDataList := make([]*mgo.EdbDataBusiness, 0)
- mogDataObj := new(mgo.EdbDataBusiness)
- {
-
- queryConditions := bson.M{
- "edb_code": edbInfo.EdbCode,
- }
- if startDate != `` {
-
- startDateTime, tmpErr := time.ParseInLocation(utils.FormatDate, startDate, time.Local)
- if tmpErr != nil {
- err = tmpErr
- return
- }
- queryConditions["data_time"] = bson.M{"$gte": startDateTime}
- }
- existDataList, err = mogDataObj.GetAllDataList(queryConditions, []string{"data_time"})
- if err != nil {
- fmt.Println(obj.GetSourceName() + ",refresh err;getEdbDataBusinessList Err:" + err.Error())
- return
- }
- }
- existDataMap := make(map[string]*mgo.EdbDataBusiness)
- removeDataTimeMap := make(map[string]bool)
- for _, v := range existDataList {
- tmpDate := v.DataTime.Format(utils.FormatDate)
- existDataMap[tmpDate] = v
- removeDataTimeMap[tmpDate] = true
- }
- needAddDateMap := make(map[time.Time]int)
-
- addDataList := make([]interface{}, 0)
-
- updateDataList := make([]mgo.EdbDataBusiness, 0)
- for _, tmpData := range baseDataList {
- currDate := tmpData.DataTime
- currDateStr := currDate.Format(utils.FormatDate)
-
- saveValue := decimal.NewFromFloat(tmpData.Value).Round(4).String()
-
- {
- if realDataMaxDate.IsZero() || currDate.After(realDataMaxDate) {
- realDataMaxDate = currDate
- }
- if edbDataInsertConfigDate.IsZero() || currDate.Equal(edbDataInsertConfigDate) {
- isFindConfigDateRealData = true
- }
- }
- existData, ok := existDataMap[currDateStr]
-
- if ok {
-
- existValStr := decimal.NewFromFloat(existData.Value).Round(4).String()
-
- delete(removeDataTimeMap, currDateStr)
- if existValStr != saveValue {
- existData.Value = tmpData.Value
- updateDataList = append(updateDataList, *existData)
- }
- continue
- }
-
- timestamp := currDate.UnixNano() / 1e6
- addDataList = append(addDataList, mgo.EdbDataBusiness{
- EdbInfoId: edbInfo.EdbInfoId,
- EdbCode: edbInfo.EdbCode,
- DataTime: currDate,
- Value: tmpData.Value,
- CreateTime: time.Now(),
- ModifyTime: time.Now(),
- DataTimestamp: timestamp,
- })
- needAddDateMap[currDate] = 1
- }
-
- {
- coll := mogDataObj.GetCollection()
-
- {
- removeDateList := make([]time.Time, 0)
- for dateTime := range removeDataTimeMap {
-
- tmpDateTime, tmpErr := time.ParseInLocation(utils.FormatDate, dateTime, time.Local)
- if tmpErr != nil {
- err = tmpErr
- return
- }
- removeDateList = append(removeDateList, tmpDateTime)
- }
- removeNum := len(removeDateList)
- if removeNum > 0 {
- err = mogDataObj.RemoveManyByColl(coll, bson.M{"edb_code": edbInfo.EdbCode, "data_time": bson.M{"$in": removeDateList}})
- if err != nil {
- fmt.Println("mogDataObj.RemoveMany() Err:" + err.Error())
- return
- }
- }
- }
-
- if len(addDataList) > 0 {
- err = mogDataObj.BatchInsertDataByColl(coll, 500, addDataList)
- if err != nil {
- fmt.Println("mogDataObj.BatchInsertData() Err:" + err.Error())
- return
- }
- }
-
- if len(updateDataList) > 0 {
- for _, v := range updateDataList {
- err = mogDataObj.UpdateDataByColl(coll, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}})
- if err != nil {
- fmt.Println("mogDataObj.UpdateDataByColl:Err:" + err.Error())
- return
- }
- }
- }
- }
-
- HandleConfigInsertEdbDataByMongo(realDataMaxDate, edbDataInsertConfig, edbInfo.EdbInfoId, edbInfo.Source, edbInfo.SubSource, existDataMap, isFindConfigDateRealData)
- return
- }
- func (obj Business) refreshByMysql(to orm.TxOrmer, edbInfo *EdbInfo, startDate string) (err error) {
- dataTableName := GetEdbDataTableName(edbInfo.Source, edbInfo.SubSource)
- edbInfoIdStr := fmt.Sprint(edbInfo.EdbInfoId)
-
- var realDataMaxDate, edbDataInsertConfigDate time.Time
- var edbDataInsertConfig *EdbDataInsertConfig
- var isFindConfigDateRealData bool
- {
- edbDataInsertConfig, err = GetEdbDataInsertConfigByEdbId(edbInfo.EdbInfoId)
- if err != nil && err.Error() != utils.ErrNoRow() {
- return
- }
- if edbDataInsertConfig != nil {
- edbDataInsertConfigDate = edbDataInsertConfig.Date
- }
- }
-
- baseDataList, err := obj.getBaseBusinessDataByMysql(edbInfo, startDate)
-
- var existCondition string
- var existPars []interface{}
- existCondition += " AND edb_info_id=? "
- existPars = append(existPars, edbInfo.EdbInfoId)
- if startDate != "" {
- existCondition += " AND data_time>=? "
- existPars = append(existPars, startDate)
- }
- existList, err := GetEdbDataByCondition(edbInfo.Source, edbInfo.SubSource, existCondition, existPars)
- if err != nil {
- fmt.Println(obj.GetSourceName() + ",refreshByMysql err;getEdbDataBusinessList Err:" + err.Error())
- return err
- }
- existDataMap := make(map[string]*EdbInfoSearchData)
- removeDataTimeMap := make(map[string]bool)
- for _, v := range existList {
- existDataMap[v.DataTime] = v
- removeDataTimeMap[v.DataTime] = true
- }
- needAddDateMap := make(map[time.Time]int)
-
- addSql := ` INSERT INTO ` + dataTableName + ` (edb_info_id,edb_code,data_time,value,create_time,modify_time,data_timestamp) values `
- var isAdd bool
-
- updateDataList := make([]*EdbInfoSearchData, 0)
- for _, tmpData := range baseDataList {
- currDate := tmpData.DataTime
- currDateStr := currDate.Format(utils.FormatDate)
-
- saveValue := decimal.NewFromFloat(tmpData.Value).Round(4).String()
-
- {
- if realDataMaxDate.IsZero() || currDate.After(realDataMaxDate) {
- realDataMaxDate = currDate
- }
- if edbDataInsertConfigDate.IsZero() || currDate.Equal(edbDataInsertConfigDate) {
- isFindConfigDateRealData = true
- }
- }
- existData, ok := existDataMap[currDateStr]
-
- if ok {
-
- existValStr := decimal.NewFromFloat(existData.Value).Round(4).String()
-
- delete(removeDataTimeMap, currDateStr)
- if existValStr != saveValue {
- existData.Value = tmpData.Value
- updateDataList = append(updateDataList, existData)
- }
- continue
- }
-
- timestamp := currDate.UnixNano() / 1e6
- needAddDateMap[currDate] = 1
- addSql += GetAddSql(edbInfoIdStr, edbInfo.EdbCode, currDateStr, fmt.Sprint(timestamp), saveValue)
- isAdd = true
- }
-
- {
- removeDateList := make([]string, 0)
- for dateTime := range removeDataTimeMap {
- removeDateList = append(removeDateList, dateTime)
- }
- removeNum := len(removeDateList)
- if removeNum > 0 {
-
- sql := fmt.Sprintf(` DELETE FROM %s WHERE edb_info_id = ? and data_time in (`+utils.GetOrmInReplace(removeNum)+`) `, dataTableName)
- _, err = to.Raw(sql, edbInfo.EdbInfoId, removeDateList).Exec()
- if err != nil {
- err = fmt.Errorf("删除自有数据的明细数据失败,Err:" + err.Error())
- return
- }
- }
- }
- if isAdd {
- addSql = strings.TrimRight(addSql, ",")
- _, err = to.Raw(addSql).Exec()
- if err != nil {
- fmt.Println("RefreshAllCalculate add Err", err.Error())
- return
- }
- }
-
- if len(updateDataList) > 0 {
- for _, v := range updateDataList {
- err = ModifyEdbDataById(edbInfo.Source, edbInfo.SubSource, v.EdbDataId, fmt.Sprint(v.Value))
- if err != nil {
- fmt.Println(obj.GetSourceName() + ",refreshByMysql:Err:" + err.Error())
- return err
- }
- }
- }
-
- HandleConfigInsertEdbData(realDataMaxDate, edbDataInsertConfig, edbInfo.EdbInfoId, edbInfo.Source, edbInfo.SubSource, existDataMap, isFindConfigDateRealData)
- return
- }
- func (obj Business) getEdbInfoMaxAndMinInfoByMongo(edbCode string) (item *EdbInfoMaxAndMinInfo, err error) {
- mogDataObj := new(mgo.EdbDataBusiness)
- pipeline := []bson.M{
- {"$match": bson.M{"edb_code": edbCode}},
- {"$group": bson.M{
- "_id": nil,
- "min_date": bson.M{"$min": "$data_time"},
- "max_date": bson.M{"$max": "$data_time"},
- "min_value": bson.M{"$min": "$value"},
- "max_value": bson.M{"$max": "$value"},
- }},
- {"$project": bson.M{"_id": 0}},
- }
- result, err := mogDataObj.GetEdbInfoMaxAndMinInfo(pipeline)
- if err != nil {
- fmt.Println("EdbDataBusiness getEdbDataBusinessList Err:" + err.Error())
- return
- }
- if !result.MaxDate.IsZero() {
- whereQuery := bson.M{"edb_code": edbCode, "data_time": result.MaxDate}
- selectParam := bson.D{{"value", 1}, {"_id", 0}}
- latestValue, tmpErr := mogDataObj.GetLatestValue(whereQuery, selectParam)
- if tmpErr != nil {
- err = tmpErr
- return
- }
- result.LatestValue = latestValue.Value
- result.EndValue = latestValue.Value
- }
- item = &EdbInfoMaxAndMinInfo{
- MinDate: result.MinDate.Format(utils.FormatDate),
- MaxDate: result.MaxDate.Format(utils.FormatDate),
- MinValue: result.MinValue,
- MaxValue: result.MaxValue,
- LatestValue: result.LatestValue,
- LatestDate: result.LatestDate.Format(utils.FormatDate),
- EndValue: result.EndValue,
- }
- return
- }
- func (obj Business) UnifiedModifyEdbInfoMaxAndMinInfo(edbInfo *EdbInfo) (err error) {
- if utils.UseMongo {
- edbInfoMaxAndMinInfo, tmpErr := obj.getEdbInfoMaxAndMinInfoByMongo(edbInfo.EdbCode)
-
- if tmpErr == nil && edbInfoMaxAndMinInfo != nil {
- err = ModifyEdbInfoMaxAndMinInfo(edbInfo.EdbInfoId, edbInfoMaxAndMinInfo)
- } else {
-
- err = nil
- }
- } else {
- err, _ = UnifiedModifyEdbInfoMaxAndMinInfo(edbInfo)
- }
- return
- }
- type EdbInfoMgoData struct {
-
- DataTime time.Time `description:"数据日期"`
- Value float64 `description:"数据"`
- EdbCode string `description:"指标编码"`
- }
- func (obj Business) getBaseBusinessData(edbInfo *EdbInfo, startDate string) (newDataList []EdbInfoMgoData, err error) {
- return obj.getBaseBusinessDataByMongo(edbInfo, startDate)
- }
- func (obj Business) getBaseBusinessDataByMongo(edbInfo *EdbInfo, startDate string) (newDataList []EdbInfoMgoData, err error) {
- newDataList = make([]EdbInfoMgoData, 0)
-
- mogDataObj := new(mgo.BaseFromBusinessData)
-
- queryConditions := bson.M{
- "index_code": edbInfo.EdbCode,
- }
- if startDate != `` {
-
- startDateTime, tmpErr := time.ParseInLocation(utils.FormatDate, startDate, time.Local)
- if tmpErr != nil {
- err = tmpErr
- return
- }
- queryConditions["data_time"] = bson.M{"$gte": startDateTime}
- }
- baseDataList, err := mogDataObj.GetAllDataList(queryConditions, []string{"data_time"})
- if err != nil {
- fmt.Println("getBaseBusinessData Err:" + err.Error())
- return
- }
- for _, v := range baseDataList {
- newDataList = append(newDataList, EdbInfoMgoData{
-
- DataTime: v.DataTime,
- Value: v.Value,
- EdbCode: v.IndexCode,
- })
- }
- return
- }
- func (obj Business) getBaseBusinessDataByMysql(edbInfo *EdbInfo, startDate string) (newDataList []EdbInfoMgoData, err error) {
- newDataList = make([]EdbInfoMgoData, 0)
-
- baseBusinessDataObj := new(BaseFromBusinessData)
-
- var condition []string
- var pars []interface{}
- condition = append(condition, "index_code = ? ")
- pars = append(pars, edbInfo.EdbCode)
- if startDate != `` {
- condition = append(condition, " data_time >= ? ")
- pars = append(pars, startDate)
- }
- baseDataList, err := baseBusinessDataObj.GetAllDataList(condition, pars, " data_time ASC ")
- if err != nil {
- fmt.Println("getBaseBusinessData Err:" + err.Error())
- return
- }
- for _, v := range baseDataList {
- newDataList = append(newDataList, EdbInfoMgoData{
-
- DataTime: v.DataTime,
- Value: v.Value,
- EdbCode: v.IndexCode,
- })
- }
- return
- }
|