kobe6258 1 semana atrás
pai
commit
6eb7f88a87

+ 1 - 1
models/llm/user_chat_record.go

@@ -11,7 +11,7 @@ import (
 // UserChatRecord 定义用户聊天记录结构体
 type UserChatRecord struct {
 	Id           int       `gorm:"primaryKey;autoIncrement;comment:主键"`
-	ChatId       int       `gorm:"comment:会话id"`
+	ChatId       int       `gorm:"chat_id;comment:会话id"`
 	ChatUserType string    `gorm:"type:enum('user','assistant');comment:用户方"`
 	Content      string    `gorm:"content:内容"`
 	SendTime     time.Time `gorm:"comment:发送时间"`

+ 79 - 28
services/llm/chat_service.go

@@ -11,23 +11,25 @@ import (
 	"github.com/google/uuid"
 	"strconv"
 	"strings"
+	"sync"
 	"time"
 )
 
 const (
 	redisChatPrefix = "chat:zet:"
-	RecordLock      = "lock:chat:record:"
 	redisTTL        = 24 * time.Hour // Redis 缓存过期时间
 )
 
 // AddChatRecord 添加聊天记录到 Redis
 func AddChatRecord(record *llm.UserChatRecordRedis) error {
 	key := fmt.Sprintf("%s%d", redisChatPrefix, record.ChatId)
-	lockKey := fmt.Sprintf("%s%s", RecordLock, key)
-	fmt.Printf("2%s", lockKey)
-	holder, _ := uuid.NewUUID()
-	if lock.AcquireLock(lockKey, 10, holder.String()) {
-		defer lock.ReleaseLock(key, holder.String())
+	holder, _ := uuid.NewRandom()
+	holderStr := fmt.Sprintf("user_%s", holder.String())
+	if lock.AcquireLock(key, 10, holderStr) {
+		defer func() {
+			fmt.Printf("用户释放锁:%s", key)
+			lock.ReleaseLock(key, holderStr)
+		}()
 		data, err := json.Marshal(record)
 		if err != nil {
 			return fmt.Errorf("序列化聊天记录失败: %w", err)
@@ -41,7 +43,7 @@ func AddChatRecord(record *llm.UserChatRecordRedis) error {
 			Member: data,
 			Score:  float64(time.Now().Unix()),
 		})
-		fmt.Println(strconv.Itoa(len(zSet)))
+
 		err = utils.Rc.ZAdd(key, zSet...)
 		if err != nil {
 			return fmt.Errorf("保存聊天记录到 Redis 失败: %w", err)
@@ -85,6 +87,41 @@ func GetChatRecordsFromRedis(chatId int) (redisList []*llm.UserChatRecordRedis,
 	return
 }
 
+func flushRecordsToRedis(chatId int) (err error) {
+	key := fmt.Sprintf("%s%d", redisChatPrefix, chatId)
+	zSet, _ := utils.Rc.ZRangeWithScores(key)
+	var redisList []*llm.UserChatRecordRedis
+	if len(zSet) == 0 {
+		// 缓存不存在,从数据库拉取数据
+		records, dbErr := GetChatRecordsFromDB(chatId)
+		if dbErr != nil {
+			err = fmt.Errorf("从数据库获取聊天记录失败: %w", dbErr)
+			return
+		}
+		var zet []*redis.Zset
+		// 将数据保存到 Redis
+		for _, record := range records {
+			redisRecord := &llm.UserChatRecordRedis{
+				Id:           record.Id,
+				ChatId:       chatId,
+				ChatUserType: record.ChatUserType,
+				Content:      record.Content,
+				SendTime:     record.SendTime.Format(utils.FormatDateTime),
+			}
+			data, parseErr := json.Marshal(&record)
+			if parseErr != nil {
+				utils.FileLog.Error("解析聊天记录失败: %w", err)
+			}
+			zet = append(zet, &redis.Zset{
+				Member: data,
+				Score:  float64(record.SendTime.Unix()),
+			})
+			redisList = append(redisList, redisRecord)
+		}
+	}
+	return
+}
+
 // SaveChatRecordsToDB 将 Redis 中的聊天记录保存到数据库
 func SaveChatRecordsToDB(chatId int) error {
 	list, err := GetChatRecordsFromRedis(chatId)
@@ -107,14 +144,25 @@ func SaveChatRecordsToDB(chatId int) error {
 		})
 		//}
 	}
-	//先删除redis中的缓存
-	//_ = RemoveChatRecord(chatId)
-	err = llm.BatchInsertRecords(newRecords)
-	if err != nil {
-		utils.FileLog.Error("批量插入记录失败:", err.Error())
-		return fmt.Errorf("批量插入记录失败: %w", err)
+	key := fmt.Sprintf("%s%d", redisChatPrefix, chatId)
+	holder, _ := uuid.NewRandom()
+	holderStr := fmt.Sprintf("sys_%s", holder.String())
+	defer func() {
+		fmt.Printf("系统释放锁:%s", key)
+		lock.ReleaseLock(key, holderStr)
+	}()
+	if lock.AcquireLock(key, 10, holderStr) {
+		//先删除redis中的缓存
+		_ = RemoveChatRecord(chatId)
+		err = llm.BatchInsertRecords(newRecords)
+		if err != nil {
+			utils.FileLog.Error("批量插入记录失败:", err.Error())
+			return fmt.Errorf("批量插入记录失败: %w", err)
+		}
+		_ = RemoveChatRecord(chatId)
+		//重新加载数据
+		_ = flushRecordsToRedis(chatId)
 	}
-	//_ = RemoveChatRecord(chatId)
 	return nil
 }
 
@@ -127,23 +175,26 @@ func SaveAllChatRecordsToDB() {
 			utils.FileLog.Error("获取 Redis 键失败: %v", err)
 			return
 		}
+		var wg sync.WaitGroup
+		wg.Add(len(keys))
 		for _, key := range keys {
-			lockKey := fmt.Sprintf("%s%s", RecordLock, key)
-			fmt.Printf("1%s", lockKey)
-			chatIdStr := strings.TrimPrefix(key, redisChatPrefix)
-			chatId, parseErr := strconv.Atoi(chatIdStr)
-			if parseErr != nil {
-				utils.FileLog.Error("解析聊天ID失败: %v", err)
-				continue
-			}
-			if lock.AcquireLock(lockKey, 10, "system_task") {
+			go func(key string) {
+				defer wg.Done()
+				chatIdStr := strings.TrimPrefix(key, redisChatPrefix)
+				chatId, parseErr := strconv.Atoi(chatIdStr)
+				if parseErr != nil {
+					utils.FileLog.Error("解析聊天ID失败: %v", err)
+					return
+				}
 				if err = SaveChatRecordsToDB(chatId); err != nil {
 					utils.FileLog.Error("解析聊天ID失败: %v", err)
 				}
-				lock.ReleaseLock(key, "system_task")
-			}
+				fmt.Println("保存聊天记录到数据库完成")
+			}(key)
 		}
-		time.Sleep(10 * time.Second)
+		wg.Wait()
+		fmt.Printf("计划任务完成")
+		time.Sleep(60 * time.Second)
 	}
 }
 
@@ -157,10 +208,10 @@ func RemoveChatRecord(chatId int) error {
 	return nil
 }
 
-func GetChatRecordsFromDB(chatID int) ([]*llm.UserChatRecord, error) {
+func GetChatRecordsFromDB(chatId int) ([]*llm.UserChatRecord, error) {
 	o := global.DbMap[utils.DbNameAI]
 	var records []*llm.UserChatRecord
-	if err := o.Where("chat_id = ?", chatID).Find(&records).Error; err != nil {
+	if err := o.Where("chat_id = ?", chatId).Find(&records).Error; err != nil {
 		return nil, fmt.Errorf("从数据库获取聊天记录失败: %w", err)
 	}
 	return records, nil

+ 6 - 0
utils/lock/distrubtLock.go

@@ -25,14 +25,17 @@ func AcquireLock(key string, expiration int, Holder string) bool {
 			else
 				return 0
 			end`)
+	fmt.Println("lockName:"+lockName+key, "Holder:"+Holder, "expiration:"+fmt.Sprintf("%d", expiration))
 	lockey := fmt.Sprintf("%s%s", lockName, key)
 	result, err := script.Run(ctx, utils.Rc.RedisClient(), []string{lockey}, Holder, expiration).Int()
 	if err != nil {
+		fmt.Printf("加锁失败:err: %v", err)
 		return false
 	}
 	if result == 1 {
 		return true
 	}
+	fmt.Printf("加锁失败:")
 	return false
 }
 
@@ -44,13 +47,16 @@ func ReleaseLock(key string, holder string) bool {
 	       return 0
 	   end
 	`)
+	fmt.Println("lockName:"+lockName+key, "Holder:"+holder)
 	lockey := fmt.Sprintf("%s%s", lockName, key)
 	result, err := script.Run(ctx, utils.Rc.RedisClient(), []string{lockey}, holder).Int()
 	if err != nil {
+		fmt.Printf("解锁失败:err: %v", err)
 		return false
 	}
 	if result == 1 {
 		return true
 	}
+	fmt.Printf("解锁失败:")
 	return false
 }