package mgo import ( "context" "errors" "eta_gn/eta_api/utils" "fmt" "github.com/qiniu/qmgo" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "time" ) type EdbDataBusiness struct { ID primitive.ObjectID `json:"_id" bson:"_id,omitempty" ` // 文档id EdbInfoId int `json:"edb_info_id" bson:"edb_info_id"` // 指标编码 EdbCode string `json:"edb_code" bson:"edb_code"` // 指标编码 DataTime time.Time `json:"data_time" bson:"data_time"` // 数据日期 Value float64 `json:"value" bson:"value"` // 数据值 CreateTime time.Time `json:"create_time" bson:"create_time"` // 创建时间 ModifyTime time.Time `json:"modify_time" bson:"modify_time"` // 修改时间 DataTimestamp int64 `json:"data_timestamp" bson:"data_timestamp"` // 数据日期时间戳 } func (m *EdbDataBusiness) CollectionName() string { return "edb_data_business" } func (m *EdbDataBusiness) DataBaseName() string { return utils.MgoDataDbName } func (m *EdbDataBusiness) GetCollection() *qmgo.Collection { db := utils.MgoDataCli.Database(m.DataBaseName()) return db.Collection(m.CollectionName()) } func (m *EdbDataBusiness) GetItem(whereParams interface{}) (item *EdbDataBusiness, err error) { if utils.MgoDataCli == nil { err = errors.New("mongodb连接失败") return } db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) return m.GetItemByColl(coll, whereParams) } func (m *EdbDataBusiness) GetItemByColl(coll *qmgo.Collection, whereParams interface{}) (item *EdbDataBusiness, err error) { ctx := context.TODO() if err != nil { fmt.Println("MgoGetColl Err:", err.Error()) return } err = coll.Find(ctx, whereParams).One(&item) if err != nil { return } item.DataTime = item.DataTime.In(time.Local) item.CreateTime = item.CreateTime.In(time.Local) item.ModifyTime = item.ModifyTime.In(time.Local) return } func (m *EdbDataBusiness) GetAllDataList(whereParams interface{}, sort []string) (result []*EdbDataBusiness, err error) { if utils.MgoDataCli == nil { err = errors.New("mongodb连接失败") return } db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) ctx := context.TODO() if err != nil { fmt.Println("MgoGetColl Err:", err.Error()) return } err = coll.Find(ctx, whereParams).Sort(sort...).All(&result) if err != nil { return } for _, v := range result { v.DataTime = v.DataTime.In(time.Local) v.CreateTime = v.CreateTime.In(time.Local) v.ModifyTime = v.ModifyTime.In(time.Local) } return } func (m *EdbDataBusiness) GetLimitDataList(whereParams interface{}, size int64, sort []string) (result []*EdbDataBusiness, err error) { if utils.MgoDataCli == nil { err = errors.New("mongodb连接失败") return } db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) ctx := context.TODO() if err != nil { fmt.Println("MgoGetColl Err:", err.Error()) return } err = coll.Find(ctx, whereParams).Sort(sort...).Limit(size).All(&result) if err != nil { return } for _, v := range result { v.DataTime = v.DataTime.In(time.Local) v.CreateTime = v.CreateTime.In(time.Local) v.ModifyTime = v.ModifyTime.In(time.Local) } return } func (m *EdbDataBusiness) GetPageDataList(whereParams interface{}, startSize, size int64, sort []string) (result []*EdbDataBusiness, err error) { if utils.MgoDataCli == nil { err = errors.New("mongodb连接失败") return } db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) ctx := context.TODO() if err != nil { fmt.Println("MgoGetColl Err:", err.Error()) return } err = coll.Find(ctx, whereParams).Sort(sort...).Skip(startSize).Limit(size).All(&result) if err != nil { return } for _, v := range result { v.DataTime = v.DataTime.In(time.Local) v.CreateTime = v.CreateTime.In(time.Local) v.ModifyTime = v.ModifyTime.In(time.Local) } return } func (m *EdbDataBusiness) GetCountDataList(whereParams interface{}) (count int64, err error) { if utils.MgoDataCli == nil { err = errors.New("mongodb连接失败") return } db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) ctx := context.TODO() if err != nil { fmt.Println("MgoGetColl Err:", err.Error()) return } count, err = coll.Find(ctx, whereParams).Count() return } func (m *EdbDataBusiness) InsertDataByColl(coll *qmgo.Collection, addData interface{}) (err error) { ctx := context.TODO() _, err = coll.InsertOne(ctx, addData) if err != nil { fmt.Println("InsertDataByColl:Err:" + err.Error()) return } return } func (m *EdbDataBusiness) BatchInsertData(bulk int, dataList []interface{}) (err error) { db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) return m.BatchInsertDataByColl(coll, bulk, dataList) } func (m *EdbDataBusiness) BatchInsertDataByColl(coll *qmgo.Collection, bulk int, dataList []interface{}) (err error) { ctx := context.TODO() dataNum := len(dataList) if dataNum <= 0 { return } if bulk <= 0 || dataNum <= bulk { _, err = coll.InsertMany(ctx, dataList) if err != nil { fmt.Println("BatchInsertData:Err:" + err.Error()) return } return } i := 0 tmpAddDataList := make([]interface{}, 0) for _, v := range dataList { tmpAddDataList = append(tmpAddDataList, v) i++ if i >= bulk { _, err = coll.InsertMany(ctx, tmpAddDataList) if err != nil { fmt.Println("BatchInsertData:Err:" + err.Error()) return } i = 0 tmpAddDataList = make([]interface{}, 0) } } if len(tmpAddDataList) > 0 { _, err = coll.InsertMany(ctx, tmpAddDataList) if err != nil { fmt.Println("BatchInsertData:Err:" + err.Error()) return } } return } func (m *EdbDataBusiness) UpdateData(whereParams, updateParams interface{}) (err error) { db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) return m.UpdateDataByColl(coll, whereParams, updateParams) } func (m *EdbDataBusiness) UpdateDataByColl(coll *qmgo.Collection, whereParams, updateParams interface{}) (err error) { ctx := context.TODO() err = coll.UpdateOne(ctx, whereParams, updateParams) if err != nil { fmt.Println("UpdateDataByColl:Err:" + err.Error()) return } return } func (m *EdbDataBusiness) RemoveMany(whereParams interface{}) (err error) { db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) return m.RemoveManyByColl(coll, whereParams) } func (m *EdbDataBusiness) RemoveManyByColl(coll *qmgo.Collection, whereParams interface{}) (err error) { ctx := context.TODO() _, err = coll.RemoveAll(ctx, whereParams) if err != nil { fmt.Println("RemoveManyByColl:Err:" + err.Error()) return } return } func (m *EdbDataBusiness) HandleData(addDataList, updateDataList []EdbDataBusiness) (result interface{}, err error) { ctx := context.TODO() callback := func(sessCtx context.Context) (interface{}, error) { db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) if len(addDataList) > 0 { _, err = coll.InsertMany(sessCtx, addDataList) if err != nil { return nil, err } } if len(updateDataList) > 0 { for _, v := range updateDataList { err = coll.UpdateOne(ctx, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}}) if err != nil { fmt.Println("BatchInsertData:Err:" + err.Error()) return nil, err } } } return nil, nil } result, err = utils.MgoDataCli.DoTransaction(ctx, callback) return } type EdbInfoMaxAndMinInfo struct { MinDate time.Time `description:"最小日期" bson:"min_date"` MaxDate time.Time `description:"最大日期" bson:"max_date"` MinValue float64 `description:"最小值" bson:"min_value"` MaxValue float64 `description:"最大值" bson:"max_value"` LatestValue float64 `description:"最新值" bson:"latest_value"` LatestDate time.Time `description:"实际数据最新日期" bson:"latest_date"` EndValue float64 `description:"最新值" bson:"end_value"` } func (m *EdbDataBusiness) GetEdbInfoMaxAndMinInfo(whereParams interface{}) (result EdbInfoMaxAndMinInfo, err error) { if utils.MgoDataCli == nil { err = errors.New("mongodb连接失败") return } db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) ctx := context.TODO() if err != nil { fmt.Println("MgoGetColl Err:", err.Error()) return } err = coll.Aggregate(ctx, whereParams).One(&result) if err != nil { return } result.MinDate = result.MinDate.In(time.Local) result.MaxDate = result.MaxDate.In(time.Local) result.LatestDate = result.LatestDate.In(time.Local) return } type LatestValue struct { Value float64 `description:"值" bson:"value"` } func (m *EdbDataBusiness) GetLatestValue(whereParams, selectParam interface{}) (latestValue LatestValue, err error) { if utils.MgoDataCli == nil { err = errors.New("mongodb连接失败") return } db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) ctx := context.TODO() if err != nil { fmt.Println("MgoGetColl Err:", err.Error()) return } err = coll.Find(ctx, whereParams).Select(selectParam).One(&latestValue) return }