Browse Source

自动设置停止刷新功能

xyxie 9 months ago
parent
commit
8eb52fed07

+ 13 - 0
models/business_conf.go

@@ -165,3 +165,16 @@ func UpdateBusinessConfMulti(items []BusinessConfUpdate) (err error) {
 	}
 	return
 }
+
+func (m *BusinessConf) GetItemByConfKey(key string) (item *BusinessConf, err error) {
+	o := orm.NewOrmUsingDB("eta")
+	sql := fmt.Sprintf(`SELECT * FROM %s WHERE conf_key = ? LIMIT 1`, m.TableName())
+	err = o.Raw(sql, key).QueryRow(&item)
+	return
+}
+
+type EdbStopRefreshRule struct {
+	IsOpen            int `description:"是否开启自动禁用1,开启,0未开启"`
+	BaseIndexStopDays int `description:"数据源间隔天数未加入指标库则停用"`
+	EdbStopDays       int `description:"指标库间隔天数未引用则停用"`
+}

+ 27 - 1
models/data_manage/base_from_mysteel_chemical_index.go

@@ -1,6 +1,9 @@
 package data_manage
 
-import "github.com/beego/beego/v2/client/orm"
+import (
+	"eta/eta_task/utils"
+	"github.com/beego/beego/v2/client/orm"
+)
 
 type BaseFromMysteelChemicalIndexItem struct {
 	BaseFromMysteelChemicalIndexId    int32   `json:"base_from_mysteel_chemical_index_id"`
@@ -47,3 +50,26 @@ func GetBaseFromMysteelChemicalIndexItemByCode(edbCode string) (item *BaseFromMy
 
 	return
 }
+
+// GetRefreshBaseFromMysteelChemicalIndexItemByCreateTime 获取正常刷新的钢联化工指标
+func GetRefreshBaseFromMysteelChemicalIndexItemByCreateTime(endDate string, startPage, pageSize int) (items []*BaseFromMysteelChemicalIndexItem, err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := ` SELECT * FROM base_from_mysteel_chemical_index WHERE is_stop = 0 and create_time < ? Limit ?,?`
+	_, err = o.Raw(sql, endDate, startPage, pageSize).QueryRows(&items)
+	return
+}
+
+// GetCountRefreshBaseFromMysteelChemicalIndexItemByCreateTime 获取正常刷新的钢联化工指标
+func GetCountRefreshBaseFromMysteelChemicalIndexItemByCreateTime(endDate string) (total int64, err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := ` SELECT count(*) FROM base_from_mysteel_chemical_index WHERE is_stop = 0 and create_time < ?`
+	err = o.Raw(sql, endDate).QueryRow(&total)
+	return
+}
+
+func SetStopRefreshMysteelChemicalIndex(ids []int32) (err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := ` UPDATE base_from_mysteel_chemical_index SET is_stop = 1 WHERE base_from_mysteel_chemical_index_id IN (` + utils.GetOrmInReplace(len(ids)) + `) and is_stop=0`
+	_, err = o.Raw(sql, ids).Exec()
+	return
+}

+ 68 - 0
models/data_manage/edb_info.go

@@ -2,6 +2,7 @@ package data_manage
 
 import (
 	"errors"
+	"eta/eta_task/utils"
 	"fmt"
 	"github.com/beego/beego/v2/client/orm"
 	"strconv"
@@ -282,3 +283,70 @@ func GetEdbInfoMaxModifyTime(source, subSource int, edbCode string) (modifyTime
 
 	return
 }
+
+func GetEdbInfoPageByCondition(condition string, pars []interface{}, startPage, pageSize int) (item []*EdbInfoList, err error) {
+	o := orm.NewOrm()
+	sql := ` SELECT * FROM edb_info WHERE 1=1 `
+	if condition != "" {
+		sql += condition
+	}
+	sql += ` LIMIT ?,? `
+	_, err = o.Raw(sql, pars, startPage, pageSize).QueryRows(&item)
+	return
+}
+
+func GetEdbInfoCountByCondition(condition string, pars []interface{}) (total int64, err error) {
+	o := orm.NewOrm()
+	sql := ` SELECT count(*) FROM edb_info WHERE 1=1 `
+	if condition != "" {
+		sql += condition
+	}
+	err = o.Raw(sql, pars).QueryRow(&total)
+	return
+}
+
+func ModifyEdbUpdateStatus(edbIdList []int, indexCodeList []string, calculateEdbInfoIds []int) (err error) {
+	idNum := len(edbIdList)
+	if idNum <= 0 {
+		return
+	}
+	o, err := orm.NewOrmUsingDB("data").Begin()
+	if err != nil {
+		return
+	}
+	defer func() {
+		if err != nil {
+			_ = o.Rollback()
+			return
+		}
+		_ = o.Commit()
+	}()
+
+	// 更改指标的更新状态
+	sql := ` UPDATE edb_info SET no_update = 1 WHERE source in (?, ?) AND edb_info_id IN (` + utils.GetOrmInReplace(idNum) + `) AND  no_update = 0`
+	_, err = o.Raw(sql, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_WIND, edbIdList).Exec()
+	if err != nil {
+		return
+	}
+
+	// 更改钢联化工指标更新状态
+	if len(indexCodeList) > 0 {
+		// 更改数据源的更新状态
+		sql = ` UPDATE base_from_mysteel_chemical_index SET is_stop = 1 WHERE index_code IN (` + utils.GetOrmInReplace(len(indexCodeList)) + `) and is_stop=0`
+		_, err = o.Raw(sql, indexCodeList).Exec()
+		if err != nil {
+			return
+		}
+	}
+
+	// 更新相关的计算指标状态
+	if len(calculateEdbInfoIds) > 0 {
+		// 批量更新相关联的指标ID
+		sql = ` UPDATE edb_info SET no_update = 1 WHERE edb_info_id IN (` + utils.GetOrmInReplace(len(calculateEdbInfoIds)) + `) AND  no_update = 0`
+		_, err = o.Raw(sql, calculateEdbInfoIds).Exec()
+		if err != nil {
+			return
+		}
+	}
+	return
+}

+ 12 - 0
models/data_manage/edb_info_calculate_mapping.go

@@ -1,6 +1,7 @@
 package data_manage
 
 import (
+	"eta/eta_task/utils"
 	"fmt"
 	"github.com/beego/beego/v2/client/orm"
 	"time"
@@ -111,3 +112,14 @@ func GetEdbInfoCalculateMappingDetail(edbInfoId int) (item *EdbInfoCalculateMapp
 	err = o.Raw(sql, edbInfoId).QueryRow(&item)
 	return
 }
+
+// GetAllCalculateEdbIdsByEdbInfoIds 所依赖计算指标
+func GetAllCalculateEdbIdsByEdbInfoIds(edbInfoIds []int) (edbIds []int, err error) {
+	o := orm.NewOrmUsingDB("data")
+	msql := ` SELECT edb_info_id FROM edb_info_calculate_mapping WHERE from_edb_info_id in (` + utils.GetOrmInReplace(len(edbInfoIds)) + `)  GROUP BY edb_info_id `
+	_, err = o.Raw(msql, edbInfoIds).QueryRows(&edbIds)
+	if err != nil {
+		return
+	}
+	return
+}

+ 33 - 0
models/data_manage/edb_info_relation.go

@@ -0,0 +1,33 @@
+package data_manage
+
+import (
+	"eta/eta_task/utils"
+	"github.com/beego/beego/v2/client/orm"
+	"time"
+)
+
+type EdbInfoRelation struct {
+	EdbInfoRelationId int       `orm:"column(edb_info_relation_id);pk"`
+	EdbInfoId         int       `description:"指标id"`
+	Source            int       `description:"来源:1:同花顺,2:wind,3:彭博,4:指标运算,5:累计值转月,6:同比值,7:同差值,8:N数值移动平均计算,9:手工指标,10:隆众"`
+	EdbName           string    `description:"指标名称"`
+	ReferObjectId     int       `description:"引用对象ID(图表ID,ETA逻辑ID等)"`
+	ReferObjectType   int       `description:"引用对象ID类型(1.图表,2.ETA逻辑)"`
+	CreateTime        time.Time `description:"创建时间"`
+	ModifyTime        time.Time `description:"修改时间"`
+}
+
+func (e *EdbInfoRelation) TableName() string {
+	return "edb_info_relation"
+}
+
+// GetEdbInfoRelationByEdbInfoIds 查询引用的指标ID
+func GetEdbInfoRelationByEdbInfoIds(edbInfoIds []int) (edbIds []int, err error) {
+	o := orm.NewOrmUsingDB("data")
+	msql := ` SELECT edb_info_id FROM edb_info_relation WHERE edb_info_id in (` + utils.GetOrmInReplace(len(edbInfoIds)) + `)  GROUP BY edb_info_id `
+	_, err = o.Raw(msql, edbInfoIds).QueryRows(&edbIds)
+	if err != nil {
+		return
+	}
+	return
+}

+ 194 - 0
services/edb_refresh.go

@@ -2,6 +2,9 @@ package services
 
 import (
 	"context"
+	"encoding/json"
+	"eta/eta_task/models"
+	"eta/eta_task/models/data_manage"
 	"eta/eta_task/models/data_manage/edb_refresh"
 	"eta/eta_task/services/alarm_msg"
 	"eta/eta_task/services/data"
@@ -581,3 +584,194 @@ func getPreviousHalfHour(now time.Time) string {
 	}
 	return fmt.Sprintf("%02d:%02d", now.Hour(), 0)
 }
+
+// 根据配置把钢联化工和wind指标设置成禁止刷新
+func DisableEdbRefresh() (err error) {
+	//设置刷新key,如果没有执行完 报错提示
+	cacheKey := "eta_task:DisableEdbRefresh"
+	deleteCache := true
+	defer func() {
+		if deleteCache {
+			utils.Rc.Delete(cacheKey)
+		}
+		if err != nil {
+			tips := "DisableEdbRefresh-钢联化工和wind指标设置成禁止刷新失败, ErrMsg:\n" + err.Error()
+			utils.FileLog.Info(tips)
+			go alarm_msg.SendAlarmMsg(tips, 3)
+		}
+	}()
+	if !utils.Rc.SetNX(cacheKey, 1, 2*time.Minute) {
+		deleteCache = false
+		err = fmt.Errorf("系统处理中,请稍后重试!")
+		return
+	}
+	//查询配置,如果未开启自动设置禁止刷新,则无需处理
+	obj := new(models.BusinessConf)
+	conf, err := obj.GetItemByConfKey("EdbStopRefreshRule")
+	if err != nil {
+		return err
+	}
+	//将json转为结构体
+	rule := new(models.EdbStopRefreshRule)
+	err = json.Unmarshal([]byte(conf.ConfVal), rule)
+	if err != nil {
+		return
+	}
+	//判断是否开启自动设置禁止刷新
+	if rule.IsOpen == 0 {
+		return
+	}
+
+	//获取当前时间
+	now := time.Now()
+	if rule.BaseIndexStopDays > 0 { //设置数据源钢联化工指标禁止更新
+		baseIndexEndDate := now.AddDate(0, 0, -rule.BaseIndexStopDays+1).Format(utils.FormatDate)
+
+		// 查询钢联化工指标,查询创建时间在baseIndexStartDate前,的所有钢联化工指标,分批查询,先查总数,再查列表
+		totalCount, e := data_manage.GetCountRefreshBaseFromMysteelChemicalIndexItemByCreateTime(baseIndexEndDate)
+		if e != nil {
+			err = fmt.Errorf("查询钢联化工指标总数失败:%v", e)
+			return
+		}
+
+		//分页查询
+		pageSize := 100
+		pageNum := (int(totalCount) + 99) / pageSize // 使用整数除法,并添加一页以防有余数
+		stopRefreshIds := make([]int32, 0)
+		for i := 0; i < pageNum; i++ {
+			start := i * pageSize
+			indexItems, e := data_manage.GetRefreshBaseFromMysteelChemicalIndexItemByCreateTime(baseIndexEndDate, start, pageSize)
+			if e != nil {
+				err = fmt.Errorf("分页查询钢联化工指标失败:%v", e)
+				return
+			}
+			if len(indexItems) == 0 {
+				continue
+			}
+			indexCodeList := make([]string, 0)
+			for _, indexItem := range indexItems {
+				indexCodeList = append(indexCodeList, indexItem.IndexCode)
+			}
+			condition := ` AND source=? AND edb_code in (` + utils.GetOrmInReplace(len(indexCodeList)) + `)`
+			var pars []interface{}
+			pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, indexCodeList)
+
+			// 查询指标库里这些指标是否已创建
+			edbList, e := data_manage.GetEdbInfoByCondition(condition, pars, 0)
+			if e != nil {
+				err = fmt.Errorf("查询指标库里这些指标是否被创建失败:%v", e)
+				return
+			}
+			edbMap := make(map[string]bool)
+			for _, edb := range edbList {
+				edbMap[edb.EdbCode] = true
+			}
+			for _, indexItem := range indexItems {
+				// 判断指标是否被创建
+				if _, ok := edbMap[indexItem.IndexCode]; !ok {
+					stopRefreshIds = append(stopRefreshIds, indexItem.BaseFromMysteelChemicalIndexId)
+					if len(stopRefreshIds) > 100 {
+						//	err = data_manage.SetStopRefreshMysteelChemicalIndex(stopRefreshIds)
+						if err != nil {
+							err = fmt.Errorf("设置禁止刷新失败:%v", err)
+							return
+						}
+						stopRefreshIds = make([]int32, 0)
+					}
+				}
+			}
+		}
+		// 未被创建,则设置禁止刷新
+		if len(stopRefreshIds) > 0 {
+			//err = data_manage.SetStopRefreshMysteelChemicalIndex(stopRefreshIds)
+			if err != nil {
+				err = fmt.Errorf("设置禁止刷新失败:%v", err)
+				return
+			}
+		}
+	}
+
+	if rule.EdbStopDays > 0 {
+		// 查询钢联和wind来源的指标
+		edbEndDate := now.AddDate(0, 0, -rule.EdbStopDays+1).Format(utils.FormatDate)
+
+		condition := ` AND no_update=0 AND source in (?,?) AND create_time < ?`
+		var pars []interface{}
+		pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL, utils.DATA_SOURCE_WIND, edbEndDate)
+		// 查询钢联化工指标和wind指标 分批查询,先查总数,再查列表
+		totalCount, e := data_manage.GetEdbInfoCountByCondition(condition, pars)
+		if e != nil {
+			err = fmt.Errorf("查询钢联化工指标总数失败:%v", e)
+			return
+		}
+
+		//分页查询
+		pageSize := 100
+		pageNum := (int(totalCount) + 99) / pageSize // 使用整数除法,并添加一页以防有余数
+		stopRefreshIds := make([]int, 0)
+		stopRefreshMysteelCode := make([]string, 0)
+		for i := 0; i < pageNum; i++ {
+			start := i * pageSize
+			edbItems, e := data_manage.GetEdbInfoPageByCondition(condition, pars, start, pageSize)
+			if e != nil {
+				err = fmt.Errorf("分页查询钢联化工指标失败:%v", e)
+				return
+			}
+			if len(edbItems) == 0 {
+				continue
+			}
+			edbInfoIds := make([]int, 0)
+			for _, item := range edbItems {
+				edbInfoIds = append(edbInfoIds, item.EdbInfoId)
+			}
+			// 查询指标库里这些指标 引用情况
+			relationList, e := data_manage.GetEdbInfoRelationByEdbInfoIds(edbInfoIds)
+			if e != nil {
+				err = fmt.Errorf("查询指标库里这些指标是否被创建失败:%v", e)
+				return
+			}
+			edbMap := make(map[int]struct{})
+			for _, item := range relationList {
+				edbMap[item] = struct{}{}
+			}
+			for _, item := range edbItems {
+				if _, ok := edbMap[item.EdbInfoId]; !ok {
+					stopRefreshIds = append(stopRefreshIds, item.EdbInfoId)
+					if item.Source == utils.DATA_SOURCE_MYSTEEL_CHEMICAL {
+						stopRefreshMysteelCode = append(stopRefreshMysteelCode, item.EdbCode)
+					}
+					// 更新指标禁止刷新状态
+					if len(stopRefreshIds) > 100 {
+						_, tmpErr := data_manage.GetAllCalculateEdbIdsByEdbInfoIds(stopRefreshIds)
+						if tmpErr != nil {
+							err = fmt.Errorf("查询计算指标信息失败:%v", tmpErr)
+							return
+						}
+						//err = data_manage.ModifyEdbUpdateStatus(stopRefreshIds, stopRefreshMysteelCode, calculateEdbIds)
+						if err != nil {
+							err = fmt.Errorf("更新指标禁止刷新状态失败:%v", err)
+							return
+						}
+						stopRefreshIds = []int{}
+						stopRefreshMysteelCode = []string{}
+					}
+				}
+			}
+		}
+
+		// 更新指标禁止刷新状态
+		if len(stopRefreshIds) > 0 {
+			_, tmpErr := data_manage.GetAllCalculateEdbIdsByEdbInfoIds(stopRefreshIds)
+			if tmpErr != nil {
+				err = fmt.Errorf("查询计算指标信息失败:%v", tmpErr)
+				return
+			}
+			//	err = data_manage.ModifyEdbUpdateStatus(stopRefreshIds, stopRefreshMysteelCode, calculateEdbIds)
+			if err != nil {
+				err = fmt.Errorf("更新指标禁止刷新状态失败:%v", err)
+				return
+			}
+		}
+	}
+	return
+}