Ver Fonte

可重入锁添加记录

kobe6258 há 1 mês atrás
pai
commit
d80d1f8226
2 ficheiros alterados com 50 adições e 12 exclusões
  1. 4 5
      services/llm/chat_service.go
  2. 46 7
      utils/lock/distrubtLock.go

+ 4 - 5
services/llm/chat_service.go

@@ -25,14 +25,13 @@ func AddChatRecord(record *llm.UserChatRecordRedis) error {
 	key := fmt.Sprintf("%s%d", redisChatPrefix, record.ChatId)
 	holder, _ := uuid.NewRandom()
 	holderStr := fmt.Sprintf("user_%s", holder.String())
-	if lock.AcquireLock(key, 10, holderStr) {
+	if err := lock.TryLock(key, 10, holderStr, 10*time.Second); err == nil {
 		defer func() {
-			fmt.Printf("用户释放锁:%s", key)
 			lock.ReleaseLock(key, holderStr)
 		}()
-		data, err := json.Marshal(record)
-		if err != nil {
-			return fmt.Errorf("序列化聊天记录失败: %w", err)
+		data, parseErr := json.Marshal(record)
+		if parseErr != nil {
+			return fmt.Errorf("序列化聊天记录失败: %w", parseErr)
 		}
 		zSet, _ := utils.Rc.ZRangeWithScores(key)
 		if len(zSet) == 0 {

+ 46 - 7
utils/lock/distrubtLock.go

@@ -5,6 +5,7 @@ import (
 	"eta/eta_api/utils"
 	"fmt"
 	"github.com/go-redis/redis/v8"
+	"time"
 )
 
 const (
@@ -37,15 +38,53 @@ func AcquireLock(key string, expiration int, Holder string) bool {
 	return false
 }
 
-//func TryLock(key string, expiration int, Holder string, wait bool, timeout time.Duration)error{
-//
-//}
-func Lock() error {
-	if !AcquireLock("test", 10, "test") {
-		return fmt.Errorf("加锁失败")
+func TryLock(key string, expiration int, Holder string, timeout time.Duration) error {
+	redisCtx, cancel := context.WithTimeout(context.Background(), timeout)
+	defer cancel()
+	for {
+		select {
+		case <-redisCtx.Done():
+			return fmt.Errorf("等待超时")
+		default:
+			if AcquireLock(key, expiration, Holder) {
+				// 启动异步续约
+				go renewLock(key, Holder, expiration)
+				return nil
+			}
+			// 等待重试
+			time.Sleep(50 * time.Millisecond)
+		}
 	}
-	return nil
 }
+
+func renewLock(key, holder string, expiration int) {
+	for {
+		time.Sleep(5 * time.Second)
+		script := redis.NewScript(`
+            local key = KEYS[1]
+			local clientId = ARGV[1]
+			local expiration = tonumber(ARGV[2])
+            if redis.call("get", KEYS[1]) == ARGV[1] then
+                redis.call("SET", key, clientId, "EX", expiration)
+				 return 1
+            else
+				 return 0
+            end
+       `)
+		lockey := fmt.Sprintf("%s%s", lockName, key)
+		result, err := script.Run(context.Background(), utils.Rc.RedisClient(), []string{lockey}, holder, expiration).Result()
+		if err != nil {
+			fmt.Println("锁续期失败:", err)
+			return
+		}
+		if result.(int64) == 0 {
+			fmt.Println("持有者变更,停止监听")
+			return
+		}
+		fmt.Printf("锁续期成功")
+	}
+}
+
 func ReleaseLock(key string, holder string) bool {
 	script := redis.NewScript(`
 	   if redis.call("get", KEYS[1]) == ARGV[1] then