فهرست منبع

新增指标数据同步方法

xyxie 2 هفته پیش
والد
کامیت
d97463d30e
6فایلهای تغییر یافته به همراه353 افزوده شده و 6 حذف شده
  1. 114 0
      controllers/edb_data.go
  2. 9 2
      models/mgodb/edb_data_base.go
  3. 8 1
      models/mgodb/edb_data_calculate.go
  4. 9 0
      routers/commentsRouter.go
  5. 5 0
      routers/router.go
  6. 208 3
      services/edb_data.go

+ 114 - 0
controllers/edb_data.go

@@ -0,0 +1,114 @@
+package controllers
+
+import (
+	"encoding/json"
+	"eta/eta_forum_hub/models"
+	"fmt"
+	"strconv"
+)
+
+type EdbDataController struct {
+	BaseAuthController
+}
+// 根据binlog监听记录,更新指标数据
+// SaveByBinlog 根据binlog监听记录,更新指标数据
+// @Title 根据binlog监听记录,更新指标数据
+// @Description 根据binlog监听记录,更新指标数据
+// @Param   request	body models.EdbDataBinlogReq true "type json string"
+// @Success 200 {object} models.BaseResponse
+// @router /save_by_binlog [post]
+func (this *EdbDataController) SaveByBinlog() {
+	br := new(models.BaseResponse).Init()
+	defer func() {
+		this.Data["json"] = br
+		this.ServeJSON()
+	}()
+
+	var req models.EdbDataBinlogReq
+	err := json.Unmarshal(this.Ctx.Input.RequestBody, &req)
+	if err != nil {
+		br.Msg = "参数解析异常!"
+		br.ErrMsg = "参数解析失败,Err:" + err.Error()
+		return
+	}
+
+	if len(req.List) == 0 {
+		br.Msg = "参数异常!"
+		br.ErrMsg = "参数异常,请检查!"
+		return
+	}
+
+	edbInfoIdMap := make(map[int]*models.AddEdbDataBinlogReq)
+	reqList := make([]*models.EdbDataBinlogItem, 0)
+	// 判断指标库里是否存在这些指标,如果不存在,则丢弃,无需处理
+	edbInfoIds := make([]int, 0)
+	for _, item := range req.List {
+		// 解析json
+		var dataItem models.EdbDataBinlogItem
+		err := json.Unmarshal([]byte(item.Item), &dataItem)
+		if err != nil {
+			br.Msg = "参数解析异常!"
+			br.ErrMsg = "参数解析失败,Err:" + err.Error()
+			return
+		}
+		dataItem.OpType = item.OpType
+		reqList = append(reqList, &dataItem)
+		edbInfoIds = append(edbInfoIds, dataItem.EdbInfoId)
+	}
+	// 查询指标信息
+	edbInfoList, err := models.GetEdbInfoByIdList(edbInfoIds)
+	if err != nil {
+		br.Msg = "指标信息查询异常!"
+		br.ErrMsg = "指标信息查询失败,Err:" + err.Error()
+		return
+	}
+	existEdbInfoMap := make(map[int]*models.EdbInfo)
+	for _, item := range edbInfoList {
+		existEdbInfoMap[item.EdbInfoId] = item
+	}
+	
+	edbDataIdMap := make(map[string]*models.EdbDataBaseWithOpType)
+	for _, item := range reqList {
+		edbInfo, ok := existEdbInfoMap[item.EdbInfoId]
+		if !ok {
+			continue
+		}
+		if _, ok := edbInfoIdMap[item.EdbInfoId]; !ok {
+			tmp := &models.AddEdbDataBinlogReq{
+				EdbInfoId: item.EdbInfoId,
+				EdbCode: item.EdbCode,
+				EdbType: edbInfo.EdbType,
+			}
+			edbInfoIdMap[item.EdbInfoId] = tmp
+		}
+		value, ok := item.Value.(float64)
+		if !ok {
+			continue
+		}
+		
+		dataItem := &models.EdbDataBaseWithOpType{
+			EdbDataBase: &models.EdbDataBase{
+				EdbInfoId: item.EdbInfoId,
+				EdbCode: item.EdbCode,
+				Value: strconv.FormatFloat(value, 'f', -1, 64),
+				DataTimestamp: item.DataTimestamp,
+				DataTime: item.DataTime.Format("2006-01-02 15:04:05"),
+			},
+			OpType: item.OpType,
+		}
+		if _, ok := edbDataIdMap[fmt.Sprintf("%d_%d", item.EdbInfoId, item.EdbDataId)]; ok {
+			edbDataIdMap[fmt.Sprintf("%d_%d", item.EdbInfoId, item.EdbDataId)] = dataItem
+		} else {
+			edbDataIdMap[fmt.Sprintf("%d_%d", item.EdbInfoId, item.EdbDataId)] = dataItem
+		}
+		//edbInfoIdMap[item.EdbInfoId].DataList = append(edbInfoIdMap[item.EdbInfoId].DataList, dataItem)
+	}
+
+	for _, item := range edbDataIdMap {
+		edbInfoIdMap[item.EdbInfoId].DataList = append(edbInfoIdMap[item.EdbInfoId].DataList, item)
+	}
+
+	return
+}
+
+

+ 9 - 2
models/mgodb/edb_data_base.go

@@ -86,8 +86,15 @@ func DeleteEdbInfoDataByEdbInfoId(edbInfoId int) (err error) {
 }
 
 // 删除
-func DeleteEdbInfoDataByEdbInfoIdAndDate(edbCode string, dataTime []time.Time) (err error) {
-	filter := bson.D{{"edb_code", edbCode}, {"data_time", bson.D{{"$in", dataTime}}}}
+func DeleteEdbInfoDataByEdbInfoIdAndDateList(edbCode string, dataTimeList []time.Time) (err error) {
+	filter := bson.D{{"edb_code", edbCode}, {"data_time", bson.D{{"$in", dataTimeList}}}}
+	db := NewMgo(utils.MgoDataDbName, "edb_data_base", utils.MgoDataCli)
+	_, err = db.DeleteMany(filter)
+	return
+}
+
+func DeleteEdbInfoDataByEdbInfoIdAndDate(edbCode string, dataTime string) (err error) {
+	filter := bson.D{{"edb_code", edbCode}, {"data_time", dataTime}}
 	db := NewMgo(utils.MgoDataDbName, "edb_data_base", utils.MgoDataCli)
 	_, err = db.DeleteMany(filter)
 	return

+ 8 - 1
models/mgodb/edb_data_calculate.go

@@ -76,13 +76,20 @@ func ModifyValueEdbCalculateDataValue(edbDataId primitive.ObjectID, value float6
 	return
 }
 
-func DeleteEdbCalculateDataByEdbInfoIdAndDate(edbCode string, dataTime []time.Time) (err error) {
+func DeleteEdbCalculateDataByEdbInfoIdAndDateList(edbCode string, dataTime []time.Time) (err error) {
 	filter := bson.D{{"edb_code", edbCode}, {"data_time", bson.D{{"$in", dataTime}}}}
 	db := NewMgo(utils.MgoDataDbName, "edb_data_calculate", utils.MgoDataCli)
 	_, err = db.DeleteMany(filter)
 	return
 }
 
+func DeleteEdbCalculateDataByEdbInfoIdAndDate(edbCode string, dataTime string) (err error) {
+	filter := bson.D{{"edb_code", edbCode}, {"data_time", dataTime}}
+	db := NewMgo(utils.MgoDataDbName, "edb_data_calculate", utils.MgoDataCli)
+	_, err = db.DeleteMany(filter)
+	return
+}
+
 // GetEdbCalculateDataList 获取指标的数据(日期正序返回)
 func GetEdbCalculateDataList(endInfoId int, startDate, endDate time.Time) (list []*EdbDataBase, err error) {
 	findOptions := options.Find()

+ 9 - 0
routers/commentsRouter.go

@@ -340,4 +340,13 @@ func init() {
             Filters: nil,
             Params: nil})
 
+    beego.GlobalControllerRouter["eta/eta_forum_hub/controllers:EdbDataController"] = append(beego.GlobalControllerRouter["eta/eta_forum_hub/controllers:EdbDataController"],
+        beego.ControllerComments{
+            Method: "SaveByBinlog",
+            Router: `/save_by_binlog`,
+            AllowHTTPMethods: []string{"post"},
+            MethodParams: param.Make(),
+            Filters: nil,
+            Params: nil})
+
 }

+ 5 - 0
routers/router.go

@@ -55,6 +55,11 @@ func init() {
 				&controllers.ChartClassifyController{},
 			),
 		),
+		web.NSNamespace("/edb_data",
+			web.NSInclude(
+				&controllers.EdbDataController{},
+			),
+		),
 	)
 	web.AddNamespace(ns)
 }

+ 208 - 3
services/edb_data.go

@@ -5,9 +5,10 @@ import (
 	"eta/eta_forum_hub/models/mgodb"
 	"eta/eta_forum_hub/utils"
 	"fmt"
-	"github.com/shopspring/decimal"
 	"strconv"
 	"time"
+
+	"github.com/shopspring/decimal"
 )
 
 func BatchAddOrUpdateEdbData(req []*models.AddEdbDataReq) (err error) {
@@ -28,6 +29,10 @@ func BatchAddOrUpdateEdbData(req []*models.AddEdbDataReq) (err error) {
 }
 
 func AddOrUpdateEdbData(edbCode string, dataList []*models.EdbDataBase) (err error) {
+	// 指标数据为空则不更新
+	if len(dataList) == 0 {
+		return
+	}
 	addList := make([]interface{}, 0)
 	existList, err := mgodb.GetEdbDataBaseByEdbCode(edbCode)
 	if err != nil {
@@ -97,7 +102,7 @@ func AddOrUpdateEdbData(edbCode string, dataList []*models.EdbDataBase) (err err
 		}
 		removeNum := len(removeDateList)
 		if removeNum > 0 {
-			err = mgodb.DeleteEdbInfoDataByEdbInfoIdAndDate(edbCode, removeDateList)
+			err = mgodb.DeleteEdbInfoDataByEdbInfoIdAndDateList(edbCode, removeDateList)
 			if err != nil {
 				err = fmt.Errorf("删除多余日期失败,error, %v", err)
 				return
@@ -113,6 +118,10 @@ func AddOrUpdateEdbData(edbCode string, dataList []*models.EdbDataBase) (err err
 }
 
 func AddOrUpdateEdbDataCalculate(edbCode string, dataList []*models.EdbDataBase) (err error) {
+	// 指标数据为空则不更新
+	if len(dataList) == 0 {
+		return
+	}
 	addList := make([]interface{}, 0)
 	existList, err := mgodb.GetEdbDataCalculateByEdbCode(edbCode)
 	if err != nil {
@@ -182,7 +191,7 @@ func AddOrUpdateEdbDataCalculate(edbCode string, dataList []*models.EdbDataBase)
 		}
 		removeNum := len(removeDateList)
 		if removeNum > 0 {
-			err = mgodb.DeleteEdbCalculateDataByEdbInfoIdAndDate(edbCode, removeDateList)
+			err = mgodb.DeleteEdbCalculateDataByEdbInfoIdAndDateList(edbCode, removeDateList)
 			if err != nil {
 				err = fmt.Errorf("删除多余日期失败,error, %v", err)
 				return
@@ -361,3 +370,199 @@ func GetEdbDataTbzForSeason(frequency string, tmpDataList []*models.EdbDataList,
 
 	return
 }
+
+// AddOrUpdateEdbDataWithOpType 根据操作类型,更新指标数据
+func AddOrUpdateEdbDataWithOpType(edbCode string, dataList []*models.EdbDataBaseWithOpType) (err error) {
+	// 指标数据为空则不更新
+	if len(dataList) == 0 {
+		return
+	}
+	addList := make([]interface{}, 0)
+	existList, err := mgodb.GetEdbDataBaseByEdbCode(edbCode)
+	if err != nil {
+		err = fmt.Errorf("查询指标数据出错 error, %v", err)
+		return
+	}
+	existMap := make(map[string]*mgodb.EdbDataBase, len(existList))
+	deleteMap := make(map[string]struct{}, len(existList))
+	for _, exist := range existList {
+		tmp := exist.DataTime.Format(utils.FormatDate)
+		existMap[tmp] = exist
+		deleteMap[tmp] = struct{}{}
+	}
+	for _, v := range dataList {
+		dataTime, e := time.ParseInLocation(utils.FormatDate, v.DataTime, time.Local)
+		if e != nil {
+			err = fmt.Errorf("时间格式化出错 error, %v", e)
+			return
+		}
+		val, e := strconv.ParseFloat(v.Value, 64)
+		if e != nil {
+			err = fmt.Errorf("指标数据格式化出错 error, %v", e)
+			return
+		}
+		if oldObj, ok := existMap[v.DataTime]; !ok {
+			if len(addList) >0 && v.OpType == "delete" {
+				err = mgodb.InsertEdbDataBatch(addList)
+				if err != nil {
+					err = fmt.Errorf("批量新增指标数据失败 error, %v", e)
+					return
+				}
+				addList = make([]interface{}, 0)
+				continue
+			}
+			if v.OpType == "insert"  || v.OpType == "update" {
+				tmp := &mgodb.EdbDataBase{
+					EdbInfoId:     0,
+					EdbCode:       v.EdbCode,
+					DataTime:      dataTime,
+					Value:         val,
+					Status:        v.Status,
+					CreateTime:    time.Now(),
+					ModifyTime:    time.Now(),
+					DataTimestamp: v.DataTimestamp,
+				}
+				addList = append(addList, tmp)
+			}
+		} else {
+			if v.OpType != "delete" {
+				if val != oldObj.Value {
+					err = mgodb.ModifyValueEdbDataValue(oldObj.EdbDataId, val)
+					if err != nil {
+						err = fmt.Errorf("更新指标数据出错 error, %v", err)
+					return
+				}
+			}else if v.OpType == "delete" {
+				// 删除数据
+				err = mgodb.DeleteEdbInfoDataByEdbInfoIdAndDate(v.EdbCode, v.DataTime)
+				if err != nil {
+					err = fmt.Errorf("删除指标数据出错 error, %v", err)
+					return
+				}
+			}
+		}
+	}
+
+	//遍历deletemap,找出需要删除的日期
+	// {
+	// 	removeDateList := make([]time.Time, 0)
+	// 	for dateTime := range deleteMap {
+	// 		dateT, e := time.ParseInLocation(utils.FormatDate, dateTime, time.Local)
+	// 		if e != nil {
+	// 			err = fmt.Errorf("日期格式错误 error, %v", e)
+	// 			return
+	// 		}
+	// 		removeDateList = append(removeDateList, dateT)
+	// 	}
+	// 	removeNum := len(removeDateList)
+	// 	if removeNum > 0 {
+	// 		err = mgodb.DeleteEdbInfoDataByEdbInfoIdAndDate(edbCode, removeDateList)
+	// 		if err != nil {
+	// 			err = fmt.Errorf("删除多余日期失败,error, %v", err)
+	// 			return
+	// 		}
+	// 	}
+	}
+
+	if len(addList) > 0 {
+		err = mgodb.InsertEdbDataBatch(addList)
+	}
+
+	return
+}
+
+func AddOrUpdateEdbDataCalculateWithOpType(edbCode string, dataList []*models.EdbDataBaseWithOpType) (err error) {
+	// 指标数据为空则不更新
+	if len(dataList) == 0 {
+		return
+	}
+	addList := make([]interface{}, 0)
+	existList, err := mgodb.GetEdbDataCalculateByEdbCode(edbCode)
+	if err != nil {
+		err = fmt.Errorf("查询指标数据出错 error, %v", err)
+		return
+	}
+	existMap := make(map[string]*mgodb.EdbDataBase, len(existList))
+	deleteMap := make(map[string]struct{}, len(existList))
+	for _, exist := range existList {
+		tmp := exist.DataTime.Format(utils.FormatDate)
+		existMap[tmp] = exist
+		deleteMap[tmp] = struct{}{}
+	}
+	for _, v := range dataList {
+		dataTime, e := time.ParseInLocation(utils.FormatDate, v.DataTime, time.Local)
+		if e != nil {
+			err = fmt.Errorf("时间格式化出错 error, %v", e)
+			return
+		}
+		val, e := strconv.ParseFloat(v.Value, 64)
+		if e != nil {
+			err = fmt.Errorf("指标数据格式化出错 error, %v", e)
+			return
+		}
+		if oldObj, ok := existMap[v.DataTime]; !ok {
+			if len(addList) >0 && v.OpType == "delete" {
+				err = mgodb.InsertEdbCalculateDataBatch(addList)
+				if err != nil {
+					err = fmt.Errorf("批量新增指标数据失败 error, %v", e)
+					return
+				}
+				addList = make([]interface{}, 0)
+				continue
+			}
+			if v.OpType == "insert"  || v.OpType == "update" {
+				tmp := &mgodb.EdbDataBase{
+						EdbInfoId:     0,
+						EdbCode:       v.EdbCode,
+						DataTime:      dataTime,
+						Value:         val,
+						Status:        v.Status,
+						CreateTime:    time.Now(),
+						ModifyTime:    time.Now(),
+						DataTimestamp: v.DataTimestamp,
+					}
+				addList = append(addList, tmp)
+			}
+		} else {
+			if v.OpType != "delete" {
+				if val != oldObj.Value {
+					err = mgodb.ModifyValueEdbCalculateDataValue(oldObj.EdbDataId, val)
+					if err != nil {
+						err = fmt.Errorf("更新指标数据出错 error, %v", err)
+						return
+					}
+				}
+			} else {
+				// 删除数据
+				err = mgodb.DeleteEdbCalculateDataByEdbInfoIdAndDate(v.EdbCode, v.DataTime)
+				if err != nil {
+					err = fmt.Errorf("删除指标数据出错 error, %v", err)
+					return
+				}
+			}
+		}
+	//遍历deleteMap,找出需要删除的日期
+	// {
+	// 	removeDateList := make([]time.Time, 0)
+	// 	for dateTime := range deleteMap {
+	// 		dateT, e := time.ParseInLocation(utils.FormatDate, dateTime, time.Local)
+	// 		if e != nil {
+	// 			err = fmt.Errorf("日期格式错误 error, %v", e)
+	// 			return
+	// 		}
+	// 		removeDateList = append(removeDateList, dateT)
+	// 	}
+	// 	removeNum := len(removeDateList)
+	// 	if removeNum > 0 {
+	// 		err = mgodb.DeleteEdbCalculateDataByEdbInfoIdAndDateList(edbCode, removeDateList)
+	// 		if err != nil {
+	// 			err = fmt.Errorf("删除多余日期失败,error, %v", err)
+	// 			return
+	// 		}
+	// 	}
+	}
+	if len(addList) > 0 {
+		err = mgodb.InsertEdbCalculateDataBatch(addList)
+	}
+	return
+}