package mgo import ( "context" "errors" "eta/eta_api/utils" "fmt" "github.com/qiniu/qmgo" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "time" ) // BaseFromThsHfData // @Description: 同花顺高频集合 type BaseFromThsHfData struct { ID primitive.ObjectID `json:"_id" bson:"_id,omitempty"` // 文档id BaseFromThsHfDataId int64 `json:"base_from_ths_hf_data_id" bson:"base_from_ths_hf_data_id"` // 指标数据ID BaseFromThsHfIndexId int64 `json:"base_from_ths_hf_index_id" bson:"base_from_ths_hf_index_id"` // 指标ID IndexCode string `json:"index_code" bson:"index_code"` // 指标编码 DataTime time.Time `json:"data_time" bson:"data_time"` // 数据日期 Value float64 `json:"value" bson:"value"` // 数据值 UniqueCode string `json:"unique_code" bson:"unique_code"` // 唯一编码 CreateTime time.Time `json:"create_time" bson:"create_time"` // 创建时间 ModifyTime time.Time `json:"modify_time" bson:"modify_time"` // 修改时间 DataTimestamp int64 `json:"data_timestamp" bson:"data_timestamp"` // 数据日期时间戳 } // CollectionName // @Description: 获取集合名称 func (m *BaseFromThsHfData) CollectionName() string { return "base_from_ths_hf_data" } // DataBaseName // @Description: 获取数据库名称 func (m *BaseFromThsHfData) DataBaseName() string { return utils.MgoDataDbName } // GetCollection // @Description: 获取mongodb集合的句柄 func (m *BaseFromThsHfData) GetCollection() *qmgo.Collection { db := utils.MgoDataCli.Database(m.DataBaseName()) return db.Collection(m.CollectionName()) } // GetAllDataList // @Description: 根据条件获取所有数据 // @author: Roc // @receiver m // @datetime 2024-04-26 13:42:19 // @param sort []string // @param whereParams interface{} // @return result []BaseFromThsHfData // @return err error func (m *BaseFromThsHfData) GetAllDataList(whereParams interface{}, sort []string) (result []*BaseFromThsHfData, err error) { if utils.MgoDataCli == nil { err = errors.New("mongodb连接失败") return } db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) ctx := context.TODO() if err != nil { fmt.Println("MgoGetColl Err:", err.Error()) return } err = coll.Find(ctx, whereParams).Sort(sort...).All(&result) if err != nil { return } for _, v := range result { v.DataTime = v.DataTime.In(time.Local) v.CreateTime = v.CreateTime.In(time.Local) v.ModifyTime = v.ModifyTime.In(time.Local) } return } // GetLimitDataList // @Description: 根据条件获取指定数量数据列表 // @author: Roc // @receiver m // @datetime 2024-05-06 17:08:32 // @param whereParams interface{} // @param size int64 // @return result []*BaseFromThsHfData // @return err error func (m *BaseFromThsHfData) GetLimitDataList(whereParams interface{}, size int64, sort []string) (result []*BaseFromThsHfData, err error) { if utils.MgoDataCli == nil { err = errors.New("mongodb连接失败") return } db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) ctx := context.TODO() if err != nil { fmt.Println("MgoGetColl Err:", err.Error()) return } err = coll.Find(ctx, whereParams).Sort(sort...).Limit(size).All(&result) if err != nil { return } for _, v := range result { v.DataTime = v.DataTime.In(time.Local) v.CreateTime = v.CreateTime.In(time.Local) v.ModifyTime = v.ModifyTime.In(time.Local) } return } // GetPageDataList // @Description: 根据条件获取分页数据列表 // @author: Roc // @receiver m // @datetime 2024-05-07 10:21:07 // @param whereParams interface{} // @param startSize int64 // @param size int64 // @param sort []string // @return result []*BaseFromThsHfData // @return err error func (m *BaseFromThsHfData) GetPageDataList(whereParams interface{}, startSize, size int64, sort []string) (result []*BaseFromThsHfData, err error) { if utils.MgoDataCli == nil { err = errors.New("mongodb连接失败") return } db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) ctx := context.TODO() if err != nil { fmt.Println("MgoGetColl Err:", err.Error()) return } err = coll.Find(ctx, whereParams).Sort(sort...).Skip(startSize).Limit(size).All(&result) if err != nil { return } for _, v := range result { v.DataTime = v.DataTime.In(time.Local) v.CreateTime = v.CreateTime.In(time.Local) v.ModifyTime = v.ModifyTime.In(time.Local) } return } // GetCountDataList // @Description: 根据条件获取数据列表总数 // @author: Roc // @receiver m // @datetime 2024-05-07 10:29:00 // @param whereParams interface{} // @return count int64 // @return err error func (m *BaseFromThsHfData) GetCountDataList(whereParams interface{}) (count int64, err error) { if utils.MgoDataCli == nil { err = errors.New("mongodb连接失败") return } db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) ctx := context.TODO() if err != nil { fmt.Println("MgoGetColl Err:", err.Error()) return } count, err = coll.Find(ctx, whereParams).Count() return } // InsertDataByColl // @Description: 写入单条数据(外部传入集合) // @author: Roc // @receiver m // @datetime 2024-04-26 14:22:18 // @param addData interface{} // @return err error func (m *BaseFromThsHfData) InsertDataByColl(coll *qmgo.Collection, addData interface{}) (err error) { ctx := context.TODO() _, err = coll.InsertOne(ctx, addData) if err != nil { fmt.Println("InsertDataByColl:Err:" + err.Error()) return } return } // BatchInsertData // @Description: 批量写入数据 // @author: Roc // @receiver m // @datetime 2024-04-26 14:22:18 // @param bulk int 每次请求保存的数据量 // @param dataList []interface{} // @return err error func (m *BaseFromThsHfData) BatchInsertData(bulk int, dataList []interface{}) (err error) { db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) return m.BatchInsertDataByColl(coll, bulk, dataList) } // BatchInsertDataByColl // @Description: 批量写入数据(外部传入集合) // @author: Roc // @receiver m // @datetime 2024-04-26 14:22:18 // @param coll *qmgo.Collection // @param bulk int 每次请求保存的数据量 // @param dataList []interface{} // @return err error func (m *BaseFromThsHfData) BatchInsertDataByColl(coll *qmgo.Collection, bulk int, dataList []interface{}) (err error) { ctx := context.TODO() dataNum := len(dataList) if dataNum <= 0 { return } // 不设置每次保存切片数量大小,或者实际数据量小于设置的切片数量大小,那么就直接保存吧 if bulk <= 0 || dataNum <= bulk { _, err = coll.InsertMany(ctx, dataList) if err != nil { fmt.Println("BatchInsertData:Err:" + err.Error()) return } return } // 分批保存 i := 0 tmpAddDataList := make([]interface{}, 0) for _, v := range dataList { tmpAddDataList = append(tmpAddDataList, v) i++ if i >= bulk { _, err = coll.InsertMany(ctx, tmpAddDataList) if err != nil { fmt.Println("BatchInsertData:Err:" + err.Error()) return } i = 0 tmpAddDataList = make([]interface{}, 0) } } if len(tmpAddDataList) > 0 { _, err = coll.InsertMany(ctx, tmpAddDataList) if err != nil { fmt.Println("BatchInsertData:Err:" + err.Error()) return } } return } // UpdateDataByColl // @Description: 单条数据修改 // @author: Roc // @receiver m // @datetime 2024-04-26 15:01:51 // @param whereParams interface{} // @param updateParams interface{} // @return err error func (m *BaseFromThsHfData) UpdateDataByColl(coll *qmgo.Collection, whereParams, updateParams interface{}) (err error) { ctx := context.TODO() err = coll.UpdateOne(ctx, whereParams, updateParams) if err != nil { fmt.Println("UpdateDataByColl:Err:" + err.Error()) return } return } // UpdateData // @Description: 单条数据修改 // @author: Roc // @receiver m // @datetime 2024-04-26 15:01:51 // @param whereParams interface{} // @param updateParams interface{} // @return err error func (m *BaseFromThsHfData) UpdateData(whereParams, updateParams interface{}) (err error) { db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) ctx := context.TODO() err = coll.UpdateOne(ctx, whereParams, updateParams) if err != nil { fmt.Println("UpdateData:Err:" + err.Error()) return } return } // HandleData // @Description: 事务处理数据 // @author: Roc // @receiver m // @datetime 2024-04-30 10:40:20 // @param addDataList []BaseAddFromBusinessData // @param updateDataList []BaseFromThsHfData // @return result interface{} // @return err error func (m *BaseFromThsHfData) HandleData(addDataList, updateDataList []BaseFromThsHfData) (result interface{}, err error) { ctx := context.TODO() callback := func(sessCtx context.Context) (interface{}, error) { // 重要:确保事务中的每一个操作,都使用传入的sessCtx参数 db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) // 插入数据 if len(addDataList) > 0 { _, err = coll.InsertMany(sessCtx, addDataList) if err != nil { return nil, err } } // 修改 if len(updateDataList) > 0 { for _, v := range updateDataList { err = coll.UpdateOne(ctx, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}}) if err != nil { fmt.Println("BatchInsertData:Err:" + err.Error()) return nil, err } } } return nil, nil } result, err = utils.MgoDataCli.DoTransaction(ctx, callback) return } // GetEdbInfoMaxAndMinInfo // @Description: 获取当前指标的最大最小值 // @author: Roc // @receiver m // @datetime 2024-04-30 17:15:39 // @param whereParams interface{} // @return result EdbInfoMaxAndMinInfo // @return err error func (m *BaseFromThsHfData) GetEdbInfoMaxAndMinInfo(whereParams interface{}) (result EdbInfoMaxAndMinInfo, err error) { if utils.MgoDataCli == nil { err = errors.New("mongodb连接失败") return } db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) ctx := context.TODO() if err != nil { fmt.Println("MgoGetColl Err:", err.Error()) return } err = coll.Aggregate(ctx, whereParams).One(&result) if err != nil { return } result.MinDate = result.MinDate.In(time.Local) result.MaxDate = result.MaxDate.In(time.Local) result.LatestDate = result.LatestDate.In(time.Local) return } // GetLatestValue // @Description: 获取当前指标的最新数据记录 // @author: Roc // @receiver m // @datetime 2024-04-30 17:16:15 // @param whereParams interface{} // @param selectParam interface{} // @return latestValue LatestValue // @return err error func (m *BaseFromThsHfData) GetLatestValue(whereParams, selectParam interface{}) (latestValue LatestValue, err error) { if utils.MgoDataCli == nil { err = errors.New("mongodb连接失败") return } db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) ctx := context.TODO() if err != nil { fmt.Println("MgoGetColl Err:", err.Error()) return } //var result interface{} //err = coll.Find(ctx, whereParams).Select(selectParam).One(&result) err = coll.Find(ctx, whereParams).Select(selectParam).One(&latestValue) return } func (m *BaseFromThsHfData) RemoveMany(whereParams interface{}) (err error) { db := utils.MgoDataCli.Database(m.DataBaseName()) coll := db.Collection(m.CollectionName()) return m.RemoveManyByColl(coll, whereParams) } func (m *BaseFromThsHfData) RemoveManyByColl(coll *qmgo.Collection, whereParams interface{}) (err error) { ctx := context.TODO() _, err = coll.RemoveAll(ctx, whereParams) if err != nil { fmt.Println("RemoveManyByColl:Err:" + err.Error()) return } return }