瀏覽代碼

fix:兼容mongo

Roc 11 月之前
父節點
當前提交
3919330f04
共有 3 個文件被更改,包括 243 次插入13 次删除
  1. 61 0
      controllers/eia_steo.go
  2. 73 5
      models/mgo/base_from_business_data.go
  3. 109 8
      models/mgo/edb_data_business.go

+ 61 - 0
controllers/eia_steo.go

@@ -136,3 +136,64 @@ func (this *EdbInfoController) EiaSteoData() {
 	br.Data = resultList
 }
 
+//func init() {
+//	for i := 110; i < 115; i++ {
+//		pushData(i)
+//	}
+//	//pushData(110)
+//}
+//func pushData(index int) {
+//	edbOb := new(data_manage.EdbInfo)
+//	dataOb := new(data_manage.EdbData)
+//	cond := ``
+//	pars := make([]interface{}, 0)
+//	//cond += ` AND source in (1,2,3,9,10,11,15,16,17,18,19,20,21,25,26,34,57)`
+//	cond += ` AND source in (1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,25,26,28,29,31,32,33,34,35,36,37,38,39,40,41,42,42,44,45,46,47,48,57)`
+//	//cond += ` AND edb_code = "C2305235333"`
+//	edbList, e := edbOb.GetItemsByCondition(cond, pars, []string{}, "sort ASC, create_time DESC")
+//	if e != nil {
+//		fmt.Println("EdbList GetItemsByCondition err: " + e.Error())
+//		return
+//	}
+//
+//	count := len(edbList)
+//	for k, edb := range edbList {
+//		fmt.Println("当前第", index, "组,第", k+1, "条,剩余", count-k-1, "条")
+//		req := data_manage.PushBusinessIndexReq{
+//			IndexCode:  fmt.Sprintf("Roc_%d_%s", index, edb.EdbCode),
+//			IndexName:  fmt.Sprintf("自有数据-%d-%s", index, edb.EdbName),
+//			Unit:       edb.Unit,
+//			Frequency:  edb.Frequency,
+//			SourceName: edb.SourceName,
+//			Remark:     "",
+//			DataList:   nil,
+//		}
+//
+//		// 获取指标数据
+//		tmpDataList := make([]data_manage.AddBusinessDataReq, 0)
+//		dataList, e := dataOb.GetItemsBySourceAndCode(edb.Source, edb.SubSource, edb.EdbCode, "", []string{}, "")
+//		if e != nil {
+//			fmt.Println(edb.EdbCode + ";EdbData GetItemsBySourceAndCode err: " + e.Error())
+//			continue
+//		}
+//		for _, v := range dataList {
+//			tmpDataList = append(tmpDataList, data_manage.AddBusinessDataReq{
+//				Value: v.Value,
+//				Date:  v.DataTime.Format(utils.FormatDate),
+//			})
+//		}
+//		req.DataList = tmpDataList
+//		reqJson, _ := json.Marshal(req)
+//
+//		respItem, err := data.PushEdb(string(reqJson))
+//		if err != nil {
+//			fmt.Println(edb.EdbCode + "处理失败,Err:" + err.Error())
+//			continue
+//		}
+//		if respItem.Ret != 200 {
+//			fmt.Println(edb.EdbCode + "处理失败,Err:" + respItem.ErrMsg)
+//		}
+//	}
+//
+//	fmt.Println("end")
+//}

+ 73 - 5
models/mgo/base_from_business_data.go

@@ -187,22 +187,90 @@ func (m *BaseFromBusinessData) GetCountDataList(whereParams interface{}) (count
 	return
 }
 
+// InsertDataByColl
+// @Description: 写入单条数据(外部传入集合)
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 14:22:18
+// @param addData interface{}
+// @return err error
+func (m *BaseFromBusinessData) InsertDataByColl(coll *qmgo.Collection, addData interface{}) (err error) {
+	ctx := context.TODO()
+	_, err = coll.InsertOne(ctx, addData)
+	if err != nil {
+		fmt.Println("InsertDataByColl:Err:" + err.Error())
+		return
+	}
+
+	return
+}
+
 // BatchInsertData
 // @Description: 批量写入数据
 // @author: Roc
 // @receiver m
 // @datetime 2024-04-26 14:22:18
-// @param dataList interface{}
+// @param bulk int 每次请求保存的数据量
+// @param dataList []interface{}
 // @return err error
-func (m *BaseFromBusinessData) BatchInsertData(dataList interface{}) (err error) {
+func (m *BaseFromBusinessData) BatchInsertData(bulk int, dataList []interface{}) (err error) {
 	db := utils.MgoDataCli.Database(m.DataBaseName())
 	coll := db.Collection(m.CollectionName())
+
+	return m.BatchInsertDataByColl(coll, bulk, dataList)
+}
+
+// BatchInsertDataByColl
+// @Description: 批量写入数据(外部传入集合)
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 14:22:18
+// @param coll *qmgo.Collection
+// @param bulk int 每次请求保存的数据量
+// @param dataList []interface{}
+// @return err error
+func (m *BaseFromBusinessData) BatchInsertDataByColl(coll *qmgo.Collection, bulk int, dataList []interface{}) (err error) {
 	ctx := context.TODO()
-	_, err = coll.InsertMany(ctx, dataList)
-	if err != nil {
-		fmt.Println("BatchInsertData:Err:" + err.Error())
+	dataNum := len(dataList)
+	if dataNum <= 0 {
+		return
+	}
+
+	// 不设置每次保存切片数量大小,或者实际数据量小于设置的切片数量大小,那么就直接保存吧
+	if bulk <= 0 || dataNum <= bulk {
+		_, err = coll.InsertMany(ctx, dataList)
+		if err != nil {
+			fmt.Println("BatchInsertData:Err:" + err.Error())
+			return
+		}
 		return
 	}
+
+	// 分批保存
+	i := 0
+	tmpAddDataList := make([]interface{}, 0)
+	for _, v := range dataList {
+		tmpAddDataList = append(tmpAddDataList, v)
+		i++
+		if i >= bulk {
+			_, err = coll.InsertMany(ctx, tmpAddDataList)
+			if err != nil {
+				fmt.Println("BatchInsertData:Err:" + err.Error())
+				return
+			}
+			i = 0
+			tmpAddDataList = make([]interface{}, 0)
+		}
+	}
+
+	if len(tmpAddDataList) > 0 {
+		_, err = coll.InsertMany(ctx, tmpAddDataList)
+		if err != nil {
+			fmt.Println("BatchInsertData:Err:" + err.Error())
+			return
+		}
+	}
+
 	return
 }
 

+ 109 - 8
models/mgo/edb_data_business.go

@@ -55,6 +55,52 @@ func (m *EdbDataBusiness) GetCollection() *qmgo.Collection {
 	return db.Collection(m.CollectionName())
 }
 
+// GetItem
+// @Description: 根据条件获取单条数据
+// @author: Roc
+// @receiver m
+// @datetime 2024-05-09 10:00:49
+// @param whereParams interface{}
+// @return item *EdbDataBusiness
+// @return err error
+func (m *EdbDataBusiness) GetItem(whereParams interface{}) (item *EdbDataBusiness, err error) {
+	if utils.MgoDataCli == nil {
+		err = errors.New("mongodb连接失败")
+		return
+	}
+	db := utils.MgoDataCli.Database(m.DataBaseName())
+	coll := db.Collection(m.CollectionName())
+
+	return m.GetItemByColl(coll, whereParams)
+}
+
+// GetItemByColl
+// @Description: 根据条件获取单条数据
+// @author: Roc
+// @receiver m
+// @datetime 2024-05-09 13:22:06
+// @param coll *qmgo.Collection
+// @param whereParams interface{}
+// @return item *EdbDataBusiness
+// @return err error
+func (m *EdbDataBusiness) GetItemByColl(coll *qmgo.Collection, whereParams interface{}) (item *EdbDataBusiness, err error) {
+	ctx := context.TODO()
+	if err != nil {
+		fmt.Println("MgoGetColl Err:", err.Error())
+		return
+	}
+	err = coll.Find(ctx, whereParams).One(&item)
+	if err != nil {
+		return
+	}
+
+	item.DataTime = item.DataTime.In(time.Local)
+	item.CreateTime = item.CreateTime.In(time.Local)
+	item.ModifyTime = item.ModifyTime.In(time.Local)
+
+	return
+}
+
 // GetAllDataList
 // @Description: 根据条件获取所有数据
 // @author: Roc
@@ -188,18 +234,37 @@ func (m *EdbDataBusiness) GetCountDataList(whereParams interface{}) (count int64
 	return
 }
 
+// InsertDataByColl
+// @Description: 写入单条数据(外部传入集合)
+// @author: Roc
+// @receiver m
+// @datetime 2024-04-26 14:22:18
+// @param addData interface{}
+// @return err error
+func (m *EdbDataBusiness) InsertDataByColl(coll *qmgo.Collection, addData interface{}) (err error) {
+	ctx := context.TODO()
+	_, err = coll.InsertOne(ctx, addData)
+	if err != nil {
+		fmt.Println("InsertDataByColl:Err:" + err.Error())
+		return
+	}
+
+	return
+}
+
 // BatchInsertData
 // @Description: 批量写入数据
 // @author: Roc
 // @receiver m
 // @datetime 2024-04-26 14:22:18
-// @param dataList interface{}
+// @param bulk int 每次请求保存的数据量
+// @param dataList []interface{}
 // @return err error
-func (m *EdbDataBusiness) BatchInsertData(dataList []EdbDataBusiness) (err error) {
+func (m *EdbDataBusiness) BatchInsertData(bulk int, dataList []interface{}) (err error) {
 	db := utils.MgoDataCli.Database(m.DataBaseName())
 	coll := db.Collection(m.CollectionName())
 
-	return m.BatchInsertDataByColl(coll, dataList)
+	return m.BatchInsertDataByColl(coll, bulk, dataList)
 }
 
 // BatchInsertDataByColl
@@ -207,16 +272,52 @@ func (m *EdbDataBusiness) BatchInsertData(dataList []EdbDataBusiness) (err error
 // @author: Roc
 // @receiver m
 // @datetime 2024-04-26 14:22:18
-// @param dataList interface{}
+// @param coll *qmgo.Collection
+// @param bulk int 每次请求保存的数据量
+// @param dataList []interface{}
 // @return err error
-func (m *EdbDataBusiness) BatchInsertDataByColl(coll *qmgo.Collection, dataList []EdbDataBusiness) (err error) {
+func (m *EdbDataBusiness) BatchInsertDataByColl(coll *qmgo.Collection, bulk int, dataList []interface{}) (err error) {
 	ctx := context.TODO()
-	_, err = coll.InsertMany(ctx, dataList)
-	if err != nil {
-		fmt.Println("BatchInsertData:Err:" + err.Error())
+	dataNum := len(dataList)
+	if dataNum <= 0 {
 		return
 	}
 
+	// 不设置每次保存切片数量大小,或者实际数据量小于设置的切片数量大小,那么就直接保存吧
+	if bulk <= 0 || dataNum <= bulk {
+		_, err = coll.InsertMany(ctx, dataList)
+		if err != nil {
+			fmt.Println("BatchInsertData:Err:" + err.Error())
+			return
+		}
+		return
+	}
+
+	// 分批保存
+	i := 0
+	tmpAddDataList := make([]interface{}, 0)
+	for _, v := range dataList {
+		tmpAddDataList = append(tmpAddDataList, v)
+		i++
+		if i >= bulk {
+			_, err = coll.InsertMany(ctx, tmpAddDataList)
+			if err != nil {
+				fmt.Println("BatchInsertData:Err:" + err.Error())
+				return
+			}
+			i = 0
+			tmpAddDataList = make([]interface{}, 0)
+		}
+	}
+
+	if len(tmpAddDataList) > 0 {
+		_, err = coll.InsertMany(ctx, tmpAddDataList)
+		if err != nil {
+			fmt.Println("BatchInsertData:Err:" + err.Error())
+			return
+		}
+	}
+
 	return
 }