浏览代码

Merge branch 'debug' of http://8.136.199.33:3000/eta_server/eta_index_lib into debug

hongze 8 月之前
父节点
当前提交
646571e22c

+ 2 - 1
.gitignore

@@ -13,4 +13,5 @@ eta_index_lib
 /etalogs
 *.gz
 *.exe
-/.vscode/
+test/
+/.vscode

+ 25 - 2
controllers/base_from_bloomberg.go

@@ -184,7 +184,7 @@ func (this *BloombergController) PCSGImportHistoryData() {
 	indexes = append(indexes, index)
 
 	// 写入数据
-	if e := services.PCSGWrite2BaseBloomberg(indexes, req.IsVCode); e != nil {
+	if e := services.PCSGWrite2BaseBloomberg(indexes, req.IsVCode, req.ExtraLetter, ""); e != nil {
 		br.Msg = "刷新失败"
 		br.ErrMsg = "PCSG-写入Bloomberg数据源失败, Err: " + e.Error()
 		return
@@ -246,8 +246,31 @@ func (this *BloombergController) PCSGRefreshTask() {
 			return
 		}
 
+		// 此处重新整理一下, 将同一指标的数据合并, 否则会使刷新时间变得很长
+		var newIndexes []models.BaseFromBloombergApiIndexAndData
+		codeMap := make(map[string]models.BaseFromBloombergApiIndexAndData)
+		indexCodeData := make(map[string][]models.BaseFromBloombergApiIndexData)
+		for _, iv := range indexes {
+			if codeMap[iv.IndexCode].IndexCode == "" {
+				var indexItem models.BaseFromBloombergApiIndexAndData
+				indexItem.IndexCode = iv.IndexCode
+				indexItem.IndexName = iv.IndexName
+				indexItem.Unit = iv.Unit
+				indexItem.Source = iv.Source
+				indexItem.Frequency = iv.Frequency
+				indexItem.CreateTime = iv.CreateTime
+				indexItem.ModifyTime = iv.ModifyTime
+				codeMap[iv.IndexCode] = indexItem
+			}
+			indexCodeData[iv.IndexCode] = append(indexCodeData[iv.IndexCode], iv.Data...)
+		}
+		for _, mv := range codeMap {
+			mv.Data = indexCodeData[mv.IndexCode]
+			newIndexes = append(newIndexes, mv)
+		}
+
 		// 写入数据
-		if e = services.PCSGWrite2BaseBloomberg(indexes, v.VCode); e != nil {
+		if e = services.PCSGWrite2BaseBloomberg(newIndexes, v.VCode, v.ExtraLetter, v.IndexNamePrefix); e != nil {
 			br.Msg = "刷新失败"
 			br.ErrMsg = "PCSG-写入Bloomberg数据源失败, Err: " + e.Error()
 			return

+ 1 - 1
controllers/base_from_calculate.go

@@ -1509,7 +1509,7 @@ func (this *CalculateController) Refresh() {
 		}
 		//startDate = edbInfo.StartDate
 		endDate = time.Now().Format(utils.FormatDate)
-		err = models.RefreshAllCalculateBp(edbInfoId, source, subSource, fromEdbInfo, calculateTbz.EdbCode, startDate, endDate)
+		err = models.RefreshAllCalculateBp(edbInfoId, source, subSource, fromEdbInfo, calculateTbz.EdbCode, startDate, endDate, edbInfo.EmptyType)
 		if err != nil && err.Error() != utils.ErrNoRow() {
 			errMsg = "RefreshAllCalculateBp Err:" + err.Error()
 			break

+ 1 - 1
controllers/base_from_predict_calculate.go

@@ -1103,7 +1103,7 @@ func (this *PredictCalculateController) Refresh() {
 			break
 		}
 		endDate = time.Now().Format(utils.FormatDate)
-		latestDateStr, latestValue, err = models.RefreshAllPredictCalculateBp(edbInfoId, source, edbInfo.SubSource, fromEdbInfo, calculateTbz.EdbCode, startDate, endDate)
+		latestDateStr, latestValue, err = models.RefreshAllPredictCalculateBp(edbInfoId, source, edbInfo.SubSource, fromEdbInfo, calculateTbz.EdbCode, startDate, endDate, edbInfo.EmptyType)
 		if err != nil && err.Error() != utils.ErrNoRow() {
 			errMsg = "RefreshAllPredictCalculateBp Err:" + err.Error()
 			break

+ 83 - 31
controllers/base_from_ths_hf.go

@@ -4,6 +4,7 @@ import (
 	"encoding/json"
 	"eta/eta_index_lib/logic"
 	"eta/eta_index_lib/models"
+	"eta/eta_index_lib/models/mgo"
 	"eta/eta_index_lib/services"
 	"eta/eta_index_lib/utils"
 	"fmt"
@@ -280,10 +281,6 @@ func (this *ThsHfController) BaseAdd() {
 	indexItem.EndDate = endTime
 	indexItem.SysUserId = params.SysAdminId
 	indexItem.SysUserRealName = params.SysAdminName
-	if params.EndTime == "" {
-		indexItem.EndDate = time.Now().Local()
-	}
-	// TODO:指定终端号
 	terminal, e := services.GetFirstTerminal(utils.DATA_SOURCE_THS, "")
 	if e != nil {
 		br.Msg = "终端未配置"
@@ -301,32 +298,81 @@ func (this *ThsHfController) BaseAdd() {
 	}
 	indexItem.ApiPars = string(b)
 	if len(indexWithData.IndexData) > 0 {
-		indexItem.LatestValue = indexWithData.IndexData[0].Value
+		indexItem.StartDate = indexWithData.IndexData[0].DataTime
+		indexItem.EndDate = indexWithData.IndexData[len(indexWithData.IndexData)-1].DataTime
+		lastVal, e := utils.FormatFloatPlaces(indexWithData.IndexData[0].Value, 4)
+		if e != nil {
+			br.Msg = "操作失败"
+			br.ErrMsg = fmt.Sprintf("格式化最新值失败, val: %v, err: %v", indexWithData.IndexData[0].Value, e)
+			return
+		}
+		indexItem.LatestValue = lastVal
 	}
 	indexItem.CreateTime = time.Now().Local()
 	indexItem.ModifyTime = time.Now().Local()
 
-	// 新增至数据源和数据
-	itemData := make([]*models.BaseFromThsHfData, 0)
-	for _, v := range indexWithData.IndexData {
-		t := new(models.BaseFromThsHfData)
-		t.IndexCode = indexItem.IndexCode
-		t.DataTime = v.DataTime
-		t.Value = v.Value
-		t.UniqueCode = utils.MD5(fmt.Sprint(indexItem.IndexCode, v.DataTime.Format("2006-01-02 15:04")))
-		t.CreateTime = time.Now().Local()
-		t.ModifyTime = time.Now().Local()
-		t.DataTimestamp = v.DataTime.UnixNano() / 1e6
-		itemData = append(itemData, t)
-	}
-
-	// 新增指标和数据
-	if e = indexOb.CreateIndexAndData(indexItem, itemData); e != nil {
+	// 新增指标
+	if e := indexItem.Create(); e != nil {
 		br.Msg = "操作失败"
-		br.ErrMsg = fmt.Sprintf("新增指标和数据失败, %v", e)
+		br.ErrMsg = fmt.Sprintf("新增指标失败, %v", e)
 		return
 	}
 
+	// 新增数据
+	if utils.UseMongo {
+		dataList := make([]interface{}, 0)
+		for _, v := range indexWithData.IndexData {
+			newVal, e := utils.FormatFloatPlaces(v.Value, 4)
+			if e != nil {
+				utils.FileLog.Info(fmt.Sprintf("FormatFloatPlaces err: %v", e))
+				continue
+			}
+
+			dataList = append(dataList, &mgo.BaseFromThsHfData{
+				BaseFromThsHfIndexId: int64(indexItem.BaseFromThsHfIndexId),
+				IndexCode:            indexItem.IndexCode,
+				DataTime:             v.DataTime,
+				Value:                newVal,
+				UniqueCode:           utils.MD5(fmt.Sprint(indexItem.IndexCode, v.DataTime.Format(utils.FormatDateTimeMinute))),
+				CreateTime:           time.Now().Local(),
+				ModifyTime:           time.Now().Local(),
+				DataTimestamp:        v.DataTime.UnixNano() / 1e6,
+			})
+		}
+		dataOb := new(mgo.BaseFromThsHfData)
+		if e = dataOb.BatchInsertData(500, dataList); e != nil {
+			br.Msg = "操作失败"
+			br.ErrMsg = fmt.Sprintf("批量新增数据失败-Mongo, %v", e)
+			return
+		}
+	} else {
+		dataOb := new(models.BaseFromThsHfData)
+		itemData := make([]*models.BaseFromThsHfData, 0)
+		for _, v := range indexWithData.IndexData {
+			newVal, e := utils.FormatFloatPlaces(v.Value, 4)
+			if e != nil {
+				utils.FileLog.Info(fmt.Sprintf("FormatFloatPlaces err: %v", e))
+				continue
+			}
+
+			t := new(models.BaseFromThsHfData)
+			t.BaseFromThsHfIndexId = indexItem.BaseFromThsHfIndexId
+			t.IndexCode = indexItem.IndexCode
+			t.DataTime = v.DataTime
+			t.Value = newVal
+			t.UniqueCode = utils.MD5(fmt.Sprint(indexItem.IndexCode, v.DataTime.Format(utils.FormatDateTimeMinute)))
+			t.CreateTime = time.Now().Local()
+			t.ModifyTime = time.Now().Local()
+			t.DataTimestamp = v.DataTime.UnixNano() / 1e6
+			itemData = append(itemData, t)
+		}
+		if e = dataOb.CreateMulti(itemData); e != nil {
+			br.Msg = "操作失败"
+			br.ErrMsg = fmt.Sprintf("批量新增数据失败-MySQL, %v", e)
+			return
+		}
+	}
+
 	br.Ret = 200
 	br.Success = true
 	br.Msg = "操作成功"
@@ -422,7 +468,7 @@ func (this *ThsHfController) BaseRefresh() {
 		}
 	}
 
-	// 获取指标数据
+	// API-获取指标数据
 	indexes, e := services.GetEdbDataFromThsHf(apiPars, indexItem.TerminalCode)
 	if e != nil {
 		br.Msg = "操作失败"
@@ -437,10 +483,18 @@ func (this *ThsHfController) BaseRefresh() {
 	indexWithData := indexes[0]
 
 	// 写入指标数据
-	if e = services.WriteRefreshBaseThsHfIndex(indexItem, indexWithData, apiPars.StartTime); e != nil {
-		br.Msg = "操作失败"
-		br.ErrMsg = fmt.Sprintf("写入源指标数据失败, %v", e)
-		return
+	if utils.UseMongo {
+		if e = services.RefreshThsHfBaseIndexMgo(indexItem, indexWithData, apiPars.StartTime); e != nil {
+			br.Msg = "操作失败"
+			br.ErrMsg = fmt.Sprintf("写入源指标数据失败-Mongo, %v", e)
+			return
+		}
+	} else {
+		if e = services.RefreshThsHfBaseIndex(indexItem, indexWithData, apiPars.StartTime); e != nil {
+			br.Msg = "操作失败"
+			br.ErrMsg = fmt.Sprintf("写入源指标数据失败, %v", e)
+			return
+		}
 	}
 
 	br.Ret = 200
@@ -657,8 +711,7 @@ func (this *ThsHfController) EdbAdd() {
 	}
 
 	// 更新指标最值
-	e, _ = models.UnifiedModifyEdbInfoMaxAndMinInfo(edbInfo)
-	if e != nil {
+	if e = thsOb.UnifiedModifyEdbInfoMaxAndMinInfo(edbInfo); e != nil {
 		br.Msg = "刷新指标失败"
 		br.ErrMsg = fmt.Sprintf("更新指标最值失败, %v", e)
 		return
@@ -750,8 +803,7 @@ func (this *ThsHfController) EdbRefresh() {
 	}
 
 	// 更新指标最值
-	e, _ = models.UnifiedModifyEdbInfoMaxAndMinInfo(edbInfo)
-	if e != nil {
+	if e = thsOb.UnifiedModifyEdbInfoMaxAndMinInfo(edbInfo); e != nil {
 		br.Msg = "刷新指标失败"
 		br.ErrMsg = fmt.Sprintf("更新指标最值失败, %v", e)
 		return

+ 53 - 5
controllers/shanghai_smm.go

@@ -216,6 +216,9 @@ func (this *ShanghaiSmmController) RefreshExcel() {
 	}
 	updateIndexList := make([]*models.BaseFromSmmIndex, 0)
 	addDataList := make([]*models.BaseFromSmmData, 0)
+	updateDataList := make([]*models.BaseFromSmmData, 0)
+
+	queryIndexIds := make([]int, 0)
 	for _, v := range req {
 		indexCode := strings.Replace(v.IndexCode, " ", "", -1)
 		indexInfo := ShanghaiSmmCodeToIndexMap[indexCode]
@@ -228,6 +231,29 @@ func (this *ShanghaiSmmController) RefreshExcel() {
 			fmt.Printf("指标名称或指标id有误v.IndexName:%s, v.IndexCode:%s", v.IndexName, v.IndexCode)
 			return
 		}
+		queryIndexIds = append(queryIndexIds, int(indexInfo.BaseFromSmmIndexId))
+	}
+	list, err := models.GetBaseFromSmmDataByIds(queryIndexIds)
+	if err != nil {
+		br.Msg = "查询指标数据失败"
+		br.ErrMsg = "查询指标数据失败,Err:" + err.Error()
+		return
+	}
+	dateDataMap := make(map[int]map[string]string)
+	for _, v := range list {
+		dataMap, ok := dateDataMap[v.BaseFromSmmIndexId]
+		if ok {
+			dataMap[v.DataTime] = v.Value
+		} else {
+			dateDataMap[v.BaseFromSmmIndexId] = map[string]string{
+				v.DataTime: v.Value,
+			}
+		}
+	}
+	for _, v := range req {
+		indexCode := strings.Replace(v.IndexCode, " ", "", -1)
+		indexInfo := ShanghaiSmmCodeToIndexMap[indexCode]
+
 		isAdd := false
 		indexItem := new(models.BaseFromSmmIndex)
 		indexItem.BaseFromSmmIndexId = indexInfo.BaseFromSmmIndexId
@@ -245,21 +271,33 @@ func (this *ShanghaiSmmController) RefreshExcel() {
 		} else {
 			indexItem.StartDate = indexInfo.StartDate
 		}
-		// 如果指标index有更新,那么需要添加指标数据
 		if isAdd {
 			indexItem.IndexName = v.IndexName
 			indexItem.ModifyTime = time.Now()
 			updateIndexList = append(updateIndexList, indexItem)
-			for k, dv := range v.Data {
+		}
+		for k, dv := range v.Data {
+			valueStr := strconv.FormatFloat(dv, 'f', -1, 64)
+			valueRaw := dateDataMap[int(indexInfo.BaseFromSmmIndexId)][k]
+			if valueRaw == "" {
 				dataItem := new(models.BaseFromSmmData)
 				dataItem.BaseFromSmmIndexId = int(indexInfo.BaseFromSmmIndexId)
 				dataItem.IndexCode = indexInfo.IndexCode
 				dataItem.DataTime = k
-				dataItem.Value = strconv.FormatFloat(dv, 'f', -1, 64)
+				dataItem.Value = valueStr
 				dataItem.CreateTime = time.Now()
 				dataItem.ModifyTime = time.Now()
 				dataItem.DataTimestamp = time.Now().UnixMilli()
 				addDataList = append(addDataList, dataItem)
+			} else if valueStr != "" && valueRaw != valueStr {
+				dataItem := new(models.BaseFromSmmData)
+				dataItem.BaseFromSmmIndexId = int(indexInfo.BaseFromSmmIndexId)
+				dataItem.IndexCode = indexInfo.IndexCode
+				dataItem.DataTime = k
+				dataItem.Value = valueStr
+				dataItem.ModifyTime = time.Now()
+				dataItem.DataTimestamp = time.Now().UnixMilli()
+				updateDataList = append(updateDataList, dataItem)
 			}
 		}
 	}
@@ -275,12 +313,22 @@ func (this *ShanghaiSmmController) RefreshExcel() {
 		for _, v := range updateIndexList {
 			err = v.UpdateCols([]string{"end_date", "start_date", "modify_time"})
 			if err != nil {
-				br.Msg = "更新指标数据失败"
-				br.ErrMsg = "更新指标失败,Err:" + err.Error()
+				br.Msg = "更新指标索引失败"
+				br.ErrMsg = "更新指标索引失败,Err:" + err.Error()
 				return
 			}
 		}
 	}
+	// 不修改
+	fmt.Println(len(updateDataList))
+	// if len(updateDataList) > 0 {
+	// 	err = models.UpdateBaseFromSmmDataListByIndexCode(updateDataList)
+	// 	if err != nil {
+	// 		br.Msg = "更新指标数据失败"
+	// 		br.ErrMsg = "更新指标失败,Err:" + err.Error()
+	// 		return
+	// 	}
+	// }
 	br.Ret = 200
 	br.Success = true
 	br.Msg = "数据刷新成功"

+ 2 - 2
models/base_calculate.go

@@ -670,7 +670,7 @@ func (obj BaseCalculate) Hcz() (dateDataMap map[time.Time]float64, err error, er
 	}
 
 	dataLen := len(dataList)
-	fmt.Println("dataLen:", dataLen)
+	//fmt.Println("dataLen:", dataLen)
 	for i := 0; i < dataLen; i++ {
 		j := i + formulaInt
 		if j < dataLen {
@@ -765,7 +765,7 @@ func (obj BaseCalculate) UpFrequency() (dateDataMap map[time.Time]float64, err e
 		preDate = preItem.DataTime
 		day = int(currentDate.Sub(preItem.DataTime).Hours() / float64(24))
 		//utils.FileLog.Info("preItem.DataTime:" + preItem.DataTime.Format(utils.FormatDate) + ";currentItem.DataTime" + currentItem.DataTime.Format(utils.FormatDate))
-		fmt.Println("preItem.DataTime:" + preItem.DataTime.Format(utils.FormatDate) + ";currentItem.DataTime:" + currentItem.DataTime.Format(utils.FormatDate))
+		//fmt.Println("preItem.DataTime:" + preItem.DataTime.Format(utils.FormatDate) + ";currentItem.DataTime:" + currentItem.DataTime.Format(utils.FormatDate))
 
 		for k := 1; k < day; k++ {
 			needDay := preDate.AddDate(0, 0, k)

+ 5 - 3
models/base_from_bloomberg.go

@@ -354,7 +354,9 @@ type BaseFromBloombergApiIndexData struct {
 
 // PCSGImportHistoryDataReq 导入历史数据
 type PCSGImportHistoryDataReq struct {
-	IndexCode string                `description:"指标编码"`
-	DataMap   map[time.Time]float64 `description:"数据日期/值"`
-	IsVCode   bool                  `description:"是否指标编码中间加V"`
+	IndexCode       string                `description:"指标编码"`
+	DataMap         map[time.Time]float64 `description:"数据日期/值"`
+	IsVCode         bool                  `description:"是否指标编码中间加V"`
+	ExtraLetter     string                `description:"指标编码中间额外加的字母...比如V"`
+	IndexNamePrefix string                `description:"指标名称前缀"`
 }

+ 27 - 0
models/base_from_smm.go

@@ -37,6 +37,33 @@ func GetBaseFromSmmDataByCondition(condition string, pars []interface{}) (list [
 	return
 }
 
+func UpdateBaseFromSmmDataListByIndexCode(items []*BaseFromSmmData) (err error) {
+	o := orm.NewOrm()
+	sql := `UPDATE base_from_smm_data SET value=? WHERE index_code=? AND data_time=? `
+	stmt, err := o.Raw(sql).Prepare()
+	if err != nil {
+		return
+	}
+	defer stmt.Close()
+	for _, item := range items {
+		_, err = stmt.Exec(item.Value, item.IndexCode, item.DataTime)
+		if err != nil {
+			return
+		}
+	}
+	return
+}
+
+func GetBaseFromSmmDataByIds(smmDataIds []int) (list []*BaseFromSmmData, err error) {
+	if len(smmDataIds) == 0 {
+		return
+	}
+	o := orm.NewOrm()
+	sql := `SELECT * FROM base_from_smm_data WHERE 1=1 AND base_from_smm_index_id in (` + utils.GetOrmInReplace(len(smmDataIds)) + `)`
+	_, err = o.Raw(sql, smmDataIds).QueryRows(&list)
+	return
+}
+
 // 新增有色指标数据
 func AddEdbDataFromSmm(edbCode string, smmBaseDataAll []*BaseFromSmmData) (err error) {
 	o := orm.NewOrm()

+ 124 - 35
models/base_from_ths_hf.go

@@ -1,9 +1,11 @@
 package models
 
 import (
+	"eta/eta_index_lib/models/mgo"
 	"eta/eta_index_lib/utils"
 	"fmt"
 	"github.com/beego/beego/v2/client/orm"
+	"go.mongodb.org/mongo-driver/bson"
 	"strings"
 	"time"
 )
@@ -305,41 +307,41 @@ type ThsHfBaseAddReq struct {
 }
 
 // CreateIndexAndData 新增指标和数据
-func (m *BaseFromThsHfIndex) CreateIndexAndData(indexItem *BaseFromThsHfIndex, indexData []*BaseFromThsHfData) (err error) {
-	o := orm.NewOrm()
-	tx, e := o.Begin()
-	if e != nil {
-		err = fmt.Errorf("tx begin err: %v", e)
-		return
-	}
-	defer func() {
-		if err != nil {
-			_ = tx.Rollback()
-			return
-		}
-		_ = tx.Commit()
-	}()
-
-	lastId, e := tx.Insert(indexItem)
-	if e != nil {
-		err = fmt.Errorf("insert index err: %v", e)
-		return
-	}
-	indexId := int(lastId)
-	indexItem.BaseFromThsHfIndexId = indexId
-
-	if len(indexData) == 0 {
-		return
-	}
-	for _, v := range indexData {
-		v.BaseFromThsHfIndexId = indexId
-	}
-	if _, e = tx.InsertMulti(200, indexData); e != nil {
-		err = fmt.Errorf("insert index data err: %v", e)
-		return
-	}
-	return
-}
+//func (m *BaseFromThsHfIndex) CreateIndexAndData(indexItem *BaseFromThsHfIndex, indexData []*BaseFromThsHfData) (err error) {
+//	o := orm.NewOrm()
+//	tx, e := o.Begin()
+//	if e != nil {
+//		err = fmt.Errorf("tx begin err: %v", e)
+//		return
+//	}
+//	defer func() {
+//		if err != nil {
+//			_ = tx.Rollback()
+//			return
+//		}
+//		_ = tx.Commit()
+//	}()
+//
+//	lastId, e := tx.Insert(indexItem)
+//	if e != nil {
+//		err = fmt.Errorf("insert index err: %v", e)
+//		return
+//	}
+//	indexId := int(lastId)
+//	indexItem.BaseFromThsHfIndexId = indexId
+//
+//	if len(indexData) == 0 {
+//		return
+//	}
+//	for _, v := range indexData {
+//		v.BaseFromThsHfIndexId = indexId
+//	}
+//	if _, e = tx.InsertMulti(200, indexData); e != nil {
+//		err = fmt.Errorf("insert index data err: %v", e)
+//		return
+//	}
+//	return
+//}
 
 // ThsHfBaseRefreshReq 数据源刷新请求
 type ThsHfBaseRefreshReq struct {
@@ -385,3 +387,90 @@ type ThsHfIndexMultiSave2EdbPreItem struct {
 	Tips         string `description:"提示信息"`
 	ErrMsg       string `description:"错误信息"`
 }
+
+// GetEdbInfoMaxAndMinInfo
+// @Description: 获取指标的最新数据记录信息
+// @author: Roc
+// @receiver m
+// @datetime 2024-07-02 14:50:50
+// @param edbCode string
+// @return item *EdbInfoMaxAndMinInfo
+// @return err error
+func (m BaseFromThsHfIndex) GetEdbInfoMaxAndMinInfo(edbCode string) (item *EdbInfoMaxAndMinInfo, err error) {
+	if utils.UseMongo {
+		return m.getEdbInfoMaxAndMinInfoByMongo(edbCode)
+	}
+	return m.getEdbInfoMaxAndMinInfoByMysql(edbCode)
+}
+
+// getEdbInfoMaxAndMinInfoByMongo
+// @Description: 获取指标的最新数据记录信息(从mongo中获取)
+// @author: Roc
+// @receiver m
+// @datetime 2024-07-02 14:41:20
+// @param edbCode string
+// @return item *EdbInfoMaxAndMinInfo
+// @return err error
+func (m BaseFromThsHfIndex) getEdbInfoMaxAndMinInfoByMongo(edbCode string) (item *EdbInfoMaxAndMinInfo, err error) {
+	mogDataObj := new(mgo.BaseFromThsHfData)
+	pipeline := []bson.M{
+		{"$match": bson.M{"index_code": edbCode}},
+		{"$group": bson.M{
+			"_id":       nil,
+			"min_date":  bson.M{"$min": "$data_time"},
+			"max_date":  bson.M{"$max": "$data_time"},
+			"min_value": bson.M{"$min": "$value"},
+			"max_value": bson.M{"$max": "$value"},
+		}},
+		{"$project": bson.M{"_id": 0}}, // 可选,如果不需要_id字段
+	}
+	result, err := mogDataObj.GetEdbInfoMaxAndMinInfo(pipeline)
+	if err != nil {
+		fmt.Println("BaseFromThsHfIndex GetEdbInfoMaxAndMinInfo Err:" + err.Error())
+		return
+	}
+
+	if !result.MaxDate.IsZero() {
+		whereQuery := bson.M{"index_code": edbCode, "data_time": result.MaxDate}
+		selectParam := bson.D{{"value", 1}, {"_id", 0}}
+		latestValue, tmpErr := mogDataObj.GetLatestValue(whereQuery, selectParam)
+		if tmpErr != nil {
+			err = tmpErr
+			return
+		}
+		result.LatestValue = latestValue.Value
+		result.EndValue = latestValue.Value
+	}
+
+	item = &EdbInfoMaxAndMinInfo{
+		MinDate:     result.MinDate.Format(utils.FormatDate),
+		MaxDate:     result.MaxDate.Format(utils.FormatDate),
+		MinValue:    result.MinValue,
+		MaxValue:    result.MaxValue,
+		LatestValue: result.LatestValue,
+		LatestDate:  result.LatestDate.Format(utils.FormatDate),
+		EndValue:    result.EndValue,
+	}
+	return
+}
+
+// getEdbInfoMaxAndMinInfoByMysql
+// @Description: 获取指标的最新数据记录信息(从mysql中获取)
+func (m BaseFromThsHfIndex) getEdbInfoMaxAndMinInfoByMysql(edbCode string) (item *EdbInfoMaxAndMinInfo, err error) {
+	dataObj := BaseFromThsHfData{}
+	result, err := dataObj.GetIndexMinMax(edbCode)
+	if err != nil {
+		return
+	}
+
+	item = &EdbInfoMaxAndMinInfo{
+		MinDate:     result.MinDate,
+		MaxDate:     result.MaxDate,
+		MinValue:    result.MinValue,
+		MaxValue:    result.MaxValue,
+		LatestValue: result.LatestValue,
+		LatestDate:  result.LatestDate,
+		EndValue:    result.EndValue,
+	}
+	return
+}

+ 1 - 1
models/base_from_ths_hf_data.go

@@ -66,7 +66,7 @@ func (m *BaseFromThsHfData) CreateMulti(items []*BaseFromThsHfData) (err error)
 		return
 	}
 	o := orm.NewOrm()
-	_, err = o.InsertMulti(len(items), items)
+	_, err = o.InsertMulti(500, items)
 	return
 }
 

+ 122 - 94
models/edb_data_calculate_bp.go

@@ -46,6 +46,7 @@ func AddCalculateBp(req *EdbInfoCalculateBatchSaveReq, fromEdbInfo *EdbInfo, edb
 		edbInfo.UnitEn = req.Unit
 		edbInfo.EdbType = 2
 		edbInfo.Sort = GetAddEdbMaxSortByClassifyId(req.ClassifyId, utils.EDB_INFO_TYPE)
+		edbInfo.EmptyType = req.EmptyType
 		newEdbInfoId, tmpErr := to.Insert(edbInfo)
 		if tmpErr != nil {
 			err = tmpErr
@@ -90,7 +91,7 @@ func AddCalculateBp(req *EdbInfoCalculateBatchSaveReq, fromEdbInfo *EdbInfo, edb
 	}
 
 	//计算数据
-	err = refreshAllCalculateBp(to, edbInfo.EdbInfoId, edbInfo.Source, edbInfo.SubSource, fromEdbInfo, edbInfo.EdbCode, "", "", 0)
+	err = refreshAllCalculateBp(to, edbInfo.EdbInfoId, edbInfo.Source, edbInfo.SubSource, fromEdbInfo, edbInfo.EdbCode, "", "", 0, edbInfo.EmptyType)
 
 	return
 }
@@ -111,6 +112,7 @@ func EditCalculateBp(edbInfo *EdbInfo, req *EdbInfoCalculateBatchEditReq, fromEd
 		}
 	}()
 
+	oldEmptyType := edbInfo.EmptyType
 	//修改指标信息
 	edbInfo.EdbName = req.EdbName
 	edbInfo.EdbNameSource = req.EdbName
@@ -120,7 +122,8 @@ func EditCalculateBp(edbInfo *EdbInfo, req *EdbInfoCalculateBatchEditReq, fromEd
 	edbInfo.EdbNameEn = req.EdbNameEn
 	edbInfo.UnitEn = req.UnitEn
 	edbInfo.ModifyTime = time.Now()
-	_, err = to.Update(edbInfo, "EdbName", "EdbNameSource", "Frequency", "Unit", "ClassifyId", "ModifyTime", "EdbNameEn", "UnitEn")
+	edbInfo.EmptyType = req.EmptyType
+	_, err = to.Update(edbInfo, "EdbName", "EdbNameSource", "Frequency", "Unit", "ClassifyId", "ModifyTime", "EdbNameEn", "UnitEn", "EmptyType")
 	if err != nil {
 		return
 	}
@@ -136,7 +139,7 @@ func EditCalculateBp(edbInfo *EdbInfo, req *EdbInfoCalculateBatchEditReq, fromEd
 		err = errors.New("判断指标是否改变失败,Err:" + err.Error())
 		return
 	}
-	if count > 0 { // 指标未被替换,无需重新计算
+	if count > 0 && oldEmptyType == req.EmptyType { // 指标未被替换,无需重新计算
 		return
 	}
 
@@ -179,7 +182,7 @@ func EditCalculateBp(edbInfo *EdbInfo, req *EdbInfoCalculateBatchEditReq, fromEd
 	}
 
 	//计算数据
-	err = refreshAllCalculateBp(to, edbInfo.EdbInfoId, edbInfo.Source, edbInfo.SubSource, fromEdbInfo, edbInfo.EdbCode, "", "", 0)
+	err = refreshAllCalculateBp(to, edbInfo.EdbInfoId, edbInfo.Source, edbInfo.SubSource, fromEdbInfo, edbInfo.EdbCode, "", "", 0, edbInfo.EmptyType)
 
 	return
 }
@@ -316,7 +319,7 @@ func RefreshAllCalculateBpBak(edbInfoId, source, subSource int, fromEdbInfo *Edb
 	return
 }
 
-func RefreshAllCalculateBp(edbInfoId, source, subSource int, fromEdbInfo *EdbInfo, edbCode, startDate, endDate string) (err error) {
+func RefreshAllCalculateBp(edbInfoId, source, subSource int, fromEdbInfo *EdbInfo, edbCode, startDate, endDate string, emptyType int) (err error) {
 	o := orm.NewOrm()
 	to, err := o.Begin()
 	if err != nil {
@@ -332,13 +335,13 @@ func RefreshAllCalculateBp(edbInfoId, source, subSource int, fromEdbInfo *EdbInf
 	}()
 
 	// 计算数据
-	err = refreshAllCalculateBp(to, edbInfoId, source, subSource, fromEdbInfo, edbCode, startDate, endDate, 1)
+	err = refreshAllCalculateBp(to, edbInfoId, source, subSource, fromEdbInfo, edbCode, startDate, endDate, 0, emptyType)
 
 	return
 }
 
 // refreshAllCalculateBp 刷新升频数据
-func refreshAllCalculateBp(to orm.TxOrmer, edbInfoId, source, subSource int, fromEdbInfo *EdbInfo, edbCode, startDate, endDate string, order int) (err error) {
+func refreshAllCalculateBp(to orm.TxOrmer, edbInfoId, source, subSource int, fromEdbInfo *EdbInfo, edbCode, startDate, endDate string, order int, emptyType int) (err error) {
 	edbInfoIdStr := strconv.Itoa(edbInfoId)
 	//计算数据
 
@@ -347,127 +350,152 @@ func refreshAllCalculateBp(to orm.TxOrmer, edbInfoId, source, subSource int, fro
 		EdbInfoId: fromEdbInfo.EdbInfoId,
 	}, order)
 	if err != nil {
-		return err
-	}
-
-	// 来源指标没有数据,那么需要删除所有的计算指标数据
-	if len(dataList) <= 0 {
-		// todo 删除所有的计算指标数据
 		return
 	}
-	// 来源指标的第一个日期
-	fromFirstDate, err := time.ParseInLocation(utils.FormatDate, dataList[0].DataTime, time.Local)
-	if err != nil {
-		return
-	}
-	fromFirstDate = time.Date(fromFirstDate.Year(), fromFirstDate.Month(), fromFirstDate.Day(), 0, 0, 0, 0, time.Local)
-
-	// 变频计算
-	newDataList, err := EdbInfoSearchDataToData(dataList)
-	if err != nil {
-		return
-	}
-
-	baseCalculate := BaseCalculate{
-		DataList:      newDataList,
-		Frequency:     "",
-		Formula:       nil,
-		Calendar:      "",
-		MoveType:      0,
-		MoveFrequency: "",
-		FromFrequency: "",
-		Source:        source,
-	}
-	dateDataMap, err, _ := baseCalculate.UpFrequency()
-	if err != nil {
-		return
+	var dateArr []string
+	dataMap := make(map[string]*EdbInfoSearchData)
+	fromDataMap := make(map[string]float64)
+	//来源指指标数据
+	for _, v := range dataList {
+		dateArr = append(dateArr, v.DataTime)
+		dataMap[v.DataTime] = v
+		fromDataMap[v.DataTime] = v.Value
 	}
+	fmt.Println("source:", source)
 
-	// 获取升频指所有已经存在的计算指标数据
+	//获取升频指标所有数据
 	existDataList, err := GetAllEdbDataListByTo(to, edbInfoId, source, subSource)
 	if err != nil {
 		return
 	}
 	//计算指标的map
 	existDataMap := make(map[string]*EdbData, 0)
-	for _, v := range existDataList {
-		existDataMap[v.DataTime] = v
-	}
-
+	removeDateMap := make(map[string]struct{})
 	addSql := ` INSERT INTO edb_data_calculate_bp(edb_info_id,edb_code,data_time,value,create_time,modify_time,data_timestamp) values `
 	var isAdd bool
-
-	now := time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day(), 0, 0, 0, 0, time.Local)
-
-	for currDate := fromFirstDate; !currDate.After(now); currDate = currDate.AddDate(0, 0, 1) {
-		currDateStr := currDate.Format(utils.FormatDate)
-		timestamp := currDate.UnixNano() / 1e6
-		timestampStr := fmt.Sprintf("%d", timestamp)
-
-		// 当前计算的值
-		currValue, ok := dateDataMap[currDate]
-		if !ok {
-			// 没有计算成功就过滤
-			continue
+	//待删除的日期
+	removeDateList := make([]string, 0)
+	if len(existDataList) > 0 && len(dateArr) == 0 {
+		//如果没有来源指标数据,那么已经入库的计算指标数据需要全部删除
+		tableName := GetEdbDataTableName(source, subSource)
+		sql := fmt.Sprintf(` DELETE FROM %s WHERE edb_info_id = ?`, tableName)
+		_, err = to.Raw(sql, edbInfoId).Exec()
+		if err != nil {
+			err = fmt.Errorf("删除所有的升频指标数据失败,Err:" + err.Error())
+			return
 		}
-		lastValueStr := decimal.NewFromFloat(currValue).Round(4).String()
+		return
+	}
 
-		// 已经入库的值
-		existData, ok := existDataMap[currDateStr]
-		if !ok {
-			// 没有入库那么就插入添加
-			isAdd = true
-			addSql += GetAddSql(edbInfoIdStr, edbCode, currDateStr, timestampStr, lastValueStr)
-			continue
-		}
+	existMap := make(map[string]string)
 
-		// 将已经入库的值转换为decimal类型,然后再保留4位小数,目的是为了做匹配,要不然取出来的数据与计算的数据不一致
-		existDataValueDec, tmpErr := decimal.NewFromString(existData.Value)
-		if tmpErr != nil {
-			err = tmpErr
-			return
+	dataLen := len(dataList)
+	//第三步: 已经入库的数据处理
+	for _, v := range existDataList {
+		existDataMap[v.DataTime] = v
+		removeDateMap[v.DataTime] = struct{}{}
+	}
+	for i := 0; i < dataLen; i++ {
+		//当期
+		currentItem := dataList[i]
+		var prevItem *EdbInfoSearchData
+		if emptyType == 3 { //3后值填充,其余前值填充
+			if i >= 1 {
+				prevItem = dataList[i-1]
+			}
 		}
-		existDataValueStr := existDataValueDec.Round(4).String()
-
-		// 如果该日期已经入库了,且两个值不匹配,那么就更新
-		if lastValueStr != existDataValueStr {
-			err = ModifyEdbDataById(source, subSource, existData.EdbDataId, lastValueStr)
-			if err != nil {
-				return err
+		currentDate, _ := time.ParseInLocation(utils.FormatDate, currentItem.DataTime, time.Local)
+		var day int
+		var preItem *EdbInfoSearchData
+		var preDate time.Time
+		if i == 0 {
+			if emptyType == 3 { //后值填充
+				day = 0 //最新的时间就是来源指标的最新日期
+				preDate = currentDate
+			} else {
+				day = int(time.Now().Sub(currentDate).Hours() / float64(24))
+				preDate = time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day(), 0, 0, 0, 0, time.Local)
+			}
+		} else {
+			j := i - 1
+			if j < dataLen {
+				preItem = dataList[j]
+				preDate, _ = time.ParseInLocation(utils.FormatDate, preItem.DataTime, time.Local)
+				day = int(preDate.Sub(currentDate).Hours() / float64(24))
+				utils.FileLog.Info("preItem.DataTime:" + preItem.DataTime + ";currentItem.DataTime" + currentItem.DataTime)
 			}
 		}
+		for k := 0; k <= day; k++ {
+			needDay := preDate.AddDate(0, 0, -k)
+			needDayStr := needDay.Format(utils.FormatDate)
+			delete(removeDateMap, needDayStr)
+			existKey := edbCode + needDayStr
+			if _, ok := existMap[existKey]; !ok {
+				timestamp := needDay.UnixNano() / 1e6
+				timestampStr := fmt.Sprintf("%d", timestamp)
+				valStr := decimal.NewFromFloat(currentItem.Value).String()
+				if prevItem != nil && needDayStr != currentItem.DataTime {
+					valStr = decimal.NewFromFloat(prevItem.Value).String()
+				}
+				tmpExistData, ok2 := existDataMap[needDayStr]
+				if !ok2 {
+					addSql += GetAddSql(edbInfoIdStr, edbCode, needDayStr, timestampStr, valStr)
+					isAdd = true
+				} else {
+					//如果对应的值不匹配
+					if tmpExistData.Value != valStr {
+						err = ModifyEdbDataById(source, subSource, tmpExistData.EdbDataId, valStr)
+						if err != nil {
+							return err
+						}
+					}
+				}
 
-		// 该日期已经处理过了,所以需要移除,如果后面该map还有数据,那么需要删除该map里面的日期数据
-		delete(existDataMap, currDateStr)
+			}
+			existMap[existKey] = needDayStr
+		}
+		existKey := edbCode + currentItem.DataTime
+		if _, ok := existMap[existKey]; !ok {
+			currentDate, _ := time.ParseInLocation(utils.FormatDate, currentItem.DataTime, time.Local)
+			timestamp := currentDate.UnixNano() / 1e6
+			timestampStr := fmt.Sprintf("%d", timestamp)
+			valStr := decimal.NewFromFloat(currentItem.Value).String()
+			tmpExistData, ok2 := existDataMap[currentItem.DataTime]
+			if !ok2 {
+				addSql += GetAddSql(edbInfoIdStr, edbCode, currentItem.DataTime, timestampStr, valStr)
+				isAdd = true
+			} else {
+				//如果对应的值不匹配
+				if tmpExistData.Value != valStr {
+					err = ModifyEdbDataById(source, subSource, tmpExistData.EdbDataId, valStr)
+					if err != nil {
+						return err
+					}
+				}
+			}
 
+		}
+		existMap[existKey] = currentItem.DataTime
 	}
 
+	for k, _ := range removeDateMap {
+		removeDateList = append(removeDateList, k)
+	}
 	// 删除不需要的指标数据
-	if len(existDataMap) > 0 {
-		//待删除的日期
-		removeDateList := make([]string, 0)
-		for date := range existDataMap {
-			removeDateList = append(removeDateList, date)
-		}
-
-		removeDateStr := strings.Join(removeDateList, `","`)
-		removeDateStr = `"` + removeDateStr + `"`
+	if len(removeDateList) > 0 {
 		//如果拼接指标变更了,那么需要删除所有的指标数据
 		tableName := GetEdbDataTableName(source, subSource)
-		sql := fmt.Sprintf(` DELETE FROM %s WHERE edb_info_id = ? and data_time in (%s) `, tableName, removeDateStr)
-
-		_, err = to.Raw(sql, edbInfoId).Exec()
+		sql := fmt.Sprintf(` DELETE FROM %s WHERE edb_info_id = ? and data_time in (`+utils.GetOrmInReplace(len(removeDateList))+`) `, tableName)
+		_, err = to.Raw(sql, edbInfoId, removeDateList).Exec()
 		if err != nil {
 			err = fmt.Errorf("删除不存在的升频指标数据失败,Err:" + err.Error())
 			return
 		}
 	}
 
-	// 新增的数据值
 	if isAdd {
 		addSql = strings.TrimRight(addSql, ",")
 		_, err = to.Raw(addSql).Exec()
 	}
-
 	return
 }

+ 10 - 6
models/edb_data_calculate_percentile.go

@@ -443,21 +443,25 @@ func (obj Percentile) getPercentileData(fromEdbInfo *EdbInfo, calculateValue int
 	// 百分位数据个数算法
 	// 数据区间第一个和最后一个数据点的时间和数据分别为(T1,S1)(T2,S2); N=T1到T2指标数据个数, n=小于等于S2的数据个数
 	// 个数百分位=(n-1)/(N-1)
-	maxDay := len(dataList) // 往前找数据的边界
+	var firstDate time.Time
+	if len(dataList) > 0 {
+		d, _ := time.ParseInLocation(utils.FormatDate, dataList[0].DataTime, time.Local)
+		firstDate = d
+	}
 	if percentType == utils.PercentCalculateTypeNum {
 		for i, d := range dataList {
 			// T2为当前日期
 			s2 := decimal.NewFromFloat(d.Value)
 			t2, _ := time.ParseInLocation(utils.FormatDate, d.DataTime, time.Local)
 
-			// 计算N和n
+			// 往前找(时间长度)个有数据的
 			var bigN, tinyN int
-			for k := 0; k < maxDay; k++ {
-				// 往前找(时间长度)个有数据的, N理论上只有最前面几个日期<calculateDay, 后面的N=calculateDay
-				if bigN >= calculateDay {
+			for k := 0; k < calculateDay; k++ {
+				tp := t2.AddDate(0, 0, -k)
+				if tp.Before(firstDate) {
 					break
 				}
-				preVal, preOk := dataMap[t2.AddDate(0, 0, -k)]
+				preVal, preOk := dataMap[tp]
 				if !preOk {
 					continue
 				}

+ 1 - 1
models/edb_data_ths_hf.go

@@ -63,7 +63,7 @@ func (m *EdbDataThsHf) CreateMulti(items []*EdbDataThsHf) (err error) {
 		return
 	}
 	o := orm.NewOrm()
-	_, err = o.InsertMulti(200, items)
+	_, err = o.InsertMulti(500, items)
 	return
 }
 

+ 5 - 5
models/edb_info.go

@@ -1518,6 +1518,11 @@ func GetEdbInfoByEdbCodeList(source int, edbCodeList []string) (items []*EdbInfo
 	return
 }
 
+// EdbInfoExtra 指标额外数据-extra字段
+type EdbInfoExtra struct {
+	ApiExtraPars string `description:"API-额外参数(如同花顺日期序列)"`
+}
+
 // GetEdbInfoNoUpdateTotalByIdList 根据指标id列表获取指标信息
 func GetEdbInfoNoUpdateTotalByIdList(edbInfoIdList []int) (total int, err error) {
 	num := len(edbInfoIdList)
@@ -1577,8 +1582,3 @@ func GetAddEdbMaxSortByClassifyId(classifyId int, classifyType uint8) (sort int)
 
 	return
 }
-
-// EdbInfoExtra 指标额外数据-extra字段
-type EdbInfoExtra struct {
-	ApiExtraPars string `description:"API-额外参数(如同花顺日期序列)"`
-}

+ 393 - 36
models/edb_ths_hf.go

@@ -2,10 +2,12 @@ package models
 
 import (
 	"encoding/json"
+	"eta/eta_index_lib/models/mgo"
 	"eta/eta_index_lib/utils"
 	"fmt"
 	"github.com/beego/beego/v2/client/orm"
 	"github.com/shopspring/decimal"
+	"go.mongodb.org/mongo-driver/bson"
 	"reflect"
 	"sort"
 	"time"
@@ -73,14 +75,6 @@ type ThsHfRefreshBaseParams struct {
 
 // Add
 // @Description: 添加指标
-// @author: Roc
-// @receiver obj
-// @datetime 2024-04-30 17:35:14
-// @param params ThsHfAddBaseParams
-// @param businessIndexItem *BaseFromBusinessIndex
-// @return edbInfo *EdbInfo
-// @return err error
-// @return errMsg string
 func (obj EdbThsHf) Add(params ThsHfAddBaseParams, baseIndex *BaseFromThsHfIndex) (edbInfo *EdbInfo, err error) {
 	o := orm.NewOrm()
 	tx, e := o.Begin()
@@ -151,6 +145,13 @@ func (obj EdbThsHf) Add(params ThsHfAddBaseParams, baseIndex *BaseFromThsHfIndex
 }
 
 func (obj EdbThsHf) Refresh(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping, startDate string) (err error) {
+	if utils.UseMongo {
+		return obj.refreshByMongo(edbInfo, edbBaseMapping, startDate)
+	}
+	return obj.refreshByMysql(edbInfo, edbBaseMapping, startDate)
+}
+
+func (obj EdbThsHf) refreshByMysql(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping, startDate string) (err error) {
 	if edbInfo == nil || edbBaseMapping == nil {
 		err = fmt.Errorf("指标信息/关联信息有误, EdbInfo: %v, EdbBaseMapping: %v", edbInfo, edbBaseMapping)
 		return
@@ -172,6 +173,17 @@ func (obj EdbThsHf) Refresh(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping
 		}
 	}
 
+	// 查询时间为开始时间-3d
+	var queryDate string
+	if startDate != "" {
+		st, e := time.ParseInLocation(utils.FormatDate, startDate, time.Local)
+		if e != nil {
+			err = fmt.Errorf("刷新开始时间有误, %v", e)
+			return
+		}
+		queryDate = st.AddDate(0, 0, -3).Format(utils.FormatDate)
+	}
+
 	// 源指标数据
 	baseDataList := make([]*BaseFromThsHfData, 0)
 	{
@@ -179,15 +191,9 @@ func (obj EdbThsHf) Refresh(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping
 		cond := fmt.Sprintf(" AND %s = ?", ob.Cols().IndexCode)
 		pars := make([]interface{}, 0)
 		pars = append(pars, edbBaseMapping.BaseIndexCode)
-		// 开始时间-取-3d
-		if startDate != "" {
-			st, e := time.ParseInLocation(utils.FormatDate, startDate, time.Local)
-			if e != nil {
-				err = fmt.Errorf("刷新开始时间有误, %v", e)
-				return
-			}
+		if queryDate != "" {
 			cond += fmt.Sprintf(" AND %s >= ?", ob.Cols().DataTime)
-			pars = append(pars, st.AddDate(0, 0, -3).Format(utils.FormatDate))
+			pars = append(pars, queryDate)
 		}
 		list, e := ob.GetItemsByCondition(cond, pars, []string{}, fmt.Sprintf("%s ASC", ob.Cols().DataTime))
 		if e != nil {
@@ -196,14 +202,21 @@ func (obj EdbThsHf) Refresh(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping
 		}
 		baseDataList = list
 	}
+
+	// 转换数据
 	convertRule := new(ThsHfIndexConvert2EdbRule)
 	if e := json.Unmarshal([]byte(edbBaseMapping.ConvertRule), &convertRule); e != nil {
 		err = fmt.Errorf("转换规则有误, %v", e)
 		return
 	}
-
-	// 转换数据
-	convertData, e := ThsHfConvertData2DayByRule(baseDataList, convertRule)
+	convertOriginData := make([]*ThsHfConvertOriginData, 0)
+	for _, v := range baseDataList {
+		convertOriginData = append(convertOriginData, &ThsHfConvertOriginData{
+			DataTime: v.DataTime,
+			Value:    v.Value,
+		})
+	}
+	convertData, e := ThsHfConvertData2DayByRule(convertOriginData, convertRule)
 	if e != nil {
 		err = fmt.Errorf("转换数据失败, %v", e)
 		return
@@ -221,9 +234,9 @@ func (obj EdbThsHf) Refresh(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping
 		cond := fmt.Sprintf(" AND %s = ?", dataOb.Cols().EdbInfoId)
 		pars := make([]interface{}, 0)
 		pars = append(pars, edbInfo.EdbInfoId)
-		if startDate != "" {
+		if queryDate != "" {
 			cond += fmt.Sprintf(" AND %s >= ?", dataOb.Cols().DataTime)
-			pars = append(pars, startDate)
+			pars = append(pars, queryDate)
 		}
 		list, e := dataOb.GetItemsByCondition(cond, pars, []string{}, "")
 		if e != nil {
@@ -315,8 +328,212 @@ func (obj EdbThsHf) Refresh(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping
 	return
 }
 
+func (obj EdbThsHf) refreshByMongo(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping, startDate string) (err error) {
+	defer func() {
+		if err != nil {
+			utils.FileLog.Info(fmt.Sprintf("refreshByMongo, err: %v", err))
+		}
+	}()
+
+	var realDataMaxDate, edbDataInsertConfigDate time.Time
+	var edbDataInsertConfig *EdbDataInsertConfig
+	var isFindConfigDateRealData bool //是否找到配置日期的实际数据的值
+	{
+		insertConfig, e := GetEdbDataInsertConfigByEdbId(edbInfo.EdbInfoId)
+		if e != nil && e.Error() != utils.ErrNoRow() {
+			err = fmt.Errorf("GetEdbDataInsertConfigByEdbId, err: %v", e)
+			return
+		}
+		edbDataInsertConfig = insertConfig
+		if edbDataInsertConfig != nil {
+			edbDataInsertConfigDate = edbDataInsertConfig.Date
+		}
+	}
+
+	// 查询时间为开始时间-3d
+	var queryDate string
+	if startDate != "" {
+		st, e := time.ParseInLocation(utils.FormatDate, startDate, time.Local)
+		if e != nil {
+			err = fmt.Errorf("刷新开始时间有误, %v", e)
+			return
+		}
+		queryDate = st.AddDate(0, 0, -3).Format(utils.FormatDate)
+	}
+
+	// 获取源指标数据
+	baseDataList, e := obj.getBaseIndexDataByMongo(edbBaseMapping.BaseIndexCode, queryDate)
+	if e != nil {
+		err = fmt.Errorf("getBaseIndexDataByMongo, err: %v", e)
+		return
+	}
+
+	// 转换数据
+	convertRule := new(ThsHfIndexConvert2EdbRule)
+	if e := json.Unmarshal([]byte(edbBaseMapping.ConvertRule), &convertRule); e != nil {
+		err = fmt.Errorf("转换规则有误, %v", e)
+		return
+	}
+	convertOriginData := make([]*ThsHfConvertOriginData, 0)
+	for _, v := range baseDataList {
+		convertOriginData = append(convertOriginData, &ThsHfConvertOriginData{
+			DataTime: v.DataTime,
+			Value:    v.Value,
+		})
+	}
+	convertData, e := ThsHfConvertData2DayByRule(convertOriginData, convertRule)
+	if e != nil {
+		err = fmt.Errorf("转换数据失败, %v", e)
+		return
+	}
+	if len(convertData) == 0 {
+		utils.FileLog.Info("同花顺高频-转换无数据, EdbCode: %s", edbInfo.EdbCode)
+		return
+	}
+
+	//获取指标所有数据
+	existDataList := make([]*mgo.EdbDataThsHf, 0)
+	mogDataObj := new(mgo.EdbDataThsHf)
+	{
+		// 构建查询条件
+		queryConditions := bson.M{
+			"edb_code": edbInfo.EdbCode,
+		}
+
+		if queryDate != `` {
+			//获取已存在的所有数据
+			startDateTime, e := time.ParseInLocation(utils.FormatDate, queryDate, time.Local)
+			if e != nil {
+				err = fmt.Errorf("startDateTime parse err: %v", e)
+				return
+			}
+			queryConditions["data_time"] = bson.M{"$gte": startDateTime}
+		}
+		existDataList, e = mogDataObj.GetAllDataList(queryConditions, []string{"data_time"})
+		if e != nil {
+			err = fmt.Errorf("GetAllDataList, err: %v", e)
+			return
+		}
+	}
+
+	existDataMap := make(map[string]*mgo.EdbDataThsHf)
+	removeDataTimeMap := make(map[string]bool) //需要移除的日期数据
+	for _, v := range existDataList {
+		tmpDate := v.DataTime.Format(utils.FormatDate)
+		existDataMap[tmpDate] = v
+		removeDataTimeMap[tmpDate] = true
+	}
+
+	// 待添加的数据集
+	addDataList := make([]interface{}, 0)
+	updateDataList := make([]mgo.EdbDataThsHf, 0)
+
+	insertExist := make(map[string]bool)
+	for k, v := range convertData {
+		strDate := k.Format(utils.FormatDate)
+
+		// 手动插入数据的判断
+		if realDataMaxDate.IsZero() || k.After(realDataMaxDate) {
+			realDataMaxDate = k
+		}
+		if edbDataInsertConfigDate.IsZero() || k.Equal(edbDataInsertConfigDate) {
+			isFindConfigDateRealData = true
+		}
+
+		// 入库值
+		saveVal := decimal.NewFromFloat(v).Round(4).String()
+		d, e := decimal.NewFromString(saveVal)
+		if e != nil {
+			utils.FileLog.Info(fmt.Sprintf("EdbDataThsHf NewFromString err: %v", e))
+			continue
+		}
+		saveFloat, _ := d.Float64()
+
+		// 更新
+		exists := existDataMap[strDate]
+		if exists != nil {
+			existVal := decimal.NewFromFloat(exists.Value).Round(4).String()
+			if saveVal != existVal {
+				exists.Value = saveFloat
+				updateDataList = append(updateDataList, *exists)
+			}
+			continue
+		}
+
+		// 新增
+		if insertExist[strDate] {
+			continue
+		}
+		insertExist[strDate] = true
+
+		timestamp := k.UnixNano() / 1e6
+		addDataList = append(addDataList, &mgo.EdbDataThsHf{
+			EdbInfoId:     edbInfo.EdbInfoId,
+			EdbCode:       edbInfo.EdbCode,
+			DataTime:      k,
+			Value:         saveFloat,
+			CreateTime:    time.Now(),
+			ModifyTime:    time.Now(),
+			DataTimestamp: timestamp,
+		})
+	}
+
+	// 入库
+	{
+		coll := mogDataObj.GetCollection()
+
+		//删除已经不存在的指标数据(由于该指标当日的数据删除了)
+		{
+			removeDateList := make([]time.Time, 0)
+			for dateTime := range removeDataTimeMap {
+				//获取已存在的所有数据
+				tmpDateTime, e := time.ParseInLocation(utils.FormatDate, dateTime, time.Local)
+				if e != nil {
+					err = fmt.Errorf("tmpDateTime parse err: %v", e)
+					return
+				}
+				removeDateList = append(removeDateList, tmpDateTime)
+			}
+			removeNum := len(removeDateList)
+			if removeNum > 0 {
+				if e = mogDataObj.RemoveManyByColl(coll, bson.M{"edb_code": edbInfo.EdbCode, "data_time": bson.M{"$in": removeDateList}}); e != nil {
+					err = fmt.Errorf("RemoveManyByColl, err: %v", e)
+					return
+				}
+			}
+		}
+
+		// 插入新数据
+		if len(addDataList) > 0 {
+			if e = mogDataObj.BatchInsertDataByColl(coll, 500, addDataList); e != nil {
+				err = fmt.Errorf("BatchInsertDataByColl, err: %v", e)
+				return
+			}
+		}
+
+		// 修改历史数据
+		if len(updateDataList) > 0 {
+			for _, v := range updateDataList {
+				if e = mogDataObj.UpdateDataByColl(coll, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}}); e != nil {
+					err = fmt.Errorf("UpdateDataByColl, err: %v", e)
+					return
+				}
+			}
+		}
+	}
+
+	// 处理手工数据补充的配置
+	obj.HandleConfigInsertEdbDataByMongo(realDataMaxDate, edbDataInsertConfig, edbInfo.EdbInfoId, edbInfo.Source, edbInfo.SubSource, existDataMap, isFindConfigDateRealData)
+	return
+}
+
+type ThsHfConvertOriginData struct {
+	DataTime time.Time `description:"数据日期(至时分秒)"`
+	Value    float64   `description:"数据值"`
+}
+
 // ThsHfConvertData2DayByRule 原指标数据转换为日度数据
-func ThsHfConvertData2DayByRule(originData []*BaseFromThsHfData, convertRule *ThsHfIndexConvert2EdbRule) (timeData map[time.Time]float64, err error) {
+func ThsHfConvertData2DayByRule(originData []*ThsHfConvertOriginData, convertRule *ThsHfIndexConvert2EdbRule) (timeData map[time.Time]float64, err error) {
 	// PS: originData为期望开始日期前三日(有两天非交易日, 那么周一的前日应当算上周五的)至结束日期的数据
 	timeData = make(map[time.Time]float64)
 	if len(originData) == 0 || convertRule == nil {
@@ -329,19 +546,19 @@ func ThsHfConvertData2DayByRule(originData []*BaseFromThsHfData, convertRule *Th
 
 	// 升序排序
 	sort.Slice(originData, func(i, j int) bool {
-		return originData[i].DataTimestamp < originData[j].DataTimestamp
+		return originData[i].DataTime.Before(originData[j].DataTime)
 	})
 
 	// 将数据根据日期进行分组
 	var sortDates []string
-	groupDateData := make(map[string][]*BaseFromThsHfData)
+	groupDateData := make(map[string][]*ThsHfConvertOriginData)
 	for _, v := range originData {
 		d := v.DataTime.Format(utils.FormatDate)
 		if !utils.InArrayByStr(sortDates, d) {
 			sortDates = append(sortDates, d)
 		}
 		if groupDateData[d] == nil {
-			groupDateData[d] = make([]*BaseFromThsHfData, 0)
+			groupDateData[d] = make([]*ThsHfConvertOriginData, 0)
 		}
 		groupDateData[d] = append(groupDateData[d], v)
 	}
@@ -355,7 +572,7 @@ func ThsHfConvertData2DayByRule(originData []*BaseFromThsHfData, convertRule *Th
 				continue
 			}
 			var timeTarget time.Time
-			dateData := make([]*BaseFromThsHfData, 0)
+			dateData := make([]*ThsHfConvertOriginData, 0)
 
 			// 当日
 			if convertRule.ConvertFixed.FixedDay == 1 {
@@ -379,7 +596,7 @@ func ThsHfConvertData2DayByRule(originData []*BaseFromThsHfData, convertRule *Th
 
 			// 前一日
 			if convertRule.ConvertFixed.FixedDay == 2 {
-				if k < 0 {
+				if k < 1 {
 					utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v))
 					continue
 				}
@@ -402,9 +619,13 @@ func ThsHfConvertData2DayByRule(originData []*BaseFromThsHfData, convertRule *Th
 				}
 				dateData = dt
 			}
+			if len(dateData) == 0 {
+				utils.FileLog.Info("日期%s无数据序列", v)
+				continue
+			}
 
 			// 重新获取数据序列中, 时间在目标时间点之后的
-			newDateData := make([]*BaseFromThsHfData, 0)
+			newDateData := make([]*ThsHfConvertOriginData, 0)
 			for kv, dv := range dateData {
 				if dv.DataTime.Before(timeTarget) {
 					continue
@@ -416,7 +637,7 @@ func ThsHfConvertData2DayByRule(originData []*BaseFromThsHfData, convertRule *Th
 
 			// 取重组后当日数据中的第一个(有可能目标时间点无值, 那么取之后时间最近的值)
 			if len(newDateData) == 0 {
-				utils.FileLog.Info("%s当日无有效数据", v)
+				utils.FileLog.Info("日期%s无有效数据", v)
 				continue
 			}
 			timeData[todayTime] = newDateData[0].Value
@@ -433,14 +654,14 @@ func ThsHfConvertData2DayByRule(originData []*BaseFromThsHfData, convertRule *Th
 		}
 		var thisDate, preDate string
 		thisDate = v
-		if k > 0 {
+		if k > 1 {
 			preDate = sortDates[k-1]
 		}
 		var startTimeTarget, endTimeTarget time.Time
 
 		// 起始时间-当日/前一日
 		if convertRule.ConvertArea.StartDay == 1 {
-			tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", thisDate, convertRule.ConvertFixed.FixedTime), time.Local)
+			tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", thisDate, convertRule.ConvertArea.StartTime), time.Local)
 			if e != nil {
 				utils.FileLog.Info(fmt.Sprintf("当日startTimeTarget转换有误, %v", e))
 				continue
@@ -452,7 +673,7 @@ func ThsHfConvertData2DayByRule(originData []*BaseFromThsHfData, convertRule *Th
 				utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v))
 				continue
 			}
-			tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", preDate, convertRule.ConvertFixed.FixedTime), time.Local)
+			tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", preDate, convertRule.ConvertArea.StartTime), time.Local)
 			if e != nil {
 				utils.FileLog.Info(fmt.Sprintf("前日startTimeTarget转换有误, %v", e))
 				continue
@@ -462,7 +683,7 @@ func ThsHfConvertData2DayByRule(originData []*BaseFromThsHfData, convertRule *Th
 
 		// 截止时间-当日/前一日
 		if convertRule.ConvertArea.EndDay == 1 {
-			tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", thisDate, convertRule.ConvertFixed.FixedTime), time.Local)
+			tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", thisDate, convertRule.ConvertArea.EndTime), time.Local)
 			if e != nil {
 				utils.FileLog.Info(fmt.Sprintf("当日endTimeTarget转换有误, %v", e))
 				continue
@@ -474,7 +695,7 @@ func ThsHfConvertData2DayByRule(originData []*BaseFromThsHfData, convertRule *Th
 				utils.FileLog.Info(fmt.Sprintf("%s前日无数据", v))
 				continue
 			}
-			tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", preDate, convertRule.ConvertFixed.FixedTime), time.Local)
+			tg, e := time.ParseInLocation(utils.FormatDateTime, fmt.Sprintf("%s %s", preDate, convertRule.ConvertArea.EndTime), time.Local)
 			if e != nil {
 				utils.FileLog.Info(fmt.Sprintf("前日endTimeTarget转换有误, %v", e))
 				continue
@@ -487,7 +708,7 @@ func ThsHfConvertData2DayByRule(originData []*BaseFromThsHfData, convertRule *Th
 		}
 
 		// 合并前日当日数据
-		dateData := make([]*BaseFromThsHfData, 0)
+		dateData := make([]*ThsHfConvertOriginData, 0)
 		if convertRule.ConvertArea.StartDay == 1 && convertRule.ConvertArea.EndDay == 1 {
 			// 起始截止均为当日
 			dateData = groupDateData[thisDate]
@@ -525,9 +746,13 @@ func ThsHfConvertData2DayByRule(originData []*BaseFromThsHfData, convertRule *Th
 			dateData = append(dateData, preData...)
 			dateData = append(dateData, thisData...)
 		}
+		if len(dateData) == 0 {
+			utils.FileLog.Info("日期%s无数据序列", v)
+			continue
+		}
 
 		// 重组时间区间内的数据
-		newDateData := make([]*BaseFromThsHfData, 0)
+		newDateData := make([]*ThsHfConvertOriginData, 0)
 		for _, dv := range dateData {
 			if dv.DataTime.Before(startTimeTarget) || dv.DataTime.After(endTimeTarget) {
 				continue
@@ -541,6 +766,7 @@ func ThsHfConvertData2DayByRule(originData []*BaseFromThsHfData, convertRule *Th
 
 		// 取出区间内的均值/最值
 		var avgVal, minVal, maxVal, sumVal float64
+		minVal, maxVal = newDateData[0].Value, newDateData[0].Value
 		for _, nv := range newDateData {
 			sumVal += nv.Value
 			if nv.Value > maxVal {
@@ -565,3 +791,134 @@ func ThsHfConvertData2DayByRule(originData []*BaseFromThsHfData, convertRule *Th
 	}
 	return
 }
+
+func (obj EdbThsHf) getBaseIndexDataByMongo(baseIndexCode, startDate string) (newDataList []EdbInfoMgoData, err error) {
+	newDataList = make([]EdbInfoMgoData, 0)
+
+	// 获取数据源的指标数据
+	mogDataObj := new(mgo.BaseFromThsHfData)
+
+	// 构建查询条件
+	queryConditions := bson.M{
+		"index_code": baseIndexCode,
+	}
+
+	if startDate != `` {
+		//获取已存在的所有数据
+		startDateTime, tmpErr := time.ParseInLocation(utils.FormatDate, startDate, time.Local)
+		if tmpErr != nil {
+			err = tmpErr
+			return
+		}
+		queryConditions["data_time"] = bson.M{"$gte": startDateTime}
+	}
+
+	baseDataList, err := mogDataObj.GetAllDataList(queryConditions, []string{"data_time"})
+	if err != nil {
+		fmt.Println("GetAllDataList Err:" + err.Error())
+		return
+	}
+
+	for _, v := range baseDataList {
+		newDataList = append(newDataList, EdbInfoMgoData{
+			//EdbDataId: v.ID,
+			DataTime: v.DataTime,
+			Value:    v.Value,
+			EdbCode:  v.IndexCode,
+		})
+	}
+	return
+}
+
+func (obj EdbThsHf) HandleConfigInsertEdbDataByMongo(realDataMaxDate time.Time, edbDataInsertConfig *EdbDataInsertConfig, edbInfoId, source, subSource int, existMap map[string]*mgo.EdbDataThsHf, isFindConfigDateRealData bool) {
+	if edbDataInsertConfig == nil {
+		return
+	}
+	var err error
+	defer func() {
+		if err != nil {
+			utils.FileLog.Info(fmt.Sprintf("ThsHf-HandleConfigInsertEdbDataByMongo, err: %v", err))
+		}
+	}()
+
+	edbDataInsertConfigDate := edbDataInsertConfig.Date // 配置的日期
+
+	// 如果存在真实数据的最大日期  && 存在配置插入数据的最大日期  && 真实数据的最大日期 晚于/等于 配置插入数据的最大日期
+	if realDataMaxDate.After(edbDataInsertConfigDate) || realDataMaxDate.Equal(edbDataInsertConfigDate) {
+		go DeleteEdbDataInsertConfigByEdbId(edbInfoId)
+
+		mogDataObj := mgo.EdbDataThsHf{}
+		coll := mogDataObj.GetCollection()
+		edbDataInsertConfigDateStr := edbDataInsertConfigDate.Format(utils.FormatDate)
+		// 如果没有找到找到配置日期的实际数据,那么就直接删除
+		if item, ok := existMap[edbDataInsertConfigDateStr]; ok && !isFindConfigDateRealData {
+			mogDataObj.RemoveManyByColl(coll, bson.M{"_id": item.ID})
+		}
+	} else {
+		o := orm.NewOrm()
+		edbDataInsertConfig.RealDate = realDataMaxDate
+		_, err = o.Update(edbDataInsertConfig, "RealDate")
+	}
+	return
+}
+
+func (obj EdbThsHf) UnifiedModifyEdbInfoMaxAndMinInfo(edbInfo *EdbInfo) (err error) {
+	if utils.UseMongo {
+		edbInfoMaxAndMinInfo, tmpErr := obj.getEdbInfoMaxAndMinInfoByMongo(edbInfo.EdbCode)
+		// 如果正常获取到了,那就去修改指标的最大最小值
+		if tmpErr == nil && edbInfoMaxAndMinInfo != nil {
+			err = ModifyEdbInfoMaxAndMinInfo(edbInfo.EdbInfoId, edbInfoMaxAndMinInfo)
+		} else {
+			// 清空的目的是为了避免异常返回
+			err = nil
+		}
+	} else {
+		err, _ = UnifiedModifyEdbInfoMaxAndMinInfo(edbInfo)
+	}
+
+	return
+}
+
+func (obj EdbThsHf) getEdbInfoMaxAndMinInfoByMongo(edbCode string) (item *EdbInfoMaxAndMinInfo, err error) {
+	mogDataObj := new(mgo.EdbDataThsHf)
+	pipeline := []bson.M{
+		{"$match": bson.M{"edb_code": edbCode}},
+		{"$group": bson.M{
+			"_id":       nil,
+			"min_date":  bson.M{"$min": "$data_time"},
+			"max_date":  bson.M{"$max": "$data_time"},
+			"min_value": bson.M{"$min": "$value"},
+			"max_value": bson.M{"$max": "$value"},
+		}},
+		{"$project": bson.M{"_id": 0}}, // 可选,如果不需要_id字段
+	}
+	result, err := mogDataObj.GetEdbInfoMaxAndMinInfo(pipeline)
+	if err != nil {
+		fmt.Println("EdbDataBusiness getEdbDataBusinessList Err:" + err.Error())
+		return
+	}
+
+	if !result.MaxDate.IsZero() {
+		whereQuery := bson.M{"edb_code": edbCode, "data_time": result.MaxDate}
+		selectParam := bson.D{{"value", 1}, {"_id", 0}}
+		latestValue, tmpErr := mogDataObj.GetLatestValue(whereQuery, selectParam)
+		if tmpErr != nil {
+			err = tmpErr
+			return
+		}
+		result.LatestValue = latestValue.Value
+		result.EndValue = latestValue.Value
+	}
+
+	item = &EdbInfoMaxAndMinInfo{
+		MinDate:     result.MinDate.Format(utils.FormatDate),
+		MaxDate:     result.MaxDate.Format(utils.FormatDate),
+		MinValue:    result.MinValue,
+		MaxValue:    result.MaxValue,
+		LatestValue: result.LatestValue,
+		LatestDate:  result.LatestDate.Format(utils.FormatDate),
+		EndValue:    result.EndValue,
+	}
+
+	return
+}

+ 405 - 0
models/mgo/base_from_ths_hf_data.go

@@ -1 +1,406 @@
 package mgo
+
+import (
+	"context"
+	"errors"
+	"eta/eta_index_lib/utils"
+	"fmt"
+	"github.com/qiniu/qmgo"
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"time"
+)
+
+// BaseFromThsHfData
+// @Description: 同花顺高频集合
+type BaseFromThsHfData struct {
+	ID                   primitive.ObjectID `json:"_id" bson:"_id,omitempty"`                                   // 文档id
+	BaseFromThsHfDataId  int64              `json:"base_from_ths_hf_data_id" bson:"base_from_ths_hf_data_id"`   // 指标数据ID
+	BaseFromThsHfIndexId int64              `json:"base_from_ths_hf_index_id" bson:"base_from_ths_hf_index_id"` // 指标ID
+	IndexCode            string             `json:"index_code" bson:"index_code"`                               // 指标编码
+	DataTime             time.Time          `json:"data_time" bson:"data_time"`                                 // 数据日期
+	Value                float64            `json:"value" bson:"value"`                                         // 数据值
+	UniqueCode           string             `json:"unique_code" bson:"unique_code"`                             // 唯一编码
+	CreateTime           time.Time          `json:"create_time" bson:"create_time"`                             // 创建时间
+	ModifyTime           time.Time          `json:"modify_time" bson:"modify_time"`                             // 修改时间
+	DataTimestamp        int64              `json:"data_timestamp" bson:"data_timestamp"`                       // 数据日期时间戳
+}
+
+// CollectionName
+// @Description:  获取集合名称
+func (m *BaseFromThsHfData) CollectionName() string {
+	return "base_from_ths_hf_data"
+}
+
+// DataBaseName
+// @Description: 获取数据库名称
+func (m *BaseFromThsHfData) DataBaseName() string {
+	return utils.MgoDataDbName
+}
+
+// GetCollection
+// @Description: 获取mongodb集合的句柄
+func (m *BaseFromThsHfData) GetCollection() *qmgo.Collection {
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	return db.Collection(m.CollectionName())
+}
+
+// GetAllDataList
+// @Description: 根据条件获取所有数据
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 13:42:19
+// @param sort []string
+// @param whereParams interface{}
+// @return result []BaseFromThsHfData
+// @return err error
+func (m *BaseFromThsHfData) GetAllDataList(whereParams interface{}, sort []string) (result []*BaseFromThsHfData, err error) {
+	if utils.MgoDataCli == nil {
+		err = errors.New("mongodb连接失败")
+		return
+	}
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+	ctx := context.TODO()
+	if err != nil {
+		fmt.Println("MgoGetColl Err:", err.Error())
+		return
+	}
+	err = coll.Find(ctx, whereParams).Sort(sort...).All(&result)
+	if err != nil {
+		return
+	}
+
+	for _, v := range result {
+		v.DataTime = v.DataTime.In(time.Local)
+		v.CreateTime = v.CreateTime.In(time.Local)
+		v.ModifyTime = v.ModifyTime.In(time.Local)
+	}
+	return
+}
+
+// GetLimitDataList
+// @Description: 根据条件获取指定数量数据列表
+// @author: Roc
+// @receiver m
+// @datetime 2024-05-06 17:08:32
+// @param whereParams interface{}
+// @param size int64
+// @return result []*BaseFromThsHfData
+// @return err error
+func (m *BaseFromThsHfData) GetLimitDataList(whereParams interface{}, size int64, sort []string) (result []*BaseFromThsHfData, err error) {
+	if utils.MgoDataCli == nil {
+		err = errors.New("mongodb连接失败")
+		return
+	}
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+	ctx := context.TODO()
+	if err != nil {
+		fmt.Println("MgoGetColl Err:", err.Error())
+		return
+	}
+	err = coll.Find(ctx, whereParams).Sort(sort...).Limit(size).All(&result)
+	if err != nil {
+		return
+	}
+
+	for _, v := range result {
+		v.DataTime = v.DataTime.In(time.Local)
+		v.CreateTime = v.CreateTime.In(time.Local)
+		v.ModifyTime = v.ModifyTime.In(time.Local)
+	}
+
+	return
+}
+
+// GetPageDataList
+// @Description: 根据条件获取分页数据列表
+// @author: Roc
+// @receiver m
+// @datetime 2024-05-07 10:21:07
+// @param whereParams interface{}
+// @param startSize int64
+// @param size int64
+// @param sort []string
+// @return result []*BaseFromThsHfData
+// @return err error
+func (m *BaseFromThsHfData) GetPageDataList(whereParams interface{}, startSize, size int64, sort []string) (result []*BaseFromThsHfData, err error) {
+	if utils.MgoDataCli == nil {
+		err = errors.New("mongodb连接失败")
+		return
+	}
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+	ctx := context.TODO()
+	if err != nil {
+		fmt.Println("MgoGetColl Err:", err.Error())
+		return
+	}
+	err = coll.Find(ctx, whereParams).Sort(sort...).Skip(startSize).Limit(size).All(&result)
+	if err != nil {
+		return
+	}
+
+	for _, v := range result {
+		v.DataTime = v.DataTime.In(time.Local)
+		v.CreateTime = v.CreateTime.In(time.Local)
+		v.ModifyTime = v.ModifyTime.In(time.Local)
+	}
+
+	return
+}
+
+// GetCountDataList
+// @Description:  根据条件获取数据列表总数
+// @author: Roc
+// @receiver m
+// @datetime 2024-05-07 10:29:00
+// @param whereParams interface{}
+// @return count int64
+// @return err error
+func (m *BaseFromThsHfData) GetCountDataList(whereParams interface{}) (count int64, err error) {
+	if utils.MgoDataCli == nil {
+		err = errors.New("mongodb连接失败")
+		return
+	}
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+	ctx := context.TODO()
+	if err != nil {
+		fmt.Println("MgoGetColl Err:", err.Error())
+		return
+	}
+	count, err = coll.Find(ctx, whereParams).Count()
+
+	return
+}
+
+// InsertDataByColl
+// @Description: 写入单条数据(外部传入集合)
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 14:22:18
+// @param addData interface{}
+// @return err error
+func (m *BaseFromThsHfData) InsertDataByColl(coll *qmgo.Collection, addData interface{}) (err error) {
+	ctx := context.TODO()
+	_, err = coll.InsertOne(ctx, addData)
+	if err != nil {
+		fmt.Println("InsertDataByColl:Err:" + err.Error())
+		return
+	}
+
+	return
+}
+
+// BatchInsertData
+// @Description: 批量写入数据
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 14:22:18
+// @param bulk int 每次请求保存的数据量
+// @param dataList []interface{}
+// @return err error
+func (m *BaseFromThsHfData) BatchInsertData(bulk int, dataList []interface{}) (err error) {
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+
+	return m.BatchInsertDataByColl(coll, bulk, dataList)
+}
+
+// BatchInsertDataByColl
+// @Description: 批量写入数据(外部传入集合)
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 14:22:18
+// @param coll *qmgo.Collection
+// @param bulk int 每次请求保存的数据量
+// @param dataList []interface{}
+// @return err error
+func (m *BaseFromThsHfData) BatchInsertDataByColl(coll *qmgo.Collection, bulk int, dataList []interface{}) (err error) {
+	ctx := context.TODO()
+	dataNum := len(dataList)
+	if dataNum <= 0 {
+		return
+	}
+
+	// 不设置每次保存切片数量大小,或者实际数据量小于设置的切片数量大小,那么就直接保存吧
+	if bulk <= 0 || dataNum <= bulk {
+		_, err = coll.InsertMany(ctx, dataList)
+		if err != nil {
+			fmt.Println("BatchInsertData:Err:" + err.Error())
+			return
+		}
+		return
+	}
+
+	// 分批保存
+	i := 0
+	tmpAddDataList := make([]interface{}, 0)
+	for _, v := range dataList {
+		tmpAddDataList = append(tmpAddDataList, v)
+		i++
+		if i >= bulk {
+			_, err = coll.InsertMany(ctx, tmpAddDataList)
+			if err != nil {
+				fmt.Println("BatchInsertData:Err:" + err.Error())
+				return
+			}
+			i = 0
+			tmpAddDataList = make([]interface{}, 0)
+		}
+	}
+
+	if len(tmpAddDataList) > 0 {
+		_, err = coll.InsertMany(ctx, tmpAddDataList)
+		if err != nil {
+			fmt.Println("BatchInsertData:Err:" + err.Error())
+			return
+		}
+	}
+
+	return
+}
+
+// UpdateDataByColl
+// @Description: 单条数据修改
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 15:01:51
+// @param whereParams interface{}
+// @param updateParams interface{}
+// @return err error
+func (m *BaseFromThsHfData) UpdateDataByColl(coll *qmgo.Collection, whereParams, updateParams interface{}) (err error) {
+	ctx := context.TODO()
+	err = coll.UpdateOne(ctx, whereParams, updateParams)
+	if err != nil {
+		fmt.Println("UpdateDataByColl:Err:" + err.Error())
+		return
+	}
+	return
+}
+
+// UpdateData
+// @Description: 单条数据修改
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 15:01:51
+// @param whereParams interface{}
+// @param updateParams interface{}
+// @return err error
+func (m *BaseFromThsHfData) UpdateData(whereParams, updateParams interface{}) (err error) {
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+	ctx := context.TODO()
+	err = coll.UpdateOne(ctx, whereParams, updateParams)
+	if err != nil {
+		fmt.Println("UpdateData:Err:" + err.Error())
+		return
+	}
+	return
+}
+
+// HandleData
+// @Description: 事务处理数据
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-30 10:40:20
+// @param addDataList []BaseAddFromBusinessData
+// @param updateDataList []BaseFromThsHfData
+// @return result interface{}
+// @return err error
+func (m *BaseFromThsHfData) HandleData(addDataList, updateDataList []BaseFromThsHfData) (result interface{}, err error) {
+
+	ctx := context.TODO()
+
+	callback := func(sessCtx context.Context) (interface{}, error) {
+		// 重要:确保事务中的每一个操作,都使用传入的sessCtx参数
+
+		db := utils.MgoDataCli.Database(m.DataBaseName())
+		coll := db.Collection(m.CollectionName())
+
+		// 插入数据
+		if len(addDataList) > 0 {
+			_, err = coll.InsertMany(sessCtx, addDataList)
+			if err != nil {
+				return nil, err
+			}
+		}
+
+		// 修改
+
+		if len(updateDataList) > 0 {
+			for _, v := range updateDataList {
+				err = coll.UpdateOne(ctx, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}})
+				if err != nil {
+					fmt.Println("BatchInsertData:Err:" + err.Error())
+					return nil, err
+				}
+			}
+		}
+
+		return nil, nil
+	}
+	result, err = utils.MgoDataCli.DoTransaction(ctx, callback)
+
+	return
+}
+
+// GetEdbInfoMaxAndMinInfo
+// @Description: 获取当前指标的最大最小值
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-30 17:15:39
+// @param whereParams interface{}
+// @return result EdbInfoMaxAndMinInfo
+// @return err error
+func (m *BaseFromThsHfData) GetEdbInfoMaxAndMinInfo(whereParams interface{}) (result EdbInfoMaxAndMinInfo, err error) {
+	if utils.MgoDataCli == nil {
+		err = errors.New("mongodb连接失败")
+		return
+	}
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+	ctx := context.TODO()
+	if err != nil {
+		fmt.Println("MgoGetColl Err:", err.Error())
+		return
+	}
+	err = coll.Aggregate(ctx, whereParams).One(&result)
+	if err != nil {
+		return
+	}
+	result.MinDate = result.MinDate.In(time.Local)
+	result.MaxDate = result.MaxDate.In(time.Local)
+	result.LatestDate = result.LatestDate.In(time.Local)
+	return
+}
+
+// GetLatestValue
+// @Description: 获取当前指标的最新数据记录
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-30 17:16:15
+// @param whereParams interface{}
+// @param selectParam interface{}
+// @return latestValue LatestValue
+// @return err error
+func (m *BaseFromThsHfData) GetLatestValue(whereParams, selectParam interface{}) (latestValue LatestValue, err error) {
+	if utils.MgoDataCli == nil {
+		err = errors.New("mongodb连接失败")
+		return
+	}
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+	ctx := context.TODO()
+	if err != nil {
+		fmt.Println("MgoGetColl Err:", err.Error())
+		return
+	}
+
+	//var result interface{}
+	//err = coll.Find(ctx, whereParams).Select(selectParam).One(&result)
+	err = coll.Find(ctx, whereParams).Select(selectParam).One(&latestValue)
+	return
+}

+ 509 - 0
models/mgo/edb_data_ths_hf.go

@@ -1 +1,510 @@
 package mgo
+
+import (
+	"context"
+	"errors"
+	"eta/eta_index_lib/utils"
+	"fmt"
+	"github.com/qiniu/qmgo"
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"time"
+)
+
+// EdbDataThsHf
+// @Description: 同花顺高频集合(指标库)
+type EdbDataThsHf struct {
+	ID            primitive.ObjectID `json:"_id" bson:"_id,omitempty" `            // 文档id
+	EdbInfoId     int                `json:"edb_info_id" bson:"edb_info_id"`       // 指标ID
+	EdbCode       string             `json:"edb_code" bson:"edb_code"`             // 指标编码
+	DataTime      time.Time          `json:"data_time" bson:"data_time"`           // 数据日期
+	Value         float64            `json:"value" bson:"value"`                   // 数据值
+	CreateTime    time.Time          `json:"create_time" bson:"create_time"`       // 创建时间
+	ModifyTime    time.Time          `json:"modify_time" bson:"modify_time"`       // 修改时间
+	DataTimestamp int64              `json:"data_timestamp" bson:"data_timestamp"` // 数据日期时间戳
+}
+
+// CollectionName
+// @Description:  获取集合名称
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 13:41:36
+// @return string
+func (m *EdbDataThsHf) CollectionName() string {
+	return "edb_data_ths_hf"
+}
+
+// DataBaseName
+// @Description: 获取数据库名称
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 13:41:33
+// @return string
+func (m *EdbDataThsHf) DataBaseName() string {
+	return utils.MgoDataDbName
+}
+
+// GetCollection
+// @Description: 获取mongodb集合的句柄
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 13:41:33
+// @return string
+func (m *EdbDataThsHf) GetCollection() *qmgo.Collection {
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	return db.Collection(m.CollectionName())
+}
+
+// GetItem
+// @Description: 根据条件获取单条数据
+// @author: Roc
+// @receiver m
+// @datetime 2024-05-09 10:00:49
+// @param whereParams interface{}
+// @return item *EdbDataThsHf
+// @return err error
+func (m *EdbDataThsHf) GetItem(whereParams interface{}) (item *EdbDataThsHf, err error) {
+	if utils.MgoDataCli == nil {
+		err = errors.New("mongodb连接失败")
+		return
+	}
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+
+	return m.GetItemByColl(coll, whereParams)
+}
+
+// GetItemByColl
+// @Description: 根据条件获取单条数据
+// @author: Roc
+// @receiver m
+// @datetime 2024-05-09 13:22:06
+// @param coll *qmgo.Collection
+// @param whereParams interface{}
+// @return item *EdbDataThsHf
+// @return err error
+func (m *EdbDataThsHf) GetItemByColl(coll *qmgo.Collection, whereParams interface{}) (item *EdbDataThsHf, err error) {
+	ctx := context.TODO()
+	if err != nil {
+		fmt.Println("MgoGetColl Err:", err.Error())
+		return
+	}
+	err = coll.Find(ctx, whereParams).One(&item)
+	if err != nil {
+		return
+	}
+
+	item.DataTime = item.DataTime.In(time.Local)
+	item.CreateTime = item.CreateTime.In(time.Local)
+	item.ModifyTime = item.ModifyTime.In(time.Local)
+
+	return
+}
+
+// GetAllDataList
+// @Description: 根据条件获取所有数据
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 13:42:19
+// @param whereParams interface{}
+// @param sort []string
+// @return result []EdbDataThsHf
+// @return err error
+func (m *EdbDataThsHf) GetAllDataList(whereParams interface{}, sort []string) (result []*EdbDataThsHf, err error) {
+	if utils.MgoDataCli == nil {
+		err = errors.New("mongodb连接失败")
+		return
+	}
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+	ctx := context.TODO()
+	if err != nil {
+		fmt.Println("MgoGetColl Err:", err.Error())
+		return
+	}
+	err = coll.Find(ctx, whereParams).Sort(sort...).All(&result)
+	if err != nil {
+		return
+	}
+
+	for _, v := range result {
+		v.DataTime = v.DataTime.In(time.Local)
+		v.CreateTime = v.CreateTime.In(time.Local)
+		v.ModifyTime = v.ModifyTime.In(time.Local)
+	}
+
+	return
+}
+
+// GetLimitDataList
+// @Description: 根据条件获取指定数量数据列表
+// @author: Roc
+// @receiver m
+// @datetime 2024-05-06 17:08:32
+// @param whereParams interface{}
+// @param size int64
+// @param sort []string
+// @return result []*BaseFromBusinessData
+// @return err error
+func (m *EdbDataThsHf) GetLimitDataList(whereParams interface{}, size int64, sort []string) (result []*EdbDataThsHf, err error) {
+	if utils.MgoDataCli == nil {
+		err = errors.New("mongodb连接失败")
+		return
+	}
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+	ctx := context.TODO()
+	if err != nil {
+		fmt.Println("MgoGetColl Err:", err.Error())
+		return
+	}
+	err = coll.Find(ctx, whereParams).Sort(sort...).Limit(size).All(&result)
+	if err != nil {
+		return
+	}
+
+	for _, v := range result {
+		v.DataTime = v.DataTime.In(time.Local)
+		v.CreateTime = v.CreateTime.In(time.Local)
+		v.ModifyTime = v.ModifyTime.In(time.Local)
+	}
+
+	return
+}
+
+// GetPageDataList
+// @Description: 根据条件获取分页数据列表
+// @author: Roc
+// @receiver m
+// @datetime 2024-05-07 10:21:07
+// @param whereParams interface{}
+// @param startSize int64
+// @param size int64
+// @param sort []string
+// @return result []*EdbDataThsHf
+// @return err error
+func (m *EdbDataThsHf) GetPageDataList(whereParams interface{}, startSize, size int64, sort []string) (result []*EdbDataThsHf, err error) {
+	if utils.MgoDataCli == nil {
+		err = errors.New("mongodb连接失败")
+		return
+	}
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+	ctx := context.TODO()
+	if err != nil {
+		fmt.Println("MgoGetColl Err:", err.Error())
+		return
+	}
+	err = coll.Find(ctx, whereParams).Sort(sort...).Skip(startSize).Limit(size).All(&result)
+	if err != nil {
+		return
+	}
+
+	for _, v := range result {
+		v.DataTime = v.DataTime.In(time.Local)
+		v.CreateTime = v.CreateTime.In(time.Local)
+		v.ModifyTime = v.ModifyTime.In(time.Local)
+	}
+
+	return
+}
+
+// GetCountDataList
+// @Description:  根据条件获取数据列表总数
+// @author: Roc
+// @receiver m
+// @datetime 2024-05-07 10:29:00
+// @param whereParams interface{}
+// @return count int64
+// @return err error
+func (m *EdbDataThsHf) GetCountDataList(whereParams interface{}) (count int64, err error) {
+	if utils.MgoDataCli == nil {
+		err = errors.New("mongodb连接失败")
+		return
+	}
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+	ctx := context.TODO()
+	if err != nil {
+		fmt.Println("MgoGetColl Err:", err.Error())
+		return
+	}
+	count, err = coll.Find(ctx, whereParams).Count()
+
+	return
+}
+
+// InsertDataByColl
+// @Description: 写入单条数据(外部传入集合)
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 14:22:18
+// @param addData interface{}
+// @return err error
+func (m *EdbDataThsHf) InsertDataByColl(coll *qmgo.Collection, addData interface{}) (err error) {
+	ctx := context.TODO()
+	_, err = coll.InsertOne(ctx, addData)
+	if err != nil {
+		fmt.Println("InsertDataByColl:Err:" + err.Error())
+		return
+	}
+
+	return
+}
+
+// BatchInsertData
+// @Description: 批量写入数据
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 14:22:18
+// @param bulk int 每次请求保存的数据量
+// @param dataList []interface{}
+// @return err error
+func (m *EdbDataThsHf) BatchInsertData(bulk int, dataList []interface{}) (err error) {
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+
+	return m.BatchInsertDataByColl(coll, bulk, dataList)
+}
+
+// BatchInsertDataByColl
+// @Description: 批量写入数据(外部传入集合)
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 14:22:18
+// @param coll *qmgo.Collection
+// @param bulk int 每次请求保存的数据量
+// @param dataList []interface{}
+// @return err error
+func (m *EdbDataThsHf) BatchInsertDataByColl(coll *qmgo.Collection, bulk int, dataList []interface{}) (err error) {
+	ctx := context.TODO()
+	dataNum := len(dataList)
+	if dataNum <= 0 {
+		return
+	}
+
+	// 不设置每次保存切片数量大小,或者实际数据量小于设置的切片数量大小,那么就直接保存吧
+	if bulk <= 0 || dataNum <= bulk {
+		_, err = coll.InsertMany(ctx, dataList)
+		if err != nil {
+			fmt.Println("BatchInsertData:Err:" + err.Error())
+			return
+		}
+		return
+	}
+
+	// 分批保存
+	i := 0
+	tmpAddDataList := make([]interface{}, 0)
+	for _, v := range dataList {
+		tmpAddDataList = append(tmpAddDataList, v)
+		i++
+		if i >= bulk {
+			_, err = coll.InsertMany(ctx, tmpAddDataList)
+			if err != nil {
+				fmt.Println("BatchInsertData:Err:" + err.Error())
+				return
+			}
+			i = 0
+			tmpAddDataList = make([]interface{}, 0)
+		}
+	}
+
+	if len(tmpAddDataList) > 0 {
+		_, err = coll.InsertMany(ctx, tmpAddDataList)
+		if err != nil {
+			fmt.Println("BatchInsertData:Err:" + err.Error())
+			return
+		}
+	}
+
+	return
+}
+
+// UpdateData
+// @Description: 单条数据修改
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 15:01:51
+// @param whereParams interface{}
+// @param updateParams interface{}
+// @return err error
+func (m *EdbDataThsHf) UpdateData(whereParams, updateParams interface{}) (err error) {
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+
+	return m.UpdateDataByColl(coll, whereParams, updateParams)
+}
+
+// UpdateDataByColl
+// @Description: 单条数据修改(外部传入集合)
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 15:01:51
+// @param whereParams interface{}
+// @param updateParams interface{}
+// @return err error
+func (m *EdbDataThsHf) UpdateDataByColl(coll *qmgo.Collection, whereParams, updateParams interface{}) (err error) {
+	ctx := context.TODO()
+	err = coll.UpdateOne(ctx, whereParams, updateParams)
+	if err != nil {
+		fmt.Println("UpdateDataByColl:Err:" + err.Error())
+		return
+	}
+
+	return
+}
+
+// RemoveMany
+// @Description: 根据条件删除多条数据
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-30 13:17:02
+// @param whereParams interface{}
+// @return err error
+func (m *EdbDataThsHf) RemoveMany(whereParams interface{}) (err error) {
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+
+	return m.RemoveManyByColl(coll, whereParams)
+}
+
+// RemoveManyByColl
+// @Description: 根据条件删除多条数据(外部传入集合)
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-30 13:18:42
+// @param coll *qmgo.Collection
+// @param whereParams interface{}
+// @return err error
+func (m *EdbDataThsHf) RemoveManyByColl(coll *qmgo.Collection, whereParams interface{}) (err error) {
+	ctx := context.TODO()
+	_, err = coll.RemoveAll(ctx, whereParams)
+	if err != nil {
+		fmt.Println("RemoveManyByColl:Err:" + err.Error())
+		return
+	}
+
+	return
+}
+
+// HandleData
+// @Description: 事务处理数据
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-30 10:39:01
+// @param addDataList []AddEdbDataThsHf
+// @param updateDataList []EdbDataThsHf
+// @return result interface{}
+// @return err error
+func (m *EdbDataThsHf) HandleData(addDataList, updateDataList []EdbDataThsHf) (result interface{}, err error) {
+
+	ctx := context.TODO()
+
+	callback := func(sessCtx context.Context) (interface{}, error) {
+		// 重要:确保事务中的每一个操作,都使用传入的sessCtx参数
+
+		db := utils.MgoDataCli.Database(m.DataBaseName())
+		coll := db.Collection(m.CollectionName())
+
+		// 插入数据
+		if len(addDataList) > 0 {
+			_, err = coll.InsertMany(sessCtx, addDataList)
+			if err != nil {
+				return nil, err
+			}
+		}
+
+		// 修改
+
+		if len(updateDataList) > 0 {
+			for _, v := range updateDataList {
+				err = coll.UpdateOne(ctx, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}})
+				if err != nil {
+					fmt.Println("BatchInsertData:Err:" + err.Error())
+					return nil, err
+				}
+			}
+		}
+
+		return nil, nil
+	}
+	result, err = utils.MgoDataCli.DoTransaction(ctx, callback)
+
+	return
+}
+
+// EdbInfoMaxAndMinInfo 指标最新数据记录结构体
+//type EdbInfoMaxAndMinInfo struct {
+//	MinDate     time.Time `description:"最小日期" bson:"min_date"`
+//	MaxDate     time.Time `description:"最大日期" bson:"max_date"`
+//	MinValue    float64   `description:"最小值" bson:"min_value"`
+//	MaxValue    float64   `description:"最大值" bson:"max_value"`
+//	LatestValue float64   `description:"最新值" bson:"latest_value"`
+//	LatestDate  time.Time `description:"实际数据最新日期" bson:"latest_date"`
+//	EndValue    float64   `description:"最新值" bson:"end_value"`
+//}
+
+// GetEdbInfoMaxAndMinInfo
+// @Description: 获取当前指标的最大最小值
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-30 17:15:39
+// @param whereParams interface{}
+// @return result EdbInfoMaxAndMinInfo
+// @return err error
+func (m *EdbDataThsHf) GetEdbInfoMaxAndMinInfo(whereParams interface{}) (result EdbInfoMaxAndMinInfo, err error) {
+	if utils.MgoDataCli == nil {
+		err = errors.New("mongodb连接失败")
+		return
+	}
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+	ctx := context.TODO()
+	if err != nil {
+		fmt.Println("MgoGetColl Err:", err.Error())
+		return
+	}
+	err = coll.Aggregate(ctx, whereParams).One(&result)
+	if err != nil {
+		return
+	}
+	result.MinDate = result.MinDate.In(time.Local)
+	result.MaxDate = result.MaxDate.In(time.Local)
+	result.LatestDate = result.LatestDate.In(time.Local)
+
+	return
+}
+
+// LatestValue 指标最新数据记录结构体
+//type LatestValue struct {
+//	Value float64 `description:"值" bson:"value"`
+//}
+
+// GetLatestValue
+// @Description: 获取当前指标的最新数据记录
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-30 17:16:15
+// @param whereParams interface{}
+// @param selectParam interface{}
+// @return latestValue LatestValue
+// @return err error
+func (m *EdbDataThsHf) GetLatestValue(whereParams, selectParam interface{}) (latestValue LatestValue, err error) {
+	if utils.MgoDataCli == nil {
+		err = errors.New("mongodb连接失败")
+		return
+	}
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+	ctx := context.TODO()
+	if err != nil {
+		fmt.Println("MgoGetColl Err:", err.Error())
+		return
+	}
+
+	//var result interface{}
+	//err = coll.Find(ctx, whereParams).Select(selectParam).One(&result)
+	err = coll.Find(ctx, whereParams).Select(selectParam).One(&latestValue)
+	return
+}

+ 114 - 156
models/predict_edb_data_calculate_bp.go

@@ -47,6 +47,7 @@ func SavePredictCalculateBp(req *EdbInfoCalculateBatchSaveReq, fromEdbInfo *EdbI
 		edbInfo.CalculateFormula = req.Formula
 		edbInfo.EdbType = 2
 		edbInfo.Sort = GetAddEdbMaxSortByClassifyId(req.ClassifyId, utils.PREDICT_EDB_INFO_TYPE)
+		edbInfo.EmptyType = req.EmptyType
 		newEdbInfoId, tmpErr := to.Insert(edbInfo)
 		if tmpErr != nil {
 			err = tmpErr
@@ -91,11 +92,12 @@ func SavePredictCalculateBp(req *EdbInfoCalculateBatchSaveReq, fromEdbInfo *EdbI
 			edbInfo.Unit = req.Unit
 			edbInfo.EdbNameSource = req.EdbName
 		}
-
+		oldEmptyType := edbInfo.EmptyType
 		edbInfo.Frequency = req.Frequency
 		edbInfo.ClassifyId = req.ClassifyId
+		edbInfo.EmptyType = req.EmptyType
 		edbInfo.ModifyTime = time.Now()
-		_, err = to.Update(edbInfo, "EdbName", "EdbNameSource", "Frequency", "Unit", "ClassifyId", "ModifyTime", "EdbNameEn", "UnitEn")
+		_, err = to.Update(edbInfo, "EdbName", "EdbNameSource", "Frequency", "Unit", "ClassifyId", "ModifyTime", "EdbNameEn", "UnitEn", "EmptyType")
 		if err != nil {
 			return
 		}
@@ -111,7 +113,7 @@ func SavePredictCalculateBp(req *EdbInfoCalculateBatchSaveReq, fromEdbInfo *EdbI
 			err = errors.New("判断指标是否改变失败,Err:" + e.Error())
 			return
 		}
-		if count > 0 { // 指标未被替换,无需重新计算
+		if count > 0 && oldEmptyType == edbInfo.EmptyType { // 指标未被替换,无需重新计算
 			return
 		}
 
@@ -154,12 +156,12 @@ func SavePredictCalculateBp(req *EdbInfoCalculateBatchSaveReq, fromEdbInfo *EdbI
 	}
 
 	//计算数据
-	latestDateStr, latestValue, err = refreshAllPredictCalculateBp(to, edbInfo.EdbInfoId, edbInfo.Source, edbInfo.SubSource, fromEdbInfo, edbInfo.EdbCode, "", "", 0)
+	latestDateStr, latestValue, err = refreshAllPredictCalculateBp(to, edbInfo.EdbInfoId, edbInfo.Source, edbInfo.SubSource, fromEdbInfo, edbInfo.EdbCode, "", "", 0, edbInfo.EmptyType)
 
 	return
 }
 
-func RefreshAllPredictCalculateBp(edbInfoId, source, subSource int, fromEdbInfo *EdbInfo, edbCode, startDate, endDate string) (latestDateStr string, latestValue float64, err error) {
+func RefreshAllPredictCalculateBp(edbInfoId, source, subSource int, fromEdbInfo *EdbInfo, edbCode, startDate, endDate string, emptyType int) (latestDateStr string, latestValue float64, err error) {
 	o := orm.NewOrm()
 	to, err := o.Begin()
 	if err != nil {
@@ -175,14 +177,16 @@ func RefreshAllPredictCalculateBp(edbInfoId, source, subSource int, fromEdbInfo
 	}()
 
 	// 计算数据
-	latestDateStr, latestValue, err = refreshAllPredictCalculateBp(to, edbInfoId, source, subSource, fromEdbInfo, edbCode, startDate, endDate, 1)
+	latestDateStr, latestValue, err = refreshAllPredictCalculateBp(to, edbInfoId, source, subSource, fromEdbInfo, edbCode, startDate, endDate, 0, emptyType)
 	return
 }
 
 // refreshAllPredictCalculateBp 刷新变频数据
-func refreshAllPredictCalculateBp(to orm.TxOrmer, edbInfoId, source, subSource int, fromEdbInfo *EdbInfo, edbCode, startDate, endDate string, order int) (latestDateStr string, latestValue float64, err error) {
+func refreshAllPredictCalculateBp(to orm.TxOrmer, edbInfoId, source, subSource int, fromEdbInfo *EdbInfo, edbCode, startDate, endDate string, order, emptyType int) (latestDateStr string, latestValue float64, err error) {
 	edbInfoIdStr := strconv.Itoa(edbInfoId)
-	// 获取关联指标数据
+	//计算数据
+
+	//获取来源指标的数据
 	dataList, err := GetPredictEdbDataListAllByStartDate(fromEdbInfo, order, "")
 	if err != nil {
 		return
@@ -200,190 +204,144 @@ func refreshAllPredictCalculateBp(to orm.TxOrmer, edbInfoId, source, subSource i
 	}
 	fmt.Println("source:", source)
 
-	//获取频指标所有数据
+	//获取频指标所有数据
 	existDataList, err := GetAllEdbDataListByTo(to, edbInfoId, source, subSource)
 	if err != nil {
 		return
 	}
 	//计算指标的map
 	existDataMap := make(map[string]*EdbData, 0)
-
+	removeDateMap := make(map[string]struct{})
 	addSql := ` INSERT INTO edb_data_predict_calculate_bp(edb_info_id,edb_code,data_time,value,create_time,modify_time,data_timestamp) values `
-	var isAdd bool
 
-	var lastValue float64   //最后数据的值(float64)
-	var lastValueStr string //最后数据的值(string)
+	var isAdd bool
 	//待删除的日期
 	removeDateList := make([]string, 0)
-	if len(existDataList) > 0 {
-		//第一个已经入库的日期
-		firstExistDataTimeStr := existDataList[0].DataTime //计算指标数据第一条的日期字符串
-		lenDateArr := len(dateArr)
-		if lenDateArr > 0 {
-			firstFromDataTimeStr := dateArr[0]                                                                     //来源数据第一条的日期字符串
-			firstExistDataTime, _ := time.ParseInLocation(utils.FormatDate, firstExistDataTimeStr, time.Local)     //计算指标数据第一条的日期(time类型)
-			firstFromDataTime, _ := time.ParseInLocation(utils.FormatDate, firstFromDataTimeStr, time.Local)       //来源数据第一条的日期(time类型)
-			fromEndDateStr := dateArr[lenDateArr-1]                                                                //当天日期字符串
-			fromEndDate, _ := time.ParseInLocation(utils.FormatDate, fromEndDateStr, firstFromDataTime.Location()) //当天日期(time类型)
+	if len(existDataList) > 0 && len(dateArr) == 0 {
+		//如果没有来源指标数据,那么已经入库的计算指标数据需要全部删除
+		tableName := GetEdbDataTableName(source, subSource)
+		sql := fmt.Sprintf(` DELETE FROM %s WHERE edb_info_id = ?`, tableName)
+		_, err = to.Raw(sql, edbInfoId).Exec()
+		if err != nil {
+			err = fmt.Errorf("删除所有的升频指标数据失败,Err:" + err.Error())
+			return
+		}
+		return
+	}
 
-			lastValue = fromDataMap[firstFromDataTimeStr]
-			lastValueStr = decimal.NewFromFloat(lastValue).String()
-			//第一步: 判断来源指标的开始时间与计算指标的开始时间是否相等,相等的话,那么就不需要对两个时间之间的数据做处理
-			if firstExistDataTimeStr != firstFromDataTimeStr {
-				if firstExistDataTime.Before(firstFromDataTime) { //如果计算指标第一条数据的开始时间 早于 来源指标的第一条开始时间,那么需要对两个时间之间的计算指标数据做 删除处理
-					for _, v := range existDataList {
-						if v.DataTime == firstFromDataTimeStr {
-							if tmpLastValue, ok := fromDataMap[firstFromDataTimeStr]; ok { //来源指标当天的数据
-								lastValue = tmpLastValue
-								lastValueStr = decimal.NewFromFloat(lastValue).String()
-							}
-							break
-						}
-						removeDateList = append(removeDateList, v.DataTime)
-					}
-				} else {
-					for _, v := range dateArr { //如果计算指标第一条数据的开始时间 晚于 来源指标的第一条开始时间,那么需要对两个时间之间的计算指标数据做 新增处理
-						vDataTime, _ := time.ParseInLocation(utils.FormatDate, v, time.Local) //当前日期(time类型)
-						if firstExistDataTime.Equal(vDataTime) || firstExistDataTime.Before(vDataTime) {
-							if tmpLastValue, ok := fromDataMap[v]; ok { //来源指标当天的数据
-								lastValue = tmpLastValue
-								lastValueStr = decimal.NewFromFloat(lastValue).String()
-							}
-							break
-						}
+	existMap := make(map[string]string)
 
-						currentDate, _ := time.ParseInLocation(utils.FormatDate, v, time.Local)
-						timestamp := currentDate.UnixNano() / 1e6
-						timestampStr := fmt.Sprintf("%d", timestamp)
-						addSql += GetAddSql(edbInfoIdStr, edbCode, v, timestampStr, lastValueStr)
-						// 实际数据的值
-						if fromEdbInfo.LatestDate == v {
-							latestValue = lastValue
-						}
-						isAdd = true
-					}
-				}
+	dataLen := len(dataList)
+	//第三步: 已经入库的数据处理
+	for _, v := range existDataList {
+		existDataMap[v.DataTime] = v
+		removeDateMap[v.DataTime] = struct{}{}
+	}
+	for i := 0; i < dataLen; i++ {
+		//当期
+		currentItem := dataList[i]
+		var prevItem *EdbInfoSearchData
+		if emptyType == 3 { //3后值填充,其余前值填充
+			if i >= 1 {
+				prevItem = dataList[i-1]
 			}
-
-			//第二步 剩余数据每天修改
-
-			day := int(fromEndDate.Sub(firstExistDataTime).Hours() / float64(24))
-
-			//第三步: 已经入库的数据处理
-			for _, v := range existDataList {
-				existDataMap[v.DataTime] = v
+		}
+		currentDate, _ := time.ParseInLocation(utils.FormatDate, currentItem.DataTime, time.Local)
+		var day int
+		var preItem *EdbInfoSearchData
+		var preDate time.Time
+		if i == 0 {
+			if emptyType == 3 { //后值填充
+				day = 0 //最新的时间就是来源指标的最新日期
+				preDate = currentDate
+			} else {
+				day = int(time.Now().Sub(currentDate).Hours() / float64(24))
+				preDate = time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day(), 0, 0, 0, 0, time.Local)
 			}
-
-			for k := day; k >= 0; k-- {
-				needDay := fromEndDate.AddDate(0, 0, -k)
-				needDayStr := needDay.Format(utils.FormatDate)
-				tmpExistData, ok := existDataMap[needDayStr]
-				if ok {
-					if tmpLastValue, ok := fromDataMap[tmpExistData.DataTime]; ok { //来源指标当天的数据
-						lastValue = tmpLastValue
-						//lastValueStr = decimal.NewFromFloat(lastValue).String()
-						lastValueStr = fmt.Sprintf("%.4f", lastValue)
-					}
-					if fromEdbInfo.LatestDate == tmpExistData.DataTime {
-						latestValue = lastValue
+		} else {
+			j := i - 1
+			if j < dataLen {
+				preItem = dataList[j]
+				preDate, _ = time.ParseInLocation(utils.FormatDate, preItem.DataTime, time.Local)
+				day = int(preDate.Sub(currentDate).Hours() / float64(24))
+				utils.FileLog.Info("preItem.DataTime:" + preItem.DataTime + ";currentItem.DataTime" + currentItem.DataTime)
+			}
+		}
+		for k := 0; k <= day; k++ {
+			needDay := preDate.AddDate(0, 0, -k)
+			needDayStr := needDay.Format(utils.FormatDate)
+			delete(removeDateMap, needDayStr)
+			existKey := edbCode + needDayStr
+			if _, ok := existMap[existKey]; !ok {
+				timestamp := needDay.UnixNano() / 1e6
+				timestampStr := fmt.Sprintf("%d", timestamp)
+				valStr := decimal.NewFromFloat(currentItem.Value).String()
+				if prevItem != nil && needDayStr != currentItem.DataTime {
+					valStr = decimal.NewFromFloat(prevItem.Value).String()
+				}
+				if fromEdbInfo.LatestDate == needDayStr {
+					if prevItem != nil && needDayStr != currentItem.DataTime {
+						latestValue = prevItem.Value
+					} else {
+						latestValue = currentItem.Value
 					}
+				}
+
+				tmpExistData, ok2 := existDataMap[needDayStr]
+				if !ok2 {
+					addSql += GetAddSql(edbInfoIdStr, edbCode, needDayStr, timestampStr, valStr)
+					isAdd = true
+				} else {
 					//如果对应的值不匹配
-					if tmpExistData.Value != lastValueStr {
-						err = ModifyEdbDataById(source, subSource, tmpExistData.EdbDataId, lastValueStr)
+					if tmpExistData.Value != valStr {
+						err = ModifyEdbDataById(source, subSource, tmpExistData.EdbDataId, valStr)
 						if err != nil {
 							return
 						}
 					}
-				} else {
-					timestamp := needDay.UnixNano() / 1e6
-					timestampStr := fmt.Sprintf("%d", timestamp)
-					addSql += GetAddSql(edbInfoIdStr, edbCode, needDayStr, timestampStr, lastValueStr)
-					if fromEdbInfo.LatestDate == needDayStr {
-						latestValue = lastValue
-					}
-					isAdd = true
 				}
-			}
-		} else {
-			//如果没有来源指标数据,那么已经入库的计算指标数据需要全部删除
-			tableName := GetEdbDataTableName(source, subSource)
-			sql := fmt.Sprintf(` DELETE FROM %s WHERE edb_info_id = ?`, tableName)
-			_, err = to.Raw(sql, edbInfoId).Exec()
-			if err != nil {
-				err = fmt.Errorf("删除所有的变频指标数据失败,Err:" + err.Error())
-				return
-			}
 
-			//for _, v := range existDataList {
-			//	removeDateList = append(removeDateList, v.DataTime)
-			//}
+			}
+			existMap[existKey] = needDayStr
 		}
-	} else {
-		existMap := make(map[string]string)
-		dataLen := len(dataList)
-
-		for i := 0; i < dataLen; i++ {
-			//当期
-			currentItem := dataList[i]
+		existKey := edbCode + currentItem.DataTime
+		if _, ok := existMap[existKey]; !ok {
 			currentDate, _ := time.ParseInLocation(utils.FormatDate, currentItem.DataTime, time.Local)
-			var day int
-			var preItem *EdbInfoSearchData
-			var preDate time.Time
-			if i == 0 {
-				day = int(time.Now().Sub(currentDate).Hours() / float64(24))
-				preDate = time.Now()
-			} else {
-				j := i - 1
-				if j < dataLen {
-					preItem = dataList[j]
-					preDate, _ = time.ParseInLocation(utils.FormatDate, preItem.DataTime, time.Local)
-					day = int(preDate.Sub(currentDate).Hours() / float64(24))
-					utils.FileLog.Info("preItem.DataTime:" + preItem.DataTime + ";currentItem.DataTime" + currentItem.DataTime)
-				}
+			timestamp := currentDate.UnixNano() / 1e6
+			timestampStr := fmt.Sprintf("%d", timestamp)
+			valStr := decimal.NewFromFloat(currentItem.Value).String()
+			if fromEdbInfo.LatestDate == currentItem.DataTime {
+				latestValue = currentItem.Value
 			}
-			for k := 0; k <= day; k++ {
-				needDay := preDate.AddDate(0, 0, -k)
-				needDayStr := needDay.Format(utils.FormatDate)
-				existKey := edbCode + needDayStr
-				if _, ok := existMap[existKey]; !ok {
-					timestamp := needDay.UnixNano() / 1e6
-					timestampStr := fmt.Sprintf("%d", timestamp)
-					valStr := decimal.NewFromFloat(currentItem.Value).String()
-					addSql += GetAddSql(edbInfoIdStr, edbCode, needDayStr, timestampStr, valStr)
-					if fromEdbInfo.LatestDate == needDayStr {
-						latestValue = currentItem.Value
-					}
-					isAdd = true
-				}
-				existMap[existKey] = needDayStr
-			}
-			existKey := edbCode + currentItem.DataTime
-			if _, ok := existMap[existKey]; !ok {
-				currentDate, _ := time.ParseInLocation(utils.FormatDate, currentItem.DataTime, time.Local)
-				timestamp := currentDate.UnixNano() / 1e6
-				timestampStr := fmt.Sprintf("%d", timestamp)
-				valStr := decimal.NewFromFloat(currentItem.Value).String()
+			tmpExistData, ok2 := existDataMap[currentItem.DataTime]
+			if !ok2 {
 				addSql += GetAddSql(edbInfoIdStr, edbCode, currentItem.DataTime, timestampStr, valStr)
-				if fromEdbInfo.LatestDate == currentItem.DataTime {
-					latestValue = currentItem.Value
-				}
 				isAdd = true
+			} else {
+				//如果对应的值不匹配
+				if tmpExistData.Value != valStr {
+					err = ModifyEdbDataById(source, subSource, tmpExistData.EdbDataId, valStr)
+					if err != nil {
+						return
+					}
+				}
 			}
-			existMap[existKey] = currentItem.DataTime
+
 		}
+		existMap[existKey] = currentItem.DataTime
 	}
 
+	for k, _ := range removeDateMap {
+		removeDateList = append(removeDateList, k)
+	}
 	// 删除不需要的指标数据
 	if len(removeDateList) > 0 {
-		removeDateStr := strings.Join(removeDateList, `","`)
-		removeDateStr = `"` + removeDateStr + `"`
 		//如果拼接指标变更了,那么需要删除所有的指标数据
 		tableName := GetEdbDataTableName(source, subSource)
-		sql := fmt.Sprintf(` DELETE FROM %s WHERE edb_info_id = ? and data_time in (%s) `, tableName, removeDateStr)
-
-		_, err = to.Raw(sql, edbInfoId).Exec()
+		sql := fmt.Sprintf(` DELETE FROM %s WHERE edb_info_id = ? and data_time in (`+utils.GetOrmInReplace(len(removeDateList))+`) `, tableName)
+		_, err = to.Raw(sql, edbInfoId, removeDateList).Exec()
 		if err != nil {
-			err = fmt.Errorf("删除不存在的频指标数据失败,Err:" + err.Error())
+			err = fmt.Errorf("删除不存在的频指标数据失败,Err:" + err.Error())
 			return
 		}
 	}

+ 6 - 5
services/base_from_baiinfo.go

@@ -91,12 +91,13 @@ func HandleBaiinfoIndex(baseFilePath, terminalCode, renameFilePath, indexName, i
 
 	// 遍历excel数据,然后跟现有的数据做校验,不存在则入库
 	for date, value := range excelDataMap {
+		dateT, tmpErr := utils.DealExcelDate(date)
+		if tmpErr != nil {
+			fmt.Println("time.ParseInLocation Err:" + tmpErr.Error())
+			return
+		}
+		date = dateT.Format(utils.FormatDate)
 		if findData, ok := exitDataMap[date]; !ok {
-			_, err := time.ParseInLocation(utils.FormatDate, date, time.Local)
-			if err != nil {
-				fmt.Println("time.ParseInLocation Err:" + err.Error())
-				return
-			}
 			if !strings.Contains(value, "#N/A") {
 				var saveDataTime time.Time
 				if strings.Contains(date, "00:00:00") {

+ 4 - 1
services/base_from_mysteel_chemical.go

@@ -628,12 +628,12 @@ func MySteelChemicalPost(postUrl, hType string, postData []byte) (body []byte, e
 }
 
 func RefreshDataFromMysteelChemical(edbCode, startDate, endDate string) (err error) {
-
 	indexObj := &models.BaseFromMysteelChemicalIndex{}
 	tmpIndex, err := indexObj.GetIndexItem(edbCode)
 	if err != nil {
 		return
 	}
+	alarm_msg.SendAlarmMsg(fmt.Sprintf("开始刷新钢联化工接口数据, %s", edbCode), 1)
 
 	terminal, err := GetTerminal(utils.DATA_SOURCE_MYSTEEL_CHEMICAL, tmpIndex.TerminalCode)
 	if err != nil {
@@ -658,6 +658,7 @@ func RefreshDataFromMysteelChemical(edbCode, startDate, endDate string) (err err
 			err = errors.New(resp.Message)
 			return
 		}
+		alarm_msg.SendAlarmMsg("获取钢联化工接口数据", 1)
 
 		dataObj := new(models.BaseFromMysteelChemicalData)
 		exitDataList, er := dataObj.GetIndexDataList(edbCode)
@@ -689,6 +690,7 @@ func RefreshDataFromMysteelChemical(edbCode, startDate, endDate string) (err err
 		}
 		// 因为只有一个指标,所以取第一个就可以了
 		items := mysteelChemicalDatas[0]
+		alarm_msg.SendAlarmMsg("钢联化工接口数据转化成功", 1)
 		for _, v := range items {
 			dateStr := v.DataTime.Format(utils.FormatDate)
 			if findData, ok := existDataMap[dateStr]; !ok {
@@ -707,6 +709,7 @@ func RefreshDataFromMysteelChemical(edbCode, startDate, endDate string) (err err
 				}
 			}
 		}
+		alarm_msg.SendAlarmMsg("开始添加数据", 1)
 		err = dataObj.AddV2(addItems)
 		if err != nil {
 			return

+ 25 - 8
services/base_from_pcsg.go

@@ -23,9 +23,11 @@ type PCSGBloombergApiReq struct {
 }
 
 type PCSGBloombergTask struct {
-	TaskKey   string `json:"TaskKey"`
-	Frequency string `json:"Frequency"`
-	VCode     bool   `json:"VCode"`
+	TaskKey         string `json:"TaskKey"`
+	Frequency       string `json:"Frequency"`
+	VCode           bool   `json:"VCode"`
+	ExtraLetter     string `json:"ExtraLetter"`
+	IndexNamePrefix string `json:"IndexNamePrefix"`
 }
 
 // LoadPCSGBloombergTask 加载配置
@@ -110,7 +112,7 @@ func GetPCSGBloombergGeneralIndexFromBridge(params PCSGBloombergApiReq) (indexes
 }
 
 // PCSGWrite2BaseBloomberg 写入彭博数据源
-func PCSGWrite2BaseBloomberg(indexes []models.BaseFromBloombergApiIndexAndData, isVCode bool) (err error) {
+func PCSGWrite2BaseBloomberg(indexes []models.BaseFromBloombergApiIndexAndData, isVCode bool, extraLetter, namePrefix string) (err error) {
 	defer func() {
 		if err != nil {
 			tips := fmt.Sprintf("PCSGWrite2BaseBloomberg-写入彭博数据源失败, err: %s", err.Error())
@@ -118,6 +120,11 @@ func PCSGWrite2BaseBloomberg(indexes []models.BaseFromBloombergApiIndexAndData,
 			go alarm_msg.SendAlarmMsg(tips, 3)
 		}
 	}()
+	// 这里挡一下...万一没限制加进库了不好删...
+	if isVCode && extraLetter == "" {
+		err = fmt.Errorf("中间字母有误")
+		return
+	}
 
 	for _, v := range indexes {
 		if v.IndexCode == "" {
@@ -128,7 +135,7 @@ func PCSGWrite2BaseBloomberg(indexes []models.BaseFromBloombergApiIndexAndData,
 			continue
 		}
 		if isVCode {
-			v.IndexCode = utils.InsertStr2StrIdx(v.IndexCode, " ", 1, "V")
+			v.IndexCode = utils.InsertStr2StrIdx(v.IndexCode, " ", 1, extraLetter)
 		}
 
 		// 指标是否存在
@@ -138,11 +145,17 @@ func PCSGWrite2BaseBloomberg(indexes []models.BaseFromBloombergApiIndexAndData,
 			return
 		}
 
+		// 指标名称+前缀
+		indexName := v.IndexName
+		if indexName != "" && namePrefix != "" {
+			indexName = fmt.Sprint(namePrefix, indexName)
+		}
+
 		// 新增指标
 		if index == nil {
 			newIndex := new(models.BaseFromBloombergIndex)
 			newIndex.IndexCode = v.IndexCode
-			newIndex.IndexName = v.IndexName
+			newIndex.IndexName = indexName
 			newIndex.Unit = v.Unit
 			newIndex.Source = utils.DATA_SOURCE_BLOOMBERG
 			newIndex.Frequency = v.Frequency
@@ -156,7 +169,7 @@ func PCSGWrite2BaseBloomberg(indexes []models.BaseFromBloombergApiIndexAndData,
 		} else {
 			// 无指标名称的情况下更新指标基础信息
 			if index.IndexName == "" {
-				index.IndexName = v.IndexName
+				index.IndexName = indexName
 				index.Unit = v.Unit
 				index.Frequency = v.Frequency
 				index.ModifyTime = time.Now().Local()
@@ -240,7 +253,11 @@ func PCSGWrite2BaseBloomberg(indexes []models.BaseFromBloombergApiIndexAndData,
 				return
 			}
 			if edb != nil {
-				logic.RefreshBaseEdbInfo(edb, ``)
+				_, _, e = logic.RefreshBaseEdbInfo(edb, ``)
+				if e != nil {
+					utils.FileLog.Info(fmt.Sprintf("Bloomberg RefreshBaseEdbInfo, edbCode: %s, err: %v", index.IndexCode, e))
+					return
+				}
 			}
 		}()
 	}

+ 4 - 4
services/base_from_ths_ds_http.go

@@ -20,15 +20,15 @@ func getEdbDataFromThsDsHttp(stockCode, edbCode, startDate, endDate, thsRefreshT
 	// 分割字符串
 	paramList := strings.Split(edbCode, ",")
 
-	// 创建一个包含映射的切片
-	var indipara []map[string]interface{}
-
-	// 额外参数
 	var extraArr []string
 	if extraPars != "" {
+		extraPars = strings.TrimSpace(extraPars)
 		extraArr = strings.Split(extraPars, ",")
 	}
 
+	// 创建一个包含映射的切片
+	var indipara []map[string]interface{}
+
 	// 遍历分割后的参数列表
 	for _, param := range paramList {
 		// 创建一个映射

+ 146 - 13
services/base_from_ths_hf.go

@@ -4,11 +4,13 @@ import (
 	"encoding/json"
 	"eta/eta_index_lib/logic"
 	"eta/eta_index_lib/models"
+	"eta/eta_index_lib/models/mgo"
 	"eta/eta_index_lib/services/alarm_msg"
 	"eta/eta_index_lib/utils"
 	"fmt"
 	"github.com/rdlucklib/rdluck_tools/http"
 	"github.com/shopspring/decimal"
+	"go.mongodb.org/mongo-driver/bson"
 	"net/url"
 	"strings"
 	"time"
@@ -309,11 +311,11 @@ func getEdbDataFromThsHfApp(thsParams models.ThsHfSearchEdbReq, num int, serverU
 	return
 }
 
-// WriteRefreshBaseThsHfIndex 源指标刷新
-func WriteRefreshBaseThsHfIndex(indexItem *models.BaseFromThsHfIndex, codeWithData *models.ThsHfIndexWithData, startTime string) (err error) {
+// RefreshThsHfBaseIndex 源指标刷新
+func RefreshThsHfBaseIndex(indexItem *models.BaseFromThsHfIndex, codeWithData *models.ThsHfIndexWithData, startTime string) (err error) {
 	defer func() {
 		if err != nil {
-			tips := fmt.Sprintf("WriteRefreshBaseThsHfIndex-更新失败, %v", err)
+			tips := fmt.Sprintf("RefreshThsHfBaseIndex-更新失败, %v", err)
 			utils.FileLog.Info(tips)
 			go alarm_msg.SendAlarmMsg(tips, 3)
 		}
@@ -351,8 +353,7 @@ func WriteRefreshBaseThsHfIndex(indexItem *models.BaseFromThsHfIndex, codeWithDa
 	if len(originData) > 0 {
 		// unicode去重
 		for _, d := range originData {
-			uni := utils.MD5(fmt.Sprint(indexItem.IndexCode, d.DataTime.Format("2006-01-02 15:04")))
-			dateExist[uni] = d
+			dateExist[d.UniqueCode] = d
 		}
 	}
 
@@ -360,7 +361,7 @@ func WriteRefreshBaseThsHfIndex(indexItem *models.BaseFromThsHfIndex, codeWithDa
 	updateData := make([]*models.BaseFromThsHfData, 0)
 	insertData := make([]*models.BaseFromThsHfData, 0)
 	for _, d := range codeWithData.IndexData {
-		uni := utils.MD5(fmt.Sprint(indexItem.IndexCode, d.DataTime.Format("2006-01-02 15:04")))
+		uni := utils.MD5(fmt.Sprint(indexItem.IndexCode, d.DataTime.Format(utils.FormatDateTimeMinute)))
 		origin := dateExist[uni]
 
 		// unicode检验是否存在
@@ -389,6 +390,7 @@ func WriteRefreshBaseThsHfIndex(indexItem *models.BaseFromThsHfIndex, codeWithDa
 		newData.Value = newVal
 		newData.CreateTime = time.Now()
 		newData.ModifyTime = time.Now()
+		newData.UniqueCode = uni
 		newData.DataTimestamp = d.DataTime.UnixNano() / 1e6
 		insertData = append(insertData, newData)
 	}
@@ -422,16 +424,147 @@ func WriteRefreshBaseThsHfIndex(indexItem *models.BaseFromThsHfIndex, codeWithDa
 
 	// 同步刷新指标库
 	go func() {
-		_ = RefreshThsHfIndexFromBase(indexItem.IndexCode, startTime)
+		_ = RefreshEdbFromThsHfBaseIndex(indexItem.IndexCode, startTime)
 	}()
 	return
 }
 
-// RefreshThsHfIndexFromBase 根据源指标刷新指标库
-func RefreshThsHfIndexFromBase(baseCode, startTime string) (err error) {
+// RefreshThsHfBaseIndexMgo 源指标刷新-Mongo
+func RefreshThsHfBaseIndexMgo(indexItem *models.BaseFromThsHfIndex, codeWithData *models.ThsHfIndexWithData, startTime string) (err error) {
 	defer func() {
 		if err != nil {
-			tips := fmt.Sprintf("RefreshThsHfIndexFromBase-刷新指标库失败, %v", err)
+			tips := fmt.Sprintf("RefreshThsHfBaseIndexMgo-更新失败, %v", err)
+			utils.FileLog.Info(tips)
+			go alarm_msg.SendAlarmMsg(tips, 3)
+		}
+	}()
+	if indexItem == nil {
+		err = fmt.Errorf("指标不存在")
+		return
+	}
+	if len(codeWithData.IndexData) == 0 {
+		return
+	}
+	mogDataObj := new(mgo.BaseFromThsHfData)
+
+	// 获取已存在的所有数据
+	existCond := bson.M{
+		"index_code": indexItem.IndexCode,
+	}
+	if startTime != "" {
+		st, e := time.ParseInLocation(utils.FormatDateTime, startTime, time.Local)
+		if e != nil {
+			err = fmt.Errorf("start time parse err: %v", e)
+			return
+		}
+		existCond["data_time"] = bson.M{
+			"$gte": st,
+		}
+	}
+	exitDataList, e := mogDataObj.GetAllDataList(existCond, []string{"data_time"})
+	if e != nil {
+		err = fmt.Errorf("GetAllDataList err: %v", e)
+		return
+	}
+
+	// 已经存在的数据集
+	exitDataMap := make(map[string]*mgo.BaseFromThsHfData)
+	for _, v := range exitDataList {
+		exitDataMap[v.UniqueCode] = v
+	}
+
+	// 待添加的数据集
+	addDataList := make([]interface{}, 0)
+	updateDataList := make([]mgo.BaseFromThsHfData, 0)
+	for _, data := range codeWithData.IndexData {
+		strNewVal := decimal.NewFromFloat(data.Value).Round(4).String()
+		di, _ := decimal.NewFromString(strNewVal)
+		newVal, _ := di.Float64()
+
+		// unicode检验是否存在
+		uni := utils.MD5(fmt.Sprint(indexItem.IndexCode, data.DataTime.Format(utils.FormatDateTimeMinute)))
+		findData, ok := exitDataMap[uni]
+		if !ok {
+			addDataList = append(addDataList, mgo.BaseFromThsHfData{
+				BaseFromThsHfIndexId: int64(indexItem.BaseFromThsHfIndexId),
+				IndexCode:            indexItem.IndexCode,
+				DataTime:             data.DataTime,
+				Value:                newVal,
+				UniqueCode:           uni,
+				CreateTime:           time.Now(),
+				ModifyTime:           time.Now(),
+				DataTimestamp:        data.DataTime.UnixNano() / 1e6,
+			})
+			continue
+		}
+
+		// 值不匹配,修改数据
+		strExistVal := decimal.NewFromFloat(findData.Value).Round(4).String()
+		if strNewVal == strExistVal {
+			continue
+		}
+		findData.Value = newVal
+		updateDataList = append(updateDataList, *findData)
+	}
+
+	// 入库
+	{
+		coll := mogDataObj.GetCollection()
+		if len(addDataList) > 0 {
+			if e = mogDataObj.BatchInsertDataByColl(coll, 500, addDataList); e != nil {
+				err = fmt.Errorf("BatchInsertDataByColl, err: %v", e)
+				return
+			}
+		}
+
+		if len(updateDataList) > 0 {
+			for _, v := range updateDataList {
+				if e = mogDataObj.UpdateDataByColl(coll, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}}); e != nil {
+					err = fmt.Errorf("UpdateDataByColl, err: %v", e)
+					return
+				}
+			}
+		}
+	}
+
+	// 修改最大最小日期
+	minMax, err := indexItem.GetEdbInfoMaxAndMinInfo(indexItem.IndexCode)
+	if err != nil {
+		return
+	}
+	if err == nil && minMax != nil {
+		minDate, e := time.ParseInLocation(utils.FormatDateTime, minMax.MinDate, time.Local)
+		if e != nil {
+			err = fmt.Errorf("源数据最小日期有误, MinDate: %s, %v", minMax.MinDate, e)
+			return
+		}
+		maxDate, e := time.ParseInLocation(utils.FormatDateTime, minMax.MaxDate, time.Local)
+		if e != nil {
+			err = fmt.Errorf("源数据最大日期有误, MaxDate: %s, %v", minMax.MaxDate, e)
+			return
+		}
+		indexItem.StartDate = minDate
+		indexItem.EndDate = maxDate
+		indexItem.ModifyTime = time.Now().Local()
+		updateCols := []string{indexItem.Cols().StartDate, indexItem.Cols().EndDate, indexItem.Cols().ModifyTime}
+		if e = indexItem.Update(updateCols); e != nil {
+			err = fmt.Errorf("更新源指标开始结束时间失败, %v", e)
+			return
+		}
+	}
+
+	// 同步刷新指标库
+	go func() {
+		_ = RefreshEdbFromThsHfBaseIndex(indexItem.IndexCode, startTime)
+	}()
+	return
+}
+
+// RefreshEdbFromThsHfBaseIndex 根据源指标刷新指标库
+func RefreshEdbFromThsHfBaseIndex(baseCode, startTime string) (err error) {
+	defer func() {
+		if err != nil {
+			tips := fmt.Sprintf("RefreshEdbFromThsHfBaseIndex-刷新指标库失败, %v", err)
 			utils.FileLog.Info(tips)
 			go alarm_msg.SendAlarmMsg(tips, 3)
 		}
@@ -488,13 +621,13 @@ func RefreshThsHfIndexFromBase(baseCode, startTime string) (err error) {
 
 		edb := codeEdb[v.EdbCode]
 		if edb == nil {
-			utils.FileLog.Info(fmt.Sprintf("RefreshThsHfIndexFromBase-指标信息有误, EdbCode: %s", v.EdbCode))
+			utils.FileLog.Info(fmt.Sprintf("RefreshEdbFromThsHfBaseIndex-指标信息有误, EdbCode: %s", v.EdbCode))
 			continue
 		}
 
 		// 刷新指标
 		if e := thsOb.Refresh(edb, codeMapping[v.EdbCode], startTime); e != nil {
-			utils.FileLog.Info(fmt.Sprintf("RefreshThsHfIndexFromBase-刷新指标失败, %v", e))
+			utils.FileLog.Info(fmt.Sprintf("RefreshEdbFromThsHfBaseIndex-刷新指标失败, %v", e))
 			_ = utils.Rc.Delete(cacheKey)
 			continue
 		}
@@ -502,7 +635,7 @@ func RefreshThsHfIndexFromBase(baseCode, startTime string) (err error) {
 		// 更新指标最值
 		e, _ = models.UnifiedModifyEdbInfoMaxAndMinInfo(edb)
 		if e != nil {
-			utils.FileLog.Info(fmt.Sprintf("RefreshThsHfIndexFromBase-更新指标最值失败, %v", e))
+			utils.FileLog.Info(fmt.Sprintf("RefreshEdbFromThsHfBaseIndex-更新指标最值失败, %v", e))
 			_ = utils.Rc.Delete(cacheKey)
 			continue
 		}

+ 52 - 9
static/pcsg_task.json

@@ -1,22 +1,65 @@
 [
   {
-    "TaskKey": "IDpcsgDailyRun4",
+    "TaskKey": "IDpcsgDailyRunHistU2",
     "Frequency": "日度",
-    "VCode": false
+    "VCode": false,
+    "ExtraLetter": "",
+    "IndexNamePrefix": ""
   },
   {
-    "TaskKey": "IDpcsgDailyRun5",
+    "TaskKey": "IDpcsgDailyRunHist4",
     "Frequency": "日度",
-    "VCode": true
+    "VCode": false,
+    "ExtraLetter": "",
+    "IndexNamePrefix": ""
   },
   {
-    "TaskKey": "IDpcsgMonthRun2",
-    "Frequency": "月度",
-    "VCode": false
+    "TaskKey": "IDpcsgDailyRunHist1",
+    "Frequency": "日度",
+    "VCode": false,
+    "ExtraLetter": "",
+    "IndexNamePrefix": ""
   },
   {
-    "TaskKey": "IDpcsgDailyRunHist1",
+    "TaskKey": "IDpcsgDailyRunHist2",
     "Frequency": "日度",
-    "VCode": false
+    "VCode": false,
+    "ExtraLetter": "",
+    "IndexNamePrefix": ""
+  },
+  {
+    "TaskKey": "IDpcsgDailyRunHistV1",
+    "Frequency": "日度",
+    "VCode": false,
+    "ExtraLetter": "",
+    "IndexNamePrefix": ""
+  },
+  {
+    "TaskKey": "IDpcsgDailyRun4",
+    "Frequency": "日度",
+    "VCode": false,
+    "ExtraLetter": "",
+    "IndexNamePrefix": ""
+  },
+  {
+    "TaskKey": "IDpcsgDailyRun6",
+    "Frequency": "日度",
+    "VCode": false,
+    "ExtraLetter": "",
+    "IndexNamePrefix": ""
+  },
+  {
+    "TaskKey": "IDpcsgDailyRun7",
+    "Frequency": "日度",
+    "VCode": true,
+    "ExtraLetter": "O",
+    "IndexNamePrefix": "Open Interest -"
+  },
+  {
+    "TaskKey": "IDpcsgMonthRun2",
+    "Frequency": "月度",
+    "VCode": false,
+    "ExtraLetter": "",
+    "IndexNamePrefix": ""
   }
 ]

+ 15 - 0
utils/common.go

@@ -1310,3 +1310,18 @@ func InsertStr2StrIdx(str, sep string, idx int, value string) string {
 	slice = append(slice[:idx], append([]string{value}, slice[idx:]...)...)
 	return strings.Join(slice, sep)
 }
+
+// FormatFloatPlaces 格式化浮点数位数
+func FormatFloatPlaces(val float64, places int32) (newVal float64, err error) {
+	if places <= 0 {
+		places = 4
+	}
+	strNewVal := decimal.NewFromFloat(val).Round(places).String()
+	di, e := decimal.NewFromString(strNewVal)
+	if e != nil {
+		err = fmt.Errorf("NewFromString err: %v", e)
+		return
+	}
+	newVal, _ = di.Float64()
+	return
+}

+ 5 - 4
utils/constants.go

@@ -8,6 +8,7 @@ const (
 	FormatDate                 = "2006-01-02"              //日期格式
 	FormatDateUnSpace          = "20060102"                //日期格式
 	FormatDateTime             = "2006-01-02 15:04:05"     //完整时间格式
+	FormatDateTimeMinute       = "2006-01-02 15:04"        //完整时间格式
 	HlbFormatDateTime          = "2006-01-02_15:04:05.999" //完整时间格式
 	FormatDateTimeUnSpace      = "20060102150405"          //完整时间格式
 	FormatShortDateTimeUnSpace = "060102150405"            //省去开头两位年份的时间格式
@@ -198,10 +199,10 @@ const (
 
 // 基础数据初始化日期
 var (
-	BASE_START_DATE         = `1900-01-01`                                            //基础数据开始日期
-	BASE_END_DATE           = time.Now().AddDate(4, 0, 0).Format(FormatDate)          //基础数据结束日期
-	BASE_START_DATE_UnSpace = time.Now().AddDate(-30, 0, 0).Format(FormatDateUnSpace) //基础数据开始日期
-	BASE_END_DATE_UnSpace   = time.Now().AddDate(4, 0, 0).Format(FormatDateUnSpace)   //基础数据结束日期
+	BASE_START_DATE         = `1900-01-01`                                          //基础数据开始日期
+	BASE_END_DATE           = time.Now().AddDate(4, 0, 0).Format(FormatDate)        //基础数据结束日期
+	BASE_START_DATE_UnSpace = "19000101"                                            //基础数据开始日期
+	BASE_END_DATE_UnSpace   = time.Now().AddDate(4, 0, 0).Format(FormatDateUnSpace) //基础数据结束日期
 )
 
 const (