package controllers import ( "encoding/json" "eta/eta_forum_hub/models" "eta/eta_forum_hub/services" "eta/eta_forum_hub/utils" "fmt" "strconv" "time" ) type EdbDataController struct { BaseAuthController } // 根据binlog监听记录,更新指标数据 // SaveByBinlog 根据binlog监听记录,更新指标数据 // @Title 根据binlog监听记录,更新指标数据 // @Description 根据binlog监听记录,更新指标数据 // @Param request body models.EdbDataBinlogReq true "type json string" // @Success 200 {object} models.BaseResponse // @router /save_by_binlog [post] func (this *EdbDataController) SaveByBinlog() { br := new(models.BaseResponse).Init() defer func() { this.Data["json"] = br this.ServeJSON() }() var req models.EdbDataBinlogReq err := json.Unmarshal(this.Ctx.Input.RequestBody, &req) if err != nil { br.Msg = "参数解析异常!" br.ErrMsg = "参数解析失败,Err:" + err.Error() return } if len(req.List) == 0 { br.Msg = "参数异常!" br.ErrMsg = "参数异常,请检查!" return } edbInfoIdMap := make(map[int]*models.AddEdbDataBinlogReq) reqList := make([]*models.EdbDataBinlogItem, 0) // 判断指标库里是否存在这些指标,如果不存在,则丢弃,无需处理 edbInfoIds := make([]int, 0) for _, item := range req.List { // 解析json var dataItem models.EdbDataBinlogItem err := json.Unmarshal([]byte(item.Item), &dataItem) if err != nil { utils.FileLog.Info("解析binlog数据失败,Err:" + err.Error(), "item", item) continue } dataItem.OpType = item.OpType reqList = append(reqList, &dataItem) edbInfoIds = append(edbInfoIds, dataItem.EdbInfoId) } // 查询指标信息 edbInfoList, err := models.GetEdbInfoByIdList(edbInfoIds) if err != nil { br.Msg = "指标信息查询异常!" br.ErrMsg = "指标信息查询失败,Err:" + err.Error() return } existEdbInfoMap := make(map[int]*models.EdbInfo) for _, item := range edbInfoList { existEdbInfoMap[item.EdbInfoId] = item } // edbInfoList := []*models.EdbInfo{ // { // EdbInfoId: 1, // EdbCode: "test_code_1", // EdbType: 1, // }, // { // EdbInfoId: 2, // EdbCode: "test_code_2", // EdbType: 1, // }, // } // existEdbInfoMap := make(map[int]*models.EdbInfo) // for _, item := range edbInfoList { // existEdbInfoMap[item.EdbInfoId] = item // } edbDataIdMap := make(map[string]*models.EdbDataBaseWithOpType) for _, item := range reqList { edbInfo, ok := existEdbInfoMap[item.EdbInfoId] if !ok { continue } if _, ok := edbInfoIdMap[item.EdbInfoId]; !ok { tmp := &models.AddEdbDataBinlogReq{ EdbInfoId: item.EdbInfoId, EdbCode: item.EdbCode, EdbType: edbInfo.EdbType, } edbInfoIdMap[item.EdbInfoId] = tmp } value, ok := item.Value.(float64) if !ok { continue } createTime, _ := time.ParseInLocation("2006-01-02 15:04:05", item.CreateTime, time.Local) modifyTime, _ := time.ParseInLocation("2006-01-02 15:04:05", item.ModifyTime, time.Local) dataItem := &models.EdbDataBaseWithOpType{ EdbDataBase: &models.EdbDataBase{ EdbInfoId: item.EdbInfoId, EdbCode: item.EdbCode, Value: strconv.FormatFloat(value, 'f', -1, 64), DataTimestamp: item.DataTimestamp, DataTime: item.DataTime, CreateTime: createTime, ModifyTime: modifyTime, }, OpType: item.OpType, } if _, ok := edbDataIdMap[fmt.Sprintf("%d_%s", item.EdbInfoId, item.DataTime)]; ok { edbDataIdMap[fmt.Sprintf("%d_%s", item.EdbInfoId, item.DataTime)] = dataItem } else { edbDataIdMap[fmt.Sprintf("%d_%s", item.EdbInfoId, item.DataTime)] = dataItem } //edbInfoIdMap[item.EdbInfoId].DataList = append(edbInfoIdMap[item.EdbInfoId].DataList, dataItem) } edbDataMinDataTimeMap := make(map[int]string) edbDataMaxDataTimeMap := make(map[int]string) for _, item := range edbDataIdMap { edbInfoIdMap[item.EdbInfoId].DataList = append(edbInfoIdMap[item.EdbInfoId].DataList, item) if edbDataMinDataTimeMap[item.EdbInfoId] == "" || edbDataMinDataTimeMap[item.EdbInfoId] > item.DataTime { edbDataMinDataTimeMap[item.EdbInfoId] = item.DataTime } if edbDataMaxDataTimeMap[item.EdbInfoId] == "" || edbDataMaxDataTimeMap[item.EdbInfoId] < item.DataTime { edbDataMaxDataTimeMap[item.EdbInfoId] = item.DataTime } } for _, item := range edbInfoIdMap { minDataTime := edbDataMinDataTimeMap[item.EdbInfoId] maxDataTime := edbDataMaxDataTimeMap[item.EdbInfoId] err := services.AddOrUpdateEdbDataWithOpType(item.EdbInfoId, item.DataList, minDataTime, maxDataTime) if err != nil { br.Msg = "更新指标数据失败!" br.ErrMsg = "更新指标数据失败,Err:" + err.Error() return } } br.Ret = 200 br.Success = true br.Msg = "更新指标数据成功" return } // TestSaveByBinlog 测试SaveByBinlog接口的测试数据 // @Title 测试SaveByBinlog接口的测试数据 // @Description 测试SaveByBinlog接口的测试数据 // @Success 200 {object} models.BaseResponse // @router /test_save_by_binlog [get] func (this *EdbDataController) TestSaveByBinlog() { br := new(models.BaseResponse).Init() defer func() { this.Data["json"] = br this.ServeJSON() }() // 构造测试数据 testData := models.EdbDataBinlogReq{ List: []*models.EdbDataBinlogDataReq{ { OpType: "INSERT", Item: `{"create_time":"2024-10-12 17:03:04","data_time":"2024-09-08","data_timestamp":1725724800000,"edb_code":"test_code_1","edb_data_id":1001,"edb_info_id":1,"modify_time":"2024-10-14 13:37:30","value":71988}`, }, { OpType: "UPDATE", Item: `{"create_time":"2024-10-12 17:03:04","data_time":"2024-09-09","data_timestamp":1725811200000,"edb_code":"test_code_1","edb_data_id":1002,"edb_info_id":1,"modify_time":"2024-10-14 13:37:30","value":72500}`, }, { OpType: "DELETE", Item: `{"create_time":"2024-10-12 17:03:04","data_time":"2024-09-10","data_timestamp":1725897600000,"edb_code":"test_code_2","edb_data_id":2001,"edb_info_id":2,"modify_time":"2024-10-14 13:37:30","value":85600}`, }, }, } br.Ret = 200 br.Success = true br.Msg = "获取测试数据成功" br.Data = testData return }