package mgo import ( "context" "errors" "eta/eta_index_lib/utils" "fmt" "github.com/qiniu/qmgo" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "time" ) // EdbDataBusiness // @Description: 自有数据集合(指标库) type EdbDataBusiness struct { ID primitive.ObjectID `json:"_id" bson:"_id,omitempty" ` // 文档id EdbInfoId int `json:"edb_info_id" bson:"edb_info_id"` // 指标编码 EdbCode string `json:"edb_code" bson:"edb_code"` // 指标编码 DataTime time.Time `json:"data_time" bson:"data_time"` // 数据日期 Value float64 `json:"value" bson:"value"` // 数据值 CreateTime time.Time `json:"create_time" bson:"create_time"` // 创建时间 ModifyTime time.Time `json:"modify_time" bson:"modify_time"` // 修改时间 DataTimestamp int64 `json:"data_timestamp" bson:"data_timestamp"` // 数据日期时间戳 } // CollectionName // @Description: 获取集合名称 // @author: Roc // @receiver m // @datetime 2024-04-26 13:41:36 // @return string func (m *EdbDataBusiness) CollectionName() string { return "edb_data_business" } // DataBaseName // @Description: 获取数据库名称 // @author: Roc // @receiver m // @datetime 2024-04-26 13:41:33 // @return string func (m *EdbDataBusiness) DataBaseName() string { return utils.MgoDataDbName } // GetCollection // @Description: 获取mongodb集合的句柄 // @author: Roc // @receiver m // @datetime 2024-04-26 13:41:33 // @return string func (m *EdbDataBusiness) GetCollection() *qmgo.Collection { db := utils.MgoDataCli.Database(m.DataBaseName()) return db.Collection(m.CollectionName()) } // GetItem // @Description: 根据条件获取单条数据 // @author: Roc // @receiver m // @datetime 2024-05-09 10:00:49 // @param whereParams interface{} // @return item *EdbDataBusiness // @return err error func (m *EdbDataBusiness) GetItem(whereParams interface{}) (item *EdbDataBusiness, err error) { if utils.MgoDataCli == nil { err = errors.New("mongodb连接失败") return } db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) return m.GetItemByColl(coll, whereParams) } // GetItemByColl // @Description: 根据条件获取单条数据 // @author: Roc // @receiver m // @datetime 2024-05-09 13:22:06 // @param coll *qmgo.Collection // @param whereParams interface{} // @return item *EdbDataBusiness // @return err error func (m *EdbDataBusiness) GetItemByColl(coll *qmgo.Collection, whereParams interface{}) (item *EdbDataBusiness, err error) { ctx := context.TODO() if err != nil { fmt.Println("MgoGetColl Err:", err.Error()) return } err = coll.Find(ctx, whereParams).One(&item) if err != nil { return } item.DataTime = item.DataTime.In(time.Local) item.CreateTime = item.CreateTime.In(time.Local) item.ModifyTime = item.ModifyTime.In(time.Local) return } // GetAllDataList // @Description: 根据条件获取所有数据 // @author: Roc // @receiver m // @datetime 2024-04-26 13:42:19 // @param whereParams interface{} // @param sort []string // @return result []EdbDataBusiness // @return err error func (m *EdbDataBusiness) GetAllDataList(whereParams interface{}, sort []string) (result []*EdbDataBusiness, err error) { if utils.MgoDataCli == nil { err = errors.New("mongodb连接失败") return } db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) ctx := context.TODO() if err != nil { fmt.Println("MgoGetColl Err:", err.Error()) return } err = coll.Find(ctx, whereParams).Sort(sort...).All(&result) if err != nil { return } for _, v := range result { v.DataTime = v.DataTime.In(time.Local) v.CreateTime = v.CreateTime.In(time.Local) v.ModifyTime = v.ModifyTime.In(time.Local) } return } // GetLimitDataList // @Description: 根据条件获取指定数量数据列表 // @author: Roc // @receiver m // @datetime 2024-05-06 17:08:32 // @param whereParams interface{} // @param size int64 // @param sort []string // @return result []*BaseFromBusinessData // @return err error func (m *EdbDataBusiness) GetLimitDataList(whereParams interface{}, size int64, sort []string) (result []*EdbDataBusiness, err error) { if utils.MgoDataCli == nil { err = errors.New("mongodb连接失败") return } db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) ctx := context.TODO() if err != nil { fmt.Println("MgoGetColl Err:", err.Error()) return } err = coll.Find(ctx, whereParams).Sort(sort...).Limit(size).All(&result) if err != nil { return } for _, v := range result { v.DataTime = v.DataTime.In(time.Local) v.CreateTime = v.CreateTime.In(time.Local) v.ModifyTime = v.ModifyTime.In(time.Local) } return } // GetPageDataList // @Description: 根据条件获取分页数据列表 // @author: Roc // @receiver m // @datetime 2024-05-07 10:21:07 // @param whereParams interface{} // @param startSize int64 // @param size int64 // @param sort []string // @return result []*EdbDataBusiness // @return err error func (m *EdbDataBusiness) GetPageDataList(whereParams interface{}, startSize, size int64, sort []string) (result []*EdbDataBusiness, err error) { if utils.MgoDataCli == nil { err = errors.New("mongodb连接失败") return } db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) ctx := context.TODO() if err != nil { fmt.Println("MgoGetColl Err:", err.Error()) return } err = coll.Find(ctx, whereParams).Sort(sort...).Skip(startSize).Limit(size).All(&result) if err != nil { return } for _, v := range result { v.DataTime = v.DataTime.In(time.Local) v.CreateTime = v.CreateTime.In(time.Local) v.ModifyTime = v.ModifyTime.In(time.Local) } return } // GetCountDataList // @Description: 根据条件获取数据列表总数 // @author: Roc // @receiver m // @datetime 2024-05-07 10:29:00 // @param whereParams interface{} // @return count int64 // @return err error func (m *EdbDataBusiness) GetCountDataList(whereParams interface{}) (count int64, err error) { if utils.MgoDataCli == nil { err = errors.New("mongodb连接失败") return } db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) ctx := context.TODO() if err != nil { fmt.Println("MgoGetColl Err:", err.Error()) return } count, err = coll.Find(ctx, whereParams).Count() return } // InsertDataByColl // @Description: 写入单条数据(外部传入集合) // @author: Roc // @receiver m // @datetime 2024-04-26 14:22:18 // @param addData interface{} // @return err error func (m *EdbDataBusiness) InsertDataByColl(coll *qmgo.Collection, addData interface{}) (err error) { ctx := context.TODO() _, err = coll.InsertOne(ctx, addData) if err != nil { fmt.Println("InsertDataByColl:Err:" + err.Error()) return } return } // BatchInsertData // @Description: 批量写入数据 // @author: Roc // @receiver m // @datetime 2024-04-26 14:22:18 // @param bulk int 每次请求保存的数据量 // @param dataList []interface{} // @return err error func (m *EdbDataBusiness) BatchInsertData(bulk int, dataList []interface{}) (err error) { db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) return m.BatchInsertDataByColl(coll, bulk, dataList) } // BatchInsertDataByColl // @Description: 批量写入数据(外部传入集合) // @author: Roc // @receiver m // @datetime 2024-04-26 14:22:18 // @param coll *qmgo.Collection // @param bulk int 每次请求保存的数据量 // @param dataList []interface{} // @return err error func (m *EdbDataBusiness) BatchInsertDataByColl(coll *qmgo.Collection, bulk int, dataList []interface{}) (err error) { ctx := context.TODO() dataNum := len(dataList) if dataNum <= 0 { return } // 不设置每次保存切片数量大小,或者实际数据量小于设置的切片数量大小,那么就直接保存吧 if bulk <= 0 || dataNum <= bulk { _, err = coll.InsertMany(ctx, dataList) if err != nil { fmt.Println("BatchInsertData:Err:" + err.Error()) return } return } // 分批保存 i := 0 tmpAddDataList := make([]interface{}, 0) for _, v := range dataList { tmpAddDataList = append(tmpAddDataList, v) i++ if i >= bulk { _, err = coll.InsertMany(ctx, tmpAddDataList) if err != nil { fmt.Println("BatchInsertData:Err:" + err.Error()) return } i = 0 tmpAddDataList = make([]interface{}, 0) } } if len(tmpAddDataList) > 0 { _, err = coll.InsertMany(ctx, tmpAddDataList) if err != nil { fmt.Println("BatchInsertData:Err:" + err.Error()) return } } return } // UpdateData // @Description: 单条数据修改 // @author: Roc // @receiver m // @datetime 2024-04-26 15:01:51 // @param whereParams interface{} // @param updateParams interface{} // @return err error func (m *EdbDataBusiness) UpdateData(whereParams, updateParams interface{}) (err error) { db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) return m.UpdateDataByColl(coll, whereParams, updateParams) } // UpdateDataByColl // @Description: 单条数据修改(外部传入集合) // @author: Roc // @receiver m // @datetime 2024-04-26 15:01:51 // @param whereParams interface{} // @param updateParams interface{} // @return err error func (m *EdbDataBusiness) UpdateDataByColl(coll *qmgo.Collection, whereParams, updateParams interface{}) (err error) { ctx := context.TODO() err = coll.UpdateOne(ctx, whereParams, updateParams) if err != nil { fmt.Println("UpdateDataByColl:Err:" + err.Error()) return } return } // RemoveMany // @Description: 根据条件删除多条数据 // @author: Roc // @receiver m // @datetime 2024-04-30 13:17:02 // @param whereParams interface{} // @return err error func (m *EdbDataBusiness) RemoveMany(whereParams interface{}) (err error) { db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) return m.RemoveManyByColl(coll, whereParams) } // RemoveManyByColl // @Description: 根据条件删除多条数据(外部传入集合) // @author: Roc // @receiver m // @datetime 2024-04-30 13:18:42 // @param coll *qmgo.Collection // @param whereParams interface{} // @return err error func (m *EdbDataBusiness) RemoveManyByColl(coll *qmgo.Collection, whereParams interface{}) (err error) { ctx := context.TODO() _, err = coll.RemoveAll(ctx, whereParams) if err != nil { fmt.Println("RemoveManyByColl:Err:" + err.Error()) return } return } // HandleData // @Description: 事务处理数据 // @author: Roc // @receiver m // @datetime 2024-04-30 10:39:01 // @param addDataList []AddEdbDataBusiness // @param updateDataList []EdbDataBusiness // @return result interface{} // @return err error func (m *EdbDataBusiness) HandleData(addDataList, updateDataList []EdbDataBusiness) (result interface{}, err error) { ctx := context.TODO() callback := func(sessCtx context.Context) (interface{}, error) { // 重要:确保事务中的每一个操作,都使用传入的sessCtx参数 db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) // 插入数据 if len(addDataList) > 0 { _, err = coll.InsertMany(sessCtx, addDataList) if err != nil { return nil, err } } // 修改 if len(updateDataList) > 0 { for _, v := range updateDataList { err = coll.UpdateOne(ctx, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}}) if err != nil { fmt.Println("BatchInsertData:Err:" + err.Error()) return nil, err } } } return nil, nil } result, err = utils.MgoDataCli.DoTransaction(ctx, callback) return } // EdbInfoMaxAndMinInfo 指标最新数据记录结构体 type EdbInfoMaxAndMinInfo struct { MinDate time.Time `description:"最小日期" bson:"min_date"` MaxDate time.Time `description:"最大日期" bson:"max_date"` MinValue float64 `description:"最小值" bson:"min_value"` MaxValue float64 `description:"最大值" bson:"max_value"` LatestValue float64 `description:"最新值" bson:"latest_value"` LatestDate time.Time `description:"实际数据最新日期" bson:"latest_date"` EndValue float64 `description:"最新值" bson:"end_value"` } // GetEdbInfoMaxAndMinInfo // @Description: 获取当前指标的最大最小值 // @author: Roc // @receiver m // @datetime 2024-04-30 17:15:39 // @param whereParams interface{} // @return result EdbInfoMaxAndMinInfo // @return err error func (m *EdbDataBusiness) GetEdbInfoMaxAndMinInfo(whereParams interface{}) (result EdbInfoMaxAndMinInfo, err error) { if utils.MgoDataCli == nil { err = errors.New("mongodb连接失败") return } db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) ctx := context.TODO() if err != nil { fmt.Println("MgoGetColl Err:", err.Error()) return } err = coll.Aggregate(ctx, whereParams).One(&result) if err != nil { return } result.MinDate = result.MinDate.In(time.Local) result.MaxDate = result.MaxDate.In(time.Local) result.LatestDate = result.LatestDate.In(time.Local) return } // LatestValue 指标最新数据记录结构体 type LatestValue struct { Value float64 `description:"值" bson:"value"` } // GetLatestValue // @Description: 获取当前指标的最新数据记录 // @author: Roc // @receiver m // @datetime 2024-04-30 17:16:15 // @param whereParams interface{} // @param selectParam interface{} // @return latestValue LatestValue // @return err error func (m *EdbDataBusiness) GetLatestValue(whereParams, selectParam interface{}) (latestValue LatestValue, err error) { if utils.MgoDataCli == nil { err = errors.New("mongodb连接失败") return } db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) ctx := context.TODO() if err != nil { fmt.Println("MgoGetColl Err:", err.Error()) return } //var result interface{} //err = coll.Find(ctx, whereParams).Select(selectParam).One(&result) err = coll.Find(ctx, whereParams).Select(selectParam).One(&latestValue) return }