浏览代码

Merge branch 'feature/custom_pool_edb_calculate_sp' into debug

# Conflicts:
#	models/edb_data_calculate_bp.go
xyxie 9 月之前
父节点
当前提交
76854ae5de
共有 2 个文件被更改,包括 123 次插入95 次删除
  1. 1 1
      controllers/base_from_calculate.go
  2. 122 94
      models/edb_data_calculate_bp.go

+ 1 - 1
controllers/base_from_calculate.go

@@ -1509,7 +1509,7 @@ func (this *CalculateController) Refresh() {
 		}
 		//startDate = edbInfo.StartDate
 		endDate = time.Now().Format(utils.FormatDate)
-		err = models.RefreshAllCalculateBp(edbInfoId, source, subSource, fromEdbInfo, calculateTbz.EdbCode, startDate, endDate)
+		err = models.RefreshAllCalculateBp(edbInfoId, source, subSource, fromEdbInfo, calculateTbz.EdbCode, startDate, endDate, edbInfo.EmptyType)
 		if err != nil && err.Error() != utils.ErrNoRow() {
 			errMsg = "RefreshAllCalculateBp Err:" + err.Error()
 			break

+ 122 - 94
models/edb_data_calculate_bp.go

@@ -46,6 +46,7 @@ func AddCalculateBp(req *EdbInfoCalculateBatchSaveReq, fromEdbInfo *EdbInfo, edb
 		edbInfo.UnitEn = req.Unit
 		edbInfo.EdbType = 2
 		edbInfo.Sort = GetAddEdbMaxSortByClassifyId(req.ClassifyId, utils.EDB_INFO_TYPE)
+		edbInfo.EmptyType = req.EmptyType
 		newEdbInfoId, tmpErr := to.Insert(edbInfo)
 		if tmpErr != nil {
 			err = tmpErr
@@ -90,7 +91,7 @@ func AddCalculateBp(req *EdbInfoCalculateBatchSaveReq, fromEdbInfo *EdbInfo, edb
 	}
 
 	//计算数据
-	err = refreshAllCalculateBp(to, edbInfo.EdbInfoId, edbInfo.Source, edbInfo.SubSource, fromEdbInfo, edbInfo.EdbCode, "", "", 0)
+	err = refreshAllCalculateBp(to, edbInfo.EdbInfoId, edbInfo.Source, edbInfo.SubSource, fromEdbInfo, edbInfo.EdbCode, "", "", 0, edbInfo.EmptyType)
 
 	return
 }
@@ -111,6 +112,7 @@ func EditCalculateBp(edbInfo *EdbInfo, req *EdbInfoCalculateBatchEditReq, fromEd
 		}
 	}()
 
+	oldEmptyType := edbInfo.EmptyType
 	//修改指标信息
 	edbInfo.EdbName = req.EdbName
 	edbInfo.EdbNameSource = req.EdbName
@@ -120,7 +122,8 @@ func EditCalculateBp(edbInfo *EdbInfo, req *EdbInfoCalculateBatchEditReq, fromEd
 	edbInfo.EdbNameEn = req.EdbNameEn
 	edbInfo.UnitEn = req.UnitEn
 	edbInfo.ModifyTime = time.Now()
-	_, err = to.Update(edbInfo, "EdbName", "EdbNameSource", "Frequency", "Unit", "ClassifyId", "ModifyTime", "EdbNameEn", "UnitEn")
+	edbInfo.EmptyType = req.EmptyType
+	_, err = to.Update(edbInfo, "EdbName", "EdbNameSource", "Frequency", "Unit", "ClassifyId", "ModifyTime", "EdbNameEn", "UnitEn", "EmptyType")
 	if err != nil {
 		return
 	}
@@ -136,7 +139,7 @@ func EditCalculateBp(edbInfo *EdbInfo, req *EdbInfoCalculateBatchEditReq, fromEd
 		err = errors.New("判断指标是否改变失败,Err:" + err.Error())
 		return
 	}
-	if count > 0 { // 指标未被替换,无需重新计算
+	if count > 0 && oldEmptyType == req.EmptyType { // 指标未被替换,无需重新计算
 		return
 	}
 
@@ -179,7 +182,7 @@ func EditCalculateBp(edbInfo *EdbInfo, req *EdbInfoCalculateBatchEditReq, fromEd
 	}
 
 	//计算数据
-	err = refreshAllCalculateBp(to, edbInfo.EdbInfoId, edbInfo.Source, edbInfo.SubSource, fromEdbInfo, edbInfo.EdbCode, "", "", 0)
+	err = refreshAllCalculateBp(to, edbInfo.EdbInfoId, edbInfo.Source, edbInfo.SubSource, fromEdbInfo, edbInfo.EdbCode, "", "", 0, edbInfo.EmptyType)
 
 	return
 }
@@ -316,7 +319,7 @@ func RefreshAllCalculateBpBak(edbInfoId, source, subSource int, fromEdbInfo *Edb
 	return
 }
 
-func RefreshAllCalculateBp(edbInfoId, source, subSource int, fromEdbInfo *EdbInfo, edbCode, startDate, endDate string) (err error) {
+func RefreshAllCalculateBp(edbInfoId, source, subSource int, fromEdbInfo *EdbInfo, edbCode, startDate, endDate string, emptyType int) (err error) {
 	o := orm.NewOrm()
 	to, err := o.Begin()
 	if err != nil {
@@ -332,13 +335,13 @@ func RefreshAllCalculateBp(edbInfoId, source, subSource int, fromEdbInfo *EdbInf
 	}()
 
 	// 计算数据
-	err = refreshAllCalculateBp(to, edbInfoId, source, subSource, fromEdbInfo, edbCode, startDate, endDate, 1)
+	err = refreshAllCalculateBp(to, edbInfoId, source, subSource, fromEdbInfo, edbCode, startDate, endDate, 0, emptyType)
 
 	return
 }
 
 // refreshAllCalculateBp 刷新升频数据
-func refreshAllCalculateBp(to orm.TxOrmer, edbInfoId, source, subSource int, fromEdbInfo *EdbInfo, edbCode, startDate, endDate string, order int) (err error) {
+func refreshAllCalculateBp(to orm.TxOrmer, edbInfoId, source, subSource int, fromEdbInfo *EdbInfo, edbCode, startDate, endDate string, order int, emptyType int) (err error) {
 	edbInfoIdStr := strconv.Itoa(edbInfoId)
 	//计算数据
 
@@ -349,125 +352,150 @@ func refreshAllCalculateBp(to orm.TxOrmer, edbInfoId, source, subSource int, fro
 	if err != nil {
 		return err
 	}
-
-	// 来源指标没有数据,那么需要删除所有的计算指标数据
-	if len(dataList) <= 0 {
-		// todo 删除所有的计算指标数据
-		return
-	}
-	// 来源指标的第一个日期
-	fromFirstDate, err := time.ParseInLocation(utils.FormatDate, dataList[0].DataTime, time.Local)
-	if err != nil {
-		return
-	}
-	fromFirstDate = time.Date(fromFirstDate.Year(), fromFirstDate.Month(), fromFirstDate.Day(), 0, 0, 0, 0, time.Local)
-
-	// 变频计算
-	newDataList, err := EdbInfoSearchDataToData(dataList)
-	if err != nil {
-		return
-	}
-
-	baseCalculate := BaseCalculate{
-		DataList:      newDataList,
-		Frequency:     "",
-		Formula:       nil,
-		Calendar:      "",
-		MoveType:      0,
-		MoveFrequency: "",
-		FromFrequency: "",
-		Source:        source,
-	}
-	dateDataMap, err, _ := baseCalculate.UpFrequency()
-	if err != nil {
-		return
+	var dateArr []string
+	dataMap := make(map[string]*EdbInfoSearchData)
+	fromDataMap := make(map[string]float64)
+	//来源指指标数据
+	for _, v := range dataList {
+		dateArr = append(dateArr, v.DataTime)
+		dataMap[v.DataTime] = v
+		fromDataMap[v.DataTime] = v.Value
 	}
+	fmt.Println("source:", source)
 
-	// 获取升频指所有已经存在的计算指标数据
+	//获取升频指标所有数据
 	existDataList, err := GetAllEdbDataListByTo(to, edbInfoId, source, subSource)
 	if err != nil {
 		return
 	}
 	//计算指标的map
 	existDataMap := make(map[string]*EdbData, 0)
-	for _, v := range existDataList {
-		existDataMap[v.DataTime] = v
-	}
-
+	removeDateMap := make(map[string]struct{})
 	addSql := ` INSERT INTO edb_data_calculate_bp(edb_info_id,edb_code,data_time,value,create_time,modify_time,data_timestamp) values `
 	var isAdd bool
-
-	now := time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day(), 0, 0, 0, 0, time.Local)
-
-	for currDate := fromFirstDate; !currDate.After(now); currDate = currDate.AddDate(0, 0, 1) {
-		currDateStr := currDate.Format(utils.FormatDate)
-		timestamp := currDate.UnixNano() / 1e6
-		timestampStr := fmt.Sprintf("%d", timestamp)
-
-		// 当前计算的值
-		currValue, ok := dateDataMap[currDate]
-		if !ok {
-			// 没有计算成功就过滤
-			continue
+	//待删除的日期
+	removeDateList := make([]string, 0)
+	if len(existDataList) > 0 && len(dateArr) == 0 {
+		//如果没有来源指标数据,那么已经入库的计算指标数据需要全部删除
+		tableName := GetEdbDataTableName(source, subSource)
+		sql := fmt.Sprintf(` DELETE FROM %s WHERE edb_info_id = ?`, tableName)
+		_, err = to.Raw(sql, edbInfoId).Exec()
+		if err != nil {
+			err = fmt.Errorf("删除所有的升频指标数据失败,Err:" + err.Error())
+			return
 		}
-		lastValueStr := decimal.NewFromFloat(currValue).Round(4).String()
+		return
+	}
 
-		// 已经入库的值
-		existData, ok := existDataMap[currDateStr]
-		if !ok {
-			// 没有入库那么就插入添加
-			isAdd = true
-			addSql += GetAddSql(edbInfoIdStr, edbCode, currDateStr, timestampStr, lastValueStr)
-			continue
-		}
+	existMap := make(map[string]string)
 
-		// 将已经入库的值转换为decimal类型,然后再保留4位小数,目的是为了做匹配,要不然取出来的数据与计算的数据不一致
-		existDataValueDec, tmpErr := decimal.NewFromString(existData.Value)
-		if tmpErr != nil {
-			err = tmpErr
-			return
+	dataLen := len(dataList)
+	//第三步: 已经入库的数据处理
+	for _, v := range existDataList {
+		existDataMap[v.DataTime] = v
+		removeDateMap[v.DataTime] = struct{}{}
+	}
+	for i := 0; i < dataLen; i++ {
+		//当期
+		currentItem := dataList[i]
+		var prevItem *EdbInfoSearchData
+		if emptyType == 3 { //2前值填充,3后值填充
+			if i >= 1 {
+				prevItem = dataList[i-1]
+			}
 		}
-		existDataValueStr := existDataValueDec.Round(4).String()
-
-		// 如果该日期已经入库了,且两个值不匹配,那么就更新
-		if lastValueStr != existDataValueStr {
-			err = ModifyEdbDataById(source, subSource, existData.EdbDataId, lastValueStr)
-			if err != nil {
-				return err
+		currentDate, _ := time.ParseInLocation(utils.FormatDate, currentItem.DataTime, time.Local)
+		var day int
+		var preItem *EdbInfoSearchData
+		var preDate time.Time
+		if i == 0 {
+			if emptyType == 3 { //后值填充
+				day = 0 //最新的时间就是来源指标的最新日期
+				preDate = currentDate
+			} else {
+				day = int(time.Now().Sub(currentDate).Hours() / float64(24))
+				preDate = time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day(), 0, 0, 0, 0, time.Local)
+			}
+		} else {
+			j := i - 1
+			if j < dataLen {
+				preItem = dataList[j]
+				preDate, _ = time.ParseInLocation(utils.FormatDate, preItem.DataTime, time.Local)
+				day = int(preDate.Sub(currentDate).Hours() / float64(24))
+				utils.FileLog.Info("preItem.DataTime:" + preItem.DataTime + ";currentItem.DataTime" + currentItem.DataTime)
 			}
 		}
+		for k := 0; k <= day; k++ {
+			needDay := preDate.AddDate(0, 0, -k)
+			needDayStr := needDay.Format(utils.FormatDate)
+			delete(removeDateMap, needDayStr)
+			existKey := edbCode + needDayStr
+			if _, ok := existMap[existKey]; !ok {
+				timestamp := needDay.UnixNano() / 1e6
+				timestampStr := fmt.Sprintf("%d", timestamp)
+				valStr := decimal.NewFromFloat(currentItem.Value).String()
+				if prevItem != nil && needDayStr != currentItem.DataTime {
+					valStr = decimal.NewFromFloat(prevItem.Value).String()
+				}
+				tmpExistData, ok2 := existDataMap[needDayStr]
+				if !ok2 {
+					addSql += GetAddSql(edbInfoIdStr, edbCode, needDayStr, timestampStr, valStr)
+					isAdd = true
+				} else {
+					//如果对应的值不匹配
+					if tmpExistData.Value != valStr {
+						err = ModifyEdbDataById(source, subSource, tmpExistData.EdbDataId, valStr)
+						if err != nil {
+							return err
+						}
+					}
+				}
 
-		// 该日期已经处理过了,所以需要移除,如果后面该map还有数据,那么需要删除该map里面的日期数据
-		delete(existDataMap, currDateStr)
+			}
+			existMap[existKey] = needDayStr
+		}
+		existKey := edbCode + currentItem.DataTime
+		if _, ok := existMap[existKey]; !ok {
+			currentDate, _ := time.ParseInLocation(utils.FormatDate, currentItem.DataTime, time.Local)
+			timestamp := currentDate.UnixNano() / 1e6
+			timestampStr := fmt.Sprintf("%d", timestamp)
+			valStr := decimal.NewFromFloat(currentItem.Value).String()
+			tmpExistData, ok2 := existDataMap[currentItem.DataTime]
+			if !ok2 {
+				addSql += GetAddSql(edbInfoIdStr, edbCode, currentItem.DataTime, timestampStr, valStr)
+				isAdd = true
+			} else {
+				//如果对应的值不匹配
+				if tmpExistData.Value != valStr {
+					err = ModifyEdbDataById(source, subSource, tmpExistData.EdbDataId, valStr)
+					if err != nil {
+						return err
+					}
+				}
+			}
 
+		}
+		existMap[existKey] = currentItem.DataTime
 	}
 
+	for k, _ := range removeDateMap {
+		removeDateList = append(removeDateList, k)
+	}
 	// 删除不需要的指标数据
-	if len(existDataMap) > 0 {
-		//待删除的日期
-		removeDateList := make([]string, 0)
-		for date := range existDataMap {
-			removeDateList = append(removeDateList, date)
-		}
-
-		removeDateStr := strings.Join(removeDateList, `","`)
-		removeDateStr = `"` + removeDateStr + `"`
+	if len(removeDateList) > 0 {
 		//如果拼接指标变更了,那么需要删除所有的指标数据
 		tableName := GetEdbDataTableName(source, subSource)
-		sql := fmt.Sprintf(` DELETE FROM %s WHERE edb_info_id = ? and data_time in (%s) `, tableName, removeDateStr)
-
-		_, err = to.Raw(sql, edbInfoId).Exec()
+		sql := fmt.Sprintf(` DELETE FROM %s WHERE edb_info_id = ? and data_time in (`+utils.GetOrmInReplace(len(removeDateList))+`) `, tableName)
+		_, err = to.Raw(sql, edbInfoId, removeDateList).Exec()
 		if err != nil {
 			err = fmt.Errorf("删除不存在的升频指标数据失败,Err:" + err.Error())
 			return
 		}
 	}
 
-	// 新增的数据值
 	if isAdd {
 		addSql = strings.TrimRight(addSql, ",")
 		_, err = to.Raw(addSql).Exec()
 	}
-
 	return
 }