Эх сурвалжийг харах

Merge branch 'feature/eta1.5.2_mysteel_stat' of eta_server/eta_index_lib into master

xyxie 1 жил өмнө
parent
commit
ca1e7814f8

+ 2 - 2
controllers/base_from_mysteel_chemical.go

@@ -125,9 +125,9 @@ func (this *MySteelChemicalController) Refresh() {
 	}
 	// 添加指标刷新成功日志
 	if erDataUpdateDate != "" {
-		_ = services.AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 1, "", 1, "", 0)
+		_ = services.AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 1, "", 1, "", 0, 0)
 	} else {
-		_ = services.AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 1, "", 2, "未刷新到数据", 0)
+		_ = services.AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 1, "", 2, "未刷新到数据", 0, 0)
 	}
 
 	// 更新ES

+ 2 - 2
controllers/edb_info_stat.go

@@ -21,8 +21,8 @@ func (this *EdbInfoStatController) SetEdbSourceStat() {
 		this.ServeJSON()
 	}()
 	// 钢联终端统计汇总
-	_ = services.SetMysteelChemicalEdbInfoUpdateStat()
-	_ = services.SetEdbSourceStat()
+	_ = services.SetMysteelChemicalEdbInfoUpdateStat(true)
+	_ = services.SetEdbSourceStat(true)
 
 	br.Ret = 200
 	br.Success = true

+ 11 - 0
models/base_from_mysteel_chemical.go

@@ -512,3 +512,14 @@ func (d *BaseFromMysteelChemicalData) ModifyMysteelIndexMaxAndMinInfo(indexCode
 	_, err = o.Raw(sql, item.MinDate, item.MaxDate, indexCode).Exec()
 	return
 }
+
+// GetIndexByCondition 获取指标
+func (m *BaseFromMysteelChemicalIndex) GetIndexByCondition(condition string, pars []interface{}) (items []*BaseFromMysteelChemicalIndex, err error) {
+	o := orm.NewOrm()
+	sql := `SELECT * FROM base_from_mysteel_chemical_index WHERE 1=1 `
+	if condition != "" {
+		sql += condition
+	}
+	_, err = o.Raw(sql, pars).QueryRows(&items)
+	return
+}

+ 1 - 0
models/data_stat/edb_info_update_log.go

@@ -36,6 +36,7 @@ type EdbInfoUpdateLog struct {
 	DataUpdateFailedReason string    `description:"数据未正常更新原因"`
 	DataUpdateTime         string    `description:"数据更新时间"`
 	IsSourceRefresh        int       `description:"是否为终端刷新到数据源的刷新操作:0否,1是"`
+	UpdateType             int       `description:"变更类型,0:数据明细变更,1:基础信息变更, 2:新增指标"`
 }
 
 func AddEdbUpdateLog(item *EdbInfoUpdateLog) (lastId int64, err error) {

+ 21 - 8
services/base_from_mysteel_chemical.go

@@ -21,11 +21,8 @@ func HandleMysteelIndex(req *models.HandleMysteelIndexResp) (err error) {
 		}
 	}
 
-	go func() {
-		// 钢联终端统计汇总
-		_ = SetMysteelChemicalEdbInfoUpdateStat()
-		_ = SetEdbSourceStat()
-	}()
+	_ = SetMysteelChemicalEdbInfoUpdateStat(false)
+	_ = SetEdbSourceStat(false)
 	return
 }
 
@@ -38,7 +35,7 @@ func handleIndex(indexItem *models.HandleMysteelIndex) (err error) {
 			edbInfo, e := models.GetEdbInfoByEdbCode(utils.DATA_SOURCE_MYSTEEL_CHEMICAL, indexItem.IndexCode)
 			if e == nil {
 				//查询指标存在,才添加刷新日志
-				_ = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 2, err.Error(), dataUpdateResult, dataUpdateFailedReason, 1)
+				_ = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 2, err.Error(), dataUpdateResult, dataUpdateFailedReason, 1, 0)
 			}
 		}
 	}()
@@ -61,8 +58,12 @@ func handleIndex(indexItem *models.HandleMysteelIndex) (err error) {
 			return
 		}
 	}
+	nameChange := false
 	if item != nil && item.BaseFromMysteelChemicalIndexId > 0 {
 		isAdd = 2
+		if item.IndexName != indexItem.IndexName {
+			nameChange = true
+		}
 	} else {
 		isAdd = 1
 	}
@@ -222,7 +223,7 @@ func handleIndex(indexItem *models.HandleMysteelIndex) (err error) {
 		dataUpdateFailedReason := "服务异常"
 		_, logErrMsg, logErr := logic.RefreshBaseEdbInfo(edbInfo, ``)
 		if logErr != nil {
-			lErr = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 2, logErrMsg+logErr.Error(), dataUpdateResult, dataUpdateFailedReason, 1)
+			lErr = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 2, logErrMsg+logErr.Error(), dataUpdateResult, dataUpdateFailedReason, 1, 0)
 			return
 		}
 
@@ -234,7 +235,19 @@ func handleIndex(indexItem *models.HandleMysteelIndex) (err error) {
 		}
 
 		// 添加刷新成功日志
-		lErr = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 1, "", dataUpdateResult, dataUpdateFailedReason, 1)
+		lErr = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 1, "", dataUpdateResult, dataUpdateFailedReason, 1, 0)
+		if lErr != nil {
+			return
+		}
+		//如果变更了指标名称,则添加指标信息变更日志
+		if nameChange {
+			edbInfo.SourceIndexName = indexItem.IndexName
+			lErr = edbInfo.Update([]string{"SourceIndexName"})
+			if lErr != nil {
+				return
+			}
+			lErr = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 1, "", 0, "", 0, 1)
+		}
 	}
 	//}()
 

+ 4 - 4
services/base_from_ths.go

@@ -31,10 +31,6 @@ func GetEdbDataFromThs(edbCode, startDate, endDate, edbTerminalCode string) (ite
 		err = fmt.Errorf("获取同花顺接口配置出错 Err: %s", err)
 		return
 	}
-	if terminal.ServerUrl == "" {
-		err = fmt.Errorf("同花顺接口未配置")
-		return
-	}
 
 	if edbTerminalCode == "" {
 		// 设置指标与终端关系的缓存
@@ -44,6 +40,10 @@ func GetEdbDataFromThs(edbCode, startDate, endDate, edbTerminalCode string) (ite
 
 	// 如果没有配置,获取配置的方式是api,那么就走官方接口
 	if utils.ThsDataMethod == "" || utils.ThsDataMethod == "api" {
+		if terminal.Value == "" {
+			err = fmt.Errorf("同花顺接口未配置")
+			return
+		}
 		var token string
 		token, err = GetAccessToken(false, terminal.Value)
 		if err != nil {

+ 73 - 15
services/edb_info_stat.go

@@ -10,7 +10,7 @@ import (
 )
 
 // AddEdbInfoUpdateLog 添加指标编辑/刷新日志
-func AddEdbInfoUpdateLog(edbInfoId int, updateResult int, updateFailedReason string, dataUpdateResult int, dataUpdateFailedReason string, isSourceRefresh int) (err error) {
+func AddEdbInfoUpdateLog(edbInfoId int, updateResult int, updateFailedReason string, dataUpdateResult int, dataUpdateFailedReason string, isSourceRefresh int, updateType int) (err error) {
 	var edbInfo *models.EdbInfo
 	if edbInfoId > 0 {
 		// 获取指标详情
@@ -25,7 +25,7 @@ func AddEdbInfoUpdateLog(edbInfoId int, updateResult int, updateFailedReason str
 		log.Source = edbInfo.Source
 		log.EdbCode = edbInfo.EdbCode
 		log.EdbName = edbInfo.EdbName
-		log.EdbNameSource = edbInfo.EdbNameSource
+		log.EdbNameSource = edbInfo.SourceIndexName
 		log.Frequency = edbInfo.Frequency
 		log.Unit = edbInfo.Unit
 		log.StartDate = edbInfo.StartDate
@@ -46,6 +46,7 @@ func AddEdbInfoUpdateLog(edbInfoId int, updateResult int, updateFailedReason str
 		log.DataUpdateResult = dataUpdateResult
 		log.DataUpdateFailedReason = dataUpdateFailedReason
 		log.IsSourceRefresh = isSourceRefresh
+		log.UpdateType = updateType
 		_, err = data_stat.AddEdbUpdateLog(log)
 		if err != nil {
 			err = fmt.Errorf("新增指标更新日志失败,Err: %s", err)
@@ -56,15 +57,16 @@ func AddEdbInfoUpdateLog(edbInfoId int, updateResult int, updateFailedReason str
 }
 
 // SetMysteelChemicalEdbInfoUpdateStat 定时统计钢联化工的数据源明细表
-func SetMysteelChemicalEdbInfoUpdateStat() (err error) {
+func SetMysteelChemicalEdbInfoUpdateStat(needStat bool) (err error) {
 	defer func() {
 		if err != nil {
 			tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新统计异常 Err: %s", err.Error())
+			utils.FileLog.Info(tips)
 			alarm_msg.SendAlarmMsg(tips, 3)
 		}
 	}()
-	//查询钢联的所有指标信息
-	condition := " and source = ? "
+	//查询钢联的所有在更新的指标信息
+	condition := " and source = ? and no_update=0"
 	var pars []interface{}
 	pars = append(pars, utils.DATA_SOURCE_MYSTEEL_CHEMICAL)
 	edbList, err := models.GetEdbInfoByCondition(condition, pars, 0)
@@ -82,6 +84,10 @@ func SetMysteelChemicalEdbInfoUpdateStat() (err error) {
 		err = fmt.Errorf("查询钢联化工指标终端更新日志报错,err: %s", err)
 		return
 	}
+	fmt.Println(len(updateLogList))
+	if !needStat && len(updateLogList) == 0 { //如果不存在变更记录 则不进行汇总
+		return
+	}
 	updateLogMap := make(map[int]*data_stat.EdbInfoUpdateLog)
 	if len(updateLogList) > 0 {
 		for _, v := range updateLogList {
@@ -105,12 +111,46 @@ func SetMysteelChemicalEdbInfoUpdateStat() (err error) {
 			updateStatMap[v.EdbInfoId] = v
 		}
 	}
+	indexObj := new(models.BaseFromMysteelChemicalIndex)
+	week := int(nowTime.Weekday())
+	weekNeedRefreshMap := make(map[string]struct{})
+	if week >= 3 && week <= 6 {
+		endDate := utils.GetNowWeekMonday().Format(utils.FormatDate)
+		cond := ` AND frequency = ? AND end_date < ? AND is_stop = 0`
+		var tmpPars []interface{}
+		tmpPars = append(tmpPars, "周度", endDate)
+		//查询所有需要当日刷新的周度指标
+		indexTotal, tErr := indexObj.GetIndexByCondition(cond, tmpPars)
+		if tErr != nil {
+			err = fmt.Errorf("查询钢联化工原始指标报错,err: %s", tErr)
+			return
+		}
+		for _, v := range indexTotal {
+			weekNeedRefreshMap[v.IndexCode] = struct{}{}
+		}
+	}
+	//查询所有停更指标
+	stopRefreshMap := make(map[string]struct{})
+	tmpCond := ` AND is_stop = 1`
+	//查询所有需要当日刷新的周度指标
+	indexStop, tErr := indexObj.GetIndexByCondition(tmpCond, []interface{}{})
+	if tErr != nil {
+		err = fmt.Errorf("查询钢联化工原始指标报错,err: %s", tErr)
+		return
+	}
+	for _, v := range indexStop {
+		stopRefreshMap[v.IndexCode] = struct{}{}
+	}
+
 	logStat := new(data_stat.EdbInfoUpdateStat)
 	//组装新增数据
 	addList := make([]*data_stat.EdbInfoUpdateStat, 0)
 	modifyList := make([]*data_stat.EdbInfoUpdateStat, 0)
 	if len(edbList) > 0 {
 		for _, v := range edbList {
+			if _, ok := stopRefreshMap[v.EdbCode]; ok {
+				continue
+			}
 			tmp := &data_stat.EdbInfoUpdateStat{
 				EdbInfoId:        v.EdbInfoId,
 				SourceName:       v.SourceName,
@@ -134,8 +174,22 @@ func SetMysteelChemicalEdbInfoUpdateStat() (err error) {
 				ErDataUpdateDate: v.ErDataUpdateDate,
 				ModifyTime:       nowTime,
 			}
-			needRefresh, _ := checkMySteelEdbInfoNeedRefresh(v.Frequency)
-			tmp.NeedRefresh = needRefresh
+			exist, existOk := updateStatMap[v.EdbInfoId]
+
+			frequency := v.Frequency
+			if v.Frequency == "旬度" { //特殊处理指标库里和数据源里频度不一致的情况
+				//查询源指标库的频度
+				indexTmp, e := indexObj.GetIndexItem(v.EdbCode)
+				if e == nil {
+					frequency = indexTmp.Frequency
+				}
+			}
+			if existOk {
+				tmp.NeedRefresh = exist.NeedRefresh
+			} else {
+				needRefresh, _ := checkMySteelEdbInfoNeedRefresh(v.EdbCode, frequency, weekNeedRefreshMap)
+				tmp.NeedRefresh = needRefresh
+			}
 
 			// 判断是否当日新增
 			if v.CreateTime.After(todayT) || v.CreateTime == todayT {
@@ -152,14 +206,14 @@ func SetMysteelChemicalEdbInfoUpdateStat() (err error) {
 				tmp.UpdateResult = up.UpdateResult
 				tmp.UpdateFailedReason = up.UpdateFailedReason
 				tmp.UpdateTime = up.CreateTime.Format(utils.FormatDateTime)
-			} else if needRefresh == 1 {
+			} else if tmp.NeedRefresh == 1 {
 				tmp.HasRefresh = 0
 				tmp.DataUpdateResult = 2
 				tmp.DataUpdateFailedReason = "服务异常"
 			}
 
 			// 判断是否需要新增还是更新
-			if exist, ok := updateStatMap[v.EdbInfoId]; ok {
+			if existOk {
 				tmp.Id = exist.Id
 				modifyList = append(modifyList, tmp)
 			} else {
@@ -179,7 +233,7 @@ func SetMysteelChemicalEdbInfoUpdateStat() (err error) {
 	return
 }
 
-func checkMySteelEdbInfoNeedRefresh(frequency string) (needRefresh int, err error) {
+func checkMySteelEdbInfoNeedRefresh(edbCode, frequency string, weekNeedRefreshMap map[string]struct{}) (needRefresh int, err error) {
 	now := time.Now()
 	week := int(now.Weekday())
 	//日度
@@ -191,21 +245,22 @@ func checkMySteelEdbInfoNeedRefresh(frequency string) (needRefresh int, err erro
 	}
 	//周度
 	if week >= 3 && week <= 6 {
-		if frequency == "周度" {
+		_, ok := weekNeedRefreshMap[edbCode]
+		if frequency == "周度" && ok {
 			needRefresh = 1
 			return
 		}
 	}
 
-	day := now.Day() //季度,月度,年度都是每个月1号刷新
-	if day == 1 {
+	//季度,月度,年度都是每个周末刷新
+	if week == 0 {
 		needRefresh = 1
 	}
 	return
 }
 
 // SetEdbSourceStat 定时统计数据源汇总表
-func SetEdbSourceStat() (err error) {
+func SetEdbSourceStat(needStat bool) (err error) {
 	defer func() {
 		if err != nil {
 			tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新统计异常 Err: %s", err.Error())
@@ -226,6 +281,9 @@ func SetEdbSourceStat() (err error) {
 		err = fmt.Errorf("查询钢联化工数据源明细记录统计报错,err: %s", err)
 		return
 	}
+	if !needStat && len(updateStatList) == 0 {
+		return
+	}
 	updateStatMap := make(map[string][]*data_stat.EdbInfoUpdateStat)
 	if len(updateStatList) > 0 {
 		for _, v := range updateStatList {
@@ -297,7 +355,7 @@ func SetEdbSourceStat() (err error) {
 
 			if v.UpdateResult == 1 { //刷新结果
 				tmp.RefreshSuccessNum = tmp.RefreshSuccessNum + 1
-			} else if v.NeedRefresh == 1 {
+			} else if v.HasRefresh == 1 {
 				tmp.RefreshFailedNum = tmp.RefreshFailedNum + 1
 			}
 		}