123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214 |
- package llm
- import (
- "encoding/json"
- "eta/eta_api/global"
- "eta/eta_api/models/llm"
- "eta/eta_api/utils"
- "eta/eta_api/utils/lock"
- "eta/eta_api/utils/redis"
- "fmt"
- "github.com/google/uuid"
- "strconv"
- "strings"
- "sync"
- "time"
- )
- const (
- redisChatPrefix = "chat:zet:"
- redisTTL = 24 * time.Hour // Redis 缓存过期时间
- )
- // AddChatRecord 添加聊天记录到 Redis
- func AddChatRecord(record *llm.UserChatRecordRedis) error {
- key := fmt.Sprintf("%s%d", redisChatPrefix, record.ChatId)
- holder, _ := uuid.NewRandom()
- holderStr := fmt.Sprintf("user_%s", holder.String())
- if lock.AcquireLock(key, 10, holderStr) {
- defer func() {
- fmt.Printf("用户释放锁:%s", key)
- lock.ReleaseLock(key, holderStr)
- }()
- data, err := json.Marshal(record)
- if err != nil {
- return fmt.Errorf("序列化聊天记录失败: %w", err)
- }
- zSet, _ := utils.Rc.ZRangeWithScores(key)
- if len(zSet) == 0 {
- // 设置过期时间
- _ = utils.Rc.Expire(key, 24*time.Hour)
- }
- zSet = append(zSet, &redis.Zset{
- Member: data,
- Score: float64(time.Now().Unix()),
- })
- err = utils.Rc.ZAdd(key, zSet...)
- if err != nil {
- return fmt.Errorf("保存聊天记录到 Redis 失败: %w", err)
- }
- return nil
- }
- return fmt.Errorf("获取锁失败,请稍后重试")
- }
- // GetChatRecordsFromRedis 从 Redis 获取聊天记录
- func GetChatRecordsFromRedis(chatId int) (redisList []*llm.UserChatRecordRedis, err error) {
- key := fmt.Sprintf("%s%d", redisChatPrefix, chatId)
- zSet, _ := utils.Rc.ZRangeWithScores(key)
- if len(zSet) == 0 {
- // 缓存不存在,从数据库拉取数据
- records, dbErr := GetChatRecordsFromDB(chatId)
- if dbErr != nil {
- err = fmt.Errorf("从数据库获取聊天记录失败: %w", dbErr)
- return
- }
- // 将数据保存到 Redis
- for _, record := range records {
- redisRecord := &llm.UserChatRecordRedis{
- Id: record.Id,
- ChatId: chatId,
- ChatUserType: record.ChatUserType,
- Content: record.Content,
- SendTime: record.SendTime.Format(utils.FormatDateTime),
- }
- redisList = append(redisList, redisRecord)
- }
- return
- }
- for _, z := range zSet {
- var redisRecord llm.UserChatRecordRedis
- if err = json.Unmarshal([]byte(z.Member.(string)), &redisRecord); err != nil {
- return nil, fmt.Errorf("解析聊天记录失败: %w", err)
- }
- redisList = append(redisList, &redisRecord)
- }
- return
- }
- func flushRecordsToRedis(chatId int) (err error) {
- key := fmt.Sprintf("%s%d", redisChatPrefix, chatId)
- zSet, _ := utils.Rc.ZRangeWithScores(key)
- if len(zSet) == 0 {
- // 缓存不存在,从数据库拉取数据
- records, dbErr := GetChatRecordsFromDB(chatId)
- if dbErr != nil {
- err = fmt.Errorf("从数据库获取聊天记录失败: %w", dbErr)
- return
- }
- var zet []*redis.Zset
- // 将数据保存到 Redis
- for _, record := range records {
- redisRecord := &llm.UserChatRecordRedis{
- Id: record.Id,
- ChatId: chatId,
- ChatUserType: record.ChatUserType,
- Content: record.Content,
- SendTime: record.SendTime.Format(utils.FormatDateTime),
- }
- data, parseErr := json.Marshal(&redisRecord)
- if parseErr != nil {
- utils.FileLog.Error("解析聊天记录失败: %w", err)
- }
- zet = append(zet, &redis.Zset{
- Member: data,
- Score: float64(record.SendTime.Unix()),
- })
- }
- _ = utils.Rc.ZAdd(key, zet...)
- }
- return
- }
- // SaveChatRecordsToDB 将 Redis 中的聊天记录保存到数据库
- func SaveChatRecordsToDB(chatId int) error {
- list, err := GetChatRecordsFromRedis(chatId)
- if err != nil {
- return err
- }
- var newRecords []*llm.UserChatRecord
- for _, record := range list {
- if record.Id == 0 {
- sendTime, parseErr := time.Parse(utils.FormatDateTime, record.SendTime)
- if parseErr != nil {
- sendTime = time.Now()
- }
- newRecords = append(newRecords, &llm.UserChatRecord{
- Id: record.Id,
- ChatId: record.ChatId,
- ChatUserType: record.ChatUserType,
- Content: record.Content,
- SendTime: sendTime,
- CreatedTime: time.Now(),
- })
- }
- }
- key := fmt.Sprintf("%s%d", redisChatPrefix, chatId)
- holder, _ := uuid.NewRandom()
- holderStr := fmt.Sprintf("sys_%s", holder.String())
- defer func() {
- lock.ReleaseLock(key, holderStr)
- }()
- if lock.AcquireLock(key, 10, holderStr) {
- //先删除redis中的缓存
- _ = RemoveChatRecord(chatId)
- err = llm.BatchInsertRecords(newRecords)
- if err != nil {
- utils.FileLog.Error("批量插入记录失败:", err.Error())
- return fmt.Errorf("批量插入记录失败: %w", err)
- }
- _ = RemoveChatRecord(chatId)
- //重新加载数据
- _ = flushRecordsToRedis(chatId)
- }
- return nil
- }
- // SaveAllChatRecordsToDB 定时任务保存所有 Redis 中的聊天记录到数据库
- func SaveAllChatRecordsToDB() {
- for {
- keys, err := utils.Rc.Keys(redisChatPrefix + "*")
- if err != nil {
- utils.FileLog.Error("获取 Redis 键失败: %v", err)
- return
- }
- var wg sync.WaitGroup
- wg.Add(len(keys))
- for _, key := range keys {
- go func(key string) {
- defer wg.Done()
- chatIdStr := strings.TrimPrefix(key, redisChatPrefix)
- chatId, parseErr := strconv.Atoi(chatIdStr)
- if parseErr != nil {
- utils.FileLog.Error("解析聊天ID失败: %v", err)
- return
- }
- if err = SaveChatRecordsToDB(chatId); err != nil {
- utils.FileLog.Error("解析聊天ID失败: %v", err)
- }
- }(key)
- }
- wg.Wait()
- time.Sleep(10 * time.Second)
- }
- }
- // RemoveChatRecord 从 Redis 删除聊天记录
- func RemoveChatRecord(chatId int) error {
- key := fmt.Sprintf("%s%d", redisChatPrefix, chatId)
- err := utils.Rc.Delete(key)
- if err != nil {
- return fmt.Errorf("删除 Redis 缓存失败: %w", err)
- }
- return nil
- }
- func GetChatRecordsFromDB(chatId int) ([]*llm.UserChatRecord, error) {
- o := global.DbMap[utils.DbNameAI]
- var records []*llm.UserChatRecord
- if err := o.Where("chat_id = ?", chatId).Find(&records).Error; err != nil {
- return nil, fmt.Errorf("从数据库获取聊天记录失败: %w", err)
- }
- return records, nil
- }
|