Forráskód Böngészése

同花顺高频-mongo

hsun 7 hónapja
szülő
commit
aaa24b2a68

+ 60 - 23
controllers/base_from_ths_hf.go

@@ -4,6 +4,7 @@ import (
 	"encoding/json"
 	"eta/eta_index_lib/logic"
 	"eta/eta_index_lib/models"
+	"eta/eta_index_lib/models/mgo"
 	"eta/eta_index_lib/services"
 	"eta/eta_index_lib/utils"
 	"fmt"
@@ -283,7 +284,6 @@ func (this *ThsHfController) BaseAdd() {
 	if params.EndTime == "" {
 		indexItem.EndDate = time.Now().Local()
 	}
-	// TODO:指定终端号
 	terminal, e := services.GetFirstTerminal(utils.DATA_SOURCE_THS, "")
 	if e != nil {
 		br.Msg = "终端未配置"
@@ -306,27 +306,56 @@ func (this *ThsHfController) BaseAdd() {
 	indexItem.CreateTime = time.Now().Local()
 	indexItem.ModifyTime = time.Now().Local()
 
-	// 新增至数据源和数据
-	itemData := make([]*models.BaseFromThsHfData, 0)
-	for _, v := range indexWithData.IndexData {
-		t := new(models.BaseFromThsHfData)
-		t.IndexCode = indexItem.IndexCode
-		t.DataTime = v.DataTime
-		t.Value = v.Value
-		t.UniqueCode = utils.MD5(fmt.Sprint(indexItem.IndexCode, v.DataTime.Format("2006-01-02 15:04")))
-		t.CreateTime = time.Now().Local()
-		t.ModifyTime = time.Now().Local()
-		t.DataTimestamp = v.DataTime.UnixNano() / 1e6
-		itemData = append(itemData, t)
-	}
-
-	// 新增指标和数据
-	if e = indexOb.CreateIndexAndData(indexItem, itemData); e != nil {
+	// 新增指标
+	if e := indexItem.Create(); e != nil {
 		br.Msg = "操作失败"
-		br.ErrMsg = fmt.Sprintf("新增指标和数据失败, %v", e)
+		br.ErrMsg = fmt.Sprintf("新增指标失败, %v", e)
 		return
 	}
 
+	// 新增数据
+	if utils.UseMongo {
+		dataList := make([]interface{}, 0)
+		for _, v := range indexWithData.IndexData {
+			dataList = append(dataList, &mgo.BaseFromThsHfData{
+				BaseFromThsHfIndexId: int64(indexItem.BaseFromThsHfIndexId),
+				IndexCode:            indexItem.IndexCode,
+				DataTime:             v.DataTime,
+				Value:                v.Value,
+				UniqueCode:           utils.MD5(fmt.Sprint(indexItem.IndexCode, v.DataTime.Format(utils.FormatDateTimeMinute))),
+				CreateTime:           time.Now().Local(),
+				ModifyTime:           time.Now().Local(),
+				DataTimestamp:        v.DataTime.UnixNano() / 1e6,
+			})
+		}
+		dataOb := new(mgo.BaseFromThsHfData)
+		if e = dataOb.BatchInsertData(500, dataList); e != nil {
+			br.Msg = "操作失败"
+			br.ErrMsg = fmt.Sprintf("批量新增数据失败-Mongo, %v", e)
+			return
+		}
+	} else {
+		dataOb := new(models.BaseFromThsHfData)
+		itemData := make([]*models.BaseFromThsHfData, 0)
+		for _, v := range indexWithData.IndexData {
+			t := new(models.BaseFromThsHfData)
+			t.BaseFromThsHfIndexId = indexItem.BaseFromThsHfIndexId
+			t.IndexCode = indexItem.IndexCode
+			t.DataTime = v.DataTime
+			t.Value = v.Value
+			t.UniqueCode = utils.MD5(fmt.Sprint(indexItem.IndexCode, v.DataTime.Format(utils.FormatDateTimeMinute)))
+			t.CreateTime = time.Now().Local()
+			t.ModifyTime = time.Now().Local()
+			t.DataTimestamp = v.DataTime.UnixNano() / 1e6
+			itemData = append(itemData, t)
+		}
+		if e = dataOb.CreateMulti(itemData); e != nil {
+			br.Msg = "操作失败"
+			br.ErrMsg = fmt.Sprintf("批量新增数据失败-MySQL, %v", e)
+			return
+		}
+	}
+
 	br.Ret = 200
 	br.Success = true
 	br.Msg = "操作成功"
@@ -422,7 +451,7 @@ func (this *ThsHfController) BaseRefresh() {
 		}
 	}
 
-	// 获取指标数据
+	// API-获取指标数据
 	indexes, e := services.GetEdbDataFromThsHf(apiPars, indexItem.TerminalCode)
 	if e != nil {
 		br.Msg = "操作失败"
@@ -437,10 +466,18 @@ func (this *ThsHfController) BaseRefresh() {
 	indexWithData := indexes[0]
 
 	// 写入指标数据
-	if e = services.WriteRefreshBaseThsHfIndex(indexItem, indexWithData, apiPars.StartTime); e != nil {
-		br.Msg = "操作失败"
-		br.ErrMsg = fmt.Sprintf("写入源指标数据失败, %v", e)
-		return
+	if utils.UseMongo {
+		if e = services.RefreshThsHfBaseIndexMgo(indexItem, indexWithData, apiPars.StartTime); e != nil {
+			br.Msg = "操作失败"
+			br.ErrMsg = fmt.Sprintf("写入源指标数据失败-Mongo, %v", e)
+			return
+		}
+	} else {
+		if e = services.RefreshThsHfBaseIndex(indexItem, indexWithData, apiPars.StartTime); e != nil {
+			br.Msg = "操作失败"
+			br.ErrMsg = fmt.Sprintf("写入源指标数据失败, %v", e)
+			return
+		}
 	}
 
 	br.Ret = 200

+ 124 - 35
models/base_from_ths_hf.go

@@ -1,9 +1,11 @@
 package models
 
 import (
+	"eta/eta_index_lib/models/mgo"
 	"eta/eta_index_lib/utils"
 	"fmt"
 	"github.com/beego/beego/v2/client/orm"
+	"go.mongodb.org/mongo-driver/bson"
 	"strings"
 	"time"
 )
@@ -305,41 +307,41 @@ type ThsHfBaseAddReq struct {
 }
 
 // CreateIndexAndData 新增指标和数据
-func (m *BaseFromThsHfIndex) CreateIndexAndData(indexItem *BaseFromThsHfIndex, indexData []*BaseFromThsHfData) (err error) {
-	o := orm.NewOrm()
-	tx, e := o.Begin()
-	if e != nil {
-		err = fmt.Errorf("tx begin err: %v", e)
-		return
-	}
-	defer func() {
-		if err != nil {
-			_ = tx.Rollback()
-			return
-		}
-		_ = tx.Commit()
-	}()
-
-	lastId, e := tx.Insert(indexItem)
-	if e != nil {
-		err = fmt.Errorf("insert index err: %v", e)
-		return
-	}
-	indexId := int(lastId)
-	indexItem.BaseFromThsHfIndexId = indexId
-
-	if len(indexData) == 0 {
-		return
-	}
-	for _, v := range indexData {
-		v.BaseFromThsHfIndexId = indexId
-	}
-	if _, e = tx.InsertMulti(200, indexData); e != nil {
-		err = fmt.Errorf("insert index data err: %v", e)
-		return
-	}
-	return
-}
+//func (m *BaseFromThsHfIndex) CreateIndexAndData(indexItem *BaseFromThsHfIndex, indexData []*BaseFromThsHfData) (err error) {
+//	o := orm.NewOrm()
+//	tx, e := o.Begin()
+//	if e != nil {
+//		err = fmt.Errorf("tx begin err: %v", e)
+//		return
+//	}
+//	defer func() {
+//		if err != nil {
+//			_ = tx.Rollback()
+//			return
+//		}
+//		_ = tx.Commit()
+//	}()
+//
+//	lastId, e := tx.Insert(indexItem)
+//	if e != nil {
+//		err = fmt.Errorf("insert index err: %v", e)
+//		return
+//	}
+//	indexId := int(lastId)
+//	indexItem.BaseFromThsHfIndexId = indexId
+//
+//	if len(indexData) == 0 {
+//		return
+//	}
+//	for _, v := range indexData {
+//		v.BaseFromThsHfIndexId = indexId
+//	}
+//	if _, e = tx.InsertMulti(200, indexData); e != nil {
+//		err = fmt.Errorf("insert index data err: %v", e)
+//		return
+//	}
+//	return
+//}
 
 // ThsHfBaseRefreshReq 数据源刷新请求
 type ThsHfBaseRefreshReq struct {
@@ -385,3 +387,90 @@ type ThsHfIndexMultiSave2EdbPreItem struct {
 	Tips         string `description:"提示信息"`
 	ErrMsg       string `description:"错误信息"`
 }
+
+// GetEdbInfoMaxAndMinInfo
+// @Description: 获取指标的最新数据记录信息
+// @author: Roc
+// @receiver m
+// @datetime 2024-07-02 14:50:50
+// @param edbCode string
+// @return item *EdbInfoMaxAndMinInfo
+// @return err error
+func (m BaseFromThsHfIndex) GetEdbInfoMaxAndMinInfo(edbCode string) (item *EdbInfoMaxAndMinInfo, err error) {
+	if utils.UseMongo {
+		return m.getEdbInfoMaxAndMinInfoByMongo(edbCode)
+	}
+	return m.getEdbInfoMaxAndMinInfoByMysql(edbCode)
+}
+
+// getEdbInfoMaxAndMinInfoByMongo
+// @Description: 获取指标的最新数据记录信息(从mongo中获取)
+// @author: Roc
+// @receiver m
+// @datetime 2024-07-02 14:41:20
+// @param edbCode string
+// @return item *EdbInfoMaxAndMinInfo
+// @return err error
+func (m BaseFromThsHfIndex) getEdbInfoMaxAndMinInfoByMongo(edbCode string) (item *EdbInfoMaxAndMinInfo, err error) {
+	mogDataObj := new(mgo.BaseFromThsHfData)
+	pipeline := []bson.M{
+		{"$match": bson.M{"index_code": edbCode}},
+		{"$group": bson.M{
+			"_id":       nil,
+			"min_date":  bson.M{"$min": "$data_time"},
+			"max_date":  bson.M{"$max": "$data_time"},
+			"min_value": bson.M{"$min": "$value"},
+			"max_value": bson.M{"$max": "$value"},
+		}},
+		{"$project": bson.M{"_id": 0}}, // 可选,如果不需要_id字段
+	}
+	result, err := mogDataObj.GetEdbInfoMaxAndMinInfo(pipeline)
+	if err != nil {
+		fmt.Println("BaseFromThsHfIndex GetEdbInfoMaxAndMinInfo Err:" + err.Error())
+		return
+	}
+
+	if !result.MaxDate.IsZero() {
+		whereQuery := bson.M{"index_code": edbCode, "data_time": result.MaxDate}
+		selectParam := bson.D{{"value", 1}, {"_id", 0}}
+		latestValue, tmpErr := mogDataObj.GetLatestValue(whereQuery, selectParam)
+		if tmpErr != nil {
+			err = tmpErr
+			return
+		}
+		result.LatestValue = latestValue.Value
+		result.EndValue = latestValue.Value
+	}
+
+	item = &EdbInfoMaxAndMinInfo{
+		MinDate:     result.MinDate.Format(utils.FormatDate),
+		MaxDate:     result.MaxDate.Format(utils.FormatDate),
+		MinValue:    result.MinValue,
+		MaxValue:    result.MaxValue,
+		LatestValue: result.LatestValue,
+		LatestDate:  result.LatestDate.Format(utils.FormatDate),
+		EndValue:    result.EndValue,
+	}
+	return
+}
+
+// getEdbInfoMaxAndMinInfoByMysql
+// @Description: 获取指标的最新数据记录信息(从mysql中获取)
+func (m BaseFromThsHfIndex) getEdbInfoMaxAndMinInfoByMysql(edbCode string) (item *EdbInfoMaxAndMinInfo, err error) {
+	dataObj := BaseFromThsHfData{}
+	result, err := dataObj.GetIndexMinMax(edbCode)
+	if err != nil {
+		return
+	}
+
+	item = &EdbInfoMaxAndMinInfo{
+		MinDate:     result.MinDate,
+		MaxDate:     result.MaxDate,
+		MinValue:    result.MinValue,
+		MaxValue:    result.MaxValue,
+		LatestValue: result.LatestValue,
+		LatestDate:  result.LatestDate,
+		EndValue:    result.EndValue,
+	}
+	return
+}

+ 294 - 17
models/edb_ths_hf.go

@@ -2,10 +2,12 @@ package models
 
 import (
 	"encoding/json"
+	"eta/eta_index_lib/models/mgo"
 	"eta/eta_index_lib/utils"
 	"fmt"
 	"github.com/beego/beego/v2/client/orm"
 	"github.com/shopspring/decimal"
+	"go.mongodb.org/mongo-driver/bson"
 	"reflect"
 	"sort"
 	"time"
@@ -73,14 +75,6 @@ type ThsHfRefreshBaseParams struct {
 
 // Add
 // @Description: 添加指标
-// @author: Roc
-// @receiver obj
-// @datetime 2024-04-30 17:35:14
-// @param params ThsHfAddBaseParams
-// @param businessIndexItem *BaseFromBusinessIndex
-// @return edbInfo *EdbInfo
-// @return err error
-// @return errMsg string
 func (obj EdbThsHf) Add(params ThsHfAddBaseParams, baseIndex *BaseFromThsHfIndex) (edbInfo *EdbInfo, err error) {
 	o := orm.NewOrm()
 	tx, e := o.Begin()
@@ -151,6 +145,13 @@ func (obj EdbThsHf) Add(params ThsHfAddBaseParams, baseIndex *BaseFromThsHfIndex
 }
 
 func (obj EdbThsHf) Refresh(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping, startDate string) (err error) {
+	if utils.UseMongo {
+		return obj.refreshByMongo(edbInfo, edbBaseMapping, startDate)
+	}
+	return obj.refreshByMysql(edbInfo, edbBaseMapping, startDate)
+}
+
+func (obj EdbThsHf) refreshByMysql(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping, startDate string) (err error) {
 	if edbInfo == nil || edbBaseMapping == nil {
 		err = fmt.Errorf("指标信息/关联信息有误, EdbInfo: %v, EdbBaseMapping: %v", edbInfo, edbBaseMapping)
 		return
@@ -208,7 +209,14 @@ func (obj EdbThsHf) Refresh(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping
 		err = fmt.Errorf("转换规则有误, %v", e)
 		return
 	}
-	convertData, e := ThsHfConvertData2DayByRule(baseDataList, convertRule)
+	convertOriginData := make([]*ThsHfConvertOriginData, 0)
+	for _, v := range baseDataList {
+		convertOriginData = append(convertOriginData, &ThsHfConvertOriginData{
+			DataTime: v.DataTime,
+			Value:    v.Value,
+		})
+	}
+	convertData, e := ThsHfConvertData2DayByRule(convertOriginData, convertRule)
 	if e != nil {
 		err = fmt.Errorf("转换数据失败, %v", e)
 		return
@@ -320,8 +328,207 @@ func (obj EdbThsHf) Refresh(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping
 	return
 }
 
+func (obj EdbThsHf) refreshByMongo(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping, startDate string) (err error) {
+	var realDataMaxDate, edbDataInsertConfigDate time.Time
+	var edbDataInsertConfig *EdbDataInsertConfig
+	var isFindConfigDateRealData bool //是否找到配置日期的实际数据的值
+	{
+		edbDataInsertConfig, err = GetEdbDataInsertConfigByEdbId(edbInfo.EdbInfoId)
+		if err != nil && err.Error() != utils.ErrNoRow() {
+			return
+		}
+		if edbDataInsertConfig != nil {
+			edbDataInsertConfigDate = edbDataInsertConfig.Date
+		}
+	}
+
+	// 查询时间为开始时间-3d
+	var queryDate string
+	if startDate != "" {
+		st, e := time.ParseInLocation(utils.FormatDate, startDate, time.Local)
+		if e != nil {
+			err = fmt.Errorf("刷新开始时间有误, %v", e)
+			return
+		}
+		queryDate = st.AddDate(0, 0, -3).Format(utils.FormatDate)
+	}
+
+	// 获取源指标数据
+	baseDataList, e := obj.getBaseIndexDataByMongo(edbInfo, queryDate)
+	if e != nil {
+		err = fmt.Errorf("getBaseIndexDataByMongo, err: %v", e)
+		return
+	}
+
+	// 转换数据
+	convertRule := new(ThsHfIndexConvert2EdbRule)
+	if e := json.Unmarshal([]byte(edbBaseMapping.ConvertRule), &convertRule); e != nil {
+		err = fmt.Errorf("转换规则有误, %v", e)
+		return
+	}
+	convertOriginData := make([]*ThsHfConvertOriginData, 0)
+	for _, v := range baseDataList {
+		convertOriginData = append(convertOriginData, &ThsHfConvertOriginData{
+			DataTime: v.DataTime,
+			Value:    v.Value,
+		})
+	}
+	convertData, e := ThsHfConvertData2DayByRule(convertOriginData, convertRule)
+	if e != nil {
+		err = fmt.Errorf("转换数据失败, %v", e)
+		return
+	}
+	if len(convertData) == 0 {
+		utils.FileLog.Info("同花顺高频-转换无数据, EdbCode: %s", edbInfo.EdbCode)
+		return
+	}
+
+	//获取指标所有数据
+	existDataList := make([]*mgo.EdbDataThsHf, 0)
+	mogDataObj := new(mgo.EdbDataThsHf)
+	{
+		// 构建查询条件
+		queryConditions := bson.M{
+			"edb_code": edbInfo.EdbCode,
+		}
+
+		if queryDate != `` {
+			//获取已存在的所有数据
+			startDateTime, tmpErr := time.ParseInLocation(utils.FormatDate, queryDate, time.Local)
+			if tmpErr != nil {
+				err = tmpErr
+				return
+			}
+			queryConditions["data_time"] = bson.M{"$gte": startDateTime}
+		}
+		existDataList, err = mogDataObj.GetAllDataList(queryConditions, []string{"data_time"})
+		if err != nil {
+			fmt.Println(obj.GetSourceName() + ",refresh err;getEdbDataThsHfList Err:" + err.Error())
+			return
+		}
+	}
+
+	existDataMap := make(map[string]*mgo.EdbDataThsHf)
+	removeDataTimeMap := make(map[string]bool) //需要移除的日期数据
+	for _, v := range existDataList {
+		tmpDate := v.DataTime.Format(utils.FormatDate)
+		existDataMap[tmpDate] = v
+		removeDataTimeMap[tmpDate] = true
+	}
+
+	// 待添加的数据集
+	addDataList := make([]interface{}, 0)
+	updateDataList := make([]mgo.EdbDataThsHf, 0)
+
+	insertExist := make(map[string]bool)
+	for k, v := range convertData {
+		strDate := k.Format(utils.FormatDate)
+
+		// 手动插入数据的判断
+		if realDataMaxDate.IsZero() || k.After(realDataMaxDate) {
+			realDataMaxDate = k
+		}
+		if edbDataInsertConfigDate.IsZero() || k.Equal(edbDataInsertConfigDate) {
+			isFindConfigDateRealData = true
+		}
+
+		// 入库值
+		saveVal := decimal.NewFromFloat(v).Round(4).String()
+		d, e := decimal.NewFromString(saveVal)
+		if e != nil {
+			utils.FileLog.Info(fmt.Sprintf("EdbDataThsHf NewFromString err: %v", e))
+			continue
+		}
+		saveFloat, _ := d.Float64()
+
+		// 更新
+		exists := existDataMap[strDate]
+		if exists != nil {
+			existVal := decimal.NewFromFloat(exists.Value).Round(4).String()
+			if saveVal != existVal {
+				exists.Value = saveFloat
+				updateDataList = append(updateDataList, *exists)
+			}
+			continue
+		}
+
+		// 新增
+		if insertExist[strDate] {
+			continue
+		}
+		insertExist[strDate] = true
+
+		timestamp := k.UnixNano() / 1e6
+		addDataList = append(addDataList, &EdbDataThsHf{
+			EdbInfoId:     edbInfo.EdbInfoId,
+			EdbCode:       edbInfo.EdbCode,
+			DataTime:      k,
+			Value:         saveFloat,
+			CreateTime:    time.Now(),
+			ModifyTime:    time.Now(),
+			DataTimestamp: timestamp,
+		})
+	}
+
+	// 入库
+	{
+		coll := mogDataObj.GetCollection()
+
+		//删除已经不存在的指标数据(由于该指标当日的数据删除了)
+		{
+			removeDateList := make([]time.Time, 0)
+			for dateTime := range removeDataTimeMap {
+				//获取已存在的所有数据
+				tmpDateTime, tmpErr := time.ParseInLocation(utils.FormatDate, dateTime, time.Local)
+				if tmpErr != nil {
+					err = tmpErr
+					return
+				}
+				removeDateList = append(removeDateList, tmpDateTime)
+			}
+			removeNum := len(removeDateList)
+			if removeNum > 0 {
+				err = mogDataObj.RemoveManyByColl(coll, bson.M{"edb_code": edbInfo.EdbCode, "data_time": bson.M{"$in": removeDateList}})
+				if err != nil {
+					fmt.Println("mogDataObj.RemoveMany() Err:" + err.Error())
+					return
+				}
+			}
+		}
+
+		// 插入新数据
+		if len(addDataList) > 0 {
+			err = mogDataObj.BatchInsertDataByColl(coll, 500, addDataList)
+			if err != nil {
+				fmt.Println("mogDataObj.BatchInsertData() Err:" + err.Error())
+				return
+			}
+		}
+
+		// 修改历史数据
+		if len(updateDataList) > 0 {
+			for _, v := range updateDataList {
+				err = mogDataObj.UpdateDataByColl(coll, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}})
+				if err != nil {
+					fmt.Println("mogDataObj.UpdateDataByColl:Err:" + err.Error())
+					return
+				}
+			}
+		}
+	}
+
+	// 处理手工数据补充的配置
+	obj.HandleConfigInsertEdbDataByMongo(realDataMaxDate, edbDataInsertConfig, edbInfo.EdbInfoId, edbInfo.Source, edbInfo.SubSource, existDataMap, isFindConfigDateRealData)
+	return
+}
+
+type ThsHfConvertOriginData struct {
+	DataTime time.Time `description:"数据日期(至时分秒)"`
+	Value    float64   `description:"数据值"`
+}
+
 // ThsHfConvertData2DayByRule 原指标数据转换为日度数据
-func ThsHfConvertData2DayByRule(originData []*BaseFromThsHfData, convertRule *ThsHfIndexConvert2EdbRule) (timeData map[time.Time]float64, err error) {
+func ThsHfConvertData2DayByRule(originData []*ThsHfConvertOriginData, convertRule *ThsHfIndexConvert2EdbRule) (timeData map[time.Time]float64, err error) {
 	// PS: originData为期望开始日期前三日(有两天非交易日, 那么周一的前日应当算上周五的)至结束日期的数据
 	timeData = make(map[time.Time]float64)
 	if len(originData) == 0 || convertRule == nil {
@@ -334,19 +541,19 @@ func ThsHfConvertData2DayByRule(originData []*BaseFromThsHfData, convertRule *Th
 
 	// 升序排序
 	sort.Slice(originData, func(i, j int) bool {
-		return originData[i].DataTimestamp < originData[j].DataTimestamp
+		return originData[i].DataTime.Before(originData[j].DataTime)
 	})
 
 	// 将数据根据日期进行分组
 	var sortDates []string
-	groupDateData := make(map[string][]*BaseFromThsHfData)
+	groupDateData := make(map[string][]*ThsHfConvertOriginData)
 	for _, v := range originData {
 		d := v.DataTime.Format(utils.FormatDate)
 		if !utils.InArrayByStr(sortDates, d) {
 			sortDates = append(sortDates, d)
 		}
 		if groupDateData[d] == nil {
-			groupDateData[d] = make([]*BaseFromThsHfData, 0)
+			groupDateData[d] = make([]*ThsHfConvertOriginData, 0)
 		}
 		groupDateData[d] = append(groupDateData[d], v)
 	}
@@ -360,7 +567,7 @@ func ThsHfConvertData2DayByRule(originData []*BaseFromThsHfData, convertRule *Th
 				continue
 			}
 			var timeTarget time.Time
-			dateData := make([]*BaseFromThsHfData, 0)
+			dateData := make([]*ThsHfConvertOriginData, 0)
 
 			// 当日
 			if convertRule.ConvertFixed.FixedDay == 1 {
@@ -413,7 +620,7 @@ func ThsHfConvertData2DayByRule(originData []*BaseFromThsHfData, convertRule *Th
 			}
 
 			// 重新获取数据序列中, 时间在目标时间点之后的
-			newDateData := make([]*BaseFromThsHfData, 0)
+			newDateData := make([]*ThsHfConvertOriginData, 0)
 			for kv, dv := range dateData {
 				if dv.DataTime.Before(timeTarget) {
 					continue
@@ -496,7 +703,7 @@ func ThsHfConvertData2DayByRule(originData []*BaseFromThsHfData, convertRule *Th
 		}
 
 		// 合并前日当日数据
-		dateData := make([]*BaseFromThsHfData, 0)
+		dateData := make([]*ThsHfConvertOriginData, 0)
 		if convertRule.ConvertArea.StartDay == 1 && convertRule.ConvertArea.EndDay == 1 {
 			// 起始截止均为当日
 			dateData = groupDateData[thisDate]
@@ -540,7 +747,7 @@ func ThsHfConvertData2DayByRule(originData []*BaseFromThsHfData, convertRule *Th
 		}
 
 		// 重组时间区间内的数据
-		newDateData := make([]*BaseFromThsHfData, 0)
+		newDateData := make([]*ThsHfConvertOriginData, 0)
 		for _, dv := range dateData {
 			if dv.DataTime.Before(startTimeTarget) || dv.DataTime.After(endTimeTarget) {
 				continue
@@ -579,3 +786,73 @@ func ThsHfConvertData2DayByRule(originData []*BaseFromThsHfData, convertRule *Th
 	}
 	return
 }
+
+func (obj EdbThsHf) getBaseIndexDataByMongo(edbInfo *EdbInfo, startDate string) (newDataList []EdbInfoMgoData, err error) {
+	newDataList = make([]EdbInfoMgoData, 0)
+
+	// 获取数据源的指标数据
+	mogDataObj := new(mgo.BaseFromThsHfData)
+
+	// 构建查询条件
+	queryConditions := bson.M{
+		"index_code": edbInfo.EdbCode,
+	}
+
+	if startDate != `` {
+		//获取已存在的所有数据
+		startDateTime, tmpErr := time.ParseInLocation(utils.FormatDate, startDate, time.Local)
+		if tmpErr != nil {
+			err = tmpErr
+			return
+		}
+		queryConditions["data_time"] = bson.M{"$gte": startDateTime}
+	}
+
+	baseDataList, err := mogDataObj.GetAllDataList(queryConditions, []string{"data_time"})
+	if err != nil {
+		fmt.Println("GetAllDataList Err:" + err.Error())
+		return
+	}
+
+	for _, v := range baseDataList {
+		newDataList = append(newDataList, EdbInfoMgoData{
+			//EdbDataId: v.ID,
+			DataTime: v.DataTime,
+			Value:    v.Value,
+			EdbCode:  v.IndexCode,
+		})
+	}
+	return
+}
+
+func (obj EdbThsHf) HandleConfigInsertEdbDataByMongo(realDataMaxDate time.Time, edbDataInsertConfig *EdbDataInsertConfig, edbInfoId, source, subSource int, existMap map[string]*mgo.EdbDataThsHf, isFindConfigDateRealData bool) {
+	if edbDataInsertConfig == nil {
+		return
+	}
+	var err error
+	defer func() {
+		if err != nil {
+			utils.FileLog.Info(fmt.Sprintf("ThsHf-HandleConfigInsertEdbDataByMongo, err: %v", err))
+		}
+	}()
+
+	edbDataInsertConfigDate := edbDataInsertConfig.Date // 配置的日期
+
+	// 如果存在真实数据的最大日期  && 存在配置插入数据的最大日期  && 真实数据的最大日期 晚于/等于 配置插入数据的最大日期
+	if realDataMaxDate.After(edbDataInsertConfigDate) || realDataMaxDate.Equal(edbDataInsertConfigDate) {
+		go DeleteEdbDataInsertConfigByEdbId(edbInfoId)
+
+		mogDataObj := mgo.EdbDataThsHf{}
+		coll := mogDataObj.GetCollection()
+		edbDataInsertConfigDateStr := edbDataInsertConfigDate.Format(utils.FormatDate)
+		// 如果没有找到找到配置日期的实际数据,那么就直接删除
+		if item, ok := existMap[edbDataInsertConfigDateStr]; ok && !isFindConfigDateRealData {
+			mogDataObj.RemoveManyByColl(coll, bson.M{"_id": item.ID})
+		}
+	} else {
+		o := orm.NewOrm()
+		edbDataInsertConfig.RealDate = realDataMaxDate
+		_, err = o.Update(edbDataInsertConfig, "RealDate")
+	}
+	return
+}

+ 405 - 0
models/mgo/base_from_ths_hf_data.go

@@ -1 +1,406 @@
 package mgo
+
+import (
+	"context"
+	"errors"
+	"eta/eta_index_lib/utils"
+	"fmt"
+	"github.com/qiniu/qmgo"
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"time"
+)
+
+// BaseFromThsHfData
+// @Description: 同花顺高频集合
+type BaseFromThsHfData struct {
+	ID                   primitive.ObjectID `json:"_id" bson:"_id,omitempty"`                                   // 文档id
+	BaseFromThsHfDataId  int64              `json:"base_from_ths_hf_data_id" bson:"base_from_ths_hf_data_id"`   // 指标数据ID
+	BaseFromThsHfIndexId int64              `json:"base_from_ths_hf_index_id" bson:"base_from_ths_hf_index_id"` // 指标ID
+	IndexCode            string             `json:"index_code" bson:"index_code"`                               // 指标编码
+	DataTime             time.Time          `json:"data_time" bson:"data_time"`                                 // 数据日期
+	Value                float64            `json:"value" bson:"value"`                                         // 数据值
+	UniqueCode           string             `json:"unique_code" bson:"unique_code"`                             // 唯一编码
+	CreateTime           time.Time          `json:"create_time" bson:"create_time"`                             // 创建时间
+	ModifyTime           time.Time          `json:"modify_time" bson:"modify_time"`                             // 修改时间
+	DataTimestamp        int64              `json:"data_timestamp" bson:"data_timestamp"`                       // 数据日期时间戳
+}
+
+// CollectionName
+// @Description:  获取集合名称
+func (m *BaseFromThsHfData) CollectionName() string {
+	return "base_from_ths_hf_data"
+}
+
+// DataBaseName
+// @Description: 获取数据库名称
+func (m *BaseFromThsHfData) DataBaseName() string {
+	return utils.MgoDataDbName
+}
+
+// GetCollection
+// @Description: 获取mongodb集合的句柄
+func (m *BaseFromThsHfData) GetCollection() *qmgo.Collection {
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	return db.Collection(m.CollectionName())
+}
+
+// GetAllDataList
+// @Description: 根据条件获取所有数据
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 13:42:19
+// @param sort []string
+// @param whereParams interface{}
+// @return result []BaseFromThsHfData
+// @return err error
+func (m *BaseFromThsHfData) GetAllDataList(whereParams interface{}, sort []string) (result []*BaseFromThsHfData, err error) {
+	if utils.MgoDataCli == nil {
+		err = errors.New("mongodb连接失败")
+		return
+	}
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+	ctx := context.TODO()
+	if err != nil {
+		fmt.Println("MgoGetColl Err:", err.Error())
+		return
+	}
+	err = coll.Find(ctx, whereParams).Sort(sort...).All(&result)
+	if err != nil {
+		return
+	}
+
+	for _, v := range result {
+		v.DataTime = v.DataTime.In(time.Local)
+		v.CreateTime = v.CreateTime.In(time.Local)
+		v.ModifyTime = v.ModifyTime.In(time.Local)
+	}
+	return
+}
+
+// GetLimitDataList
+// @Description: 根据条件获取指定数量数据列表
+// @author: Roc
+// @receiver m
+// @datetime 2024-05-06 17:08:32
+// @param whereParams interface{}
+// @param size int64
+// @return result []*BaseFromThsHfData
+// @return err error
+func (m *BaseFromThsHfData) GetLimitDataList(whereParams interface{}, size int64, sort []string) (result []*BaseFromThsHfData, err error) {
+	if utils.MgoDataCli == nil {
+		err = errors.New("mongodb连接失败")
+		return
+	}
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+	ctx := context.TODO()
+	if err != nil {
+		fmt.Println("MgoGetColl Err:", err.Error())
+		return
+	}
+	err = coll.Find(ctx, whereParams).Sort(sort...).Limit(size).All(&result)
+	if err != nil {
+		return
+	}
+
+	for _, v := range result {
+		v.DataTime = v.DataTime.In(time.Local)
+		v.CreateTime = v.CreateTime.In(time.Local)
+		v.ModifyTime = v.ModifyTime.In(time.Local)
+	}
+
+	return
+}
+
+// GetPageDataList
+// @Description: 根据条件获取分页数据列表
+// @author: Roc
+// @receiver m
+// @datetime 2024-05-07 10:21:07
+// @param whereParams interface{}
+// @param startSize int64
+// @param size int64
+// @param sort []string
+// @return result []*BaseFromThsHfData
+// @return err error
+func (m *BaseFromThsHfData) GetPageDataList(whereParams interface{}, startSize, size int64, sort []string) (result []*BaseFromThsHfData, err error) {
+	if utils.MgoDataCli == nil {
+		err = errors.New("mongodb连接失败")
+		return
+	}
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+	ctx := context.TODO()
+	if err != nil {
+		fmt.Println("MgoGetColl Err:", err.Error())
+		return
+	}
+	err = coll.Find(ctx, whereParams).Sort(sort...).Skip(startSize).Limit(size).All(&result)
+	if err != nil {
+		return
+	}
+
+	for _, v := range result {
+		v.DataTime = v.DataTime.In(time.Local)
+		v.CreateTime = v.CreateTime.In(time.Local)
+		v.ModifyTime = v.ModifyTime.In(time.Local)
+	}
+
+	return
+}
+
+// GetCountDataList
+// @Description:  根据条件获取数据列表总数
+// @author: Roc
+// @receiver m
+// @datetime 2024-05-07 10:29:00
+// @param whereParams interface{}
+// @return count int64
+// @return err error
+func (m *BaseFromThsHfData) GetCountDataList(whereParams interface{}) (count int64, err error) {
+	if utils.MgoDataCli == nil {
+		err = errors.New("mongodb连接失败")
+		return
+	}
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+	ctx := context.TODO()
+	if err != nil {
+		fmt.Println("MgoGetColl Err:", err.Error())
+		return
+	}
+	count, err = coll.Find(ctx, whereParams).Count()
+
+	return
+}
+
+// InsertDataByColl
+// @Description: 写入单条数据(外部传入集合)
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 14:22:18
+// @param addData interface{}
+// @return err error
+func (m *BaseFromThsHfData) InsertDataByColl(coll *qmgo.Collection, addData interface{}) (err error) {
+	ctx := context.TODO()
+	_, err = coll.InsertOne(ctx, addData)
+	if err != nil {
+		fmt.Println("InsertDataByColl:Err:" + err.Error())
+		return
+	}
+
+	return
+}
+
+// BatchInsertData
+// @Description: 批量写入数据
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 14:22:18
+// @param bulk int 每次请求保存的数据量
+// @param dataList []interface{}
+// @return err error
+func (m *BaseFromThsHfData) BatchInsertData(bulk int, dataList []interface{}) (err error) {
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+
+	return m.BatchInsertDataByColl(coll, bulk, dataList)
+}
+
+// BatchInsertDataByColl
+// @Description: 批量写入数据(外部传入集合)
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 14:22:18
+// @param coll *qmgo.Collection
+// @param bulk int 每次请求保存的数据量
+// @param dataList []interface{}
+// @return err error
+func (m *BaseFromThsHfData) BatchInsertDataByColl(coll *qmgo.Collection, bulk int, dataList []interface{}) (err error) {
+	ctx := context.TODO()
+	dataNum := len(dataList)
+	if dataNum <= 0 {
+		return
+	}
+
+	// 不设置每次保存切片数量大小,或者实际数据量小于设置的切片数量大小,那么就直接保存吧
+	if bulk <= 0 || dataNum <= bulk {
+		_, err = coll.InsertMany(ctx, dataList)
+		if err != nil {
+			fmt.Println("BatchInsertData:Err:" + err.Error())
+			return
+		}
+		return
+	}
+
+	// 分批保存
+	i := 0
+	tmpAddDataList := make([]interface{}, 0)
+	for _, v := range dataList {
+		tmpAddDataList = append(tmpAddDataList, v)
+		i++
+		if i >= bulk {
+			_, err = coll.InsertMany(ctx, tmpAddDataList)
+			if err != nil {
+				fmt.Println("BatchInsertData:Err:" + err.Error())
+				return
+			}
+			i = 0
+			tmpAddDataList = make([]interface{}, 0)
+		}
+	}
+
+	if len(tmpAddDataList) > 0 {
+		_, err = coll.InsertMany(ctx, tmpAddDataList)
+		if err != nil {
+			fmt.Println("BatchInsertData:Err:" + err.Error())
+			return
+		}
+	}
+
+	return
+}
+
+// UpdateDataByColl
+// @Description: 单条数据修改
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 15:01:51
+// @param whereParams interface{}
+// @param updateParams interface{}
+// @return err error
+func (m *BaseFromThsHfData) UpdateDataByColl(coll *qmgo.Collection, whereParams, updateParams interface{}) (err error) {
+	ctx := context.TODO()
+	err = coll.UpdateOne(ctx, whereParams, updateParams)
+	if err != nil {
+		fmt.Println("UpdateDataByColl:Err:" + err.Error())
+		return
+	}
+	return
+}
+
+// UpdateData
+// @Description: 单条数据修改
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 15:01:51
+// @param whereParams interface{}
+// @param updateParams interface{}
+// @return err error
+func (m *BaseFromThsHfData) UpdateData(whereParams, updateParams interface{}) (err error) {
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+	ctx := context.TODO()
+	err = coll.UpdateOne(ctx, whereParams, updateParams)
+	if err != nil {
+		fmt.Println("UpdateData:Err:" + err.Error())
+		return
+	}
+	return
+}
+
+// HandleData
+// @Description: 事务处理数据
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-30 10:40:20
+// @param addDataList []BaseAddFromBusinessData
+// @param updateDataList []BaseFromThsHfData
+// @return result interface{}
+// @return err error
+func (m *BaseFromThsHfData) HandleData(addDataList, updateDataList []BaseFromThsHfData) (result interface{}, err error) {
+
+	ctx := context.TODO()
+
+	callback := func(sessCtx context.Context) (interface{}, error) {
+		// 重要:确保事务中的每一个操作,都使用传入的sessCtx参数
+
+		db := utils.MgoDataCli.Database(m.DataBaseName())
+		coll := db.Collection(m.CollectionName())
+
+		// 插入数据
+		if len(addDataList) > 0 {
+			_, err = coll.InsertMany(sessCtx, addDataList)
+			if err != nil {
+				return nil, err
+			}
+		}
+
+		// 修改
+
+		if len(updateDataList) > 0 {
+			for _, v := range updateDataList {
+				err = coll.UpdateOne(ctx, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}})
+				if err != nil {
+					fmt.Println("BatchInsertData:Err:" + err.Error())
+					return nil, err
+				}
+			}
+		}
+
+		return nil, nil
+	}
+	result, err = utils.MgoDataCli.DoTransaction(ctx, callback)
+
+	return
+}
+
+// GetEdbInfoMaxAndMinInfo
+// @Description: 获取当前指标的最大最小值
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-30 17:15:39
+// @param whereParams interface{}
+// @return result EdbInfoMaxAndMinInfo
+// @return err error
+func (m *BaseFromThsHfData) GetEdbInfoMaxAndMinInfo(whereParams interface{}) (result EdbInfoMaxAndMinInfo, err error) {
+	if utils.MgoDataCli == nil {
+		err = errors.New("mongodb连接失败")
+		return
+	}
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+	ctx := context.TODO()
+	if err != nil {
+		fmt.Println("MgoGetColl Err:", err.Error())
+		return
+	}
+	err = coll.Aggregate(ctx, whereParams).One(&result)
+	if err != nil {
+		return
+	}
+	result.MinDate = result.MinDate.In(time.Local)
+	result.MaxDate = result.MaxDate.In(time.Local)
+	result.LatestDate = result.LatestDate.In(time.Local)
+	return
+}
+
+// GetLatestValue
+// @Description: 获取当前指标的最新数据记录
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-30 17:16:15
+// @param whereParams interface{}
+// @param selectParam interface{}
+// @return latestValue LatestValue
+// @return err error
+func (m *BaseFromThsHfData) GetLatestValue(whereParams, selectParam interface{}) (latestValue LatestValue, err error) {
+	if utils.MgoDataCli == nil {
+		err = errors.New("mongodb连接失败")
+		return
+	}
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+	ctx := context.TODO()
+	if err != nil {
+		fmt.Println("MgoGetColl Err:", err.Error())
+		return
+	}
+
+	//var result interface{}
+	//err = coll.Find(ctx, whereParams).Select(selectParam).One(&result)
+	err = coll.Find(ctx, whereParams).Select(selectParam).One(&latestValue)
+	return
+}

+ 509 - 0
models/mgo/edb_data_ths_hf.go

@@ -1 +1,510 @@
 package mgo
+
+import (
+	"context"
+	"errors"
+	"eta/eta_index_lib/utils"
+	"fmt"
+	"github.com/qiniu/qmgo"
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"time"
+)
+
+// EdbDataThsHf
+// @Description: 同花顺高频集合(指标库)
+type EdbDataThsHf struct {
+	ID            primitive.ObjectID `json:"_id" bson:"_id,omitempty" `            // 文档id
+	EdbInfoId     int                `json:"edb_info_id" bson:"edb_info_id"`       // 指标ID
+	EdbCode       string             `json:"edb_code" bson:"edb_code"`             // 指标编码
+	DataTime      time.Time          `json:"data_time" bson:"data_time"`           // 数据日期
+	Value         float64            `json:"value" bson:"value"`                   // 数据值
+	CreateTime    time.Time          `json:"create_time" bson:"create_time"`       // 创建时间
+	ModifyTime    time.Time          `json:"modify_time" bson:"modify_time"`       // 修改时间
+	DataTimestamp int64              `json:"data_timestamp" bson:"data_timestamp"` // 数据日期时间戳
+}
+
+// CollectionName
+// @Description:  获取集合名称
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 13:41:36
+// @return string
+func (m *EdbDataThsHf) CollectionName() string {
+	return "edb_data_business"
+}
+
+// DataBaseName
+// @Description: 获取数据库名称
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 13:41:33
+// @return string
+func (m *EdbDataThsHf) DataBaseName() string {
+	return utils.MgoDataDbName
+}
+
+// GetCollection
+// @Description: 获取mongodb集合的句柄
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 13:41:33
+// @return string
+func (m *EdbDataThsHf) GetCollection() *qmgo.Collection {
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	return db.Collection(m.CollectionName())
+}
+
+// GetItem
+// @Description: 根据条件获取单条数据
+// @author: Roc
+// @receiver m
+// @datetime 2024-05-09 10:00:49
+// @param whereParams interface{}
+// @return item *EdbDataThsHf
+// @return err error
+func (m *EdbDataThsHf) GetItem(whereParams interface{}) (item *EdbDataThsHf, err error) {
+	if utils.MgoDataCli == nil {
+		err = errors.New("mongodb连接失败")
+		return
+	}
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+
+	return m.GetItemByColl(coll, whereParams)
+}
+
+// GetItemByColl
+// @Description: 根据条件获取单条数据
+// @author: Roc
+// @receiver m
+// @datetime 2024-05-09 13:22:06
+// @param coll *qmgo.Collection
+// @param whereParams interface{}
+// @return item *EdbDataThsHf
+// @return err error
+func (m *EdbDataThsHf) GetItemByColl(coll *qmgo.Collection, whereParams interface{}) (item *EdbDataThsHf, err error) {
+	ctx := context.TODO()
+	if err != nil {
+		fmt.Println("MgoGetColl Err:", err.Error())
+		return
+	}
+	err = coll.Find(ctx, whereParams).One(&item)
+	if err != nil {
+		return
+	}
+
+	item.DataTime = item.DataTime.In(time.Local)
+	item.CreateTime = item.CreateTime.In(time.Local)
+	item.ModifyTime = item.ModifyTime.In(time.Local)
+
+	return
+}
+
+// GetAllDataList
+// @Description: 根据条件获取所有数据
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 13:42:19
+// @param whereParams interface{}
+// @param sort []string
+// @return result []EdbDataThsHf
+// @return err error
+func (m *EdbDataThsHf) GetAllDataList(whereParams interface{}, sort []string) (result []*EdbDataThsHf, err error) {
+	if utils.MgoDataCli == nil {
+		err = errors.New("mongodb连接失败")
+		return
+	}
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+	ctx := context.TODO()
+	if err != nil {
+		fmt.Println("MgoGetColl Err:", err.Error())
+		return
+	}
+	err = coll.Find(ctx, whereParams).Sort(sort...).All(&result)
+	if err != nil {
+		return
+	}
+
+	for _, v := range result {
+		v.DataTime = v.DataTime.In(time.Local)
+		v.CreateTime = v.CreateTime.In(time.Local)
+		v.ModifyTime = v.ModifyTime.In(time.Local)
+	}
+
+	return
+}
+
+// GetLimitDataList
+// @Description: 根据条件获取指定数量数据列表
+// @author: Roc
+// @receiver m
+// @datetime 2024-05-06 17:08:32
+// @param whereParams interface{}
+// @param size int64
+// @param sort []string
+// @return result []*BaseFromBusinessData
+// @return err error
+func (m *EdbDataThsHf) GetLimitDataList(whereParams interface{}, size int64, sort []string) (result []*EdbDataThsHf, err error) {
+	if utils.MgoDataCli == nil {
+		err = errors.New("mongodb连接失败")
+		return
+	}
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+	ctx := context.TODO()
+	if err != nil {
+		fmt.Println("MgoGetColl Err:", err.Error())
+		return
+	}
+	err = coll.Find(ctx, whereParams).Sort(sort...).Limit(size).All(&result)
+	if err != nil {
+		return
+	}
+
+	for _, v := range result {
+		v.DataTime = v.DataTime.In(time.Local)
+		v.CreateTime = v.CreateTime.In(time.Local)
+		v.ModifyTime = v.ModifyTime.In(time.Local)
+	}
+
+	return
+}
+
+// GetPageDataList
+// @Description: 根据条件获取分页数据列表
+// @author: Roc
+// @receiver m
+// @datetime 2024-05-07 10:21:07
+// @param whereParams interface{}
+// @param startSize int64
+// @param size int64
+// @param sort []string
+// @return result []*EdbDataThsHf
+// @return err error
+func (m *EdbDataThsHf) GetPageDataList(whereParams interface{}, startSize, size int64, sort []string) (result []*EdbDataThsHf, err error) {
+	if utils.MgoDataCli == nil {
+		err = errors.New("mongodb连接失败")
+		return
+	}
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+	ctx := context.TODO()
+	if err != nil {
+		fmt.Println("MgoGetColl Err:", err.Error())
+		return
+	}
+	err = coll.Find(ctx, whereParams).Sort(sort...).Skip(startSize).Limit(size).All(&result)
+	if err != nil {
+		return
+	}
+
+	for _, v := range result {
+		v.DataTime = v.DataTime.In(time.Local)
+		v.CreateTime = v.CreateTime.In(time.Local)
+		v.ModifyTime = v.ModifyTime.In(time.Local)
+	}
+
+	return
+}
+
+// GetCountDataList
+// @Description:  根据条件获取数据列表总数
+// @author: Roc
+// @receiver m
+// @datetime 2024-05-07 10:29:00
+// @param whereParams interface{}
+// @return count int64
+// @return err error
+func (m *EdbDataThsHf) GetCountDataList(whereParams interface{}) (count int64, err error) {
+	if utils.MgoDataCli == nil {
+		err = errors.New("mongodb连接失败")
+		return
+	}
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+	ctx := context.TODO()
+	if err != nil {
+		fmt.Println("MgoGetColl Err:", err.Error())
+		return
+	}
+	count, err = coll.Find(ctx, whereParams).Count()
+
+	return
+}
+
+// InsertDataByColl
+// @Description: 写入单条数据(外部传入集合)
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 14:22:18
+// @param addData interface{}
+// @return err error
+func (m *EdbDataThsHf) InsertDataByColl(coll *qmgo.Collection, addData interface{}) (err error) {
+	ctx := context.TODO()
+	_, err = coll.InsertOne(ctx, addData)
+	if err != nil {
+		fmt.Println("InsertDataByColl:Err:" + err.Error())
+		return
+	}
+
+	return
+}
+
+// BatchInsertData
+// @Description: 批量写入数据
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 14:22:18
+// @param bulk int 每次请求保存的数据量
+// @param dataList []interface{}
+// @return err error
+func (m *EdbDataThsHf) BatchInsertData(bulk int, dataList []interface{}) (err error) {
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+
+	return m.BatchInsertDataByColl(coll, bulk, dataList)
+}
+
+// BatchInsertDataByColl
+// @Description: 批量写入数据(外部传入集合)
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 14:22:18
+// @param coll *qmgo.Collection
+// @param bulk int 每次请求保存的数据量
+// @param dataList []interface{}
+// @return err error
+func (m *EdbDataThsHf) BatchInsertDataByColl(coll *qmgo.Collection, bulk int, dataList []interface{}) (err error) {
+	ctx := context.TODO()
+	dataNum := len(dataList)
+	if dataNum <= 0 {
+		return
+	}
+
+	// 不设置每次保存切片数量大小,或者实际数据量小于设置的切片数量大小,那么就直接保存吧
+	if bulk <= 0 || dataNum <= bulk {
+		_, err = coll.InsertMany(ctx, dataList)
+		if err != nil {
+			fmt.Println("BatchInsertData:Err:" + err.Error())
+			return
+		}
+		return
+	}
+
+	// 分批保存
+	i := 0
+	tmpAddDataList := make([]interface{}, 0)
+	for _, v := range dataList {
+		tmpAddDataList = append(tmpAddDataList, v)
+		i++
+		if i >= bulk {
+			_, err = coll.InsertMany(ctx, tmpAddDataList)
+			if err != nil {
+				fmt.Println("BatchInsertData:Err:" + err.Error())
+				return
+			}
+			i = 0
+			tmpAddDataList = make([]interface{}, 0)
+		}
+	}
+
+	if len(tmpAddDataList) > 0 {
+		_, err = coll.InsertMany(ctx, tmpAddDataList)
+		if err != nil {
+			fmt.Println("BatchInsertData:Err:" + err.Error())
+			return
+		}
+	}
+
+	return
+}
+
+// UpdateData
+// @Description: 单条数据修改
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 15:01:51
+// @param whereParams interface{}
+// @param updateParams interface{}
+// @return err error
+func (m *EdbDataThsHf) UpdateData(whereParams, updateParams interface{}) (err error) {
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+
+	return m.UpdateDataByColl(coll, whereParams, updateParams)
+}
+
+// UpdateDataByColl
+// @Description: 单条数据修改(外部传入集合)
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 15:01:51
+// @param whereParams interface{}
+// @param updateParams interface{}
+// @return err error
+func (m *EdbDataThsHf) UpdateDataByColl(coll *qmgo.Collection, whereParams, updateParams interface{}) (err error) {
+	ctx := context.TODO()
+	err = coll.UpdateOne(ctx, whereParams, updateParams)
+	if err != nil {
+		fmt.Println("UpdateDataByColl:Err:" + err.Error())
+		return
+	}
+
+	return
+}
+
+// RemoveMany
+// @Description: 根据条件删除多条数据
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-30 13:17:02
+// @param whereParams interface{}
+// @return err error
+func (m *EdbDataThsHf) RemoveMany(whereParams interface{}) (err error) {
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+
+	return m.RemoveManyByColl(coll, whereParams)
+}
+
+// RemoveManyByColl
+// @Description: 根据条件删除多条数据(外部传入集合)
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-30 13:18:42
+// @param coll *qmgo.Collection
+// @param whereParams interface{}
+// @return err error
+func (m *EdbDataThsHf) RemoveManyByColl(coll *qmgo.Collection, whereParams interface{}) (err error) {
+	ctx := context.TODO()
+	_, err = coll.RemoveAll(ctx, whereParams)
+	if err != nil {
+		fmt.Println("RemoveManyByColl:Err:" + err.Error())
+		return
+	}
+
+	return
+}
+
+// HandleData
+// @Description: 事务处理数据
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-30 10:39:01
+// @param addDataList []AddEdbDataThsHf
+// @param updateDataList []EdbDataThsHf
+// @return result interface{}
+// @return err error
+func (m *EdbDataThsHf) HandleData(addDataList, updateDataList []EdbDataThsHf) (result interface{}, err error) {
+
+	ctx := context.TODO()
+
+	callback := func(sessCtx context.Context) (interface{}, error) {
+		// 重要:确保事务中的每一个操作,都使用传入的sessCtx参数
+
+		db := utils.MgoDataCli.Database(m.DataBaseName())
+		coll := db.Collection(m.CollectionName())
+
+		// 插入数据
+		if len(addDataList) > 0 {
+			_, err = coll.InsertMany(sessCtx, addDataList)
+			if err != nil {
+				return nil, err
+			}
+		}
+
+		// 修改
+
+		if len(updateDataList) > 0 {
+			for _, v := range updateDataList {
+				err = coll.UpdateOne(ctx, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}})
+				if err != nil {
+					fmt.Println("BatchInsertData:Err:" + err.Error())
+					return nil, err
+				}
+			}
+		}
+
+		return nil, nil
+	}
+	result, err = utils.MgoDataCli.DoTransaction(ctx, callback)
+
+	return
+}
+
+// EdbInfoMaxAndMinInfo 指标最新数据记录结构体
+//type EdbInfoMaxAndMinInfo struct {
+//	MinDate     time.Time `description:"最小日期" bson:"min_date"`
+//	MaxDate     time.Time `description:"最大日期" bson:"max_date"`
+//	MinValue    float64   `description:"最小值" bson:"min_value"`
+//	MaxValue    float64   `description:"最大值" bson:"max_value"`
+//	LatestValue float64   `description:"最新值" bson:"latest_value"`
+//	LatestDate  time.Time `description:"实际数据最新日期" bson:"latest_date"`
+//	EndValue    float64   `description:"最新值" bson:"end_value"`
+//}
+
+// GetEdbInfoMaxAndMinInfo
+// @Description: 获取当前指标的最大最小值
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-30 17:15:39
+// @param whereParams interface{}
+// @return result EdbInfoMaxAndMinInfo
+// @return err error
+func (m *EdbDataThsHf) GetEdbInfoMaxAndMinInfo(whereParams interface{}) (result EdbInfoMaxAndMinInfo, err error) {
+	if utils.MgoDataCli == nil {
+		err = errors.New("mongodb连接失败")
+		return
+	}
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+	ctx := context.TODO()
+	if err != nil {
+		fmt.Println("MgoGetColl Err:", err.Error())
+		return
+	}
+	err = coll.Aggregate(ctx, whereParams).One(&result)
+	if err != nil {
+		return
+	}
+	result.MinDate = result.MinDate.In(time.Local)
+	result.MaxDate = result.MaxDate.In(time.Local)
+	result.LatestDate = result.LatestDate.In(time.Local)
+
+	return
+}
+
+// LatestValue 指标最新数据记录结构体
+//type LatestValue struct {
+//	Value float64 `description:"值" bson:"value"`
+//}
+
+// GetLatestValue
+// @Description: 获取当前指标的最新数据记录
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-30 17:16:15
+// @param whereParams interface{}
+// @param selectParam interface{}
+// @return latestValue LatestValue
+// @return err error
+func (m *EdbDataThsHf) GetLatestValue(whereParams, selectParam interface{}) (latestValue LatestValue, err error) {
+	if utils.MgoDataCli == nil {
+		err = errors.New("mongodb连接失败")
+		return
+	}
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+	ctx := context.TODO()
+	if err != nil {
+		fmt.Println("MgoGetColl Err:", err.Error())
+		return
+	}
+
+	//var result interface{}
+	//err = coll.Find(ctx, whereParams).Select(selectParam).One(&result)
+	err = coll.Find(ctx, whereParams).Select(selectParam).One(&latestValue)
+	return
+}

+ 3 - 8
services/base_from_ths_ds_http.go

@@ -23,20 +23,15 @@ func getEdbDataFromThsDsHttp(stockCode, edbCode, startDate, endDate, thsRefreshT
 	// 创建一个包含映射的切片
 	var indipara []map[string]interface{}
 
-	// 额外参数
-	var extraArr []string
-	if extraPars != "" {
-		extraArr = strings.Split(extraPars, ",")
-	}
-
 	// 遍历分割后的参数列表
 	for _, param := range paramList {
 		// 创建一个映射
 		paramMap := map[string]interface{}{
 			"indicator": param,
 		}
-		if len(extraArr) > 0 {
-			paramMap["indiparams"] = extraArr
+		if extraPars != "" {
+			extraPars = strings.TrimSpace(extraPars)
+			paramMap["indiparams"] = extraPars
 		}
 
 		// 将映射添加到切片中

+ 145 - 13
services/base_from_ths_hf.go

@@ -4,11 +4,13 @@ import (
 	"encoding/json"
 	"eta/eta_index_lib/logic"
 	"eta/eta_index_lib/models"
+	"eta/eta_index_lib/models/mgo"
 	"eta/eta_index_lib/services/alarm_msg"
 	"eta/eta_index_lib/utils"
 	"fmt"
 	"github.com/rdlucklib/rdluck_tools/http"
 	"github.com/shopspring/decimal"
+	"go.mongodb.org/mongo-driver/bson"
 	"net/url"
 	"strings"
 	"time"
@@ -309,11 +311,11 @@ func getEdbDataFromThsHfApp(thsParams models.ThsHfSearchEdbReq, num int, serverU
 	return
 }
 
-// WriteRefreshBaseThsHfIndex 源指标刷新
-func WriteRefreshBaseThsHfIndex(indexItem *models.BaseFromThsHfIndex, codeWithData *models.ThsHfIndexWithData, startTime string) (err error) {
+// RefreshThsHfBaseIndex 源指标刷新
+func RefreshThsHfBaseIndex(indexItem *models.BaseFromThsHfIndex, codeWithData *models.ThsHfIndexWithData, startTime string) (err error) {
 	defer func() {
 		if err != nil {
-			tips := fmt.Sprintf("WriteRefreshBaseThsHfIndex-更新失败, %v", err)
+			tips := fmt.Sprintf("RefreshThsHfBaseIndex-更新失败, %v", err)
 			utils.FileLog.Info(tips)
 			go alarm_msg.SendAlarmMsg(tips, 3)
 		}
@@ -351,8 +353,7 @@ func WriteRefreshBaseThsHfIndex(indexItem *models.BaseFromThsHfIndex, codeWithDa
 	if len(originData) > 0 {
 		// unicode去重
 		for _, d := range originData {
-			uni := utils.MD5(fmt.Sprint(indexItem.IndexCode, d.DataTime.Format("2006-01-02 15:04")))
-			dateExist[uni] = d
+			dateExist[d.UniqueCode] = d
 		}
 	}
 
@@ -360,7 +361,7 @@ func WriteRefreshBaseThsHfIndex(indexItem *models.BaseFromThsHfIndex, codeWithDa
 	updateData := make([]*models.BaseFromThsHfData, 0)
 	insertData := make([]*models.BaseFromThsHfData, 0)
 	for _, d := range codeWithData.IndexData {
-		uni := utils.MD5(fmt.Sprint(indexItem.IndexCode, d.DataTime.Format("2006-01-02 15:04")))
+		uni := utils.MD5(fmt.Sprint(indexItem.IndexCode, d.DataTime.Format(utils.FormatDateTimeMinute)))
 		origin := dateExist[uni]
 
 		// unicode检验是否存在
@@ -423,16 +424,147 @@ func WriteRefreshBaseThsHfIndex(indexItem *models.BaseFromThsHfIndex, codeWithDa
 
 	// 同步刷新指标库
 	go func() {
-		_ = RefreshThsHfIndexFromBase(indexItem.IndexCode, startTime)
+		_ = RefreshEdbFromThsHfBaseIndex(indexItem.IndexCode, startTime)
 	}()
 	return
 }
 
-// RefreshThsHfIndexFromBase 根据源指标刷新指标库
-func RefreshThsHfIndexFromBase(baseCode, startTime string) (err error) {
+// RefreshThsHfBaseIndexMgo 源指标刷新-Mongo
+func RefreshThsHfBaseIndexMgo(indexItem *models.BaseFromThsHfIndex, codeWithData *models.ThsHfIndexWithData, startTime string) (err error) {
 	defer func() {
 		if err != nil {
-			tips := fmt.Sprintf("RefreshThsHfIndexFromBase-刷新指标库失败, %v", err)
+			tips := fmt.Sprintf("RefreshThsHfBaseIndexMgo-更新失败, %v", err)
+			utils.FileLog.Info(tips)
+			go alarm_msg.SendAlarmMsg(tips, 3)
+		}
+	}()
+	if indexItem == nil {
+		err = fmt.Errorf("指标不存在")
+		return
+	}
+	if len(codeWithData.IndexData) == 0 {
+		return
+	}
+	mogDataObj := new(mgo.BaseFromThsHfData)
+
+	// 获取已存在的所有数据
+	existCond := bson.M{
+		"index_code": indexItem.IndexCode,
+	}
+	if startTime != "" {
+		st, e := time.ParseInLocation(utils.FormatDateTime, startTime, time.Local)
+		if e != nil {
+			err = fmt.Errorf("start time parse err: %v", e)
+			return
+		}
+		existCond["data_time"] = bson.M{
+			"$gte": st,
+		}
+	}
+	exitDataList, e := mogDataObj.GetAllDataList(existCond, []string{"data_time"})
+	if e != nil {
+		err = fmt.Errorf("GetAllDataList err: %v", e)
+		return
+	}
+
+	// 已经存在的数据集
+	exitDataMap := make(map[string]*mgo.BaseFromThsHfData)
+	for _, v := range exitDataList {
+		exitDataMap[v.UniqueCode] = v
+	}
+
+	// 待添加的数据集
+	addDataList := make([]interface{}, 0)
+	updateDataList := make([]mgo.BaseFromThsHfData, 0)
+	for _, data := range codeWithData.IndexData {
+		strNewVal := decimal.NewFromFloat(data.Value).Round(4).String()
+		di, _ := decimal.NewFromString(strNewVal)
+		newVal, _ := di.Float64()
+
+		// unicode检验是否存在
+		uni := utils.MD5(fmt.Sprint(indexItem.IndexCode, data.DataTime.Format(utils.FormatDateTimeMinute)))
+		findData, ok := exitDataMap[uni]
+		if !ok {
+			addDataList = append(addDataList, mgo.BaseFromThsHfData{
+				BaseFromThsHfIndexId: int64(indexItem.BaseFromThsHfIndexId),
+				IndexCode:            indexItem.IndexCode,
+				DataTime:             data.DataTime,
+				Value:                newVal,
+				UniqueCode:           uni,
+				CreateTime:           time.Now(),
+				ModifyTime:           time.Now(),
+				DataTimestamp:        data.DataTime.UnixNano() / 1e6,
+			})
+			continue
+		}
+
+		// 值不匹配,修改数据
+		strExistVal := decimal.NewFromFloat(findData.Value).Round(4).String()
+		if strNewVal == strExistVal {
+			continue
+		}
+		findData.Value = newVal
+		updateDataList = append(updateDataList, *findData)
+	}
+
+	// 入库
+	{
+		coll := mogDataObj.GetCollection()
+		if len(addDataList) > 0 {
+			if e = mogDataObj.BatchInsertDataByColl(coll, 500, addDataList); e != nil {
+				err = fmt.Errorf("BatchInsertDataByColl, err: %v", e)
+				return
+			}
+		}
+
+		if len(updateDataList) > 0 {
+			for _, v := range updateDataList {
+				if e = mogDataObj.UpdateDataByColl(coll, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}}); e != nil {
+					err = fmt.Errorf("UpdateDataByColl, err: %v", e)
+					return
+				}
+			}
+		}
+	}
+
+	// 修改最大最小日期
+	minMax, err := indexItem.GetEdbInfoMaxAndMinInfo(indexItem.IndexCode)
+	if err != nil {
+		return
+	}
+	if err == nil && minMax != nil {
+		minDate, e := time.ParseInLocation(utils.FormatDateTime, minMax.MinDate, time.Local)
+		if e != nil {
+			err = fmt.Errorf("源数据最小日期有误, MinDate: %s, %v", minMax.MinDate, e)
+			return
+		}
+		maxDate, e := time.ParseInLocation(utils.FormatDateTime, minMax.MaxDate, time.Local)
+		if e != nil {
+			err = fmt.Errorf("源数据最大日期有误, MaxDate: %s, %v", minMax.MaxDate, e)
+			return
+		}
+		indexItem.StartDate = minDate
+		indexItem.EndDate = maxDate
+		indexItem.ModifyTime = time.Now().Local()
+		updateCols := []string{indexItem.Cols().StartDate, indexItem.Cols().EndDate, indexItem.Cols().ModifyTime}
+		if e = indexItem.Update(updateCols); e != nil {
+			err = fmt.Errorf("更新源指标开始结束时间失败, %v", e)
+			return
+		}
+	}
+
+	// 同步刷新指标库
+	go func() {
+		_ = RefreshEdbFromThsHfBaseIndex(indexItem.IndexCode, startTime)
+	}()
+	return
+}
+
+// RefreshEdbFromThsHfBaseIndex 根据源指标刷新指标库
+func RefreshEdbFromThsHfBaseIndex(baseCode, startTime string) (err error) {
+	defer func() {
+		if err != nil {
+			tips := fmt.Sprintf("RefreshEdbFromThsHfBaseIndex-刷新指标库失败, %v", err)
 			utils.FileLog.Info(tips)
 			go alarm_msg.SendAlarmMsg(tips, 3)
 		}
@@ -489,13 +621,13 @@ func RefreshThsHfIndexFromBase(baseCode, startTime string) (err error) {
 
 		edb := codeEdb[v.EdbCode]
 		if edb == nil {
-			utils.FileLog.Info(fmt.Sprintf("RefreshThsHfIndexFromBase-指标信息有误, EdbCode: %s", v.EdbCode))
+			utils.FileLog.Info(fmt.Sprintf("RefreshEdbFromThsHfBaseIndex-指标信息有误, EdbCode: %s", v.EdbCode))
 			continue
 		}
 
 		// 刷新指标
 		if e := thsOb.Refresh(edb, codeMapping[v.EdbCode], startTime); e != nil {
-			utils.FileLog.Info(fmt.Sprintf("RefreshThsHfIndexFromBase-刷新指标失败, %v", e))
+			utils.FileLog.Info(fmt.Sprintf("RefreshEdbFromThsHfBaseIndex-刷新指标失败, %v", e))
 			_ = utils.Rc.Delete(cacheKey)
 			continue
 		}
@@ -503,7 +635,7 @@ func RefreshThsHfIndexFromBase(baseCode, startTime string) (err error) {
 		// 更新指标最值
 		e, _ = models.UnifiedModifyEdbInfoMaxAndMinInfo(edb)
 		if e != nil {
-			utils.FileLog.Info(fmt.Sprintf("RefreshThsHfIndexFromBase-更新指标最值失败, %v", e))
+			utils.FileLog.Info(fmt.Sprintf("RefreshEdbFromThsHfBaseIndex-更新指标最值失败, %v", e))
 			_ = utils.Rc.Delete(cacheKey)
 			continue
 		}

+ 1 - 0
utils/constants.go

@@ -8,6 +8,7 @@ const (
 	FormatDate                 = "2006-01-02"              //日期格式
 	FormatDateUnSpace          = "20060102"                //日期格式
 	FormatDateTime             = "2006-01-02 15:04:05"     //完整时间格式
+	FormatDateTimeMinute       = "2006-01-02 15:04"        //完整时间格式
 	HlbFormatDateTime          = "2006-01-02_15:04:05.999" //完整时间格式
 	FormatDateTimeUnSpace      = "20060102150405"          //完整时间格式
 	FormatShortDateTimeUnSpace = "060102150405"            //省去开头两位年份的时间格式