package data import ( "eta/eta_api/models/data_manage" "eta/eta_api/services/alarm_msg" "eta/eta_api/utils" "fmt" "sync" ) var ( EdbRefreshLock sync.Map // 指标刷新锁, 避免同时间重复key的刷新 EdbRefreshHandling chan EdbQueueRefreshReq // 排队刷新的20+指标的图表/表格 EdbRefreshWorker chan struct{} // 同时允许N组刷新 ) func init() { EdbRefreshHandling = make(chan EdbQueueRefreshReq, 50) // N为最大排队等候的goroutine数 EdbRefreshWorker = make(chan struct{}, 5) // 控制刷新速率修改此N值 } type EdbQueueRefreshReq struct { RefreshKey string `description:"刷新请求的缓存key"` ItemRefreshKeys []string `description:"图表/表格刷新后要删除的缓存key"` BaseEdbInfoArr []*data_manage.EdbInfo BasePredictEdbInfoArr []*data_manage.EdbInfo CalculateMap map[int]*data_manage.EdbInfo PredictCalculateMap map[int]*data_manage.EdbInfo CalculateArr []int PredictCalculateArr []int } // PushEdb2Refresh 写入指标刷新队列 func PushEdb2Refresh(item EdbQueueRefreshReq) { refreshKey := item.RefreshKey if refreshKey == "" { return } // 检查是否已在队列中 _, ok := EdbRefreshLock.Load(refreshKey) if ok { return } // 追加至刷新队列 EdbRefreshHandling <- item EdbRefreshLock.Store(refreshKey, true) } // HandleEdbRefreshQueue 排队刷新指标 func HandleEdbRefreshQueue() { defer func() { if err := recover(); err != nil { tips := fmt.Sprintf("[HandleEdbRefreshQueue] panic: %v", err) fmt.Println(tips) utils.FileLog.Info(tips) } }() fmt.Println("HandleEdbRefreshQueue start") for { select { case item, ok := <-EdbRefreshHandling: if !ok { return } go EdbRefreshTask(item) } } } // EdbRefreshTask 指标刷新任务 func EdbRefreshTask(item EdbQueueRefreshReq) { var err error defer func() { if err != nil { tips := fmt.Sprintf("[EdbRefreshTask] ErrMsg: %s", err.Error()) utils.FileLog.Info(tips) go alarm_msg.SendAlarmMsg(tips, 3) } <-EdbRefreshWorker }() EdbRefreshWorker <- struct{}{} fmt.Printf("EdbRefreshTask开始刷新: %s\n", item.RefreshKey) // 刷新指标 e := edbInfoRefreshAll(false, item.BaseEdbInfoArr, item.BasePredictEdbInfoArr, item.CalculateMap, item.PredictCalculateMap, item.CalculateArr, item.PredictCalculateArr) if e != nil { err = fmt.Errorf("edbInfoRefreshAll err: %s", e.Error()) } // 清除缓存 _ = utils.Rc.Delete(item.RefreshKey) if len(item.ItemRefreshKeys) > 0 { for _, v := range item.ItemRefreshKeys { _ = utils.Rc.Delete(v) } } // 解除锁 EdbRefreshLock.Delete(item.RefreshKey) fmt.Printf("EdbRefreshTask结束刷新: %s\n", item.RefreshKey) }