package models import ( "errors" "eta/eta_index_lib/utils" "fmt" "github.com/beego/beego/v2/client/orm" "strconv" "strings" "time" ) // EdbDataAdjust 数据调整指标数据结构体 type EdbDataAdjust struct { EdbDataId int `orm:"column(edb_data_id);pk"` EdbInfoId int EdbCode string DataTime string Value float64 CreateTime time.Time ModifyTime time.Time DataTimestamp int64 } // AddAdjustEdbData 保存数据调整请求的数据的参数 type AddAdjustEdbData struct { Date string `description:"数据日期"` TimestampStr string `description:"日期时间戳"` Value float64 `description:"数据值"` } // SaveAdjustEdb 保存数据调整指标 func SaveAdjustEdb(req SaveAdjustEdbReq) (edbInfo *EdbInfo, err error, errMsg string) { errMsg = `添加指标失败` o := orm.NewOrm() to, err := o.Begin() if err != nil { return } defer func() { if err != nil { _ = to.Rollback() } else { _ = to.Commit() } }() var edbCode string if req.EdbInfoId <= 0 { //指标code生成 //生成指标编码 edbCode, err = utils.GenerateEdbCode(1, "") if err != nil { err = errors.New("指标编码生成失败,Err:" + err.Error()) return } timestamp := strconv.FormatInt(time.Now().UnixNano(), 10) uniqueCode := utils.MD5(utils.DATA_PREFIX + "_" + timestamp) edbInfo = &EdbInfo{ Source: utils.DATA_SOURCE_CALCULATE_ADJUST, SourceName: "数据调整", EdbCode: edbCode, EdbName: utils.TrimStr(req.EdbName), EdbNameSource: utils.TrimStr(req.EdbName), Frequency: utils.TrimStr(req.Frequency), Unit: utils.TrimStr(req.Unit), ClassifyId: req.ClassifyId, SysUserId: req.AdminId, SysUserRealName: req.AdminName, CreateTime: time.Now(), ModifyTime: time.Now(), UniqueCode: uniqueCode, EdbType: 2, Sort: GetAddEdbMaxSortByClassifyId(req.ClassifyId, utils.EDB_INFO_TYPE), } newEdbInfoId, tmpErr := to.Insert(edbInfo) if tmpErr != nil { err = tmpErr return } edbInfo.EdbInfoId = int(newEdbInfoId) // 获取来源指标 fromEdbInfo, tmpErr := GetEdbInfoById(req.FromEdbInfoId) if tmpErr != nil { err = tmpErr errMsg = "获取来源指标信息失败" return } //关联关系 { calculateMappingItem := new(EdbInfoCalculateMapping) calculateMappingItem.CreateTime = time.Now() calculateMappingItem.ModifyTime = time.Now() calculateMappingItem.Sort = 1 calculateMappingItem.EdbCode = edbCode calculateMappingItem.EdbInfoId = edbInfo.EdbInfoId calculateMappingItem.FromEdbInfoId = fromEdbInfo.EdbInfoId calculateMappingItem.FromEdbCode = fromEdbInfo.EdbCode calculateMappingItem.FromEdbName = fromEdbInfo.EdbName calculateMappingItem.FromSource = fromEdbInfo.Source calculateMappingItem.FromSourceName = fromEdbInfo.SourceName calculateMappingItem.FromTag = "" calculateMappingItem.Source = edbInfo.Source calculateMappingItem.SourceName = edbInfo.SourceName _, err = to.Insert(calculateMappingItem) if err != nil { return } } //相关配置 { var fromEdbEndDate time.Time if fromEdbInfo.EndDate != `0000-00-00` { fromEdbEndDate, _ = time.Parse(utils.FormatDate, fromEdbInfo.EndDate) } edbAdjustConf := &EdbAdjustConf{ EdbInfoId: edbInfo.EdbInfoId, SourceEndDate: fromEdbEndDate, ModifyTime: time.Now(), CreateTime: time.Now(), } _, err = to.Insert(edbAdjustConf) if err != nil { return } } } else { edbInfo, err = GetEdbInfoById(req.EdbInfoId) if err != nil { return } // 更新指标信息 edbInfo.EdbName = utils.TrimStr(req.EdbName) edbInfo.EdbNameSource = utils.TrimStr(req.EdbName) edbInfo.Frequency = utils.TrimStr(req.Frequency) edbInfo.Unit = utils.TrimStr(req.Unit) edbInfo.ClassifyId = req.ClassifyId edbInfo.ModifyTime = time.Now() _, err = to.Update(edbInfo, "EdbName", "EdbNameSource", "Frequency", "Unit", "ClassifyId", "ModifyTime") if err != nil { return } // 指标code edbCode = edbInfo.EdbCode // 清除数据 dataTableName := GetEdbDataTableName(utils.DATA_SOURCE_CALCULATE_ADJUST, edbInfo.SubSource) deleteSql := ` DELETE FROM %s WHERE edb_info_id=? ` deleteSql = fmt.Sprintf(deleteSql, dataTableName) _, err = to.Raw(deleteSql, req.EdbInfoId).Exec() if err != nil { return } //相关配置 { // 获取来源指标 calculateMapping, tmpErr := GetEdbInfoCalculateMappingDetail(edbInfo.EdbInfoId) if tmpErr != nil { errMsg = `关联指标获取异常` err = tmpErr return } fromEdbInfo, tmpErr := GetEdbInfoById(calculateMapping.FromEdbInfoId) if tmpErr != nil { errMsg = `来源指标不存在` err = tmpErr return } var fromEdbEndDate time.Time if fromEdbInfo.EndDate != `0000-00-00` { fromEdbEndDate, _ = time.Parse(utils.FormatDate, fromEdbInfo.EndDate) } // 获取之前的配置 var edbAdjustConf *EdbAdjustConf sql := ` SELECT * FROM edb_adjust_conf WHERE edb_info_id=? ` err = o.Raw(sql, edbInfo.EdbInfoId).QueryRow(&edbAdjustConf) if err != nil { errMsg = "获取指标配置失败" return } // 修改配置的最晚日期 edbAdjustConf.SourceEndDate = fromEdbEndDate edbAdjustConf.ModifyTime = time.Now() _, err = to.Update(edbAdjustConf, "SourceEndDate", "ModifyTime") if err != nil { errMsg = "更新指标配置失败" return } } } dataList := req.DataList dataObj := make(map[string]AddAdjustEdbData) for _, item := range dataList { currentDate, tmpErr := time.ParseInLocation(utils.FormatDate, item.Date, time.Local) if tmpErr != nil { err = tmpErr return } timestamp := currentDate.UnixNano() / 1e6 timestampStr := fmt.Sprintf("%d", timestamp) tmpVal, ok := dataObj[item.Date] if !ok { tmpVal = AddAdjustEdbData{ Date: item.Date, TimestampStr: timestampStr, Value: item.Value, } } else { tmpVal.Value = item.Value } dataObj[item.Date] = tmpVal } var isAdd bool dataTableName := GetEdbDataTableName(edbInfo.Source, edbInfo.SubSource) addSql := ` INSERT INTO ` + dataTableName + `(edb_info_id,edb_code,data_time,value,create_time,modify_time,data_timestamp) values ` for _, item := range dataObj { //值 val := item.Value saveVal := utils.SubFloatToString(val, 20) addSql += GetAddSql(fmt.Sprint(edbInfo.EdbInfoId), edbCode, item.Date, item.TimestampStr, saveVal) isAdd = true } if isAdd { addSql = strings.TrimRight(addSql, ",") _, err = to.Raw(addSql).Exec() if err != nil { return } } return } // RefreshAllAdjustEdb 刷新所有 数据调整指标 func RefreshAllAdjustEdb(edbInfo *EdbInfo, fromEdbInfo *EdbInfo) (err error) { o := orm.NewOrm() to, err := o.Begin() if err != nil { return } defer func() { if err != nil { fmt.Println("RefreshAllPythonEdb,Err:" + err.Error()) _ = to.Rollback() } else { _ = to.Commit() } }() // 当前数据表名 dataTableName := GetEdbDataTableName(edbInfo.Source, edbInfo.SubSource) // 获取之前的配置 var edbAdjustConf *EdbAdjustConf sql := ` SELECT * FROM edb_adjust_conf WHERE edb_info_id=? ` err = o.Raw(sql, edbInfo.EdbInfoId).QueryRow(&edbAdjustConf) if err != nil { err = errors.New("获取指标配置失败") return } // 查询关联指标的数据 dataList, err := GetEdbDataListAllByTo(to, fromEdbInfo.Source, fromEdbInfo.SubSource, FindEdbDataListAllCond{ EdbInfoId: fromEdbInfo.EdbInfoId, StartDataTime: edbAdjustConf.SourceEndDate.Format(utils.FormatDate), StartDataTimeCond: ">", }, 1) if err != nil { return err } // 已经入库的日期数据 existDataMap := make(map[string]float64) { existDataList, tmpErr := GetEdbDataListAllByTo(to, edbInfo.Source, edbInfo.SubSource, FindEdbDataListAllCond{ EdbInfoId: edbInfo.EdbInfoId, StartDataTime: edbAdjustConf.SourceEndDate.Format(utils.FormatDate), StartDataTimeCond: ">", }, 1) if tmpErr != nil { err = tmpErr return } for _, v := range existDataList { existDataMap[v.DataTime] = v.Value } } var isAdd bool addSql := ` INSERT INTO ` + dataTableName + `(edb_info_id,edb_code,data_time,value,create_time,modify_time,data_timestamp) values ` for _, item := range dataList { currDay := item.DataTime //值 val := item.Value saveVal := utils.SubFloatToString(val, 20) if existVal, ok := existDataMap[currDay]; !ok { //格式化时间 currentDate, tmpErr := time.Parse(utils.FormatDate, item.DataTime) if tmpErr != nil { err = tmpErr return } timestamp := currentDate.UnixNano() / 1e6 timestampStr := fmt.Sprintf("%d", timestamp) addSql += GetAddSql(fmt.Sprint(edbInfo.EdbInfoId), edbInfo.EdbCode, item.DataTime, timestampStr, saveVal) isAdd = true } else { if existVal != item.Value { sql = ` UPDATE %s SET value=?,modify_time=NOW() WHERE edb_info_id=? AND data_time=? ` sql = fmt.Sprintf(sql, dataTableName) _, err = to.Raw(sql, saveVal, edbInfo.EdbInfoId, currDay).Exec() if err != nil { return err } } } } if isAdd { addSql = strings.TrimRight(addSql, ",") _, err = to.Raw(addSql).Exec() if err != nil { return } // 变更数据调整配置表的实际结束日期 { var fromEdbEndDate time.Time if fromEdbInfo.EndDate != `0000-00-00` { fromEdbEndDate, _ = time.Parse(utils.FormatDate, fromEdbInfo.EndDate) } edbAdjustConf.SourceEndDate = fromEdbEndDate edbAdjustConf.ModifyTime = time.Now() _, err = to.Update(edbAdjustConf, "SourceEndDate", "ModifyTime") } } return } // SaveAdjustEdbReq 保存数据调整请求参数(请求指标服务) type SaveAdjustEdbReq struct { AdminId int `description:"添加人id"` AdminName string `description:"添加人名称"` EdbInfoId int `description:"指标id"` FromEdbInfoId int `description:"来源指标id"` EdbName string `description:"指标名称"` Frequency string `description:"频度"` Unit string `description:"单位"` ClassifyId int `description:"分类id"` DataList []SaveAdjustEdbDataReq `description:"指标对应的数据值"` } // SaveAdjustEdbDataReq 保存数据调整请求的数据的参数 type SaveAdjustEdbDataReq struct { Date string `description:"数据日期"` Value float64 `description:"数据值"` } // FixData 修复数据 func FixData() { var list []*EdbInfo o := orm.NewOrm() sql := ` SELECT * FROM edb_info WHERE source=? ` _, err := o.Raw(sql, 40).QueryRows(&list) for _, edbInfo := range list { // 获取来源指标 calculateMapping, tmpErr := GetEdbInfoCalculateMappingDetail(edbInfo.EdbInfoId) if tmpErr != nil { fmt.Println(edbInfo.EdbInfoId, "关联指标获取异常;err:", tmpErr) continue } fromEdbInfo, tmpErr := GetEdbInfoById(calculateMapping.FromEdbInfoId) if tmpErr != nil { fmt.Println(edbInfo.EdbInfoId, "来源指标不存在;err:", tmpErr) continue } var fromEdbEndDate time.Time if fromEdbInfo.EndDate != `0000-00-00` { fromEdbEndDate, _ = time.Parse(utils.FormatDate, fromEdbInfo.EndDate) } // 获取之前的配置 edbAdjustConf := &EdbAdjustConf{ EdbInfoId: edbInfo.EdbInfoId, SourceEndDate: fromEdbEndDate, ModifyTime: time.Now(), CreateTime: time.Now(), } _, err = o.Insert(edbAdjustConf) if err != nil { fmt.Println(edbInfo.EdbInfoId, "添加失败;err:", tmpErr) continue } } fmt.Println("end") }