|
@@ -329,14 +329,22 @@ func (obj EdbThsHf) refreshByMysql(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdb
|
|
|
}
|
|
|
|
|
|
func (obj EdbThsHf) refreshByMongo(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdbMapping, startDate string) (err error) {
|
|
|
+ defer func() {
|
|
|
+ if err != nil {
|
|
|
+ utils.FileLog.Info(fmt.Sprintf("refreshByMongo, err: %v", err))
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
var realDataMaxDate, edbDataInsertConfigDate time.Time
|
|
|
var edbDataInsertConfig *EdbDataInsertConfig
|
|
|
var isFindConfigDateRealData bool //是否找到配置日期的实际数据的值
|
|
|
{
|
|
|
- edbDataInsertConfig, err = GetEdbDataInsertConfigByEdbId(edbInfo.EdbInfoId)
|
|
|
- if err != nil && err.Error() != utils.ErrNoRow() {
|
|
|
+ insertConfig, e := GetEdbDataInsertConfigByEdbId(edbInfo.EdbInfoId)
|
|
|
+ if e != nil && e.Error() != utils.ErrNoRow() {
|
|
|
+ err = fmt.Errorf("GetEdbDataInsertConfigByEdbId, err: %v", e)
|
|
|
return
|
|
|
}
|
|
|
+ edbDataInsertConfig = insertConfig
|
|
|
if edbDataInsertConfig != nil {
|
|
|
edbDataInsertConfigDate = edbDataInsertConfig.Date
|
|
|
}
|
|
@@ -354,7 +362,7 @@ func (obj EdbThsHf) refreshByMongo(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdb
|
|
|
}
|
|
|
|
|
|
// 获取源指标数据
|
|
|
- baseDataList, e := obj.getBaseIndexDataByMongo(edbInfo, queryDate)
|
|
|
+ baseDataList, e := obj.getBaseIndexDataByMongo(edbBaseMapping.BaseIndexCode, queryDate)
|
|
|
if e != nil {
|
|
|
err = fmt.Errorf("getBaseIndexDataByMongo, err: %v", e)
|
|
|
return
|
|
@@ -394,16 +402,16 @@ func (obj EdbThsHf) refreshByMongo(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdb
|
|
|
|
|
|
if queryDate != `` {
|
|
|
//获取已存在的所有数据
|
|
|
- startDateTime, tmpErr := time.ParseInLocation(utils.FormatDate, queryDate, time.Local)
|
|
|
- if tmpErr != nil {
|
|
|
- err = tmpErr
|
|
|
+ startDateTime, e := time.ParseInLocation(utils.FormatDate, queryDate, time.Local)
|
|
|
+ if e != nil {
|
|
|
+ err = fmt.Errorf("startDateTime parse err: %v", e)
|
|
|
return
|
|
|
}
|
|
|
queryConditions["data_time"] = bson.M{"$gte": startDateTime}
|
|
|
}
|
|
|
- existDataList, err = mogDataObj.GetAllDataList(queryConditions, []string{"data_time"})
|
|
|
- if err != nil {
|
|
|
- fmt.Println(obj.GetSourceName() + ",refresh err;getEdbDataThsHfList Err:" + err.Error())
|
|
|
+ existDataList, e = mogDataObj.GetAllDataList(queryConditions, []string{"data_time"})
|
|
|
+ if e != nil {
|
|
|
+ err = fmt.Errorf("GetAllDataList, err: %v", e)
|
|
|
return
|
|
|
}
|
|
|
}
|
|
@@ -459,7 +467,7 @@ func (obj EdbThsHf) refreshByMongo(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdb
|
|
|
insertExist[strDate] = true
|
|
|
|
|
|
timestamp := k.UnixNano() / 1e6
|
|
|
- addDataList = append(addDataList, &EdbDataThsHf{
|
|
|
+ addDataList = append(addDataList, &mgo.EdbDataThsHf{
|
|
|
EdbInfoId: edbInfo.EdbInfoId,
|
|
|
EdbCode: edbInfo.EdbCode,
|
|
|
DataTime: k,
|
|
@@ -479,18 +487,17 @@ func (obj EdbThsHf) refreshByMongo(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdb
|
|
|
removeDateList := make([]time.Time, 0)
|
|
|
for dateTime := range removeDataTimeMap {
|
|
|
//获取已存在的所有数据
|
|
|
- tmpDateTime, tmpErr := time.ParseInLocation(utils.FormatDate, dateTime, time.Local)
|
|
|
- if tmpErr != nil {
|
|
|
- err = tmpErr
|
|
|
+ tmpDateTime, e := time.ParseInLocation(utils.FormatDate, dateTime, time.Local)
|
|
|
+ if e != nil {
|
|
|
+ err = fmt.Errorf("tmpDateTime parse err: %v", e)
|
|
|
return
|
|
|
}
|
|
|
removeDateList = append(removeDateList, tmpDateTime)
|
|
|
}
|
|
|
removeNum := len(removeDateList)
|
|
|
if removeNum > 0 {
|
|
|
- err = mogDataObj.RemoveManyByColl(coll, bson.M{"edb_code": edbInfo.EdbCode, "data_time": bson.M{"$in": removeDateList}})
|
|
|
- if err != nil {
|
|
|
- fmt.Println("mogDataObj.RemoveMany() Err:" + err.Error())
|
|
|
+ if e = mogDataObj.RemoveManyByColl(coll, bson.M{"edb_code": edbInfo.EdbCode, "data_time": bson.M{"$in": removeDateList}}); e != nil {
|
|
|
+ err = fmt.Errorf("RemoveManyByColl, err: %v", e)
|
|
|
return
|
|
|
}
|
|
|
}
|
|
@@ -498,9 +505,8 @@ func (obj EdbThsHf) refreshByMongo(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdb
|
|
|
|
|
|
// 插入新数据
|
|
|
if len(addDataList) > 0 {
|
|
|
- err = mogDataObj.BatchInsertDataByColl(coll, 500, addDataList)
|
|
|
- if err != nil {
|
|
|
- fmt.Println("mogDataObj.BatchInsertData() Err:" + err.Error())
|
|
|
+ if e = mogDataObj.BatchInsertDataByColl(coll, 500, addDataList); e != nil {
|
|
|
+ err = fmt.Errorf("BatchInsertDataByColl, err: %v", e)
|
|
|
return
|
|
|
}
|
|
|
}
|
|
@@ -508,9 +514,8 @@ func (obj EdbThsHf) refreshByMongo(edbInfo *EdbInfo, edbBaseMapping *BaseFromEdb
|
|
|
// 修改历史数据
|
|
|
if len(updateDataList) > 0 {
|
|
|
for _, v := range updateDataList {
|
|
|
- err = mogDataObj.UpdateDataByColl(coll, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}})
|
|
|
- if err != nil {
|
|
|
- fmt.Println("mogDataObj.UpdateDataByColl:Err:" + err.Error())
|
|
|
+ if e = mogDataObj.UpdateDataByColl(coll, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}}); e != nil {
|
|
|
+ err = fmt.Errorf("UpdateDataByColl, err: %v", e)
|
|
|
return
|
|
|
}
|
|
|
}
|
|
@@ -787,7 +792,7 @@ func ThsHfConvertData2DayByRule(originData []*ThsHfConvertOriginData, convertRul
|
|
|
return
|
|
|
}
|
|
|
|
|
|
-func (obj EdbThsHf) getBaseIndexDataByMongo(edbInfo *EdbInfo, startDate string) (newDataList []EdbInfoMgoData, err error) {
|
|
|
+func (obj EdbThsHf) getBaseIndexDataByMongo(baseIndexCode, startDate string) (newDataList []EdbInfoMgoData, err error) {
|
|
|
newDataList = make([]EdbInfoMgoData, 0)
|
|
|
|
|
|
// 获取数据源的指标数据
|
|
@@ -795,7 +800,7 @@ func (obj EdbThsHf) getBaseIndexDataByMongo(edbInfo *EdbInfo, startDate string)
|
|
|
|
|
|
// 构建查询条件
|
|
|
queryConditions := bson.M{
|
|
|
- "index_code": edbInfo.EdbCode,
|
|
|
+ "index_code": baseIndexCode,
|
|
|
}
|
|
|
|
|
|
if startDate != `` {
|