edb_refresh.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. package data
  2. import (
  3. "eta/eta_api/models/data_manage"
  4. "eta/eta_api/services/alarm_msg"
  5. "eta/eta_api/utils"
  6. "fmt"
  7. "sync"
  8. )
  9. var (
  10. EdbRefreshLock sync.Map // 指标刷新锁, 避免同时间重复key的刷新
  11. EdbRefreshHandling chan EdbQueueRefreshReq // 排队刷新的20+指标的图表/表格
  12. EdbRefreshWorker chan struct{} // 同时允许N组刷新
  13. )
  14. func init() {
  15. EdbRefreshHandling = make(chan EdbQueueRefreshReq, 50) // N为最大排队等候的goroutine数
  16. EdbRefreshWorker = make(chan struct{}, 5) // 控制刷新速率修改此N值
  17. }
  18. type EdbQueueRefreshReq struct {
  19. RefreshKey string `description:"刷新请求的缓存key"`
  20. ItemRefreshKeys []string `description:"图表/表格刷新后要删除的缓存key"`
  21. BaseEdbInfoArr []*data_manage.EdbInfo
  22. BasePredictEdbInfoArr []*data_manage.EdbInfo
  23. CalculateMap map[int]*data_manage.EdbInfo
  24. PredictCalculateMap map[int]*data_manage.EdbInfo
  25. CalculateArr []int
  26. PredictCalculateArr []int
  27. }
  28. // PushEdb2Refresh 写入指标刷新队列
  29. func PushEdb2Refresh(item EdbQueueRefreshReq) {
  30. refreshKey := item.RefreshKey
  31. if refreshKey == "" {
  32. return
  33. }
  34. // 检查是否已在队列中
  35. _, ok := EdbRefreshLock.Load(refreshKey)
  36. if ok {
  37. return
  38. }
  39. // 追加至刷新队列
  40. EdbRefreshHandling <- item
  41. EdbRefreshLock.Store(refreshKey, true)
  42. }
  43. // HandleEdbRefreshQueue 排队刷新指标
  44. func HandleEdbRefreshQueue() {
  45. defer func() {
  46. if err := recover(); err != nil {
  47. tips := fmt.Sprintf("[HandleEdbRefreshQueue] panic: %v", err)
  48. fmt.Println(tips)
  49. utils.FileLog.Info(tips)
  50. }
  51. }()
  52. fmt.Println("HandleEdbRefreshQueue start")
  53. for {
  54. select {
  55. case item, ok := <-EdbRefreshHandling:
  56. if !ok {
  57. return
  58. }
  59. go EdbRefreshTask(item)
  60. }
  61. }
  62. }
  63. // EdbRefreshTask 指标刷新任务
  64. func EdbRefreshTask(item EdbQueueRefreshReq) {
  65. var err error
  66. defer func() {
  67. if err != nil {
  68. tips := fmt.Sprintf("[EdbRefreshTask] ErrMsg: %s", err.Error())
  69. utils.FileLog.Info(tips)
  70. go alarm_msg.SendAlarmMsg(tips, 3)
  71. }
  72. <-EdbRefreshWorker
  73. }()
  74. EdbRefreshWorker <- struct{}{}
  75. fmt.Printf("EdbRefreshTask开始刷新: %s\n", item.RefreshKey)
  76. // 刷新指标
  77. e := edbInfoRefreshAll(false, item.BaseEdbInfoArr, item.BasePredictEdbInfoArr, item.CalculateMap, item.PredictCalculateMap, item.CalculateArr, item.PredictCalculateArr)
  78. if e != nil {
  79. err = fmt.Errorf("edbInfoRefreshAll err: %s", e.Error())
  80. }
  81. // 清除缓存
  82. _ = utils.Rc.Delete(item.RefreshKey)
  83. if len(item.ItemRefreshKeys) > 0 {
  84. for _, v := range item.ItemRefreshKeys {
  85. _ = utils.Rc.Delete(v)
  86. }
  87. }
  88. // 解除锁
  89. EdbRefreshLock.Delete(item.RefreshKey)
  90. fmt.Printf("EdbRefreshTask结束刷新: %s\n", item.RefreshKey)
  91. }