فهرست منبع

Merge remote-tracking branch 'origin/eta/1.8.4' into debug

Roc 10 ماه پیش
والد
کامیت
6f323da2eb
4فایلهای تغییر یافته به همراه145 افزوده شده و 18 حذف شده
  1. 2 2
      models/edb_data_business.go
  2. 73 5
      models/mgo/base_from_business_data.go
  3. 63 8
      models/mgo/edb_data_business.go
  4. 7 3
      services/base_from_business.go

+ 2 - 2
models/edb_data_business.go

@@ -248,7 +248,7 @@ func (obj Business) refresh(edbInfo *EdbInfo, startDate string) (err error) {
 	needAddDateMap := make(map[time.Time]int)
 
 	// 待添加的数据集
-	addDataList := make([]mgo.EdbDataBusiness, 0)
+	addDataList := make([]interface{}, 0)
 	// 待更新的数据集
 	updateDataList := make([]mgo.EdbDataBusiness, 0)
 
@@ -334,7 +334,7 @@ func (obj Business) refresh(edbInfo *EdbInfo, startDate string) (err error) {
 
 		// 插入新数据
 		if len(addDataList) > 0 {
-			err = mogDataObj.BatchInsertDataByColl(coll, addDataList)
+			err = mogDataObj.BatchInsertDataByColl(coll, 500, addDataList)
 			if err != nil {
 				fmt.Println("mogDataObj.BatchInsertData() Err:" + err.Error())
 				return

+ 73 - 5
models/mgo/base_from_business_data.go

@@ -187,22 +187,90 @@ func (m *BaseFromBusinessData) GetCountDataList(whereParams interface{}) (count
 	return
 }
 
+// InsertDataByColl
+// @Description: 写入单条数据(外部传入集合)
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 14:22:18
+// @param addData interface{}
+// @return err error
+func (m *BaseFromBusinessData) InsertDataByColl(coll *qmgo.Collection, addData interface{}) (err error) {
+	ctx := context.TODO()
+	_, err = coll.InsertOne(ctx, addData)
+	if err != nil {
+		fmt.Println("InsertDataByColl:Err:" + err.Error())
+		return
+	}
+
+	return
+}
+
 // BatchInsertData
 // @Description: 批量写入数据
 // @author: Roc
 // @receiver m
 // @datetime 2024-04-26 14:22:18
-// @param dataList interface{}
+// @param bulk int 每次请求保存的数据量
+// @param dataList []interface{}
 // @return err error
-func (m *BaseFromBusinessData) BatchInsertData(dataList interface{}) (err error) {
+func (m *BaseFromBusinessData) BatchInsertData(bulk int, dataList []interface{}) (err error) {
 	db := utils.MgoDataCli.Database(m.DataBaseName())
 	coll := db.Collection(m.CollectionName())
+
+	return m.BatchInsertDataByColl(coll, bulk, dataList)
+}
+
+// BatchInsertDataByColl
+// @Description: 批量写入数据(外部传入集合)
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 14:22:18
+// @param coll *qmgo.Collection
+// @param bulk int 每次请求保存的数据量
+// @param dataList []interface{}
+// @return err error
+func (m *BaseFromBusinessData) BatchInsertDataByColl(coll *qmgo.Collection, bulk int, dataList []interface{}) (err error) {
 	ctx := context.TODO()
-	_, err = coll.InsertMany(ctx, dataList)
-	if err != nil {
-		fmt.Println("BatchInsertData:Err:" + err.Error())
+	dataNum := len(dataList)
+	if dataNum <= 0 {
+		return
+	}
+
+	// 不设置每次保存切片数量大小,或者实际数据量小于设置的切片数量大小,那么就直接保存吧
+	if bulk <= 0 || dataNum <= bulk {
+		_, err = coll.InsertMany(ctx, dataList)
+		if err != nil {
+			fmt.Println("BatchInsertData:Err:" + err.Error())
+			return
+		}
 		return
 	}
+
+	// 分批保存
+	i := 0
+	tmpAddDataList := make([]interface{}, 0)
+	for _, v := range dataList {
+		tmpAddDataList = append(tmpAddDataList, v)
+		i++
+		if i >= bulk {
+			_, err = coll.InsertMany(ctx, tmpAddDataList)
+			if err != nil {
+				fmt.Println("BatchInsertData:Err:" + err.Error())
+				return
+			}
+			i = 0
+			tmpAddDataList = make([]interface{}, 0)
+		}
+	}
+
+	if len(tmpAddDataList) > 0 {
+		_, err = coll.InsertMany(ctx, tmpAddDataList)
+		if err != nil {
+			fmt.Println("BatchInsertData:Err:" + err.Error())
+			return
+		}
+	}
+
 	return
 }
 

+ 63 - 8
models/mgo/edb_data_business.go

@@ -234,18 +234,37 @@ func (m *EdbDataBusiness) GetCountDataList(whereParams interface{}) (count int64
 	return
 }
 
+// InsertDataByColl
+// @Description: 写入单条数据(外部传入集合)
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 14:22:18
+// @param addData interface{}
+// @return err error
+func (m *EdbDataBusiness) InsertDataByColl(coll *qmgo.Collection, addData interface{}) (err error) {
+	ctx := context.TODO()
+	_, err = coll.InsertOne(ctx, addData)
+	if err != nil {
+		fmt.Println("InsertDataByColl:Err:" + err.Error())
+		return
+	}
+
+	return
+}
+
 // BatchInsertData
 // @Description: 批量写入数据
 // @author: Roc
 // @receiver m
 // @datetime 2024-04-26 14:22:18
-// @param dataList interface{}
+// @param bulk int 每次请求保存的数据量
+// @param dataList []interface{}
 // @return err error
-func (m *EdbDataBusiness) BatchInsertData(dataList []EdbDataBusiness) (err error) {
+func (m *EdbDataBusiness) BatchInsertData(bulk int, dataList []interface{}) (err error) {
 	db := utils.MgoDataCli.Database(m.DataBaseName())
 	coll := db.Collection(m.CollectionName())
 
-	return m.BatchInsertDataByColl(coll, dataList)
+	return m.BatchInsertDataByColl(coll, bulk, dataList)
 }
 
 // BatchInsertDataByColl
@@ -253,16 +272,52 @@ func (m *EdbDataBusiness) BatchInsertData(dataList []EdbDataBusiness) (err error
 // @author: Roc
 // @receiver m
 // @datetime 2024-04-26 14:22:18
-// @param dataList interface{}
+// @param coll *qmgo.Collection
+// @param bulk int 每次请求保存的数据量
+// @param dataList []interface{}
 // @return err error
-func (m *EdbDataBusiness) BatchInsertDataByColl(coll *qmgo.Collection, dataList []EdbDataBusiness) (err error) {
+func (m *EdbDataBusiness) BatchInsertDataByColl(coll *qmgo.Collection, bulk int, dataList []interface{}) (err error) {
 	ctx := context.TODO()
-	_, err = coll.InsertMany(ctx, dataList)
-	if err != nil {
-		fmt.Println("BatchInsertData:Err:" + err.Error())
+	dataNum := len(dataList)
+	if dataNum <= 0 {
 		return
 	}
 
+	// 不设置每次保存切片数量大小,或者实际数据量小于设置的切片数量大小,那么就直接保存吧
+	if bulk <= 0 || dataNum <= bulk {
+		_, err = coll.InsertMany(ctx, dataList)
+		if err != nil {
+			fmt.Println("BatchInsertData:Err:" + err.Error())
+			return
+		}
+		return
+	}
+
+	// 分批保存
+	i := 0
+	tmpAddDataList := make([]interface{}, 0)
+	for _, v := range dataList {
+		tmpAddDataList = append(tmpAddDataList, v)
+		i++
+		if i >= bulk {
+			_, err = coll.InsertMany(ctx, tmpAddDataList)
+			if err != nil {
+				fmt.Println("BatchInsertData:Err:" + err.Error())
+				return
+			}
+			i = 0
+			tmpAddDataList = make([]interface{}, 0)
+		}
+	}
+
+	if len(tmpAddDataList) > 0 {
+		_, err = coll.InsertMany(ctx, tmpAddDataList)
+		if err != nil {
+			fmt.Println("BatchInsertData:Err:" + err.Error())
+			return
+		}
+	}
+
 	return
 }
 

+ 7 - 3
services/base_from_business.go

@@ -33,6 +33,10 @@ func HandleBusinessIndex(indexItem *models.AddBusinessIndexReq) (err error) {
 		}
 	}()
 
+	// 没有数据就返回
+	if indexItem.DataList == nil || len(indexItem.DataList) <= 0 {
+		return
+	}
 	// 兼容频度缺少度的字段
 	if !strings.Contains(indexItem.Frequency, "度") {
 		indexItem.Frequency = indexItem.Frequency + "度"
@@ -141,7 +145,7 @@ func HandleBusinessIndex(indexItem *models.AddBusinessIndexReq) (err error) {
 	// 当前传入的最小日期
 	var reqMinDate time.Time
 	// 待添加的数据集
-	addDataList := make([]mgo.BaseFromBusinessData, 0)
+	addDataList := make([]interface{}, 0)
 	updateDataList := make([]mgo.BaseFromBusinessData, 0)
 	//var hasUpdate bool
 	// 遍历excel数据,然后跟现有的数据做校验,不存在则入库
@@ -185,9 +189,10 @@ func HandleBusinessIndex(indexItem *models.AddBusinessIndexReq) (err error) {
 
 	// 入库
 	{
+		coll := mogDataObj.GetCollection()
 		if len(addDataList) > 0 {
 			isIndexUpdateOrAdd = true
-			err = mogDataObj.BatchInsertData(addDataList)
+			err = mogDataObj.BatchInsertDataByColl(coll, 500, addDataList)
 			if err != nil {
 				fmt.Println("mogDataObj.HandleData() Err:" + err.Error())
 				return
@@ -196,7 +201,6 @@ func HandleBusinessIndex(indexItem *models.AddBusinessIndexReq) (err error) {
 
 		if len(updateDataList) > 0 {
 			isIndexUpdateOrAdd = true
-			coll := mogDataObj.GetCollection()
 			for _, v := range updateDataList {
 				err = mogDataObj.UpdateDataByColl(coll, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}})
 				if err != nil {