package models import ( "errors" "eta/eta_index_lib/utils" "fmt" "github.com/beego/beego/v2/client/orm" "strings" "time" ) // EdbDataPredictCalculateZjpj 直接拼接数据结构体 type EdbDataPredictCalculateZjpj struct { EdbDataId int `orm:"column(edb_data_id);pk"` EdbInfoId int EdbCode string DataTime string Value float64 Status int CreateTime time.Time ModifyTime time.Time DataTimestamp int64 } // SavePredictCalculateZjpj 新增直接拼接数据 func SavePredictCalculateZjpj(req *EdbInfoCalculateBatchSaveReq, firstEdbInfo, secondEdbInfo *EdbInfo, edbCode, uniqueCode string, sysUserId int, sysUserRealName string) (edbInfo *EdbInfo, latestDateStr string, latestValue float64, err error) { o := orm.NewOrm() to, err := o.Begin() if err != nil { return } defer func() { if err != nil { fmt.Println("SavePredictCalculateZjpj,Err:" + err.Error()) _ = to.Rollback() } else { _ = to.Commit() } }() var existItemA, existItemB *EdbInfoCalculateMapping if req.EdbInfoId <= 0 { edbInfo = &EdbInfo{ EdbInfoType: 1, SourceName: "预测直接拼接", Source: utils.DATA_SOURCE_PREDICT_CALCULATE_ZJPJ, EdbCode: edbCode, EdbName: req.EdbName, EdbNameSource: req.EdbName, Frequency: req.Frequency, Unit: req.Unit, StartDate: firstEdbInfo.StartDate, EndDate: firstEdbInfo.EndDate, ClassifyId: req.ClassifyId, SysUserId: sysUserId, SysUserRealName: sysUserRealName, UniqueCode: uniqueCode, CreateTime: time.Now(), ModifyTime: time.Now(), CalculateFormula: req.Formula, EdbType: 2, } newEdbInfoId, tmpErr := to.Insert(edbInfo) if tmpErr != nil { err = tmpErr return } edbInfo.EdbInfoId = int(newEdbInfoId) //关联关系 //第一个指标 { existItemA = &EdbInfoCalculateMapping{ EdbInfoCalculateMappingId: 0, EdbInfoId: edbInfo.EdbInfoId, Source: edbInfo.Source, SourceName: edbInfo.SourceName, EdbCode: edbInfo.EdbCode, FromEdbInfoId: firstEdbInfo.EdbInfoId, FromEdbCode: firstEdbInfo.EdbCode, FromEdbName: firstEdbInfo.EdbName, FromSource: firstEdbInfo.Source, FromSourceName: firstEdbInfo.SourceName, FromTag: "A", Sort: 1, CreateTime: time.Now(), ModifyTime: time.Now(), } insertId, tmpErr := to.Insert(existItemA) if tmpErr != nil { err = tmpErr return } existItemA.EdbInfoCalculateMappingId = int(insertId) } //第二个指标 { existItemB = &EdbInfoCalculateMapping{ EdbInfoCalculateMappingId: 0, EdbInfoId: edbInfo.EdbInfoId, Source: edbInfo.Source, SourceName: edbInfo.SourceName, EdbCode: edbInfo.EdbCode, FromEdbInfoId: secondEdbInfo.EdbInfoId, FromEdbCode: secondEdbInfo.EdbCode, FromEdbName: secondEdbInfo.EdbName, FromSource: secondEdbInfo.Source, FromSourceName: secondEdbInfo.SourceName, FromTag: "B", Sort: 1, CreateTime: time.Now(), ModifyTime: time.Now(), } insertId, tmpErr := to.Insert(existItemB) if tmpErr != nil { err = tmpErr return } existItemB.EdbInfoCalculateMappingId = int(insertId) } } else { edbInfo, err = GetEdbInfoById(req.EdbInfoId) if err != nil { return } latestDateStr = edbInfo.LatestDate latestValue = edbInfo.LatestValue nowEdbInfo := *edbInfo // 现在的指标信息 //修改指标信息 edbInfo.EdbName = req.EdbName edbInfo.EdbNameSource = req.EdbName edbInfo.Frequency = req.Frequency edbInfo.Unit = req.Unit edbInfo.ClassifyId = req.ClassifyId edbInfo.CalculateFormula = req.Formula edbInfo.ModifyTime = time.Now() _, err = to.Update(edbInfo, "EdbName", "EdbNameSource", "Frequency", "Unit", "ClassifyId", "CalculateFormula", "ModifyTime") if err != nil { return } var existCondition string var existPars []interface{} existCondition += " AND edb_info_id=? " existPars = append(existPars, edbInfo.EdbInfoId) //查询出所有的关联指标 var existList []*EdbInfoCalculateMapping existList, err = GetEdbInfoCalculateListByCondition(existCondition, existPars) if err != nil { err = fmt.Errorf("判断指标是否改变失败,Err:" + err.Error()) return } for _, existItem := range existList { if existItem.FromTag == "A" { existItemA = existItem } else if existItem.FromTag == "B" { existItemB = existItem } } if existItemA == nil { err = errors.New("原拼接日期之前的指标不存在") return } if existItemB == nil { err = errors.New("原拼接日期之后的指标不存在") return } // 是否需要删除数据重新计算 isNeedCalculateData := false // 如果截止日期变更,那么需要重新计算 if req.Formula != nowEdbInfo.CalculateFormula { isNeedCalculateData = true } //第一个指标数据 { // 如果指标变了,那么需要删除关系,并重新计算 if existItemA.FromEdbInfoId != firstEdbInfo.EdbInfoId { //删除之前的A指标关联关系 sql := ` DELETE FROM edb_info_calculate_mapping WHERE edb_info_id = ? and from_edb_info_id = ?` _, err = to.Raw(sql, edbInfo.EdbInfoId, existItemA.FromEdbInfoId).Exec() if err != nil { err = fmt.Errorf("删除拼接日期之前的指标关联关系失败,Err:" + err.Error()) return } } } //第二个指标数据 { // 如果指标变了,那么需要删除关系,并重新计算 if existItemB.FromEdbInfoId != secondEdbInfo.EdbInfoId { //删除之前的B指标关联关系 sql := ` DELETE FROM edb_info_calculate_mapping WHERE edb_info_id = ? and from_edb_info_id = ?` _, err = to.Raw(sql, edbInfo.EdbInfoId, existItemB.FromEdbInfoId).Exec() if err != nil { err = fmt.Errorf("删除拼接日期之后的指标关联关系失败,Err:" + err.Error()) return } } } if existItemA.FromEdbInfoId != firstEdbInfo.EdbInfoId { //添加新的指标关系 { existItemA = &EdbInfoCalculateMapping{ EdbInfoCalculateMappingId: 0, EdbInfoId: edbInfo.EdbInfoId, Source: edbInfo.Source, SourceName: edbInfo.SourceName, EdbCode: edbInfo.EdbCode, FromEdbInfoId: firstEdbInfo.EdbInfoId, FromEdbCode: firstEdbInfo.EdbCode, FromEdbName: firstEdbInfo.EdbName, FromSource: firstEdbInfo.Source, FromSourceName: firstEdbInfo.SourceName, FromTag: "A", Sort: 1, CreateTime: time.Now(), ModifyTime: time.Now(), } insertId, tmpErr := to.Insert(existItemA) if tmpErr != nil { err = tmpErr return } existItemA.EdbInfoCalculateMappingId = int(insertId) isNeedCalculateData = true } } if existItemB.FromEdbInfoId != secondEdbInfo.EdbInfoId { // 添加新的指标关联关系 existItemB = &EdbInfoCalculateMapping{ EdbInfoCalculateMappingId: 0, EdbInfoId: edbInfo.EdbInfoId, Source: edbInfo.Source, SourceName: edbInfo.SourceName, EdbCode: edbInfo.EdbCode, FromEdbInfoId: secondEdbInfo.EdbInfoId, FromEdbCode: secondEdbInfo.EdbCode, FromEdbName: secondEdbInfo.EdbName, FromSource: secondEdbInfo.Source, FromSourceName: secondEdbInfo.SourceName, FromTag: "B", Sort: 2, CreateTime: time.Now(), ModifyTime: time.Now(), } insertId, tmpErr := to.Insert(existItemB) if tmpErr != nil { err = tmpErr return } existItemB.EdbInfoCalculateMappingId = int(insertId) isNeedCalculateData = true } // 如果需要重新计算,那么先删除所有的指标数据,然后再重新计算 if isNeedCalculateData { // 删除之前所有的指标数据 tableName := GetEdbDataTableName(edbInfo.Source, edbInfo.SubSource) sql := fmt.Sprintf(` DELETE FROM %s WHERE edb_info_id = ? `, tableName) _, err = to.Raw(sql, edbInfo.EdbInfoId).Exec() if err != nil { err = fmt.Errorf("删除历史数据失败,Err:" + err.Error()) return } } else { return } } //拼接数据 latestDateStr, latestValue, err = refreshAllPredictCalculateZjpj(to, edbInfo, firstEdbInfo, secondEdbInfo) return } // RefreshAllPredictCalculateZjpj 刷新所有 直接拼接 数据 func RefreshAllPredictCalculateZjpj(edbInfo *EdbInfo) (latestDateStr string, latestValue float64, err error) { o := orm.NewOrm() to, err := o.Begin() defer func() { if err != nil { fmt.Println("refreshAllPredictCalculateZjpj,Err:" + err.Error()) _ = to.Rollback() } else { _ = to.Commit() } }() //查询关联指标信息 var existCondition string var existPars []interface{} existCondition += " AND edb_info_id=? " existPars = append(existPars, edbInfo.EdbInfoId) existList, err := GetEdbInfoCalculateListByCondition(existCondition, existPars) if err != nil { err = fmt.Errorf("判断指标是否改变失败,Err:" + err.Error()) return } var existItemA, existItemB *EdbInfoCalculateMapping for _, existItem := range existList { if existItem.FromTag == "A" { existItemA = existItem } else if existItem.FromTag == "B" { existItemB = existItem } } if existItemA == nil { err = errors.New("原拼接日期之前的指标不存在") return } if existItemB == nil { err = errors.New("原拼接日期之后的指标不存在") return } fromEdbInfo, err := GetEdbInfoById(existItemA.FromEdbInfoId) if err != nil { err = fmt.Errorf("GetEdbInfoById Err:" + err.Error()) return } secondEdbInfo, err := GetEdbInfoById(existItemB.FromEdbInfoId) if err != nil { err = fmt.Errorf("GetEdbInfoById Err:" + err.Error()) return } // 刷新数据 latestDateStr, latestValue, err = refreshAllPredictCalculateZjpj(to, edbInfo, fromEdbInfo, secondEdbInfo) return } // refreshAllPredictCalculateZjpj 刷新所有 直接拼接 数据 func refreshAllPredictCalculateZjpj(to orm.TxOrmer, edbInfo, firstEdbInfo, secondEdbInfo *EdbInfo) (latestDateStr string, latestValue float64, err error) { //查询当前指标现有的数据 var dataList []*EdbDataPredictCalculateZjpj sql := ` SELECT * FROM edb_data_predict_calculate_zjpj WHERE edb_info_id=? ORDER BY data_time DESC ` _, err = to.Raw(sql, edbInfo.EdbInfoId).QueryRows(&dataList) if err != nil { return } if edbInfo.CalculateFormula <= secondEdbInfo.LatestDate { latestDateStr = secondEdbInfo.LatestDate } else { if edbInfo.CalculateFormula >= firstEdbInfo.LatestDate { latestDateStr = firstEdbInfo.LatestDate } else { latestDateStr = edbInfo.CalculateFormula } } var dateArr []string dataMap := make(map[string]*EdbDataPredictCalculateZjpj) removeDataTimeMap := make(map[string]int) //需要移除的日期数据 for _, v := range dataList { dateArr = append(dateArr, v.DataTime) dataMap[v.DataTime] = v removeDataTimeMap[v.DataTime] = 1 } addDataList := make([]*EdbDataPredictCalculateZjpj, 0) //第一个指标 { var firstDataList []*EdbInfoSearchData firstDataList, err = GetPredictEdbDataListAllByStartDate(firstEdbInfo, 0, "") if err != nil { return } for _, v := range firstDataList { if v.DataTime >= edbInfo.CalculateFormula { continue } //校验待删除日期数据里面是否存在该元素,如果存在的话,那么移除该元素 if _, ok := removeDataTimeMap[v.DataTime]; ok { delete(removeDataTimeMap, v.DataTime) } if latestDateStr == v.DataTime { latestValue = v.Value } //时间戳 if edbData, ok := dataMap[v.DataTime]; ok { if edbData.Value != v.Value { //更新指标数据 edbData.Value = v.Value _, _ = to.Update(edbData, "Value") } } else { //时间戳 currentDate, _ := time.ParseInLocation(utils.FormatDate, v.DataTime, time.Local) timestamp := currentDate.UnixNano() / 1e6 edbDataZjpj := &EdbDataPredictCalculateZjpj{ EdbInfoId: edbInfo.EdbInfoId, EdbCode: edbInfo.EdbCode, DataTime: v.DataTime, Value: v.Value, Status: 1, CreateTime: time.Now(), ModifyTime: time.Now(), DataTimestamp: timestamp, } addDataList = append(addDataList, edbDataZjpj) } } } //第二个指标 { /*condition := `` pars := make([]interface{}, 0) condition += " AND data_time >= ? AND edb_info_id = ? " pars = append(pars, edbInfo.CalculateFormula, existItemB.FromEdbInfoId) //第二个指标的数据列表 secondDataList, tmpErr := GetEdbDataListAllByTo(to, condition, pars, existItemB.FromSource, 0) if tmpErr != nil { return tmpErr }*/ var secondDataList []*EdbInfoSearchData secondDataList, err = GetPredictEdbDataListAllByStartDate(secondEdbInfo, 0, edbInfo.CalculateFormula) if err != nil { return } for _, v := range secondDataList { //校验待删除日期数据里面是否存在该元素,如果存在的话,那么移除该元素 if _, ok := removeDataTimeMap[v.DataTime]; ok { delete(removeDataTimeMap, v.DataTime) } if latestDateStr == v.DataTime { latestValue = v.Value } if edbData, ok := dataMap[v.DataTime]; ok { if edbData.Value != v.Value { //更新指标数据 edbData.Value = v.Value edbData.ModifyTime = time.Now() _, tmpErr := to.Update(edbData, "Value", "ModifyTime") if tmpErr != nil { fmt.Println("tmpErr:", tmpErr) err = tmpErr return } } } else { //时间戳 currentDate, _ := time.ParseInLocation(utils.FormatDate, v.DataTime, time.Local) timestamp := currentDate.UnixNano() / 1e6 edbDataZjpj := &EdbDataPredictCalculateZjpj{ EdbInfoId: edbInfo.EdbInfoId, EdbCode: edbInfo.EdbCode, DataTime: v.DataTime, Value: v.Value, Status: 1, CreateTime: time.Now(), ModifyTime: time.Now(), DataTimestamp: timestamp, } addDataList = append(addDataList, edbDataZjpj) } } } //删除已经不存在的累计同比拼接指标数据(由于同比值当日的数据删除了) { removeDateList := make([]string, 0) for dateTime := range removeDataTimeMap { removeDateList = append(removeDateList, dateTime) } if len(removeDateList) > 0 { removeDateStr := strings.Join(removeDateList, `","`) removeDateStr = `"` + removeDateStr + `"` //如果拼接指标变更了,那么需要删除所有的指标数据 tableName := GetEdbDataTableName(edbInfo.Source, edbInfo.SubSource) sql := fmt.Sprintf(` DELETE FROM %s WHERE edb_info_id = ? and data_time in (%s) `, tableName, removeDateStr) _, err = to.Raw(sql, edbInfo.EdbInfoId).Exec() if err != nil { err = fmt.Errorf("删除不存在的直接拼接指标数据失败,Err:" + err.Error()) return } } } //数据入库 if len(addDataList) > 0 { _, tmpErr := to.InsertMulti(len(addDataList), addDataList) if tmpErr != nil { err = tmpErr return } } return }