package mgo import ( "context" "errors" "eta_gn/eta_api/utils" "fmt" "github.com/qiniu/qmgo" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "time" ) type BaseFromThsHfData struct { ID primitive.ObjectID `json:"_id" bson:"_id,omitempty"` // 文档id BaseFromThsHfDataId int64 `json:"base_from_ths_hf_data_id" bson:"base_from_ths_hf_data_id"` // 指标数据ID BaseFromThsHfIndexId int64 `json:"base_from_ths_hf_index_id" bson:"base_from_ths_hf_index_id"` // 指标ID IndexCode string `json:"index_code" bson:"index_code"` // 指标编码 DataTime time.Time `json:"data_time" bson:"data_time"` // 数据日期 Value float64 `json:"value" bson:"value"` // 数据值 UniqueCode string `json:"unique_code" bson:"unique_code"` // 唯一编码 CreateTime time.Time `json:"create_time" bson:"create_time"` // 创建时间 ModifyTime time.Time `json:"modify_time" bson:"modify_time"` // 修改时间 DataTimestamp int64 `json:"data_timestamp" bson:"data_timestamp"` // 数据日期时间戳 } func (m *BaseFromThsHfData) CollectionName() string { return "base_from_ths_hf_data" } func (m *BaseFromThsHfData) DataBaseName() string { return utils.MgoDataDbName } func (m *BaseFromThsHfData) GetCollection() *qmgo.Collection { db := utils.MgoDataCli.Database(m.DataBaseName()) return db.Collection(m.CollectionName()) } func (m *BaseFromThsHfData) GetAllDataList(whereParams interface{}, sort []string) (result []*BaseFromThsHfData, err error) { if utils.MgoDataCli == nil { err = errors.New("mongodb连接失败") return } db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) ctx := context.TODO() if err != nil { fmt.Println("MgoGetColl Err:", err.Error()) return } err = coll.Find(ctx, whereParams).Sort(sort...).All(&result) if err != nil { return } for _, v := range result { v.DataTime = v.DataTime.In(time.Local) v.CreateTime = v.CreateTime.In(time.Local) v.ModifyTime = v.ModifyTime.In(time.Local) } return } func (m *BaseFromThsHfData) GetLimitDataList(whereParams interface{}, size int64, sort []string) (result []*BaseFromThsHfData, err error) { if utils.MgoDataCli == nil { err = errors.New("mongodb连接失败") return } db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) ctx := context.TODO() if err != nil { fmt.Println("MgoGetColl Err:", err.Error()) return } err = coll.Find(ctx, whereParams).Sort(sort...).Limit(size).All(&result) if err != nil { return } for _, v := range result { v.DataTime = v.DataTime.In(time.Local) v.CreateTime = v.CreateTime.In(time.Local) v.ModifyTime = v.ModifyTime.In(time.Local) } return } func (m *BaseFromThsHfData) GetPageDataList(whereParams interface{}, startSize, size int64, sort []string) (result []*BaseFromThsHfData, err error) { if utils.MgoDataCli == nil { err = errors.New("mongodb连接失败") return } db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) ctx := context.TODO() if err != nil { fmt.Println("MgoGetColl Err:", err.Error()) return } err = coll.Find(ctx, whereParams).Sort(sort...).Skip(startSize).Limit(size).All(&result) if err != nil { return } for _, v := range result { v.DataTime = v.DataTime.In(time.Local) v.CreateTime = v.CreateTime.In(time.Local) v.ModifyTime = v.ModifyTime.In(time.Local) } return } func (m *BaseFromThsHfData) GetCountDataList(whereParams interface{}) (count int64, err error) { if utils.MgoDataCli == nil { err = errors.New("mongodb连接失败") return } db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) ctx := context.TODO() if err != nil { fmt.Println("MgoGetColl Err:", err.Error()) return } count, err = coll.Find(ctx, whereParams).Count() return } func (m *BaseFromThsHfData) InsertDataByColl(coll *qmgo.Collection, addData interface{}) (err error) { ctx := context.TODO() _, err = coll.InsertOne(ctx, addData) if err != nil { fmt.Println("InsertDataByColl:Err:" + err.Error()) return } return } func (m *BaseFromThsHfData) BatchInsertData(bulk int, dataList []interface{}) (err error) { db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) return m.BatchInsertDataByColl(coll, bulk, dataList) } func (m *BaseFromThsHfData) BatchInsertDataByColl(coll *qmgo.Collection, bulk int, dataList []interface{}) (err error) { ctx := context.TODO() dataNum := len(dataList) if dataNum <= 0 { return } if bulk <= 0 || dataNum <= bulk { _, err = coll.InsertMany(ctx, dataList) if err != nil { fmt.Println("BatchInsertData:Err:" + err.Error()) return } return } i := 0 tmpAddDataList := make([]interface{}, 0) for _, v := range dataList { tmpAddDataList = append(tmpAddDataList, v) i++ if i >= bulk { _, err = coll.InsertMany(ctx, tmpAddDataList) if err != nil { fmt.Println("BatchInsertData:Err:" + err.Error()) return } i = 0 tmpAddDataList = make([]interface{}, 0) } } if len(tmpAddDataList) > 0 { _, err = coll.InsertMany(ctx, tmpAddDataList) if err != nil { fmt.Println("BatchInsertData:Err:" + err.Error()) return } } return } func (m *BaseFromThsHfData) UpdateDataByColl(coll *qmgo.Collection, whereParams, updateParams interface{}) (err error) { ctx := context.TODO() err = coll.UpdateOne(ctx, whereParams, updateParams) if err != nil { fmt.Println("UpdateDataByColl:Err:" + err.Error()) return } return } func (m *BaseFromThsHfData) UpdateData(whereParams, updateParams interface{}) (err error) { db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) ctx := context.TODO() err = coll.UpdateOne(ctx, whereParams, updateParams) if err != nil { fmt.Println("UpdateData:Err:" + err.Error()) return } return } func (m *BaseFromThsHfData) HandleData(addDataList, updateDataList []BaseFromThsHfData) (result interface{}, err error) { ctx := context.TODO() callback := func(sessCtx context.Context) (interface{}, error) { db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) if len(addDataList) > 0 { _, err = coll.InsertMany(sessCtx, addDataList) if err != nil { return nil, err } } if len(updateDataList) > 0 { for _, v := range updateDataList { err = coll.UpdateOne(ctx, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}}) if err != nil { fmt.Println("BatchInsertData:Err:" + err.Error()) return nil, err } } } return nil, nil } result, err = utils.MgoDataCli.DoTransaction(ctx, callback) return } func (m *BaseFromThsHfData) GetEdbInfoMaxAndMinInfo(whereParams interface{}) (result EdbInfoMaxAndMinInfo, err error) { if utils.MgoDataCli == nil { err = errors.New("mongodb连接失败") return } db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) ctx := context.TODO() if err != nil { fmt.Println("MgoGetColl Err:", err.Error()) return } err = coll.Aggregate(ctx, whereParams).One(&result) if err != nil { return } result.MinDate = result.MinDate.In(time.Local) result.MaxDate = result.MaxDate.In(time.Local) result.LatestDate = result.LatestDate.In(time.Local) return } func (m *BaseFromThsHfData) GetLatestValue(whereParams, selectParam interface{}) (latestValue LatestValue, err error) { if utils.MgoDataCli == nil { err = errors.New("mongodb连接失败") return } db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) ctx := context.TODO() if err != nil { fmt.Println("MgoGetColl Err:", err.Error()) return } err = coll.Find(ctx, whereParams).Select(selectParam).One(&latestValue) return } func (m *BaseFromThsHfData) RemoveMany(whereParams interface{}) (err error) { db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) return m.RemoveManyByColl(coll, whereParams) } func (m *BaseFromThsHfData) RemoveManyByColl(coll *qmgo.Collection, whereParams interface{}) (err error) { ctx := context.TODO() _, err = coll.RemoveAll(ctx, whereParams) if err != nil { fmt.Println("RemoveManyByColl:Err:" + err.Error()) return } return }