Browse Source

fix:外部指标

Roc 10 months ago
parent
commit
4388f0305f

+ 68 - 0
models/base_from_business.go

@@ -1,7 +1,10 @@
 package models
 
 import (
+	"eta/eta_index_lib/models/mgo"
 	"eta/eta_index_lib/utils"
+	"fmt"
+	"go.mongodb.org/mongo-driver/bson"
 	"time"
 
 	"github.com/beego/beego/v2/client/orm"
@@ -124,3 +127,68 @@ func (m *EdbBusinessSource) Add() (err error) {
 
 	return
 }
+
+// GetEdbInfoMaxAndMinInfo 获取指标的最新数据记录信息
+func (m BaseFromBusinessIndex) GetEdbInfoMaxAndMinInfo(edbCode string) (item *EdbInfoMaxAndMinInfo, err error) {
+	mogDataObj := new(mgo.BaseFromBusinessData)
+	pipeline := []bson.M{
+		{"$match": bson.M{"index_code": edbCode}},
+		{"$group": bson.M{
+			"_id":       nil,
+			"min_date":  bson.M{"$min": "$data_time"},
+			"max_date":  bson.M{"$max": "$data_time"},
+			"min_value": bson.M{"$min": "$value"},
+			"max_value": bson.M{"$max": "$value"},
+		}},
+		{"$project": bson.M{"_id": 0}}, // 可选,如果不需要_id字段
+	}
+	result, err := mogDataObj.GetEdbInfoMaxAndMinInfo(pipeline)
+	if err != nil {
+		fmt.Println("BaseFromBusinessIndex GetEdbInfoMaxAndMinInfo Err:" + err.Error())
+		return
+	}
+
+	if !result.MaxDate.IsZero() {
+		whereQuery := bson.M{"index_code": edbCode, "data_time": result.MaxDate}
+		selectParam := bson.D{{"value", 1}, {"_id", 0}}
+		latestValue, tmpErr := mogDataObj.GetLatestValue(whereQuery, selectParam)
+		if tmpErr != nil {
+			err = tmpErr
+			return
+		}
+		result.LatestValue = latestValue.Value
+		result.EndValue = latestValue.Value
+	}
+
+	item = &EdbInfoMaxAndMinInfo{
+		MinDate:     result.MinDate.Format(utils.FormatDate),
+		MaxDate:     result.MaxDate.Format(utils.FormatDate),
+		MinValue:    result.MinValue,
+		MaxValue:    result.MaxValue,
+		LatestValue: result.LatestValue,
+		LatestDate:  result.LatestDate.Format(utils.FormatDate),
+		EndValue:    result.EndValue,
+	}
+
+	return
+}
+
+// ModifyIndexMaxAndMinInfo
+// @Description: 修改最大值和最小值信息
+// @author: Roc
+// @receiver m
+// @datetime 2024-05-06 14:07:46
+// @param indexCode string
+// @param item *EdbInfoMaxAndMinInfo
+// @param isIndexUpdateOrAdd bool
+// @return err error
+func (m *BaseFromBusinessIndex) ModifyIndexMaxAndMinInfo(indexCode string, item *EdbInfoMaxAndMinInfo, isIndexUpdateOrAdd bool) (err error) {
+	o := orm.NewOrm()
+	sql := ` UPDATE base_from_business_index SET start_date=?,end_date=?,modify_time=NOW() `
+	if isIndexUpdateOrAdd {
+		sql += `,data_update_time=NOW() `
+	}
+	sql += ` WHERE index_code=?`
+	_, err = o.Raw(sql, item.MinDate, item.MaxDate, indexCode).Exec()
+	return
+}

+ 2 - 2
models/edb_data_business.go

@@ -219,7 +219,7 @@ func (obj Business) refresh(edbInfo *EdbInfo, startDate string) (err error) {
 
 		existDataList, err = mogDataObj.GetAllDataList(queryConditions)
 		if err != nil {
-			fmt.Println("getEdbDataBusinessList Err:" + err.Error())
+			fmt.Println(obj.GetSourceName() + ",refresh err;getEdbDataBusinessList Err:" + err.Error())
 			return
 		}
 	}
@@ -348,7 +348,7 @@ func (obj Business) GetEdbInfoMaxAndMinInfo(edbCode string) (item *EdbInfoMaxAnd
 	}
 	result, err := mogDataObj.GetEdbInfoMaxAndMinInfo(pipeline)
 	if err != nil {
-		fmt.Println("getEdbDataBusinessList Err:" + err.Error())
+		fmt.Println("EdbDataBusiness getEdbDataBusinessList Err:" + err.Error())
 		return
 	}
 

+ 60 - 0
models/mgo/base_from_business_data.go

@@ -191,3 +191,63 @@ func (m *BaseFromBusinessData) HandleData(addDataList, updateDataList []BaseFrom
 
 	return
 }
+
+// GetEdbInfoMaxAndMinInfo
+// @Description: 获取当前指标的最大最小值
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-30 17:15:39
+// @param whereParams interface{}
+// @return result EdbInfoMaxAndMinInfo
+// @return err error
+func (m *BaseFromBusinessData) GetEdbInfoMaxAndMinInfo(whereParams interface{}) (result EdbInfoMaxAndMinInfo, err error) {
+	if utils.MgoDataCli == nil {
+		err = errors.New("mongodb连接失败")
+		return
+	}
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+	ctx := context.TODO()
+	if err != nil {
+		fmt.Println("MgoGetColl Err:", err.Error())
+		return
+	}
+	err = coll.Aggregate(ctx, whereParams).One(&result)
+	if err != nil {
+		return
+	}
+	result.MinDate = result.MinDate.In(time.Local)
+	result.MaxDate = result.MaxDate.In(time.Local)
+	result.LatestDate = result.LatestDate.In(time.Local)
+
+	return
+}
+
+// GetLatestValue
+// @Description: 获取当前指标的最新数据记录
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-30 17:16:15
+// @param whereParams interface{}
+// @param selectParam interface{}
+// @return latestValue LatestValue
+// @return err error
+func (m *BaseFromBusinessData) GetLatestValue(whereParams, selectParam interface{}) (latestValue LatestValue, err error) {
+	if utils.MgoDataCli == nil {
+		err = errors.New("mongodb连接失败")
+		return
+	}
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+	ctx := context.TODO()
+	if err != nil {
+		fmt.Println("MgoGetColl Err:", err.Error())
+		return
+	}
+
+	//var result interface{}
+	//err = coll.Find(ctx, whereParams).Select(selectParam).One(&result)
+	err = coll.Find(ctx, whereParams).Select(selectParam).One(&latestValue)
+
+	return
+}

+ 74 - 39
services/base_from_business.go

@@ -2,8 +2,10 @@ package services
 
 import (
 	"errors"
+	"eta/eta_index_lib/logic"
 	"eta/eta_index_lib/models"
 	"eta/eta_index_lib/models/mgo"
+	"eta/eta_index_lib/services/alarm_msg"
 	"eta/eta_index_lib/utils"
 	"fmt"
 	"go.mongodb.org/mongo-driver/bson"
@@ -136,6 +138,8 @@ func HandleBusinessIndex(indexItem *models.AddBusinessIndexReq) (err error) {
 		exitDataMap[v.DataTime.Format(utils.FormatDate)] = v
 	}
 
+	// 当前传入的最小日期
+	var reqMinDate time.Time
 	// 待添加的数据集
 	addDataList := make([]mgo.BaseFromBusinessData, 0)
 	updateDataList := make([]mgo.BaseFromBusinessData, 0)
@@ -147,6 +151,12 @@ func HandleBusinessIndex(indexItem *models.AddBusinessIndexReq) (err error) {
 			fmt.Println("time.ParseInLocation Err:" + err.Error())
 			return err
 		}
+
+		// 调整最小日期
+		if reqMinDate.IsZero() || reqMinDate.After(dateTime) {
+			reqMinDate = dateTime
+		}
+
 		date := dateTime.Format(utils.FormatDate)
 
 		findData, ok := exitDataMap[date]
@@ -170,9 +180,13 @@ func HandleBusinessIndex(indexItem *models.AddBusinessIndexReq) (err error) {
 		}
 	}
 
+	// 指标数据是否新增或修改
+	var isIndexUpdateOrAdd bool
+
 	// 入库
 	{
 		if len(addDataList) > 0 {
+			isIndexUpdateOrAdd = true
 			err = mogDataObj.BatchInsertData(addDataList)
 			if err != nil {
 				fmt.Println("mogDataObj.HandleData() Err:" + err.Error())
@@ -181,6 +195,7 @@ func HandleBusinessIndex(indexItem *models.AddBusinessIndexReq) (err error) {
 		}
 
 		if len(updateDataList) > 0 {
+			isIndexUpdateOrAdd = true
 			coll := mogDataObj.GetCollection()
 			for _, v := range updateDataList {
 				err = mogDataObj.UpdateDataByColl(coll, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}})
@@ -200,45 +215,65 @@ func HandleBusinessIndex(indexItem *models.AddBusinessIndexReq) (err error) {
 	//}
 	//fmt.Println("result", result)
 
-	////修改最大最小日期
-	//mysteelIndexMaxItem, err := dataObj.GetMysteelIndexInfoMaxAndMinInfo(indexItem.IndexCode)
-	//if err == nil && mysteelIndexMaxItem != nil {
-	//	e := dataObj.ModifyMysteelIndexMaxAndMinInfo(indexItem.IndexCode, mysteelIndexMaxItem)
-	//	if e != nil {
-	//		fmt.Println("ModifyMysteelIndexMaxAndMinInfo Err:" + e.Error())
-	//	}
-	//}
-	//// 同步刷新图库钢联的指标
-	////go func() {
-	//var indexErr error
-	//var lErr error
-	//defer func() {
-	//	if indexErr != nil {
-	//		tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新异常, 指标编码: %s, err: %s", indexObj.IndexCode, indexErr.Error())
-	//		alarm_msg.SendAlarmMsg(tips, 3)
-	//	}
-	//
-	//	if lErr != nil {
-	//		tips := fmt.Sprintf("钢联数据刷新-ETA指标刷新统计异常, 指标编码: %s, err: %s", indexObj.IndexCode, lErr.Error())
-	//		alarm_msg.SendAlarmMsg(tips, 3)
-	//	}
-	//}()
-	//
-	//edbInfo, e := models.GetEdbInfoByEdbCode(utils.DATA_SOURCE_MYSTEEL_CHEMICAL, indexObj.IndexCode)
-	//if e != nil && e.Error() != utils.ErrNoRow() {
-	//	indexErr = e
-	//	return
-	//}
-	//
-	//if edbInfo != nil {
-	//	dataUpdateResult := 2
-	//	dataUpdateFailedReason := "服务异常"
-	//	_, logErrMsg, logErr := logic.RefreshBaseEdbInfo(edbInfo, ``)
-	//	if logErr != nil {
-	//		lErr = AddEdbInfoUpdateLog(edbInfo.EdbInfoId, 2, logErrMsg+logErr.Error(), dataUpdateResult, dataUpdateFailedReason, 1, 0)
-	//		return
-	//	}
-	//}
+	//修改最大最小日期
+	indexMaxAndMinInfo, err := item.GetEdbInfoMaxAndMinInfo(indexItem.IndexCode)
+	if err != nil {
+		return
+	}
+	if err == nil && indexMaxAndMinInfo != nil {
+		e := item.ModifyIndexMaxAndMinInfo(indexItem.IndexCode, indexMaxAndMinInfo, isIndexUpdateOrAdd)
+		if e != nil {
+			fmt.Println("ModifyIndexMaxAndMinInfo Err:" + e.Error())
+		}
+	}
+
+	// 同步刷新指标库的指标
+	go refreshEdbBusiness(item.IndexCode, reqMinDate)
 
 	return
 }
+
+func refreshEdbBusiness(indexCode string, reqMinDate time.Time) {
+	var indexErr error
+	var errMsg string
+	defer func() {
+		if indexErr != nil {
+			tips := fmt.Sprintf("外部数据刷新-ETA指标刷新异常, 指标编码: %s, err: %s, 错误信息:%s", indexCode, indexErr.Error(), errMsg)
+			alarm_msg.SendAlarmMsg(tips, 3)
+		}
+	}()
+
+	edbInfo, e := models.GetEdbInfoByEdbCode(utils.DATA_SOURCE_BUSINESS, indexCode)
+	if e != nil && e.Error() != utils.ErrNoRow() {
+		indexErr = e
+		return
+	}
+
+	if edbInfo != nil {
+		startDate := ``
+		if reqMinDate.IsZero() {
+			startDate = edbInfo.EndDate
+		} else {
+			startDate = reqMinDate.Format(utils.FormatDate)
+		}
+		params := models.RefreshBaseParams{
+			EdbInfo:   edbInfo,
+			StartDate: startDate,
+		}
+		obj := models.Business{}
+
+		indexErr, errMsg = obj.Refresh(params)
+		if indexErr != nil {
+			return
+		}
+
+		// 更新指标最大最小值
+		indexErr = obj.UnifiedModifyEdbInfoMaxAndMinInfo(edbInfo)
+		if indexErr != nil {
+			return
+		}
+
+		// 更新ES
+		go logic.UpdateEs(edbInfo.EdbInfoId)
+	}
+}