package models import ( "eta/eta_index_lib/utils" "fmt" "github.com/beego/beego/v2/client/orm" "strings" "time" ) // EdbDataCalculateZjpj 直接拼接数据结构体 type EdbDataCalculateZjpj struct { EdbDataId int `orm:"column(edb_data_id);pk"` EdbInfoId int EdbCode string DataTime string Value float64 Status int CreateTime time.Time ModifyTime time.Time DataTimestamp int64 } // AddCalculateZjpj 新增直接拼接数据 func AddCalculateZjpj(req *EdbInfoCalculateBatchSaveReq, firstEdbInfo, secondEdbInfo *EdbInfo, edbCode, uniqueCode string, sysUserId int, sysUserRealName string) (edbInfo *EdbInfo, err error) { o := orm.NewOrm() to, err := o.Begin() if err != nil { return } defer func() { if err != nil { fmt.Println("AddCalculateZjpj,Err:" + err.Error()) _ = to.Rollback() } else { _ = to.Commit() } }() if req.EdbInfoId <= 0 { edbInfo = &EdbInfo{ SourceName: "直接拼接", Source: utils.DATA_SOURCE_CALCULATE_ZJPJ, EdbCode: edbCode, EdbName: req.EdbName, EdbNameSource: req.EdbName, Frequency: req.Frequency, Unit: req.Unit, StartDate: firstEdbInfo.StartDate, EndDate: firstEdbInfo.EndDate, ClassifyId: req.ClassifyId, SysUserId: sysUserId, SysUserRealName: sysUserRealName, UniqueCode: uniqueCode, CreateTime: time.Now(), ModifyTime: time.Now(), CalculateFormula: req.Formula, EdbNameEn: req.EdbName, UnitEn: req.Unit, EdbType: 2, Sort: GetAddEdbMaxSortByClassifyId(req.ClassifyId, utils.EDB_INFO_TYPE), } newEdbInfoId, tmpErr := to.Insert(edbInfo) if tmpErr != nil { err = tmpErr return } edbInfo.EdbInfoId = int(newEdbInfoId) } else { edbInfo, err = GetEdbInfoById(req.EdbInfoId) if err != nil { return } //查询 tmpEdbInfo, tmpErr := GetEdbInfoById(edbInfo.EdbInfoId) if tmpErr != nil { err = tmpErr return } tmpEdbInfo.EdbName = req.EdbName tmpEdbInfo.ClassifyId = req.ClassifyId tmpEdbInfo.Frequency = req.Frequency tmpEdbInfo.Unit = req.Unit tmpEdbInfo.CalculateFormula = req.Formula edbInfo = tmpEdbInfo //删除指标数据 dataTableName := GetEdbDataTableName(utils.DATA_SOURCE_CALCULATE_ZJPJ, utils.DATA_SUB_SOURCE_EDB) fmt.Println("dataTableName:" + dataTableName) deleteSql := ` DELETE FROM %s WHERE edb_info_id=? ` deleteSql = fmt.Sprintf(deleteSql, dataTableName) _, err = to.Raw(deleteSql, req.EdbInfoId).Exec() if err != nil { return } //删除指标关系 sql := ` DELETE FROM edb_info_calculate_mapping WHERE edb_info_id=? ` _, err = to.Raw(sql, edbInfo.EdbInfoId).Exec() } //关联关系 var existItemA, existItemB *EdbInfoCalculateMapping //第一个指标 { existItemA = &EdbInfoCalculateMapping{ EdbInfoCalculateMappingId: 0, EdbInfoId: edbInfo.EdbInfoId, Source: edbInfo.Source, SourceName: edbInfo.SourceName, EdbCode: edbInfo.EdbCode, FromEdbInfoId: firstEdbInfo.EdbInfoId, FromEdbCode: firstEdbInfo.EdbCode, FromEdbName: firstEdbInfo.EdbName, FromSource: firstEdbInfo.Source, FromSourceName: firstEdbInfo.SourceName, FromTag: "A", Sort: 1, CreateTime: time.Now(), ModifyTime: time.Now(), FromSubSource: firstEdbInfo.SubSource, } insertId, tmpErr := to.Insert(existItemA) if tmpErr != nil { err = tmpErr return } existItemA.EdbInfoCalculateMappingId = int(insertId) } //第二个指标 { existItemB = &EdbInfoCalculateMapping{ EdbInfoCalculateMappingId: 0, EdbInfoId: edbInfo.EdbInfoId, Source: edbInfo.Source, SourceName: edbInfo.SourceName, EdbCode: edbInfo.EdbCode, FromEdbInfoId: secondEdbInfo.EdbInfoId, FromEdbCode: secondEdbInfo.EdbCode, FromEdbName: secondEdbInfo.EdbName, FromSource: secondEdbInfo.Source, FromSourceName: secondEdbInfo.SourceName, FromTag: "B", Sort: 1, CreateTime: time.Now(), ModifyTime: time.Now(), FromSubSource: secondEdbInfo.SubSource, } insertId, tmpErr := to.Insert(existItemB) if tmpErr != nil { err = tmpErr return } existItemB.EdbInfoCalculateMappingId = int(insertId) } //拼接数据 err = refreshAllCalculateZjpj(to, edbInfo, existItemA, existItemB) return } // EditCalculateZjpj 编辑直接拼接数据 func EditCalculateZjpj(req *EdbInfoCalculateBatchEditReq, edbInfo, firstEdbInfo, secondEdbInfo *EdbInfo) (err error) { o := orm.NewOrm() to, err := o.Begin() if err != nil { return } defer func() { if err != nil { fmt.Println("EditCalculateZjpj,Err:" + err.Error()) _ = to.Rollback() } else { _ = to.Commit() } }() nowEdbInfo := *edbInfo // 现在的指标信息 //修改指标信息 edbInfo.EdbName = req.EdbName edbInfo.EdbNameSource = req.EdbName edbInfo.Frequency = req.Frequency edbInfo.Unit = req.Unit edbInfo.ClassifyId = req.ClassifyId edbInfo.CalculateFormula = req.Formula edbInfo.EdbNameEn = req.EdbNameEn edbInfo.UnitEn = req.UnitEn edbInfo.ModifyTime = time.Now() _, err = to.Update(edbInfo, "EdbName", "EdbNameSource", "Frequency", "Unit", "ClassifyId", "CalculateFormula", "ModifyTime", "EdbNameEn", "UnitEn") if err != nil { return } var existCondition string var existPars []interface{} existCondition += " AND edb_info_id=? " existPars = append(existPars, edbInfo.EdbInfoId) //查询出所有的关联指标 existList, err := GetEdbInfoCalculateListByCondition(existCondition, existPars) if err != nil { err = fmt.Errorf("判断指标是否改变失败,Err:" + err.Error()) return } var existItemA, existItemB *EdbInfoCalculateMapping for _, existItem := range existList { if existItem.FromTag == "A" { existItemA = existItem } else if existItem.FromTag == "B" { existItemB = existItem } } // 是否需要删除数据重新计算 isNeedCalculateData := false // 如果截止日期变更,那么需要重新计算 if req.Formula != nowEdbInfo.CalculateFormula { isNeedCalculateData = true } //第一个指标数据 { // 如果指标变了,那么需要删除关系,并重新计算 if existItemA.FromEdbInfoId != firstEdbInfo.EdbInfoId { //删除之前的A指标关联关系 sql := ` DELETE FROM edb_info_calculate_mapping WHERE edb_info_id = ? and from_edb_info_id = ?` _, err = to.Raw(sql, edbInfo.EdbInfoId, existItemA.FromEdbInfoId).Exec() if err != nil { err = fmt.Errorf("删除拼接日期之前的指标关联关系失败,Err:" + err.Error()) return } //添加新的指标关系 { existItemA = &EdbInfoCalculateMapping{ EdbInfoCalculateMappingId: 0, EdbInfoId: edbInfo.EdbInfoId, Source: edbInfo.Source, SourceName: edbInfo.SourceName, EdbCode: edbInfo.EdbCode, FromEdbInfoId: firstEdbInfo.EdbInfoId, FromEdbCode: firstEdbInfo.EdbCode, FromEdbName: firstEdbInfo.EdbName, FromSource: firstEdbInfo.Source, FromSourceName: firstEdbInfo.SourceName, FromTag: "A", Sort: 1, CreateTime: time.Now(), ModifyTime: time.Now(), FromSubSource: firstEdbInfo.SubSource, } insertId, tmpErr := to.Insert(existItemA) if tmpErr != nil { err = tmpErr return } existItemA.EdbInfoCalculateMappingId = int(insertId) isNeedCalculateData = true } } } //第二个指标数据 { // 如果指标变了,那么需要删除关系,并重新计算 if existItemB.FromEdbInfoId != secondEdbInfo.EdbInfoId { //删除之前的B指标关联关系 sql := ` DELETE FROM edb_info_calculate_mapping WHERE edb_info_id = ? and from_edb_info_id = ?` _, err = to.Raw(sql, edbInfo.EdbInfoId, existItemB.FromEdbInfoId).Exec() if err != nil { err = fmt.Errorf("删除拼接日期之后的指标关联关系失败,Err:" + err.Error()) return } // 添加新的指标关联关系 existItemB = &EdbInfoCalculateMapping{ EdbInfoCalculateMappingId: 0, EdbInfoId: edbInfo.EdbInfoId, Source: edbInfo.Source, SourceName: edbInfo.SourceName, EdbCode: edbInfo.EdbCode, FromEdbInfoId: secondEdbInfo.EdbInfoId, FromEdbCode: secondEdbInfo.EdbCode, FromEdbName: secondEdbInfo.EdbName, FromSource: secondEdbInfo.Source, FromSourceName: secondEdbInfo.SourceName, FromTag: "B", Sort: 2, CreateTime: time.Now(), ModifyTime: time.Now(), FromSubSource: secondEdbInfo.SubSource, } insertId, tmpErr := to.Insert(existItemB) if tmpErr != nil { err = tmpErr return } existItemB.EdbInfoCalculateMappingId = int(insertId) isNeedCalculateData = true } } // 如果需要重新计算,那么先删除所有的指标数据,然后再重新计算 if isNeedCalculateData { // 删除之前所有的指标数据 tableName := GetEdbDataTableName(edbInfo.Source, edbInfo.SubSource) sql := fmt.Sprintf(` DELETE FROM %s WHERE edb_info_id = ? `, tableName) _, err = to.Raw(sql, edbInfo.EdbInfoId).Exec() if err != nil { err = fmt.Errorf("删除历史数据失败,Err:" + err.Error()) return } err = refreshAllCalculateZjpj(to, edbInfo, existItemA, existItemB) } return } // GetAllEdbDataCalculateZjpjByEdbInfoId 根据指标id获取全部的数据 func GetAllEdbDataCalculateZjpjByEdbInfoId(edbInfoId int) (items []*EdbDataCalculateZjpj, err error) { o := orm.NewOrm() sql := ` SELECT * FROM edb_data_calculate_zjpj WHERE edb_info_id=? ORDER BY data_time DESC ` _, err = o.Raw(sql, edbInfoId).QueryRows(&items) return } // RefreshAllCalculateZjpj 刷新所有 直接拼接 数据 func RefreshAllCalculateZjpj(edbInfo *EdbInfo) (err error) { o := orm.NewOrm() to, err := o.Begin() defer func() { if err != nil { fmt.Println("RefreshAllCalculateZjpj,Err:" + err.Error()) _ = to.Rollback() } else { _ = to.Commit() } }() //查询关联指标信息 var existCondition string var existPars []interface{} existCondition += " AND edb_info_id=? " existPars = append(existPars, edbInfo.EdbInfoId) existList, err := GetEdbInfoCalculateListByCondition(existCondition, existPars) if err != nil { err = fmt.Errorf("判断指标是否改变失败,Err:" + err.Error()) return } var existItemA, existItemB *EdbInfoCalculateMapping for _, existItem := range existList { if existItem.FromTag == "A" { existItemA = existItem } else if existItem.FromTag == "B" { existItemB = existItem } } // 刷新数据 err = refreshAllCalculateZjpj(to, edbInfo, existItemA, existItemB) return } // refreshAllCalculateZjpj 刷新所有 直接拼接 数据 func refreshAllCalculateZjpj(to orm.TxOrmer, edbInfo *EdbInfo, existItemA, existItemB *EdbInfoCalculateMapping) (err error) { //查询当前指标现有的数据 var condition string var pars []interface{} condition += " AND edb_info_id=? " pars = append(pars, edbInfo.EdbInfoId) var dataList []*EdbDataCalculateZjpj sql := ` SELECT * FROM edb_data_calculate_zjpj WHERE edb_info_id=? ORDER BY data_time DESC ` _, err = to.Raw(sql, edbInfo.EdbInfoId).QueryRows(&dataList) if err != nil { return err } var dateArr []string dataMap := make(map[string]*EdbDataCalculateZjpj) removeDataTimeMap := make(map[string]int) //需要移除的日期数据 for _, v := range dataList { dateArr = append(dateArr, v.DataTime) dataMap[v.DataTime] = v removeDataTimeMap[v.DataTime] = 1 } addDataList := make([]*EdbDataCalculateZjpj, 0) //第一个指标 { //第一个指标的数据列表 firstDataList, tmpErr := GetEdbDataListAllByTo(to, existItemA.FromSource, existItemA.FromSubSource, FindEdbDataListAllCond{ EdbInfoId: existItemA.FromEdbInfoId, EndDataTime: edbInfo.CalculateFormula, EndDataTimeCond: "<", }, 0) if tmpErr != nil { return tmpErr } for _, v := range firstDataList { //校验待删除日期数据里面是否存在该元素,如果存在的话,那么移除该元素 if _, ok := removeDataTimeMap[v.DataTime]; ok { delete(removeDataTimeMap, v.DataTime) } //时间戳 if edbData, ok := dataMap[v.DataTime]; ok { if edbData.Value != v.Value { //更新指标数据 edbData.Value = v.Value _, _ = to.Update(edbData, "Value") } } else { //时间戳 currentDate, _ := time.ParseInLocation(utils.FormatDate, v.DataTime, time.Local) timestamp := currentDate.UnixNano() / 1e6 edbDataZjpj := &EdbDataCalculateZjpj{ EdbInfoId: edbInfo.EdbInfoId, EdbCode: edbInfo.EdbCode, DataTime: v.DataTime, Value: v.Value, Status: 1, CreateTime: time.Now(), ModifyTime: time.Now(), DataTimestamp: timestamp, } addDataList = append(addDataList, edbDataZjpj) } } } //第二个指标 { //第二个指标的数据列表 secondDataList, tmpErr := GetEdbDataListAllByTo(to, existItemB.FromSource, existItemB.FromSubSource, FindEdbDataListAllCond{ EdbInfoId: existItemB.FromEdbInfoId, StartDataTime: edbInfo.CalculateFormula, StartDataTimeCond: ">=", }, 0) if tmpErr != nil { return tmpErr } for _, v := range secondDataList { //校验待删除日期数据里面是否存在该元素,如果存在的话,那么移除该元素 if _, ok := removeDataTimeMap[v.DataTime]; ok { delete(removeDataTimeMap, v.DataTime) } if edbData, ok := dataMap[v.DataTime]; ok { if edbData.Value != v.Value { //更新指标数据 edbData.Value = v.Value edbData.ModifyTime = time.Now() _, tmpErr := to.Update(edbData, "Value", "ModifyTime") if tmpErr != nil { fmt.Println("tmpErr:", tmpErr) } } } else { //时间戳 currentDate, _ := time.ParseInLocation(utils.FormatDate, v.DataTime, time.Local) timestamp := currentDate.UnixNano() / 1e6 edbDataZjpj := &EdbDataCalculateZjpj{ EdbInfoId: edbInfo.EdbInfoId, EdbCode: edbInfo.EdbCode, DataTime: v.DataTime, Value: v.Value, Status: 1, CreateTime: time.Now(), ModifyTime: time.Now(), DataTimestamp: timestamp, } addDataList = append(addDataList, edbDataZjpj) } } } //删除已经不存在的累计同比拼接指标数据(由于同比值当日的数据删除了) { removeDateList := make([]string, 0) for dateTime := range removeDataTimeMap { removeDateList = append(removeDateList, dateTime) } if len(removeDateList) > 0 { removeDateStr := strings.Join(removeDateList, `","`) removeDateStr = `"` + removeDateStr + `"` //如果拼接指标变更了,那么需要删除所有的指标数据 tableName := GetEdbDataTableName(edbInfo.Source, edbInfo.SubSource) sql := fmt.Sprintf(` DELETE FROM %s WHERE edb_info_id = ? and data_time in (%s) `, tableName, removeDateStr) _, err = to.Raw(sql, edbInfo.EdbInfoId).Exec() if err != nil { err = fmt.Errorf("删除不存在的直接拼接指标数据失败,Err:" + err.Error()) return } } } //数据入库 if len(addDataList) > 0 { tmpAddDataList := make([]*EdbDataCalculateZjpj, 0) i := 0 for _, v := range addDataList { tmpAddDataList = append(tmpAddDataList, v) i++ if i >= 500 { _, err = to.InsertMulti(len(tmpAddDataList), tmpAddDataList) if err != nil { return } i = 0 tmpAddDataList = make([]*EdbDataCalculateZjpj, 0) } } if len(tmpAddDataList) > 0 { _, err = to.InsertMulti(len(tmpAddDataList), tmpAddDataList) if err != nil { return } } } return }