edb_data_business.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. package mgo
  2. import (
  3. "context"
  4. "errors"
  5. "eta/eta_index_lib/utils"
  6. "fmt"
  7. "github.com/qiniu/qmgo"
  8. "go.mongodb.org/mongo-driver/bson"
  9. "go.mongodb.org/mongo-driver/bson/primitive"
  10. "time"
  11. )
  12. // EdbDataBusiness
  13. // @Description: 外部数据集合(指标库)
  14. type EdbDataBusiness struct {
  15. ID primitive.ObjectID `json:"_id" bson:"_id,omitempty" ` // 文档id
  16. EdbInfoId int `json:"edb_info_id" bson:"edb_info_id"` // 指标编码
  17. EdbCode string `json:"edb_code" bson:"edb_code"` // 指标编码
  18. DataTime time.Time `json:"data_time" bson:"data_time"` // 数据日期
  19. Value float64 `json:"value" bson:"value"` // 数据值
  20. CreateTime time.Time `json:"create_time" bson:"create_time"` // 创建时间
  21. ModifyTime time.Time `json:"modify_time" bson:"modify_time"` // 修改时间
  22. DataTimestamp int64 `json:"data_timestamp" bson:"data_timestamp"` // 数据日期时间戳
  23. }
  24. // CollectionName
  25. // @Description: 获取集合名称
  26. // @author: Roc
  27. // @receiver m
  28. // @datetime 2024-04-26 13:41:36
  29. // @return string
  30. func (m *EdbDataBusiness) CollectionName() string {
  31. return "edb_data_business"
  32. }
  33. // DataBaseName
  34. // @Description: 获取数据库名称
  35. // @author: Roc
  36. // @receiver m
  37. // @datetime 2024-04-26 13:41:33
  38. // @return string
  39. func (m *EdbDataBusiness) DataBaseName() string {
  40. return utils.MgoDataDbName
  41. }
  42. // GetCollection
  43. // @Description: 获取mongodb集合的句柄
  44. // @author: Roc
  45. // @receiver m
  46. // @datetime 2024-04-26 13:41:33
  47. // @return string
  48. func (m *EdbDataBusiness) GetCollection() *qmgo.Collection {
  49. db := utils.MgoDataCli.Database(m.DataBaseName())
  50. return db.Collection(m.CollectionName())
  51. }
  52. // GetAllDataList
  53. // @Description: 根据条件获取所有数据
  54. // @author: Roc
  55. // @receiver m
  56. // @datetime 2024-04-26 13:42:19
  57. // @param whereParams interface{}
  58. // @return result []EdbDataBusiness
  59. // @return err error
  60. func (m *EdbDataBusiness) GetAllDataList(whereParams interface{}) (result []*EdbDataBusiness, err error) {
  61. if utils.MgoDataCli == nil {
  62. err = errors.New("mongodb连接失败")
  63. return
  64. }
  65. db := utils.MgoDataCli.Database(m.DataBaseName())
  66. coll := db.Collection(m.CollectionName())
  67. ctx := context.TODO()
  68. if err != nil {
  69. fmt.Println("MgoGetColl Err:", err.Error())
  70. return
  71. }
  72. err = coll.Find(ctx, whereParams).All(&result)
  73. if err != nil {
  74. return
  75. }
  76. for _, v := range result {
  77. v.DataTime = v.DataTime.In(time.Local)
  78. v.CreateTime = v.CreateTime.In(time.Local)
  79. v.ModifyTime = v.ModifyTime.In(time.Local)
  80. }
  81. return
  82. }
  83. // BatchInsertData
  84. // @Description: 批量写入数据
  85. // @author: Roc
  86. // @receiver m
  87. // @datetime 2024-04-26 14:22:18
  88. // @param dataList interface{}
  89. // @return err error
  90. func (m *EdbDataBusiness) BatchInsertData(dataList []EdbDataBusiness) (err error) {
  91. db := utils.MgoDataCli.Database(m.DataBaseName())
  92. coll := db.Collection(m.CollectionName())
  93. return m.BatchInsertDataByColl(coll, dataList)
  94. }
  95. // BatchInsertDataByColl
  96. // @Description: 批量写入数据(外部传入集合)
  97. // @author: Roc
  98. // @receiver m
  99. // @datetime 2024-04-26 14:22:18
  100. // @param dataList interface{}
  101. // @return err error
  102. func (m *EdbDataBusiness) BatchInsertDataByColl(coll *qmgo.Collection, dataList []EdbDataBusiness) (err error) {
  103. ctx := context.TODO()
  104. _, err = coll.InsertMany(ctx, dataList)
  105. if err != nil {
  106. fmt.Println("BatchInsertData:Err:" + err.Error())
  107. return
  108. }
  109. return
  110. }
  111. // UpdateData
  112. // @Description: 单条数据修改
  113. // @author: Roc
  114. // @receiver m
  115. // @datetime 2024-04-26 15:01:51
  116. // @param whereParams interface{}
  117. // @param updateParams interface{}
  118. // @return err error
  119. func (m *EdbDataBusiness) UpdateData(whereParams, updateParams interface{}) (err error) {
  120. db := utils.MgoDataCli.Database(m.DataBaseName())
  121. coll := db.Collection(m.CollectionName())
  122. return m.UpdateDataByColl(coll, whereParams, updateParams)
  123. }
  124. // UpdateDataByColl
  125. // @Description: 单条数据修改(外部传入集合)
  126. // @author: Roc
  127. // @receiver m
  128. // @datetime 2024-04-26 15:01:51
  129. // @param whereParams interface{}
  130. // @param updateParams interface{}
  131. // @return err error
  132. func (m *EdbDataBusiness) UpdateDataByColl(coll *qmgo.Collection, whereParams, updateParams interface{}) (err error) {
  133. ctx := context.TODO()
  134. err = coll.UpdateOne(ctx, whereParams, updateParams)
  135. if err != nil {
  136. fmt.Println("UpdateDataByColl:Err:" + err.Error())
  137. return
  138. }
  139. return
  140. }
  141. // RemoveMany
  142. // @Description: 根据条件删除多条数据
  143. // @author: Roc
  144. // @receiver m
  145. // @datetime 2024-04-30 13:17:02
  146. // @param whereParams interface{}
  147. // @return err error
  148. func (m *EdbDataBusiness) RemoveMany(whereParams interface{}) (err error) {
  149. db := utils.MgoDataCli.Database(m.DataBaseName())
  150. coll := db.Collection(m.CollectionName())
  151. return m.RemoveManyByColl(coll, whereParams)
  152. }
  153. // RemoveManyByColl
  154. // @Description: 根据条件删除多条数据(外部传入集合)
  155. // @author: Roc
  156. // @receiver m
  157. // @datetime 2024-04-30 13:18:42
  158. // @param coll *qmgo.Collection
  159. // @param whereParams interface{}
  160. // @return err error
  161. func (m *EdbDataBusiness) RemoveManyByColl(coll *qmgo.Collection, whereParams interface{}) (err error) {
  162. ctx := context.TODO()
  163. _, err = coll.RemoveAll(ctx, whereParams)
  164. if err != nil {
  165. fmt.Println("RemoveManyByColl:Err:" + err.Error())
  166. return
  167. }
  168. return
  169. }
  170. // HandleData
  171. // @Description: 事务处理数据
  172. // @author: Roc
  173. // @receiver m
  174. // @datetime 2024-04-30 10:39:01
  175. // @param addDataList []AddEdbDataBusiness
  176. // @param updateDataList []EdbDataBusiness
  177. // @return result interface{}
  178. // @return err error
  179. func (m *EdbDataBusiness) HandleData(addDataList, updateDataList []EdbDataBusiness) (result interface{}, err error) {
  180. ctx := context.TODO()
  181. callback := func(sessCtx context.Context) (interface{}, error) {
  182. // 重要:确保事务中的每一个操作,都使用传入的sessCtx参数
  183. db := utils.MgoDataCli.Database(m.DataBaseName())
  184. coll := db.Collection(m.CollectionName())
  185. // 插入数据
  186. if len(addDataList) > 0 {
  187. _, err = coll.InsertMany(sessCtx, addDataList)
  188. if err != nil {
  189. return nil, err
  190. }
  191. }
  192. // 修改
  193. if len(updateDataList) > 0 {
  194. for _, v := range updateDataList {
  195. err = coll.UpdateOne(ctx, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}})
  196. if err != nil {
  197. fmt.Println("BatchInsertData:Err:" + err.Error())
  198. return nil, err
  199. }
  200. }
  201. }
  202. return nil, nil
  203. }
  204. result, err = utils.MgoDataCli.DoTransaction(ctx, callback)
  205. return
  206. }
  207. // EdbInfoMaxAndMinInfo 指标最新数据记录结构体
  208. type EdbInfoMaxAndMinInfo struct {
  209. MinDate time.Time `description:"最小日期" bson:"min_date"`
  210. MaxDate time.Time `description:"最大日期" bson:"max_date"`
  211. MinValue float64 `description:"最小值" bson:"min_value"`
  212. MaxValue float64 `description:"最大值" bson:"max_value"`
  213. LatestValue float64 `description:"最新值" bson:"latest_value"`
  214. LatestDate time.Time `description:"实际数据最新日期" bson:"latest_date"`
  215. EndValue float64 `description:"最新值" bson:"end_value"`
  216. }
  217. // GetEdbInfoMaxAndMinInfo
  218. // @Description: 获取当前指标的最大最小值
  219. // @author: Roc
  220. // @receiver m
  221. // @datetime 2024-04-30 17:15:39
  222. // @param whereParams interface{}
  223. // @return result EdbInfoMaxAndMinInfo
  224. // @return err error
  225. func (m *EdbDataBusiness) GetEdbInfoMaxAndMinInfo(whereParams interface{}) (result EdbInfoMaxAndMinInfo, err error) {
  226. if utils.MgoDataCli == nil {
  227. err = errors.New("mongodb连接失败")
  228. return
  229. }
  230. db := utils.MgoDataCli.Database(m.DataBaseName())
  231. coll := db.Collection(m.CollectionName())
  232. ctx := context.TODO()
  233. if err != nil {
  234. fmt.Println("MgoGetColl Err:", err.Error())
  235. return
  236. }
  237. err = coll.Aggregate(ctx, whereParams).One(&result)
  238. if err != nil {
  239. return
  240. }
  241. result.MinDate = result.MinDate.In(time.Local)
  242. result.MaxDate = result.MaxDate.In(time.Local)
  243. result.LatestDate = result.LatestDate.In(time.Local)
  244. return
  245. }
  246. // LatestValue 指标最新数据记录结构体
  247. type LatestValue struct {
  248. Value float64 `description:"值" bson:"value"`
  249. }
  250. // GetLatestValue
  251. // @Description: 获取当前指标的最新数据记录
  252. // @author: Roc
  253. // @receiver m
  254. // @datetime 2024-04-30 17:16:15
  255. // @param whereParams interface{}
  256. // @param selectParam interface{}
  257. // @return latestValue LatestValue
  258. // @return err error
  259. func (m *EdbDataBusiness) GetLatestValue(whereParams, selectParam interface{}) (latestValue LatestValue, err error) {
  260. if utils.MgoDataCli == nil {
  261. err = errors.New("mongodb连接失败")
  262. return
  263. }
  264. db := utils.MgoDataCli.Database(m.DataBaseName())
  265. coll := db.Collection(m.CollectionName())
  266. ctx := context.TODO()
  267. if err != nil {
  268. fmt.Println("MgoGetColl Err:", err.Error())
  269. return
  270. }
  271. //var result interface{}
  272. //err = coll.Find(ctx, whereParams).Select(selectParam).One(&result)
  273. err = coll.Find(ctx, whereParams).Select(selectParam).One(&latestValue)
  274. return
  275. }