123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- package data
- import (
- "eta/eta_api/models/data_manage"
- "eta/eta_api/services/alarm_msg"
- "eta/eta_api/utils"
- "fmt"
- "sync"
- )
- var (
- EdbRefreshLock sync.Map // 指标刷新锁, 避免同时间重复key的刷新
- EdbRefreshHandling chan EdbQueueRefreshReq // 排队刷新的20+指标的图表/表格
- EdbRefreshWorker chan struct{} // 同时允许N组刷新
- )
- func init() {
- EdbRefreshHandling = make(chan EdbQueueRefreshReq, 50) // N为最大排队等候的goroutine数
- EdbRefreshWorker = make(chan struct{}, 5) // 控制刷新速率修改此N值
- }
- type EdbQueueRefreshReq struct {
- RefreshKey string `description:"刷新请求的缓存key"`
- ItemRefreshKeys []string `description:"图表/表格刷新后要删除的缓存key"`
- BaseEdbInfoArr []*data_manage.EdbInfo
- BasePredictEdbInfoArr []*data_manage.EdbInfo
- CalculateMap map[int]*data_manage.EdbInfo
- PredictCalculateMap map[int]*data_manage.EdbInfo
- CalculateArr []int
- PredictCalculateArr []int
- }
- // PushEdb2Refresh 写入指标刷新队列
- func PushEdb2Refresh(item EdbQueueRefreshReq) {
- refreshKey := item.RefreshKey
- if refreshKey == "" {
- return
- }
- // 检查是否已在队列中
- _, ok := EdbRefreshLock.Load(refreshKey)
- if ok {
- return
- }
- // 追加至刷新队列
- EdbRefreshHandling <- item
- EdbRefreshLock.Store(refreshKey, true)
- }
- // HandleEdbRefreshQueue 排队刷新指标
- func HandleEdbRefreshQueue() {
- defer func() {
- if err := recover(); err != nil {
- tips := fmt.Sprintf("[HandleEdbRefreshQueue] panic: %v", err)
- fmt.Println(tips)
- utils.FileLog.Info(tips)
- }
- }()
- fmt.Println("HandleEdbRefreshQueue start")
- for {
- select {
- case item, ok := <-EdbRefreshHandling:
- if !ok {
- return
- }
- go EdbRefreshTask(item)
- }
- }
- }
- // EdbRefreshTask 指标刷新任务
- func EdbRefreshTask(item EdbQueueRefreshReq) {
- var err error
- defer func() {
- if err != nil {
- tips := fmt.Sprintf("[EdbRefreshTask] ErrMsg: %s", err.Error())
- utils.FileLog.Info(tips)
- go alarm_msg.SendAlarmMsg(tips, 3)
- }
- <-EdbRefreshWorker
- }()
- EdbRefreshWorker <- struct{}{}
- fmt.Printf("EdbRefreshTask开始刷新: %s\n", item.RefreshKey)
- // 刷新指标
- e := edbInfoRefreshAll(false, item.BaseEdbInfoArr, item.BasePredictEdbInfoArr, item.CalculateMap, item.PredictCalculateMap, item.CalculateArr, item.PredictCalculateArr)
- if e != nil {
- err = fmt.Errorf("edbInfoRefreshAll err: %s", e.Error())
- }
- // 清除缓存
- _ = utils.Rc.Delete(item.RefreshKey)
- if len(item.ItemRefreshKeys) > 0 {
- for _, v := range item.ItemRefreshKeys {
- _ = utils.Rc.Delete(v)
- }
- }
- // 解除锁
- EdbRefreshLock.Delete(item.RefreshKey)
- fmt.Printf("EdbRefreshTask结束刷新: %s\n", item.RefreshKey)
- }
|