base_from_business_data.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. package mgo
  2. import (
  3. "context"
  4. "errors"
  5. "eta/eta_index_lib/utils"
  6. "fmt"
  7. "github.com/qiniu/qmgo"
  8. "go.mongodb.org/mongo-driver/bson"
  9. "time"
  10. )
  11. // BaseAddFromBusinessData
  12. // @Description: 外部数据基础
  13. type BaseAddFromBusinessData struct {
  14. BaseFromBusinessIndexId int64 `json:"base_from_business_index_id" bson:"base_from_business_index_id"` // 指标id
  15. IndexCode string `json:"index_code" bson:"index_code"` // 指标编码
  16. DataTime string `json:"data_time" bson:"data_time"` // 数据日期
  17. Value float64 `json:"value" bson:"value"` // 数据值
  18. CreateTime time.Time `json:"create_time" bson:"create_time"` // 创建时间
  19. ModifyTime time.Time `json:"modify_time" bson:"modify_time"` // 修改时间
  20. //DataTimestamp int64 `json:"data_timestamp"` // 数据日期时间戳
  21. }
  22. // BaseFromBusinessData
  23. // @Description: 外部数据集合
  24. type BaseFromBusinessData struct {
  25. //ID primitive.ObjectID `json:"_id" bson:"_id" ` // 文档id
  26. ID string `json:"_id" bson:"_id" ` // 文档id
  27. //BaseAddFromBusinessData
  28. BaseFromBusinessIndexId int64 `json:"base_from_business_index_id" bson:"base_from_business_index_id"` // 指标id
  29. IndexCode string `json:"index_code" bson:"index_code"` // 指标编码
  30. DataTime string `json:"data_time" bson:"data_time"` // 数据日期
  31. Value float64 `json:"value" bson:"value"` // 数据值
  32. CreateTime time.Time `json:"create_time" bson:"create_time"` // 创建时间
  33. ModifyTime time.Time `json:"modify_time" bson:"modify_time"` // 修改时间
  34. //DataTimestamp int64 `json:"data_timestamp"` // 数据日期时间戳
  35. }
  36. // CollectionName
  37. // @Description: 获取集合名称
  38. // @author: Roc
  39. // @receiver m
  40. // @datetime 2024-04-26 13:41:36
  41. // @return string
  42. func (m *BaseFromBusinessData) CollectionName() string {
  43. return "base_from_business_data"
  44. }
  45. // DataBaseName
  46. // @Description: 获取数据库名称
  47. // @author: Roc
  48. // @receiver m
  49. // @datetime 2024-04-26 13:41:33
  50. // @return string
  51. func (m *BaseFromBusinessData) DataBaseName() string {
  52. return "hz_data"
  53. }
  54. // GetCollection
  55. // @Description: 获取mongodb集合的句柄
  56. // @author: Roc
  57. // @receiver m
  58. // @datetime 2024-04-26 13:41:33
  59. // @return string
  60. func (m *BaseFromBusinessData) GetCollection() *qmgo.Collection {
  61. db := utils.MgoDataCli.Database(m.DataBaseName())
  62. return db.Collection(m.CollectionName())
  63. }
  64. // GetAllDataList
  65. // @Description: 根据条件获取所有数据
  66. // @author: Roc
  67. // @receiver m
  68. // @datetime 2024-04-26 13:42:19
  69. // @param whereParams interface{}
  70. // @return result []BaseFromBusinessData
  71. // @return err error
  72. func (m *BaseFromBusinessData) GetAllDataList(whereParams interface{}) (result []BaseFromBusinessData, err error) {
  73. if utils.MgoDataCli == nil {
  74. err = errors.New("mongodb连接失败")
  75. return
  76. }
  77. db := utils.MgoDataCli.Database(m.DataBaseName())
  78. coll := db.Collection(m.CollectionName())
  79. ctx := context.TODO()
  80. if err != nil {
  81. fmt.Println("MgoGetColl Err:", err.Error())
  82. return
  83. }
  84. err = coll.Find(ctx, whereParams).All(&result)
  85. return
  86. }
  87. // BatchInsertData
  88. // @Description: 批量写入数据
  89. // @author: Roc
  90. // @receiver m
  91. // @datetime 2024-04-26 14:22:18
  92. // @param dataList interface{}
  93. // @return err error
  94. func (m *BaseFromBusinessData) BatchInsertData(dataList interface{}) (err error) {
  95. db := utils.MgoDataCli.Database(m.DataBaseName())
  96. coll := db.Collection(m.CollectionName())
  97. ctx := context.TODO()
  98. _, err = coll.InsertMany(ctx, dataList)
  99. if err != nil {
  100. fmt.Println("BatchInsertData:Err:" + err.Error())
  101. return
  102. }
  103. return
  104. }
  105. // UpdateDataByColl
  106. // @Description: 单条数据修改
  107. // @author: Roc
  108. // @receiver m
  109. // @datetime 2024-04-26 15:01:51
  110. // @param params interface{}
  111. // @param whereParams interface{}
  112. // @return err error
  113. func (m *BaseFromBusinessData) UpdateDataByColl(coll *qmgo.Collection, params, whereParams interface{}) (err error) {
  114. ctx := context.TODO()
  115. err = coll.UpdateOne(ctx, whereParams, params)
  116. if err != nil {
  117. fmt.Println("BatchInsertData:Err:" + err.Error())
  118. return
  119. }
  120. return
  121. }
  122. // UpdateData
  123. // @Description: 单条数据修改
  124. // @author: Roc
  125. // @receiver m
  126. // @datetime 2024-04-26 15:01:51
  127. // @param params interface{}
  128. // @param whereParams interface{}
  129. // @return err error
  130. func (m *BaseFromBusinessData) UpdateData(params, whereParams interface{}) (err error) {
  131. db := utils.MgoDataCli.Database(m.DataBaseName())
  132. coll := db.Collection(m.CollectionName())
  133. ctx := context.TODO()
  134. err = coll.UpdateOne(ctx, whereParams, params)
  135. if err != nil {
  136. fmt.Println("BatchInsertData:Err:" + err.Error())
  137. return
  138. }
  139. return
  140. }
  141. // UpdateData
  142. // @Description: 单条数据修改
  143. // @author: Roc
  144. // @receiver m
  145. // @datetime 2024-04-26 15:01:51
  146. // @param params interface{}
  147. // @param whereParams interface{}
  148. // @return err error
  149. func (m *BaseFromBusinessData) HandleData(addDataList []BaseAddFromBusinessData, updateDataList []BaseFromBusinessData) (result interface{}, err error) {
  150. ctx := context.TODO()
  151. callback := func(sessCtx context.Context) (interface{}, error) {
  152. // 重要:确保事务中的每一个操作,都使用传入的sessCtx参数
  153. db := utils.MgoDataCli.Database(m.DataBaseName())
  154. coll := db.Collection(m.CollectionName())
  155. // 插入数据
  156. if len(addDataList) > 0 {
  157. _, err = coll.InsertMany(sessCtx, addDataList)
  158. if err != nil {
  159. return nil, err
  160. }
  161. }
  162. // 修改
  163. if len(updateDataList) > 0 {
  164. for _, v := range updateDataList {
  165. err = coll.UpdateOne(ctx, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}})
  166. if err != nil {
  167. fmt.Println("BatchInsertData:Err:" + err.Error())
  168. return nil, err
  169. }
  170. }
  171. }
  172. return nil, nil
  173. }
  174. result, err = utils.MgoDataCli.DoTransaction(ctx, callback)
  175. return
  176. }