index_queue.go 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. package services
  2. import (
  3. "eta/mysteel_watch/cache"
  4. "fmt"
  5. "time"
  6. )
  7. // the service for log
  8. //func AutoRefresh() {
  9. // sub := global.Redis.Subscribe(context.TODO(), "autoRefresh")
  10. //
  11. // defer func() {
  12. // sub.Close()
  13. // if err := recover(); err != nil {
  14. // fmt.Println("[AutoRefresh]", err)
  15. // }
  16. // }()
  17. // for {
  18. // msg, err := sub.ReceiveMessage(context.TODO())
  19. // if err != nil {
  20. // fmt.Println("sub err:" + err.Error())
  21. // }
  22. // fmt.Println("sub:", msg.Payload)
  23. // IndexHandle(msg.Payload)
  24. //
  25. // //global.Rc.Brpop(utils.REFRESH_INDEX, func(b []byte) {
  26. // // filePath := string(b)
  27. // // fmt.Println("filePath:", filePath)
  28. // // IndexHandle(filePath)
  29. // //})
  30. // }
  31. //}
  32. // AutoRefresh 调用python刷新指标
  33. func AutoRefresh() {
  34. for {
  35. el := cache.RefreshList.Front()
  36. // 如果没取到,那么就睡眠1s
  37. if el == nil {
  38. time.Sleep(1 * time.Second)
  39. continue
  40. }
  41. filePath := el.Value.(string)
  42. IndexHandle(filePath)
  43. // 处理完后就移除该list
  44. cache.RefreshList.Remove(el)
  45. cache.FilePathMutex.Lock()
  46. delete(cache.FilePathMap, filePath)
  47. cache.FilePathMutex.Unlock()
  48. }
  49. }
  50. // IndexHandle
  51. // @Description: 指标处理
  52. // @author: Roc
  53. // @datetime 2023-11-27 09:41:59
  54. // @param filePath string
  55. func IndexHandle(filePath string) {
  56. //filePath = strings.Replace(filePath, `"`, ``, -1)
  57. fmt.Println("开始刷新文件:", filePath)
  58. time.Sleep(1 * time.Second)
  59. MysteelChemicalRefresh(filePath)
  60. }