package mgo import ( "context" "errors" "eta/eta_index_lib/utils" "fmt" "github.com/qiniu/qmgo" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "time" ) // BaseAddFromBusinessData // @Description: 外部数据基础 type BaseAddFromBusinessData struct { BaseFromBusinessIndexId int64 `json:"base_from_business_index_id" bson:"base_from_business_index_id"` // 指标id IndexCode string `json:"index_code" bson:"index_code"` // 指标编码 DataTime string `json:"data_time" bson:"data_time"` // 数据日期 Value float64 `json:"value" bson:"value"` // 数据值 CreateTime time.Time `json:"create_time" bson:"create_time"` // 创建时间 ModifyTime time.Time `json:"modify_time" bson:"modify_time"` // 修改时间 //DataTimestamp int64 `json:"data_timestamp"` // 数据日期时间戳 } // BaseFromBusinessData // @Description: 外部数据集合 type BaseFromBusinessData struct { ID primitive.ObjectID `json:"_id" bson:"_id" ` // 文档id //ID string `json:"_id" bson:"_id" ` // 文档id //BaseAddFromBusinessData BaseFromBusinessIndexId int64 `json:"base_from_business_index_id" bson:"base_from_business_index_id"` // 指标id IndexCode string `json:"index_code" bson:"index_code"` // 指标编码 DataTime string `json:"data_time" bson:"data_time"` // 数据日期 Value float64 `json:"value" bson:"value"` // 数据值 CreateTime time.Time `json:"create_time" bson:"create_time"` // 创建时间 ModifyTime time.Time `json:"modify_time" bson:"modify_time"` // 修改时间 //DataTimestamp int64 `json:"data_timestamp"` // 数据日期时间戳 } // CollectionName // @Description: 获取集合名称 // @author: Roc // @receiver m // @datetime 2024-04-26 13:41:36 // @return string func (m *BaseFromBusinessData) CollectionName() string { return "base_from_business_data" } // DataBaseName // @Description: 获取数据库名称 // @author: Roc // @receiver m // @datetime 2024-04-26 13:41:33 // @return string func (m *BaseFromBusinessData) DataBaseName() string { return "hz_data" } // GetCollection // @Description: 获取mongodb集合的句柄 // @author: Roc // @receiver m // @datetime 2024-04-26 13:41:33 // @return string func (m *BaseFromBusinessData) GetCollection() *qmgo.Collection { db := utils.MgoDataCli.Database(m.DataBaseName()) return db.Collection(m.CollectionName()) } // GetAllDataList // @Description: 根据条件获取所有数据 // @author: Roc // @receiver m // @datetime 2024-04-26 13:42:19 // @param whereParams interface{} // @return result []BaseFromBusinessData // @return err error func (m *BaseFromBusinessData) GetAllDataList(whereParams interface{}) (result []BaseFromBusinessData, err error) { if utils.MgoDataCli == nil { err = errors.New("mongodb连接失败") return } db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) ctx := context.TODO() if err != nil { fmt.Println("MgoGetColl Err:", err.Error()) return } err = coll.Find(ctx, whereParams).All(&result) return } // BatchInsertData // @Description: 批量写入数据 // @author: Roc // @receiver m // @datetime 2024-04-26 14:22:18 // @param dataList interface{} // @return err error func (m *BaseFromBusinessData) BatchInsertData(dataList interface{}) (err error) { db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) ctx := context.TODO() _, err = coll.InsertMany(ctx, dataList) if err != nil { fmt.Println("BatchInsertData:Err:" + err.Error()) return } return } // UpdateDataByColl // @Description: 单条数据修改 // @author: Roc // @receiver m // @datetime 2024-04-26 15:01:51 // @param whereParams interface{} // @param updateParams interface{} // @return err error func (m *BaseFromBusinessData) UpdateDataByColl(coll *qmgo.Collection, whereParams, updateParams interface{}) (err error) { ctx := context.TODO() err = coll.UpdateOne(ctx, whereParams, updateParams) if err != nil { fmt.Println("UpdateDataByColl:Err:" + err.Error()) return } return } // UpdateData // @Description: 单条数据修改 // @author: Roc // @receiver m // @datetime 2024-04-26 15:01:51 // @param whereParams interface{} // @param updateParams interface{} // @return err error func (m *BaseFromBusinessData) UpdateData(whereParams, updateParams interface{}) (err error) { db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) ctx := context.TODO() err = coll.UpdateOne(ctx, whereParams, updateParams) if err != nil { fmt.Println("UpdateData:Err:" + err.Error()) return } return } // UpdateData // @Description: 单条数据修改 // @author: Roc // @receiver m // @datetime 2024-04-26 15:01:51 // @param params interface{} // @param whereParams interface{} // @return err error func (m *BaseFromBusinessData) HandleData(addDataList []BaseAddFromBusinessData, updateDataList []BaseFromBusinessData) (result interface{}, err error) { ctx := context.TODO() callback := func(sessCtx context.Context) (interface{}, error) { // 重要:确保事务中的每一个操作,都使用传入的sessCtx参数 db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) // 插入数据 if len(addDataList) > 0 { _, err = coll.InsertMany(sessCtx, addDataList) if err != nil { return nil, err } } // 修改 if len(updateDataList) > 0 { for _, v := range updateDataList { err = coll.UpdateOne(ctx, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}}) if err != nil { fmt.Println("BatchInsertData:Err:" + err.Error()) return nil, err } } } return nil, nil } result, err = utils.MgoDataCli.DoTransaction(ctx, callback) return }