package llm import ( "encoding/json" "eta/eta_api/global" "eta/eta_api/models/llm" "eta/eta_api/utils" "eta/eta_api/utils/lock" "eta/eta_api/utils/redis" "fmt" "github.com/google/uuid" "strconv" "strings" "sync" "time" ) const ( redisChatPrefix = "chat:zet:" redisTTL = 24 * time.Hour // Redis 缓存过期时间 ) // AddChatRecord 添加聊天记录到 Redis func AddChatRecord(record *llm.UserChatRecordRedis) error { key := fmt.Sprintf("%s%d", redisChatPrefix, record.ChatId) holder, _ := uuid.NewRandom() holderStr := fmt.Sprintf("user_%s", holder.String()) if lock.AcquireLock(key, 10, holderStr) { defer func() { fmt.Printf("用户释放锁:%s", key) lock.ReleaseLock(key, holderStr) }() data, err := json.Marshal(record) if err != nil { return fmt.Errorf("序列化聊天记录失败: %w", err) } zSet, _ := utils.Rc.ZRangeWithScores(key) if len(zSet) == 0 { // 设置过期时间 _ = utils.Rc.Expire(key, 24*time.Hour) } zSet = append(zSet, &redis.Zset{ Member: data, Score: float64(time.Now().Unix()), }) err = utils.Rc.ZAdd(key, zSet...) if err != nil { return fmt.Errorf("保存聊天记录到 Redis 失败: %w", err) } return nil } return fmt.Errorf("获取锁失败,请稍后重试") } // GetChatRecordsFromRedis 从 Redis 获取聊天记录 func GetChatRecordsFromRedis(chatId int) (redisList []*llm.UserChatRecordRedis, err error) { key := fmt.Sprintf("%s%d", redisChatPrefix, chatId) zSet, _ := utils.Rc.ZRangeWithScores(key) if len(zSet) == 0 { // 缓存不存在,从数据库拉取数据 records, dbErr := GetChatRecordsFromDB(chatId) if dbErr != nil { err = fmt.Errorf("从数据库获取聊天记录失败: %w", dbErr) return } // 将数据保存到 Redis for _, record := range records { redisRecord := &llm.UserChatRecordRedis{ Id: record.Id, ChatId: chatId, ChatUserType: record.ChatUserType, Content: record.Content, SendTime: record.SendTime.Format(utils.FormatDateTime), } redisList = append(redisList, redisRecord) } return } for _, z := range zSet { var redisRecord llm.UserChatRecordRedis if err = json.Unmarshal([]byte(z.Member.(string)), &redisRecord); err != nil { return nil, fmt.Errorf("解析聊天记录失败: %w", err) } redisList = append(redisList, &redisRecord) } return } func flushRecordsToRedis(chatId int) (err error) { key := fmt.Sprintf("%s%d", redisChatPrefix, chatId) zSet, _ := utils.Rc.ZRangeWithScores(key) if len(zSet) == 0 { // 缓存不存在,从数据库拉取数据 records, dbErr := GetChatRecordsFromDB(chatId) if dbErr != nil { err = fmt.Errorf("从数据库获取聊天记录失败: %w", dbErr) return } var zet []*redis.Zset // 将数据保存到 Redis for _, record := range records { redisRecord := &llm.UserChatRecordRedis{ Id: record.Id, ChatId: chatId, ChatUserType: record.ChatUserType, Content: record.Content, SendTime: record.SendTime.Format(utils.FormatDateTime), } data, parseErr := json.Marshal(&redisRecord) if parseErr != nil { utils.FileLog.Error("解析聊天记录失败: %w", err) } zet = append(zet, &redis.Zset{ Member: data, Score: float64(record.SendTime.Unix()), }) } _ = utils.Rc.ZAdd(key, zet...) } return } // SaveChatRecordsToDB 将 Redis 中的聊天记录保存到数据库 func SaveChatRecordsToDB(chatId int) error { list, err := GetChatRecordsFromRedis(chatId) if err != nil { return err } var newRecords []*llm.UserChatRecord for _, record := range list { if record.Id == 0 { sendTime, parseErr := time.Parse(utils.FormatDateTime, record.SendTime) if parseErr != nil { sendTime = time.Now() } newRecords = append(newRecords, &llm.UserChatRecord{ Id: record.Id, ChatId: record.ChatId, ChatUserType: record.ChatUserType, Content: record.Content, SendTime: sendTime, CreatedTime: time.Now(), }) } } key := fmt.Sprintf("%s%d", redisChatPrefix, chatId) holder, _ := uuid.NewRandom() holderStr := fmt.Sprintf("sys_%s", holder.String()) defer func() { lock.ReleaseLock(key, holderStr) }() if lock.AcquireLock(key, 10, holderStr) { //先删除redis中的缓存 _ = RemoveChatRecord(chatId) err = llm.BatchInsertRecords(newRecords) if err != nil { utils.FileLog.Error("批量插入记录失败:", err.Error()) return fmt.Errorf("批量插入记录失败: %w", err) } _ = RemoveChatRecord(chatId) //重新加载数据 _ = flushRecordsToRedis(chatId) } return nil } // SaveAllChatRecordsToDB 定时任务保存所有 Redis 中的聊天记录到数据库 func SaveAllChatRecordsToDB() { for { keys, err := utils.Rc.Keys(redisChatPrefix + "*") if err != nil { utils.FileLog.Error("获取 Redis 键失败: %v", err) return } var wg sync.WaitGroup wg.Add(len(keys)) for _, key := range keys { go func(key string) { defer wg.Done() chatIdStr := strings.TrimPrefix(key, redisChatPrefix) chatId, parseErr := strconv.Atoi(chatIdStr) if parseErr != nil { utils.FileLog.Error("解析聊天ID失败: %v", err) return } if err = SaveChatRecordsToDB(chatId); err != nil { utils.FileLog.Error("解析聊天ID失败: %v", err) } }(key) } wg.Wait() time.Sleep(10 * time.Second) } } // RemoveChatRecord 从 Redis 删除聊天记录 func RemoveChatRecord(chatId int) error { key := fmt.Sprintf("%s%d", redisChatPrefix, chatId) err := utils.Rc.Delete(key) if err != nil { return fmt.Errorf("删除 Redis 缓存失败: %w", err) } return nil } func GetChatRecordsFromDB(chatId int) ([]*llm.UserChatRecord, error) { o := global.DbMap[utils.DbNameAI] var records []*llm.UserChatRecord if err := o.Where("chat_id = ?", chatId).Find(&records).Error; err != nil { return nil, fmt.Errorf("从数据库获取聊天记录失败: %w", err) } return records, nil }