Przeglądaj źródła

fix:刷新逻辑调整,不走redis了

Roc 1 rok temu
rodzic
commit
8d4d34bcf9
3 zmienionych plików z 82 dodań i 39 usunięć
  1. 37 13
      cache/index_cache.go
  2. 4 3
      services/index.go
  3. 41 23
      services/index_queue.go

+ 37 - 13
cache/index_cache.go

@@ -1,9 +1,8 @@
 package cache
 
 import (
-	"context"
-	"fmt"
-	"hongze/mysteel_watch/global"
+	"container/list"
+	"sync"
 )
 
 // record more information
@@ -23,15 +22,40 @@ import (
 //	return false
 //}
 
-func IndexAutoRefresh(filePath string) bool {
-	channel := `autoRefresh`
-	if global.Redis != nil {
-		err := global.Redis.Publish(context.TODO(), channel, filePath).Err()
-		if err != nil {
-			fmt.Println("Redis.Publish Err:" + err.Error())
-			return false
-		}
-		return true
+//func IndexAutoRefresh(filePath string) bool {
+//	channel := `autoRefresh`
+//	if global.Redis != nil {
+//		err := global.Redis.Publish(context.TODO(), channel, filePath).Err()
+//		if err != nil {
+//			fmt.Println("Redis.Publish Err:" + err.Error())
+//			return false
+//		}
+//		return true
+//	}
+//	return false
+//}
+
+// RefreshList 刷新的列表
+var RefreshList *list.List
+
+func init() {
+	RefreshList = list.New()
+}
+
+var FilePathMap = make(map[string]int)
+
+// FilePathMutex 创建一个互斥锁
+var FilePathMutex sync.Mutex
+
+func IndexAutoRefresh(filePath string) {
+	FilePathMutex.Lock()
+	defer FilePathMutex.Unlock()
+	// 如果存在该路径,那么就不记录入list
+	if _, ok := FilePathMap[filePath]; ok {
+		return
 	}
-	return false
+	RefreshList.PushBack(filePath)
+	FilePathMap[filePath] = 1
+
+	return
 }

+ 4 - 3
services/index.go

@@ -446,11 +446,12 @@ func GetComment(filePath string) string {
 
 var pushLock sync.RWMutex
 
-// 刷新周度指标数据
+// AddIndexRefreshToLpush 添加到指标刷新
 func AddIndexRefreshToLpush(filePath string) {
 	pushLock.Lock()
-	result := cache.IndexAutoRefresh(filePath)
-	fmt.Println("IndexAutoRefresh result:", result)
+	//result := cache.IndexAutoRefresh(filePath)
+	cache.IndexAutoRefresh(filePath)
+	//fmt.Println("IndexAutoRefresh result:", result)
 	pushLock.Unlock()
 	return
 }

+ 41 - 23
services/index_queue.go

@@ -1,36 +1,54 @@
 package services
 
 import (
-	"context"
 	"fmt"
-	"hongze/mysteel_watch/global"
-	"strings"
+	"hongze/mysteel_watch/cache"
 	"time"
 )
 
 // the service for log
-func AutoRefresh() {
-	sub := global.Redis.Subscribe(context.TODO(), "autoRefresh")
+//func AutoRefresh() {
+//	sub := global.Redis.Subscribe(context.TODO(), "autoRefresh")
+//
+//	defer func() {
+//		sub.Close()
+//		if err := recover(); err != nil {
+//			fmt.Println("[AutoRefresh]", err)
+//		}
+//	}()
+//	for {
+//		msg, err := sub.ReceiveMessage(context.TODO())
+//		if err != nil {
+//			fmt.Println("sub err:" + err.Error())
+//		}
+//		fmt.Println("sub:", msg.Payload)
+//		IndexHandle(msg.Payload)
+//
+//		//global.Rc.Brpop(utils.REFRESH_INDEX, func(b []byte) {
+//		//	filePath := string(b)
+//		//	fmt.Println("filePath:", filePath)
+//		//	IndexHandle(filePath)
+//		//})
+//	}
+//}
 
-	defer func() {
-		sub.Close()
-		if err := recover(); err != nil {
-			fmt.Println("[AutoRefresh]", err)
-		}
-	}()
+// AutoRefresh 调用python刷新指标
+func AutoRefresh() {
 	for {
-		msg, err := sub.ReceiveMessage(context.TODO())
-		if err != nil {
-			fmt.Println("sub err:" + err.Error())
+		el := cache.RefreshList.Front()
+		// 如果没取到,那么就睡眠1s
+		if el == nil {
+			time.Sleep(1 * time.Second)
+			continue
 		}
-		fmt.Println("sub:", msg.Payload)
-		IndexHandle(msg.Payload)
+		filePath := el.Value.(string)
+		IndexHandle(filePath)
+		// 处理完后就移除该list
+		cache.RefreshList.Remove(el)
 
-		//global.Rc.Brpop(utils.REFRESH_INDEX, func(b []byte) {
-		//	filePath := string(b)
-		//	fmt.Println("filePath:", filePath)
-		//	IndexHandle(filePath)
-		//})
+		cache.FilePathMutex.Lock()
+		delete(cache.FilePathMap, filePath)
+		cache.FilePathMutex.Unlock()
 	}
 }
 
@@ -39,10 +57,10 @@ func IndexHandle(filePath string) {
 	//if err != nil {
 	//	go alarm_msg.SendAlarmMsg(utils.APPNAME+" 指标数据未生成检测失败:"+err.Error()+";file:"+filePath, 3)
 	//}
-	filePath = strings.Replace(filePath, `"`, ``, -1)
+	//filePath = strings.Replace(filePath, `"`, ``, -1)
 	fmt.Println("开始刷新文件:", filePath)
 	time.Sleep(1 * time.Second)
 	MysteelChemicalRefresh(filePath)
 	//刷新完成后,清除缓存
-	global.Rc.Delete(filePath)
+	//global.Rc.Delete(filePath)
 }