فهرست منبع

新增指标队列

tuoling805 1 سال پیش
والد
کامیت
66f8ecd1ee
6فایلهای تغییر یافته به همراه59 افزوده شده و 42 حذف شده
  1. 25 12
      cache/index_cache.go
  2. 10 6
      controller/index/index.go
  3. 1 1
      core/run_server.go
  4. 1 7
      init_serve/task.go
  5. 2 1
      services/index.go
  6. 20 15
      services/index_queue.go

+ 25 - 12
cache/index_cache.go

@@ -1,24 +1,37 @@
 package cache
 
 import (
+	"context"
 	"fmt"
 	"hongze/mysteel_watch/global"
-	"hongze/mysteel_watch/utils"
 )
 
-//record more information
+// record more information
+//func IndexAutoRefresh(filePath string) bool {
+//	if global.Re == nil {
+//		if global.Rc != nil {
+//			if global.Rc.SetNX(filePath, filePath, utils.GetTodayLastSecond()) {
+//				err := global.Rc.LPush(utils.REFRESH_INDEX, filePath)
+//				if err != nil {
+//					fmt.Println("RecordNewLogs LPush Err:" + err.Error())
+//				}
+//				return true
+//			}
+//		}
+//		return false
+//	}
+//	return false
+//}
+
 func IndexAutoRefresh(filePath string) bool {
-	if global.Re == nil {
-		if global.Rc != nil {
-			if global.Rc.SetNX(filePath,filePath,utils.GetTodayLastSecond()){
-				err := global.Rc.LPush(utils.REFRESH_INDEX, filePath)
-				if err != nil {
-					fmt.Println("RecordNewLogs LPush Err:" + err.Error())
-				}
-				return true
-			}
+	channel := `autoRefresh`
+	if global.Redis != nil {
+		err := global.Redis.Publish(context.TODO(), channel, filePath).Err()
+		if err != nil {
+			fmt.Println("Redis.Publish Err:" + err.Error())
+			return false
 		}
 		return true
 	}
 	return false
-}
+}

+ 10 - 6
controller/index/index.go

@@ -41,11 +41,7 @@ func (s *IndexController) Create(c *gin.Context) {
 	}
 	//fileName := req.IndexName + "_" + req.IndexCode + ".xlsx"
 	var fileName string
-	if req.UpdateWeek != "" {
-		fileName = req.IndexCode + "_" + req.UpdateWeek + "_" + req.RunMode + ".xlsx" //保存的文件名称
-	} else {
-		fileName = req.IndexCode + "_" + req.RunMode + ".xlsx" //保存的文件名称
-	}
+	fileName = req.IndexCode + "_" + req.RunMode + ".xlsx" //保存的文件名称
 	filePath := global.CONFIG.Serve.IndexSaveDir + fileName
 
 	templatePath := global.CONFIG.Serve.IndexSaveDir + "index_template.xlsx"
@@ -58,6 +54,13 @@ func (s *IndexController) Create(c *gin.Context) {
 		templateFile.Close()
 	}()
 
+	sheetList := templateFile.GetSheetList()
+	for k, v := range sheetList {
+		if k > 0 {
+			templateFile.DeleteSheet(v)
+		}
+	}
+
 	//timeTag := time.Now().UnixNano() / 1e6
 	//timeTagStr := fmt.Sprintf("%d", timeTag)
 
@@ -86,6 +89,7 @@ func (s *IndexController) Create(c *gin.Context) {
 	fmt.Println(string(commentJson))
 	templateFile.DeleteComment("Sheet1", "A1")
 	templateFile.AddComment("Sheet1", "A1", string(commentJson))
+
 	if err := templateFile.SaveAs(filePath); err != nil {
 		fmt.Println(err)
 		resp.FailData("保存失败", "保存失败,Err:"+err.Error(), c)
@@ -228,4 +232,4 @@ func (s *IndexController) Refresh(c *gin.Context) {
 //		return
 //	}
 //	return
-//}
+//}

+ 1 - 1
core/run_server.go

@@ -16,7 +16,7 @@ func RunServe() {
 	fmt.Println(global.CONFIG.Serve.UseRedis)
 	if global.CONFIG.Serve.UseRedis {
 		//初始化redis
-		init_serve.RedisTool()
+		init_serve.Redis()
 	}
 	//初始化验证器
 	//if err := global.InitTrans("zh"); err != nil {

+ 1 - 7
init_serve/task.go

@@ -82,7 +82,7 @@ func InitTask() {
 	go watch.ListenFolderNewMerge()
 
 	//redis 队列刷新指标
-	//go services.AutoRefresh()
+	go services.AutoRefresh()
 
 	//CheckIndexCreate()
 	fmt.Println("start services.Merge")
@@ -108,9 +108,3 @@ func CheckIndexCreate() {
 	}()
 	err = services.IndexCreateCheck()
 }
-
-func InitTask123() {
-	fmt.Println("InitTask start")
-	services.MergeMonthSeasonYearV2()
-	fmt.Println("InitTask end")
-}

+ 2 - 1
services/index.go

@@ -453,7 +453,8 @@ var pushLock sync.RWMutex
 // 刷新周度指标数据
 func AddIndexRefreshToLpush(filePath string) {
 	pushLock.Lock()
-	cache.IndexAutoRefresh(filePath)
+	result := cache.IndexAutoRefresh(filePath)
+	fmt.Println("IndexAutoRefresh result:", result)
 	pushLock.Unlock()
 	return
 }

+ 20 - 15
services/index_queue.go

@@ -1,25 +1,38 @@
 package services
 
 import (
+	"context"
 	"fmt"
 	"hongze/mysteel_watch/global"
-	"hongze/mysteel_watch/utils"
 	"strings"
 	"time"
 )
 
 // the service for log
 func AutoRefresh() {
+	sub := global.Redis.Subscribe(context.TODO(), "autoRefresh")
+
 	defer func() {
+		sub.Close()
 		if err := recover(); err != nil {
 			fmt.Println("[AutoRefresh]", err)
 		}
 	}()
+	i := 0
 	for {
-		global.Rc.Brpop(utils.REFRESH_INDEX, func(b []byte) {
-			filePath := string(b)
-			IndexHandle(filePath)
-		})
+		i++
+		fmt.Println(i)
+		msg, err := sub.ReceiveMessage(context.TODO())
+		if err != nil {
+			fmt.Println("sub err:" + err.Error())
+		}
+		IndexHandle(msg.Payload)
+
+		//global.Rc.Brpop(utils.REFRESH_INDEX, func(b []byte) {
+		//	filePath := string(b)
+		//	fmt.Println("filePath:", filePath)
+		//	IndexHandle(filePath)
+		//})
 	}
 }
 
@@ -31,15 +44,7 @@ func IndexHandle(filePath string) {
 	filePath = strings.Replace(filePath, `"`, ``, -1)
 	fmt.Println("开始刷新文件:", filePath)
 	time.Sleep(1 * time.Second)
-	if global.CONFIG.Serve.SystemType == "custom" {
-		MysteelChemicalRefresh(filePath)
-	} else {
-		if global.CONFIG.Serve.Frequency != "周度" && !strings.Contains(filePath, "week") {
-			MysteelChemicalRefresh(filePath)
-		} else {
-			MysteelChemicalRefresh(filePath)
-		}
-	}
+	MysteelChemicalRefresh(filePath)
 	//刷新完成后,清除缓存
-	//global.Rc.Delete(filePath)
+	global.Rc.Delete(filePath)
 }