edb_data_ths_hf.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. package mgo
  2. import (
  3. "context"
  4. "errors"
  5. "eta_gn/eta_api/utils"
  6. "fmt"
  7. "github.com/qiniu/qmgo"
  8. "go.mongodb.org/mongo-driver/bson"
  9. "go.mongodb.org/mongo-driver/bson/primitive"
  10. "time"
  11. )
  12. type EdbDataThsHf struct {
  13. ID primitive.ObjectID `json:"_id" bson:"_id,omitempty" ` // 文档id
  14. EdbInfoId int `json:"edb_info_id" bson:"edb_info_id"` // 指标ID
  15. EdbCode string `json:"edb_code" bson:"edb_code"` // 指标编码
  16. DataTime time.Time `json:"data_time" bson:"data_time"` // 数据日期
  17. Value float64 `json:"value" bson:"value"` // 数据值
  18. CreateTime time.Time `json:"create_time" bson:"create_time"` // 创建时间
  19. ModifyTime time.Time `json:"modify_time" bson:"modify_time"` // 修改时间
  20. DataTimestamp int64 `json:"data_timestamp" bson:"data_timestamp"` // 数据日期时间戳
  21. }
  22. func (m *EdbDataThsHf) CollectionName() string {
  23. return "edb_data_ths_hf"
  24. }
  25. func (m *EdbDataThsHf) DataBaseName() string {
  26. return utils.MgoDataDbName
  27. }
  28. func (m *EdbDataThsHf) GetCollection() *qmgo.Collection {
  29. db := utils.MgoDataCli.Database(m.DataBaseName())
  30. return db.Collection(m.CollectionName())
  31. }
  32. func (m *EdbDataThsHf) GetItem(whereParams interface{}) (item *EdbDataThsHf, err error) {
  33. if utils.MgoDataCli == nil {
  34. err = errors.New("mongodb连接失败")
  35. return
  36. }
  37. db := utils.MgoDataCli.Database(m.DataBaseName())
  38. coll := db.Collection(m.CollectionName())
  39. return m.GetItemByColl(coll, whereParams)
  40. }
  41. func (m *EdbDataThsHf) GetItemByColl(coll *qmgo.Collection, whereParams interface{}) (item *EdbDataThsHf, err error) {
  42. ctx := context.TODO()
  43. if err != nil {
  44. fmt.Println("MgoGetColl Err:", err.Error())
  45. return
  46. }
  47. err = coll.Find(ctx, whereParams).One(&item)
  48. if err != nil {
  49. return
  50. }
  51. item.DataTime = item.DataTime.In(time.Local)
  52. item.CreateTime = item.CreateTime.In(time.Local)
  53. item.ModifyTime = item.ModifyTime.In(time.Local)
  54. return
  55. }
  56. func (m *EdbDataThsHf) GetAllDataList(whereParams interface{}, sort []string) (result []*EdbDataThsHf, err error) {
  57. if utils.MgoDataCli == nil {
  58. err = errors.New("mongodb连接失败")
  59. return
  60. }
  61. db := utils.MgoDataCli.Database(m.DataBaseName())
  62. coll := db.Collection(m.CollectionName())
  63. ctx := context.TODO()
  64. if err != nil {
  65. fmt.Println("MgoGetColl Err:", err.Error())
  66. return
  67. }
  68. err = coll.Find(ctx, whereParams).Sort(sort...).All(&result)
  69. if err != nil {
  70. return
  71. }
  72. for _, v := range result {
  73. v.DataTime = v.DataTime.In(time.Local)
  74. v.CreateTime = v.CreateTime.In(time.Local)
  75. v.ModifyTime = v.ModifyTime.In(time.Local)
  76. }
  77. return
  78. }
  79. func (m *EdbDataThsHf) GetLimitDataList(whereParams interface{}, size int64, sort []string) (result []*EdbDataThsHf, err error) {
  80. if utils.MgoDataCli == nil {
  81. err = errors.New("mongodb连接失败")
  82. return
  83. }
  84. db := utils.MgoDataCli.Database(m.DataBaseName())
  85. coll := db.Collection(m.CollectionName())
  86. ctx := context.TODO()
  87. if err != nil {
  88. fmt.Println("MgoGetColl Err:", err.Error())
  89. return
  90. }
  91. err = coll.Find(ctx, whereParams).Sort(sort...).Limit(size).All(&result)
  92. if err != nil {
  93. return
  94. }
  95. for _, v := range result {
  96. v.DataTime = v.DataTime.In(time.Local)
  97. v.CreateTime = v.CreateTime.In(time.Local)
  98. v.ModifyTime = v.ModifyTime.In(time.Local)
  99. }
  100. return
  101. }
  102. func (m *EdbDataThsHf) GetPageDataList(whereParams interface{}, startSize, size int64, sort []string) (result []*EdbDataThsHf, err error) {
  103. if utils.MgoDataCli == nil {
  104. err = errors.New("mongodb连接失败")
  105. return
  106. }
  107. db := utils.MgoDataCli.Database(m.DataBaseName())
  108. coll := db.Collection(m.CollectionName())
  109. ctx := context.TODO()
  110. if err != nil {
  111. fmt.Println("MgoGetColl Err:", err.Error())
  112. return
  113. }
  114. err = coll.Find(ctx, whereParams).Sort(sort...).Skip(startSize).Limit(size).All(&result)
  115. if err != nil {
  116. return
  117. }
  118. for _, v := range result {
  119. v.DataTime = v.DataTime.In(time.Local)
  120. v.CreateTime = v.CreateTime.In(time.Local)
  121. v.ModifyTime = v.ModifyTime.In(time.Local)
  122. }
  123. return
  124. }
  125. func (m *EdbDataThsHf) GetCountDataList(whereParams interface{}) (count int64, err error) {
  126. if utils.MgoDataCli == nil {
  127. err = errors.New("mongodb连接失败")
  128. return
  129. }
  130. db := utils.MgoDataCli.Database(m.DataBaseName())
  131. coll := db.Collection(m.CollectionName())
  132. ctx := context.TODO()
  133. if err != nil {
  134. fmt.Println("MgoGetColl Err:", err.Error())
  135. return
  136. }
  137. count, err = coll.Find(ctx, whereParams).Count()
  138. return
  139. }
  140. func (m *EdbDataThsHf) InsertDataByColl(coll *qmgo.Collection, addData interface{}) (err error) {
  141. ctx := context.TODO()
  142. _, err = coll.InsertOne(ctx, addData)
  143. if err != nil {
  144. fmt.Println("InsertDataByColl:Err:" + err.Error())
  145. return
  146. }
  147. return
  148. }
  149. func (m *EdbDataThsHf) BatchInsertData(bulk int, dataList []interface{}) (err error) {
  150. db := utils.MgoDataCli.Database(m.DataBaseName())
  151. coll := db.Collection(m.CollectionName())
  152. return m.BatchInsertDataByColl(coll, bulk, dataList)
  153. }
  154. func (m *EdbDataThsHf) BatchInsertDataByColl(coll *qmgo.Collection, bulk int, dataList []interface{}) (err error) {
  155. ctx := context.TODO()
  156. dataNum := len(dataList)
  157. if dataNum <= 0 {
  158. return
  159. }
  160. if bulk <= 0 || dataNum <= bulk {
  161. _, err = coll.InsertMany(ctx, dataList)
  162. if err != nil {
  163. fmt.Println("BatchInsertData:Err:" + err.Error())
  164. return
  165. }
  166. return
  167. }
  168. i := 0
  169. tmpAddDataList := make([]interface{}, 0)
  170. for _, v := range dataList {
  171. tmpAddDataList = append(tmpAddDataList, v)
  172. i++
  173. if i >= bulk {
  174. _, err = coll.InsertMany(ctx, tmpAddDataList)
  175. if err != nil {
  176. fmt.Println("BatchInsertData:Err:" + err.Error())
  177. return
  178. }
  179. i = 0
  180. tmpAddDataList = make([]interface{}, 0)
  181. }
  182. }
  183. if len(tmpAddDataList) > 0 {
  184. _, err = coll.InsertMany(ctx, tmpAddDataList)
  185. if err != nil {
  186. fmt.Println("BatchInsertData:Err:" + err.Error())
  187. return
  188. }
  189. }
  190. return
  191. }
  192. func (m *EdbDataThsHf) UpdateData(whereParams, updateParams interface{}) (err error) {
  193. db := utils.MgoDataCli.Database(m.DataBaseName())
  194. coll := db.Collection(m.CollectionName())
  195. return m.UpdateDataByColl(coll, whereParams, updateParams)
  196. }
  197. func (m *EdbDataThsHf) UpdateDataByColl(coll *qmgo.Collection, whereParams, updateParams interface{}) (err error) {
  198. ctx := context.TODO()
  199. err = coll.UpdateOne(ctx, whereParams, updateParams)
  200. if err != nil {
  201. fmt.Println("UpdateDataByColl:Err:" + err.Error())
  202. return
  203. }
  204. return
  205. }
  206. func (m *EdbDataThsHf) RemoveMany(whereParams interface{}) (err error) {
  207. db := utils.MgoDataCli.Database(m.DataBaseName())
  208. coll := db.Collection(m.CollectionName())
  209. return m.RemoveManyByColl(coll, whereParams)
  210. }
  211. func (m *EdbDataThsHf) RemoveManyByColl(coll *qmgo.Collection, whereParams interface{}) (err error) {
  212. ctx := context.TODO()
  213. _, err = coll.RemoveAll(ctx, whereParams)
  214. if err != nil {
  215. fmt.Println("RemoveManyByColl:Err:" + err.Error())
  216. return
  217. }
  218. return
  219. }
  220. func (m *EdbDataThsHf) HandleData(addDataList, updateDataList []EdbDataThsHf) (result interface{}, err error) {
  221. ctx := context.TODO()
  222. callback := func(sessCtx context.Context) (interface{}, error) {
  223. db := utils.MgoDataCli.Database(m.DataBaseName())
  224. coll := db.Collection(m.CollectionName())
  225. if len(addDataList) > 0 {
  226. _, err = coll.InsertMany(sessCtx, addDataList)
  227. if err != nil {
  228. return nil, err
  229. }
  230. }
  231. if len(updateDataList) > 0 {
  232. for _, v := range updateDataList {
  233. err = coll.UpdateOne(ctx, bson.M{"_id": v.ID}, bson.M{"$set": bson.M{"value": v.Value, "modify_time": v.ModifyTime}})
  234. if err != nil {
  235. fmt.Println("BatchInsertData:Err:" + err.Error())
  236. return nil, err
  237. }
  238. }
  239. }
  240. return nil, nil
  241. }
  242. result, err = utils.MgoDataCli.DoTransaction(ctx, callback)
  243. return
  244. }
  245. func (m *EdbDataThsHf) GetEdbInfoMaxAndMinInfo(whereParams interface{}) (result EdbInfoMaxAndMinInfo, err error) {
  246. if utils.MgoDataCli == nil {
  247. err = errors.New("mongodb连接失败")
  248. return
  249. }
  250. db := utils.MgoDataCli.Database(m.DataBaseName())
  251. coll := db.Collection(m.CollectionName())
  252. ctx := context.TODO()
  253. if err != nil {
  254. fmt.Println("MgoGetColl Err:", err.Error())
  255. return
  256. }
  257. err = coll.Aggregate(ctx, whereParams).One(&result)
  258. if err != nil {
  259. return
  260. }
  261. result.MinDate = result.MinDate.In(time.Local)
  262. result.MaxDate = result.MaxDate.In(time.Local)
  263. result.LatestDate = result.LatestDate.In(time.Local)
  264. return
  265. }
  266. func (m *EdbDataThsHf) GetLatestValue(whereParams, selectParam interface{}) (latestValue LatestValue, err error) {
  267. if utils.MgoDataCli == nil {
  268. err = errors.New("mongodb连接失败")
  269. return
  270. }
  271. db := utils.MgoDataCli.Database(m.DataBaseName())
  272. coll := db.Collection(m.CollectionName())
  273. ctx := context.TODO()
  274. if err != nil {
  275. fmt.Println("MgoGetColl Err:", err.Error())
  276. return
  277. }
  278. err = coll.Find(ctx, whereParams).Select(selectParam).One(&latestValue)
  279. return
  280. }