Browse Source

指标替换 重置指标引用关系

xyxie 9 months ago
parent
commit
c8fd7c0b70

+ 181 - 7
models/data_manage/edb_info_relation.go

@@ -22,17 +22,19 @@ type EdbInfoRelation struct {
 	RelationType       int       `description:"引用类型,0:直接饮用,1间接引用"`
 	RootEdbInfoId      int       `description:"间接引用时,关联的直接引用的指标ID"`
 	ChildEdbInfoId     int       `description:"间接引用时,关联的计算指标ID"`
+	RelationCode       string    `description:"引用标识"`
+	ParentRelationId   int       `description:"间接引用关联的直接引用的ID"`
 }
 
 func (e *EdbInfoRelation) TableName() string {
 	return "edb_info_relation"
 }
 
-// GetEdbInfoRelationByEdbInfoIds 查询引用的指标ID
-func GetEdbInfoRelationByEdbInfoIds(edbInfoIds []int) (edbIds []int, err error) {
+// GetEdbInfoRelationByRelationIds 查询引用的指标ID
+func GetEdbInfoRelationByRelationIds(ids []int) (items []*EdbInfoRelation, err error) {
 	o := orm.NewOrmUsingDB("data")
-	msql := ` SELECT edb_info_id FROM edb_info_relation WHERE edb_info_id in (` + utils.GetOrmInReplace(len(edbInfoIds)) + `)  GROUP BY edb_info_id `
-	_, err = o.Raw(msql, edbInfoIds).QueryRows(&edbIds)
+	msql := ` SELECT * FROM edb_info_relation WHERE edb_info_relation_id in (` + utils.GetOrmInReplace(len(ids)) + `) `
+	_, err = o.Raw(msql, ids).QueryRows(&items)
 	return
 }
 
@@ -87,7 +89,13 @@ func AddOrUpdateEdbInfoRelation(objectId, objectType int, relationList []*EdbInf
 			return
 		}
 	}
+	relationCodesMap := make(map[string]struct{}, 0)
 	if len(relationList) > 0 {
+		for _, relation := range relationList {
+			if relation.RelationType == 1 {
+				relationCodesMap[relation.RelationCode] = struct{}{}
+			}
+		}
 		_, err = o.InsertMulti(len(relationList), relationList)
 		if err != nil {
 			return
@@ -113,12 +121,30 @@ func AddOrUpdateEdbInfoRelation(objectId, objectType int, relationList []*EdbInf
 		}
 	}
 
-	// todo 由此被禁用的计算指标是否能恢复刷新
+	if len(relationList) > 0 {
+		// 更新间接引用指标的关联ID
+		relationCodes := make([]string, 0)
+		for relationCode := range relationCodesMap {
+			relationCodes = append(relationCodes, relationCode)
+		}
+		if len(relationCodes) > 0 {
+			sql := ` UPDATE edb_info_relation e1  
+JOIN edb_info_relation e2 ON e1.relation_code = e2.relation_code   
+SET e1.parent_relation_id = e2.edb_info_relation_id  
+WHERE  
+    e1.relation_type = 1   
+    AND e2.relation_type = 0 AND e1.parent_relation_id !=e2.edb_info_relation_id AND e1.relation_code in (` + utils.GetOrmInReplace(len(relationCodes)) + `)`
+			_, err = o.Raw(sql, relationCodes).Exec()
+			if err != nil {
+				return
+			}
+		}
+	}
 	return
 }
 
 // 新增记录
-func AddOrUpdateEdbInfoRelationFeMatter(relationList []*EdbInfoRelation, refreshEdbInfoIds []int, indexCodeList []string) (err error) {
+func AddOrUpdateEdbInfoRelationMulti(relationList []*EdbInfoRelation, refreshEdbInfoIds []int, indexCodeList []string) (err error) {
 	o, err := orm.NewOrmUsingDB("data").Begin()
 	if err != nil {
 		return
@@ -131,7 +157,13 @@ func AddOrUpdateEdbInfoRelationFeMatter(relationList []*EdbInfoRelation, refresh
 		_ = o.Commit()
 	}()
 
+	relationCodesMap := make(map[string]struct{}, 0)
 	if len(relationList) > 0 {
+		for _, relation := range relationList {
+			if relation.RelationType == 1 {
+				relationCodesMap[relation.RelationCode] = struct{}{}
+			}
+		}
 		_, err = o.InsertMulti(len(relationList), relationList)
 		if err != nil {
 			return
@@ -157,7 +189,25 @@ func AddOrUpdateEdbInfoRelationFeMatter(relationList []*EdbInfoRelation, refresh
 		}
 	}
 
-	// todo 由此被禁用的计算指标是否能恢复刷新
+	if len(relationList) > 0 {
+		// 更新间接引用指标的关联ID
+		relationCodes := make([]string, 0)
+		for relationCode := range relationCodesMap {
+			relationCodes = append(relationCodes, relationCode)
+		}
+		if len(relationCodes) > 0 {
+			sql := ` UPDATE edb_info_relation e1  
+JOIN edb_info_relation e2 ON e1.relation_code = e2.relation_code   
+SET e1.parent_relation_id = e2.edb_info_relation_id  
+WHERE  
+    e1.relation_type = 1   
+    AND e2.relation_type = 0 AND e1.parent_relation_id !=e2.edb_info_relation_id AND e1.relation_code in (` + utils.GetOrmInReplace(len(relationCodes)) + `)`
+			_, err = o.Raw(sql, relationCodes).Exec()
+			if err != nil {
+				return
+			}
+		}
+	}
 	return
 }
 
@@ -251,3 +301,127 @@ func GetEdbInfoRelationDetailList(edbInfoId int, startSize, pageSize int) (total
 
 	return
 }
+
+// 查询相关的指标记录总数
+func GetReplaceChildEdbInfoRelationTotal(edbInfoId int) (total int, err error) {
+	o := orm.NewOrmUsingDB("data")
+	// 数量汇总
+	totalSql := ` SELECT count(*) FROM edb_info_relation where  child_edb_info_id=? and root_edb_info_id !=?  group by parent_relation_id`
+	err = o.Raw(totalSql, edbInfoId, edbInfoId).QueryRow(&total)
+	if err != nil {
+		return
+	}
+	return
+}
+
+// 查询相关的指标记录列表
+func GetReplaceChildEdbInfoRelationList(edbInfoId int, startSize, pageSize int) (items []*EdbInfoRelation, err error) {
+	o := orm.NewOrmUsingDB("data")
+	// 列表数据
+	sql := ` SELECT * FROM edb_info_relation where  child_edb_info_id=? and root_edb_info_id !=? ORDER BY edb_info_relation_id ASC group by parent_relation_id  LIMIT ?,? `
+	_, err = o.Raw(sql, edbInfoId, edbInfoId, startSize, pageSize).QueryRows(&items)
+	return
+}
+
+// 查询相关的指标记录总数
+func GetReplaceEdbInfoRelationTotal(edbInfoId int) (total int, err error) {
+	o := orm.NewOrmUsingDB("data")
+	// 数量汇总
+	totalSql := ` SELECT count(*) FROM edb_info_relation where edb_info_id=? and relation_type = 0`
+	err = o.Raw(totalSql, edbInfoId).QueryRow(&total)
+	if err != nil {
+		return
+	}
+	return
+}
+
+// 查询相关的指标记录列表
+func GetReplaceEdbInfoRelationList(edbInfoId int, startSize, pageSize int) (items []*EdbInfoRelation, err error) {
+	o := orm.NewOrmUsingDB("data")
+	// 列表数据
+	sql := ` SELECT * FROM edb_info_relation where edb_info_id=? and relation_type = 0 ORDER BY edb_info_relation_id ASC LIMIT ?,? `
+	_, err = o.Raw(sql, edbInfoId, startSize, pageSize).QueryRows(&items)
+	return
+}
+
+// 替换指标引用表中的指标
+func ReplaceRelationEdbInfoId(oldEdbInfo, newEdbInfo *EdbInfo, edbRelationIds []int, relationList []*EdbInfoRelation, refreshEdbInfoIds []int, indexCodeList []string) (err error) {
+	o, err := orm.NewOrmUsingDB("data").Begin()
+	if err != nil {
+		return
+	}
+	defer func() {
+		if err != nil {
+			_ = o.Rollback()
+			return
+		}
+		_ = o.Commit()
+	}()
+
+	now := time.Now()
+	// 删除相关的间接引用
+	sql := ` DELETE FROM edb_info_relation WHERE root_edb_info_id=?  and relation_type=1 and parent_relation_id in (` + utils.GetOrmInReplace(len(edbRelationIds)) + `)`
+	_, err = o.Raw(sql, oldEdbInfo.EdbInfoId, edbRelationIds).Exec()
+	if err != nil {
+		return
+	}
+
+	// 替换edb_info_id
+	sql = ` UPDATE edb_info_relation SET edb_info_id=?, source=?, edb_name=?, edb_code=?, modify_time=?, relation_time=?  WHERE edb_info_id=? and edb_info_relation_id in (` + utils.GetOrmInReplace(len(edbRelationIds)) + `)`
+	_, err = o.Raw(sql, newEdbInfo.EdbInfoId, newEdbInfo.Source, newEdbInfo.EdbName, newEdbInfo.EdbCode, now, now, oldEdbInfo.EdbInfoId, edbRelationIds).Exec()
+	if err != nil {
+		return
+	}
+	// 新增间接引用
+	relationCodesMap := make(map[string]struct{}, 0)
+	if len(relationList) > 0 {
+		for _, relation := range relationList {
+			if relation.RelationType == 1 {
+				relationCodesMap[relation.RelationCode] = struct{}{}
+			}
+		}
+		_, err = o.InsertMulti(len(relationList), relationList)
+		if err != nil {
+			return
+		}
+	}
+
+	if len(refreshEdbInfoIds) > 0 {
+		// todo 更新指标的刷新状态
+		sql := ` UPDATE edb_info SET no_update = 0 WHERE source in (?, ?) AND edb_info_id IN (` + utils.GetOrmInReplace(len(refreshEdbInfoIds)) + `) AND no_update = 1`
+		_, err = o.Raw(sql, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_WIND, refreshEdbInfoIds).Exec()
+		if err != nil {
+			return
+		}
+	}
+
+	//更新数据源钢联化工指标
+	if len(indexCodeList) > 0 {
+		// 更改数据源的更新状态
+		sql := ` UPDATE base_from_mysteel_chemical_index SET is_stop = 0 WHERE index_code IN (` + utils.GetOrmInReplace(len(indexCodeList)) + `) and is_stop=1`
+		_, err = o.Raw(sql, indexCodeList).Exec()
+		if err != nil {
+			return
+		}
+	}
+	if len(relationList) > 0 {
+		// 更新间接引用指标的关联ID
+		relationCodes := make([]string, 0)
+		for relationCode := range relationCodesMap {
+			relationCodes = append(relationCodes, relationCode)
+		}
+		if len(relationCodes) > 0 {
+			sql := ` UPDATE edb_info_relation e1  
+JOIN edb_info_relation e2 ON e1.relation_code = e2.relation_code   
+SET e1.parent_relation_id = e2.edb_info_relation_id  
+WHERE  
+    e1.relation_type = 1   
+    AND e2.relation_type = 0 AND e1.parent_relation_id !=e2.edb_info_relation_id AND e1.relation_code in (` + utils.GetOrmInReplace(len(relationCodes)) + `)`
+			_, err = o.Raw(sql, relationCodes).Exec()
+			if err != nil {
+				return
+			}
+		}
+	}
+	return
+}

+ 6 - 1
services/data/edb_info_relation.go

@@ -95,6 +95,7 @@ func saveEdbInfoRelation(edbInfoIds []int, objectId, objectType, objectSubType i
 				ModifyTime:         nowTime,
 				RelationTime:       nowTime,
 			}
+			tmp.RelationCode = fmt.Sprintf("%d_%d_%d_%d", tmp.EdbInfoId, tmp.ReferObjectId, tmp.ReferObjectType, tmp.ReferObjectSubType)
 			addList = append(addList, tmp)
 			if edbInfo.EdbType == 2 || edbInfo.EdbInfoType == 1 {
 				childEdbMappingIds, ok1 := calculateEdbMappingIdsMap[edbInfo.EdbInfoId]
@@ -124,6 +125,7 @@ func saveEdbInfoRelation(edbInfoIds []int, objectId, objectType, objectSubType i
 						RelationType:       1,
 						RootEdbInfoId:      edbInfo.EdbInfoId,
 						ChildEdbInfoId:     childEdbMapping.EdbInfoId,
+						RelationCode:       tmp.RelationCode,
 					}
 					addList = append(addList, tmp1)
 					refreshIds = append(refreshIds, childEdbMapping.FromEdbInfoId)
@@ -270,6 +272,7 @@ func SaveCalendarEdbInfoRelation(chartPermissionId int, matterDate string, editM
 		_, ok1 := relationMap[matter.FeCalendarMatterId]
 		edbInfo, ok2 := addEdbInfoIdMap[matter.EdbInfoId]
 		if !ok1 && ok2 {
+
 			tmp := &data_manage.EdbInfoRelation{
 				ReferObjectId:      matter.FeCalendarMatterId,
 				ReferObjectType:    utils.EDB_RELATION_CALENDAR,
@@ -281,6 +284,7 @@ func SaveCalendarEdbInfoRelation(chartPermissionId int, matterDate string, editM
 				CreateTime:         nowTime,
 				ModifyTime:         nowTime,
 			}
+			tmp.RelationCode = fmt.Sprintf("%d_%d_%d_%d", tmp.EdbInfoId, tmp.ReferObjectId, tmp.ReferObjectType, tmp.ReferObjectSubType)
 			addList = append(addList, tmp)
 			//添加指标间接引用
 			if edbInfo.EdbType == 2 || edbInfo.EdbInfoType == 1 {
@@ -310,6 +314,7 @@ func SaveCalendarEdbInfoRelation(chartPermissionId int, matterDate string, editM
 						RelationType:       1,
 						RootEdbInfoId:      edbInfo.EdbInfoId,
 						ChildEdbInfoId:     childEdbMapping.EdbInfoId,
+						RelationCode:       tmp.RelationCode,
 					}
 					addList = append(addList, tmp1)
 					refreshIds = append(refreshIds, childEdbMapping.FromEdbInfoId)
@@ -319,7 +324,7 @@ func SaveCalendarEdbInfoRelation(chartPermissionId int, matterDate string, editM
 		}
 	}
 	//更新指标刷新状态为启用
-	err = data_manage.AddOrUpdateEdbInfoRelationFeMatter(addList, refreshIds, indexCodeList)
+	err = data_manage.AddOrUpdateEdbInfoRelationMulti(addList, refreshIds, indexCodeList)
 	if err != nil {
 		err = fmt.Errorf("添加指标引用,%s", err.Error())
 		return

+ 189 - 0
services/edb_info_replace.go

@@ -7,9 +7,11 @@ import (
 	excelModel "eta/eta_api/models/data_manage/excel"
 	"eta/eta_api/models/data_manage/excel/request"
 	"eta/eta_api/services/alarm_msg"
+	"eta/eta_api/services/data"
 	"eta/eta_api/services/sandbox"
 	"eta/eta_api/utils"
 	"fmt"
+	"strconv"
 	"time"
 )
 
@@ -70,6 +72,9 @@ func DealReplaceEdbCache() {
 				err = fmt.Errorf("替换逻辑图中的指标失败,errmsg:%s", err.Error())
 				return
 			}
+
+			// todo 重置指标引用表
+			ReplaceEdbInRelation(oldEdbInfo, newEdbInfo)
 		})
 	}
 }
@@ -252,3 +257,187 @@ func replaceEdbInTimeExcel(oldEdbInfo, newEdbInfo *data_manage.EdbInfo, excelInf
 	newExcelInfo = excelInfo
 	return
 }
+
+func ReplaceEdbInRelation(oldEdbInfo, newEdbInfo *data_manage.EdbInfo) {
+	var err error
+	var logMsg string
+	var replaceTotal int
+	defer func() {
+		if err != nil {
+			msg := fmt.Sprintf(" 替换指标引用表中的指标,并修改引用时间 replaceEdbInRelation  err: %v", err)
+			utils.FileLog.Info(msg)
+			fmt.Println(msg)
+			go alarm_msg.SendAlarmMsg(msg, 3)
+		}
+		if logMsg != `` {
+			utils.FileLog.Info(fmt.Sprintf("替换指标引用表中的指标记录 替换总数%d,旧的指标id:%d,新的指标id:%d;涉及到的引用id:%s", replaceTotal, oldEdbInfo.EdbInfoId, newEdbInfo.EdbInfoId, logMsg))
+		}
+	}()
+
+	calculateEdbMappingListMap := make(map[int]*data_manage.EdbInfoCalculateMapping)
+	calculateEdbMappingIdsMap := make(map[int][]int)
+	childEdbMappingIds := make([]int, 0)
+	//indexCodeList := make([]string, 0)
+	//refreshIds := make([]int, 0)
+
+	if newEdbInfo.EdbType == 2 || newEdbInfo.EdbInfoType == 1 {
+		//需要添加间接引用
+		//查询出所有关联的指标id
+		edbInfoList := make([]*data_manage.EdbInfo, 0)
+		edbInfoList = append(edbInfoList, newEdbInfo)
+		calculateEdbMappingListMap, calculateEdbMappingIdsMap, err = data.GetEdbListByEdbInfoId(edbInfoList)
+		if err != nil {
+			err = fmt.Errorf("查询指标关联指标列表失败 Err:%s", err)
+			return
+		}
+		var ok bool
+		childEdbMappingIds, ok = calculateEdbMappingIdsMap[newEdbInfo.EdbInfoId]
+		if !ok {
+			err = fmt.Errorf("查询%d指标关联指标列表为空", newEdbInfo.EdbInfoId)
+			return
+		}
+	} /*else if newEdbInfo.Source == utils.DATA_SOURCE_MYSTEEL_CHEMICAL {
+		indexCodeList = append(indexCodeList, newEdbInfo.EdbCode)
+	}
+	// todo 新指标本身没有设置成启用
+	refreshIds = append(refreshIds, newEdbInfo.EdbInfoId)*/
+	//分页查询,每次处理500条记录
+	pageSize := 500
+	// 替换间接引用中的指标
+	//查询相关的记录总数
+	total, err := data_manage.GetReplaceChildEdbInfoRelationTotal(oldEdbInfo.EdbInfoId)
+	if err != nil {
+		if err.Error() == utils.ErrNoRow() {
+			err = nil
+		} else {
+			err = fmt.Errorf("查询引用表中关联的指标总数失败 err: %v", err)
+			return
+		}
+	}
+	if total > 0 {
+		totalPage := (total + pageSize - 1) / pageSize // 使用整数除法,并添加一页以防有余数
+		//查询列表
+		for i := 0; i < totalPage; i += 1 {
+			startSize := i * pageSize
+			tmpList, e := data_manage.GetReplaceChildEdbInfoRelationList(oldEdbInfo.EdbInfoId, startSize, pageSize)
+			if e != nil {
+				err = fmt.Errorf("查询图表关联指标列表失败 Err:%s", e)
+				return
+			}
+			// 查询直接引用
+			relationIds := make([]int, 0)
+			for _, v := range tmpList {
+				relationIds = append(relationIds, v.EdbInfoRelationId)
+			}
+			if len(relationIds) > 0 {
+				list, e := data_manage.GetEdbInfoRelationByRelationIds(relationIds)
+				if e != nil {
+					err = fmt.Errorf("查询图表关联指标列表失败 Err:%s", e)
+					return
+				}
+				//如何过滤掉只有间接引用,没有直接引用的
+				replaceTotal1, logMsg1, e := replaceEdbInRelation(oldEdbInfo, newEdbInfo, list, childEdbMappingIds, calculateEdbMappingListMap)
+				if e != nil {
+					err = e
+					return
+				}
+				replaceTotal += replaceTotal1
+				logMsg += logMsg1
+			}
+		}
+	}
+	// 替换直接引用中的指标
+	total, err = data_manage.GetReplaceEdbInfoRelationTotal(oldEdbInfo.EdbInfoId)
+	if err != nil {
+		err = fmt.Errorf("查询引用表中关联的指标总数失败 err: %v", err)
+		return
+	}
+	if total == 0 {
+		return
+	}
+
+	totalPage := (total + pageSize - 1) / pageSize // 使用整数除法,并添加一页以防有余数
+	//查询图表列表
+	for i := 0; i < totalPage; i += 1 {
+		startSize := i * pageSize
+		list, e := data_manage.GetReplaceEdbInfoRelationList(oldEdbInfo.EdbInfoId, startSize, pageSize)
+		if e != nil {
+			err = fmt.Errorf("查询图表关联指标列表失败 Err:%s", e)
+			return
+		}
+		if len(list) == 0 {
+			break
+		}
+		replaceTotal1, logMsg1, e := replaceEdbInRelation(oldEdbInfo, newEdbInfo, list, childEdbMappingIds, calculateEdbMappingListMap)
+		if e != nil {
+			err = e
+			return
+		}
+		replaceTotal += replaceTotal1
+		logMsg += logMsg1
+	}
+	return
+}
+
+func replaceEdbInRelation(oldEdbInfo, newEdbInfo *data_manage.EdbInfo, list []*data_manage.EdbInfoRelation, childEdbMappingIds []int, calculateEdbMappingListMap map[int]*data_manage.EdbInfoCalculateMapping) (replaceTotal int, logMsg string, err error) {
+	nowTime := time.Now()
+	replaceEdbIds := make([]int, 0)
+	//calculateEdbMappingListMap := make(map[int]*data_manage.EdbInfoCalculateMapping)
+	//calculateEdbMappingIdsMap := make(map[int][]int)
+	//childEdbMappingIds := make([]int, 0)
+	indexCodeList := make([]string, 0)
+	addList := make([]*data_manage.EdbInfoRelation, 0)
+	refreshIds := make([]int, 0)
+	for _, v := range list {
+		if v.RelationType == 0 && (v.ReferObjectType == utils.EDB_RELATION_CALENDAR ||
+			(v.ReferObjectType == utils.EDB_RELATION_TABLE && v.ReferObjectSubType == utils.BALANCE_TABLE)) {
+			//平衡表和事件日历中的直接引用无需替换,
+		} else {
+			replaceEdbIds = append(replaceEdbIds, v.EdbInfoRelationId)
+			// 添加间接引用数据
+			if newEdbInfo.EdbType == 2 || newEdbInfo.EdbInfoType == 1 {
+				for _, childEdbMappingId := range childEdbMappingIds {
+					childEdbMapping, ok2 := calculateEdbMappingListMap[childEdbMappingId]
+					if !ok2 {
+						continue
+					}
+
+					if childEdbMapping.FromSource == utils.DATA_SOURCE_MYSTEEL_CHEMICAL {
+						indexCodeList = append(indexCodeList, childEdbMapping.FromEdbCode)
+					}
+					tmp1 := &data_manage.EdbInfoRelation{
+						ReferObjectId:      v.ReferObjectId,
+						ReferObjectType:    v.ReferObjectType,
+						ReferObjectSubType: v.ReferObjectSubType,
+						EdbInfoId:          childEdbMapping.FromEdbInfoId,
+						EdbName:            childEdbMapping.FromEdbName,
+						Source:             childEdbMapping.FromSource,
+						EdbCode:            childEdbMapping.FromEdbCode,
+						CreateTime:         nowTime,
+						ModifyTime:         nowTime,
+						RelationTime:       nowTime,
+						RelationType:       1,
+						RootEdbInfoId:      newEdbInfo.EdbInfoId,
+						ChildEdbInfoId:     childEdbMapping.EdbInfoId,
+					}
+					tmp1.RelationCode = fmt.Sprintf("%d_%d_%d_%d", tmp1.RootEdbInfoId, tmp1.ReferObjectId, tmp1.ReferObjectType, tmp1.ReferObjectSubType)
+					addList = append(addList, tmp1)
+					refreshIds = append(refreshIds, childEdbMapping.FromEdbInfoId)
+					// todo 防止重复
+				}
+			}
+			logMsg += strconv.Itoa(v.EdbInfoRelationId) + ";"
+		}
+
+	}
+	if len(replaceEdbIds) > 0 {
+		err = data_manage.ReplaceRelationEdbInfoId(oldEdbInfo, newEdbInfo, replaceEdbIds, addList, refreshIds, indexCodeList)
+		if err != nil {
+			logMsg = ""
+			err = fmt.Errorf("替换指标引用表中的指标ID失败 Err:%s", err)
+			return
+		}
+		replaceTotal = len(replaceEdbIds)
+	}
+	return
+}