base_from_business_data.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. package mgo
  2. import (
  3. "context"
  4. "errors"
  5. "eta_gn/eta_api/utils"
  6. "fmt"
  7. "github.com/qiniu/qmgo"
  8. "go.mongodb.org/mongo-driver/bson"
  9. "go.mongodb.org/mongo-driver/bson/primitive"
  10. "time"
  11. )
  12. type BaseFromBusinessData struct {
  13. ID primitive.ObjectID `json:"_id" bson:"_id,omitempty"` // 文档id
  14. BaseFromBusinessIndexId int64 `json:"base_from_business_index_id" bson:"base_from_business_index_id"` // 指标id
  15. IndexCode string `json:"index_code" bson:"index_code"` // 指标编码
  16. DataTime time.Time `json:"data_time" bson:"data_time"` // 数据日期
  17. Value float64 `json:"value" bson:"value"` // 数据值
  18. CreateTime time.Time `json:"create_time" bson:"create_time"` // 创建时间
  19. ModifyTime time.Time `json:"modify_time" bson:"modify_time"` // 修改时间
  20. }
  21. func (m *BaseFromBusinessData) CollectionName() string {
  22. return "base_from_business_data"
  23. }
  24. func (m *BaseFromBusinessData) DataBaseName() string {
  25. return utils.MgoDataDbName
  26. }
  27. func (m *BaseFromBusinessData) GetCollection() *qmgo.Collection {
  28. db := utils.MgoDataCli.Database(m.DataBaseName())
  29. return db.Collection(m.CollectionName())
  30. }
  31. func (m *BaseFromBusinessData) GetAllDataList(whereParams interface{}, sort []string) (result []*BaseFromBusinessData, err error) {
  32. if utils.MgoDataCli == nil {
  33. err = errors.New("mongodb连接失败")
  34. return
  35. }
  36. db := utils.MgoDataCli.Database(m.DataBaseName())
  37. coll := db.Collection(m.CollectionName())
  38. ctx := context.TODO()
  39. if err != nil {
  40. fmt.Println("MgoGetColl Err:", err.Error())
  41. return
  42. }
  43. err = coll.Find(ctx, whereParams).Sort(sort...).All(&result)
  44. if err != nil {
  45. return
  46. }
  47. for _, v := range result {
  48. v.DataTime = v.DataTime.In(time.Local)
  49. v.CreateTime = v.CreateTime.In(time.Local)
  50. v.ModifyTime = v.ModifyTime.In(time.Local)
  51. }
  52. return
  53. }
  54. func (m *BaseFromBusinessData) GetLimitDataList(whereParams interface{}, size int64, sort []string) (result []*BaseFromBusinessData, err error) {
  55. if utils.MgoDataCli == nil {
  56. err = errors.New("mongodb连接失败")
  57. return
  58. }
  59. db := utils.MgoDataCli.Database(m.DataBaseName())
  60. coll := db.Collection(m.CollectionName())
  61. ctx := context.TODO()
  62. if err != nil {
  63. fmt.Println("MgoGetColl Err:", err.Error())
  64. return
  65. }
  66. err = coll.Find(ctx, whereParams).Sort(sort...).Limit(size).All(&result)
  67. if err != nil {
  68. return
  69. }
  70. for _, v := range result {
  71. v.DataTime = v.DataTime.In(time.Local)
  72. v.CreateTime = v.CreateTime.In(time.Local)
  73. v.ModifyTime = v.ModifyTime.In(time.Local)
  74. }
  75. return
  76. }
  77. func (m *BaseFromBusinessData) GetPageDataList(whereParams interface{}, startSize, size int64, sort []string) (result []*BaseFromBusinessData, err error) {
  78. if utils.MgoDataCli == nil {
  79. err = errors.New("mongodb连接失败")
  80. return
  81. }
  82. db := utils.MgoDataCli.Database(m.DataBaseName())
  83. coll := db.Collection(m.CollectionName())
  84. ctx := context.TODO()
  85. if err != nil {
  86. fmt.Println("MgoGetColl Err:", err.Error())
  87. return
  88. }
  89. err = coll.Find(ctx, whereParams).Sort(sort...).Skip(startSize).Limit(size).All(&result)
  90. if err != nil {
  91. return
  92. }
  93. for _, v := range result {
  94. v.DataTime = v.DataTime.In(time.Local)
  95. v.CreateTime = v.CreateTime.In(time.Local)
  96. v.ModifyTime = v.ModifyTime.In(time.Local)
  97. }
  98. return
  99. }
  100. func (m *BaseFromBusinessData) GetCountDataList(whereParams interface{}) (count int64, err error) {
  101. if utils.MgoDataCli == nil {
  102. err = errors.New("mongodb连接失败")
  103. return
  104. }
  105. db := utils.MgoDataCli.Database(m.DataBaseName())
  106. coll := db.Collection(m.CollectionName())
  107. ctx := context.TODO()
  108. if err != nil {
  109. fmt.Println("MgoGetColl Err:", err.Error())
  110. return
  111. }
  112. count, err = coll.Find(ctx, whereParams).Count()
  113. return
  114. }
  115. func (m *BaseFromBusinessData) InsertDataByColl(coll *qmgo.Collection, addData interface{}) (err error) {
  116. ctx := context.TODO()
  117. _, err = coll.InsertOne(ctx, addData)
  118. if err != nil {
  119. fmt.Println("InsertDataByColl:Err:" + err.Error())
  120. return
  121. }
  122. return
  123. }
  124. func (m *BaseFromBusinessData) BatchInsertData(bulk int, dataList []interface{}) (err error) {
  125. db := utils.MgoDataCli.Database(m.DataBaseName())
  126. coll := db.Collection(m.CollectionName())
  127. return m.BatchInsertDataByColl(coll, bulk, dataList)
  128. }
  129. func (m *BaseFromBusinessData) BatchInsertDataByColl(coll *qmgo.Collection, bulk int, dataList []interface{}) (err error) {
  130. ctx := context.TODO()
  131. dataNum := len(dataList)
  132. if dataNum <= 0 {
  133. return
  134. }
  135. if bulk <= 0 || dataNum <= bulk {
  136. _, err = coll.InsertMany(ctx, dataList)
  137. if err != nil {
  138. fmt.Println("BatchInsertData:Err:" + err.Error())
  139. return
  140. }
  141. return
  142. }
  143. i := 0
  144. tmpAddDataList := make([]interface{}, 0)
  145. for _, v := range dataList {
  146. tmpAddDataList = append(tmpAddDataList, v)
  147. i++
  148. if i >= bulk {
  149. _, err = coll.InsertMany(ctx, tmpAddDataList)
  150. if err != nil {
  151. fmt.Println("BatchInsertData:Err:" + err.Error())
  152. return
  153. }
  154. i = 0
  155. tmpAddDataList = make([]interface{}, 0)
  156. }
  157. }
  158. if len(tmpAddDataList) > 0 {
  159. _, err = coll.InsertMany(ctx, tmpAddDataList)
  160. if err != nil {
  161. fmt.Println("BatchInsertData:Err:" + err.Error())
  162. return
  163. }
  164. }
  165. return
  166. }
  167. func (m *BaseFromBusinessData) UpdateDataByColl(coll *qmgo.Collection, whereParams, updateParams interface{}) (err error) {
  168. ctx := context.TODO()
  169. err = coll.UpdateOne(ctx, whereParams, updateParams)
  170. if err != nil {
  171. fmt.Println("UpdateDataByColl:Err:" + err.Error())
  172. return
  173. }
  174. return
  175. }
  176. func (m *BaseFromBusinessData) UpdateData(whereParams, updateParams interface{}) (err error) {
  177. db := utils.MgoDataCli.Database(m.DataBaseName())
  178. coll := db.Collection(m.CollectionName())
  179. ctx := context.TODO()
  180. err = coll.UpdateOne(ctx, whereParams, updateParams)
  181. if err != nil {
  182. fmt.Println("UpdateData:Err:" + err.Error())
  183. return
  184. }
  185. return
  186. }
  187. func (m *BaseFromBusinessData) HandleData(addDataList, updateDataList []BaseFromBusinessData) (result interface{}, err error) {
  188. ctx := context.TODO()
  189. callback := func(sessCtx context.Context) (interface{}, error) {
  190. db := utils.MgoDataCli.Database(m.DataBaseName())
  191. coll := db.Collection(m.CollectionName())
  192. if len(addDataList) > 0 {
  193. _, err = coll.InsertMany(sessCtx, addDataList)
  194. if err != nil {
  195. return nil, err
  196. }
  197. }
  198. if len(updateDataList) > 0 {
  199. for _, v := range updateDataList {
  200. err = coll.UpdateOne(ctx, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}})
  201. if err != nil {
  202. fmt.Println("BatchInsertData:Err:" + err.Error())
  203. return nil, err
  204. }
  205. }
  206. }
  207. return nil, nil
  208. }
  209. result, err = utils.MgoDataCli.DoTransaction(ctx, callback)
  210. return
  211. }
  212. func (m *BaseFromBusinessData) GetEdbInfoMaxAndMinInfo(whereParams interface{}) (result EdbInfoMaxAndMinInfo, err error) {
  213. if utils.MgoDataCli == nil {
  214. err = errors.New("mongodb连接失败")
  215. return
  216. }
  217. db := utils.MgoDataCli.Database(m.DataBaseName())
  218. coll := db.Collection(m.CollectionName())
  219. ctx := context.TODO()
  220. if err != nil {
  221. fmt.Println("MgoGetColl Err:", err.Error())
  222. return
  223. }
  224. err = coll.Aggregate(ctx, whereParams).One(&result)
  225. if err != nil {
  226. return
  227. }
  228. result.MinDate = result.MinDate.In(time.Local)
  229. result.MaxDate = result.MaxDate.In(time.Local)
  230. result.LatestDate = result.LatestDate.In(time.Local)
  231. return
  232. }
  233. func (m *BaseFromBusinessData) GetLatestValue(whereParams, selectParam interface{}) (latestValue LatestValue, err error) {
  234. if utils.MgoDataCli == nil {
  235. err = errors.New("mongodb连接失败")
  236. return
  237. }
  238. db := utils.MgoDataCli.Database(m.DataBaseName())
  239. coll := db.Collection(m.CollectionName())
  240. ctx := context.TODO()
  241. if err != nil {
  242. fmt.Println("MgoGetColl Err:", err.Error())
  243. return
  244. }
  245. err = coll.Find(ctx, whereParams).Select(selectParam).One(&latestValue)
  246. return
  247. }