edb_refresh.go 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package data
  2. import (
  3. "eta_gn/eta_api/models/data_manage"
  4. "eta_gn/eta_api/services/alarm_msg"
  5. "eta_gn/eta_api/utils"
  6. "fmt"
  7. "sync"
  8. )
  9. var (
  10. EdbRefreshLock sync.Map // 指标刷新锁, 避免同时间重复key的刷新
  11. EdbRefreshHandling chan EdbQueueRefreshReq // 排队刷新的20+指标的图表/表格
  12. EdbRefreshWorker chan struct{} // 同时允许N组刷新
  13. )
  14. func init() {
  15. EdbRefreshHandling = make(chan EdbQueueRefreshReq, 50) // N为最大排队等候的goroutine数
  16. EdbRefreshWorker = make(chan struct{}, 5) // 控制刷新速率修改此N值
  17. }
  18. type EdbQueueRefreshReq struct {
  19. RefreshKey string `description:"刷新请求的缓存key"`
  20. ItemRefreshKeys []string `description:"图表/表格刷新后要删除的缓存key"`
  21. BaseEdbInfoArr []*data_manage.EdbInfo
  22. BasePredictEdbInfoArr []*data_manage.EdbInfo
  23. CalculateMap map[int]*data_manage.EdbInfo
  24. PredictCalculateMap map[int]*data_manage.EdbInfo
  25. CalculateArr []int
  26. PredictCalculateArr []int
  27. }
  28. func PushEdb2Refresh(item EdbQueueRefreshReq) {
  29. refreshKey := item.RefreshKey
  30. if refreshKey == "" {
  31. return
  32. }
  33. _, ok := EdbRefreshLock.Load(refreshKey)
  34. if ok {
  35. return
  36. }
  37. EdbRefreshHandling <- item
  38. EdbRefreshLock.Store(refreshKey, true)
  39. }
  40. func HandleEdbRefreshQueue() {
  41. defer func() {
  42. if err := recover(); err != nil {
  43. tips := fmt.Sprintf("[HandleEdbRefreshQueue] panic: %v", err)
  44. fmt.Println(tips)
  45. utils.FileLog.Info(tips)
  46. }
  47. }()
  48. fmt.Println("HandleEdbRefreshQueue start")
  49. for {
  50. select {
  51. case item, ok := <-EdbRefreshHandling:
  52. if !ok {
  53. return
  54. }
  55. go EdbRefreshTask(item)
  56. }
  57. }
  58. }
  59. func EdbRefreshTask(item EdbQueueRefreshReq) {
  60. var err error
  61. defer func() {
  62. if err != nil {
  63. tips := fmt.Sprintf("[EdbRefreshTask] ErrMsg: %s", err.Error())
  64. utils.FileLog.Info(tips)
  65. go alarm_msg.SendAlarmMsg(tips, 3)
  66. }
  67. <-EdbRefreshWorker
  68. }()
  69. EdbRefreshWorker <- struct{}{}
  70. fmt.Printf("EdbRefreshTask开始刷新: %s\n", item.RefreshKey)
  71. e := edbInfoRefreshAll(false, item.BaseEdbInfoArr, item.BasePredictEdbInfoArr, item.CalculateMap, item.PredictCalculateMap, item.CalculateArr, item.PredictCalculateArr)
  72. if e != nil {
  73. err = fmt.Errorf("edbInfoRefreshAll err: %s", e.Error())
  74. }
  75. _ = utils.Rc.Delete(item.RefreshKey)
  76. if len(item.ItemRefreshKeys) > 0 {
  77. for _, v := range item.ItemRefreshKeys {
  78. _ = utils.Rc.Delete(v)
  79. }
  80. }
  81. EdbRefreshLock.Delete(item.RefreshKey)
  82. fmt.Printf("EdbRefreshTask结束刷新: %s\n", item.RefreshKey)
  83. }