|
@@ -0,0 +1,218 @@
|
|
|
+package llm
|
|
|
+
|
|
|
+import (
|
|
|
+ "encoding/json"
|
|
|
+ "eta/eta_api/global"
|
|
|
+ "eta/eta_api/models/llm"
|
|
|
+ "eta/eta_api/utils"
|
|
|
+ "eta/eta_api/utils/lock"
|
|
|
+ "eta/eta_api/utils/redis"
|
|
|
+ "fmt"
|
|
|
+ "github.com/google/uuid"
|
|
|
+ "strconv"
|
|
|
+ "strings"
|
|
|
+ "sync"
|
|
|
+ "time"
|
|
|
+)
|
|
|
+
|
|
|
+const (
|
|
|
+ redisChatPrefix = "chat:zet:"
|
|
|
+ redisTTL = 24 * time.Hour // Redis 缓存过期时间
|
|
|
+)
|
|
|
+
|
|
|
+// AddChatRecord 添加聊天记录到 Redis
|
|
|
+func AddChatRecord(record *llm.UserChatRecordRedis) error {
|
|
|
+ key := fmt.Sprintf("%s%d", redisChatPrefix, record.ChatId)
|
|
|
+ holder, _ := uuid.NewRandom()
|
|
|
+ holderStr := fmt.Sprintf("user_%s", holder.String())
|
|
|
+ if lock.AcquireLock(key, 10, holderStr) {
|
|
|
+ defer func() {
|
|
|
+ fmt.Printf("用户释放锁:%s", key)
|
|
|
+ lock.ReleaseLock(key, holderStr)
|
|
|
+ }()
|
|
|
+ data, err := json.Marshal(record)
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("序列化聊天记录失败: %w", err)
|
|
|
+ }
|
|
|
+ zSet, _ := utils.Rc.ZRangeWithScores(key)
|
|
|
+ if len(zSet) == 0 {
|
|
|
+ // 设置过期时间
|
|
|
+ _ = utils.Rc.Expire(key, 24*time.Hour)
|
|
|
+ }
|
|
|
+ zSet = append(zSet, &redis.Zset{
|
|
|
+ Member: data,
|
|
|
+ Score: float64(time.Now().Unix()),
|
|
|
+ })
|
|
|
+
|
|
|
+ err = utils.Rc.ZAdd(key, zSet...)
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("保存聊天记录到 Redis 失败: %w", err)
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ return fmt.Errorf("获取锁失败,请稍后重试")
|
|
|
+}
|
|
|
+
|
|
|
+// GetChatRecordsFromRedis 从 Redis 获取聊天记录
|
|
|
+func GetChatRecordsFromRedis(chatId int) (redisList []*llm.UserChatRecordRedis, err error) {
|
|
|
+ key := fmt.Sprintf("%s%d", redisChatPrefix, chatId)
|
|
|
+ zSet, _ := utils.Rc.ZRangeWithScores(key)
|
|
|
+ if len(zSet) == 0 {
|
|
|
+ // 缓存不存在,从数据库拉取数据
|
|
|
+ records, dbErr := GetChatRecordsFromDB(chatId)
|
|
|
+ if dbErr != nil {
|
|
|
+ err = fmt.Errorf("从数据库获取聊天记录失败: %w", dbErr)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ // 将数据保存到 Redis
|
|
|
+ for _, record := range records {
|
|
|
+ redisRecord := &llm.UserChatRecordRedis{
|
|
|
+ Id: record.Id,
|
|
|
+ ChatId: chatId,
|
|
|
+ ChatUserType: record.ChatUserType,
|
|
|
+ Content: record.Content,
|
|
|
+ SendTime: record.SendTime.Format(utils.FormatDateTime),
|
|
|
+ }
|
|
|
+ redisList = append(redisList, redisRecord)
|
|
|
+ }
|
|
|
+ return
|
|
|
+ }
|
|
|
+ for _, z := range zSet {
|
|
|
+ var redisRecord llm.UserChatRecordRedis
|
|
|
+ if err = json.Unmarshal([]byte(z.Member.(string)), &redisRecord); err != nil {
|
|
|
+ return nil, fmt.Errorf("解析聊天记录失败: %w", err)
|
|
|
+ }
|
|
|
+ redisList = append(redisList, &redisRecord)
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+func flushRecordsToRedis(chatId int) (err error) {
|
|
|
+ key := fmt.Sprintf("%s%d", redisChatPrefix, chatId)
|
|
|
+ zSet, _ := utils.Rc.ZRangeWithScores(key)
|
|
|
+ if len(zSet) == 0 {
|
|
|
+ // 缓存不存在,从数据库拉取数据
|
|
|
+ records, dbErr := GetChatRecordsFromDB(chatId)
|
|
|
+ if dbErr != nil {
|
|
|
+ err = fmt.Errorf("从数据库获取聊天记录失败: %w", dbErr)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ var zet []*redis.Zset
|
|
|
+ // 将数据保存到 Redis
|
|
|
+ for _, record := range records {
|
|
|
+ redisRecord := &llm.UserChatRecordRedis{
|
|
|
+ Id: record.Id,
|
|
|
+ ChatId: chatId,
|
|
|
+ ChatUserType: record.ChatUserType,
|
|
|
+ Content: record.Content,
|
|
|
+ SendTime: record.SendTime.Format(utils.FormatDateTime),
|
|
|
+ }
|
|
|
+ data, parseErr := json.Marshal(&redisRecord)
|
|
|
+ if parseErr != nil {
|
|
|
+ utils.FileLog.Error("解析聊天记录失败: %w", err)
|
|
|
+ }
|
|
|
+ zet = append(zet, &redis.Zset{
|
|
|
+ Member: data,
|
|
|
+ Score: float64(record.SendTime.Unix()),
|
|
|
+ })
|
|
|
+ }
|
|
|
+ _ = utils.Rc.ZAdd(key, zet...)
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+// SaveChatRecordsToDB 将 Redis 中的聊天记录保存到数据库
|
|
|
+func SaveChatRecordsToDB(chatId int) error {
|
|
|
+ list, err := GetChatRecordsFromRedis(chatId)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ var newRecords []*llm.UserChatRecord
|
|
|
+ for _, record := range list {
|
|
|
+ if record.Id == 0 {
|
|
|
+ sendTime, parseErr := time.Parse(utils.FormatDateTime, record.SendTime)
|
|
|
+ if parseErr != nil {
|
|
|
+ sendTime = time.Now()
|
|
|
+ }
|
|
|
+ newRecords = append(newRecords, &llm.UserChatRecord{
|
|
|
+ Id: record.Id,
|
|
|
+ ChatId: record.ChatId,
|
|
|
+ ChatUserType: record.ChatUserType,
|
|
|
+ Content: record.Content,
|
|
|
+ SendTime: sendTime,
|
|
|
+ CreatedTime: time.Now(),
|
|
|
+ })
|
|
|
+ }
|
|
|
+ }
|
|
|
+ key := fmt.Sprintf("%s%d", redisChatPrefix, chatId)
|
|
|
+ holder, _ := uuid.NewRandom()
|
|
|
+ holderStr := fmt.Sprintf("sys_%s", holder.String())
|
|
|
+ defer func() {
|
|
|
+ fmt.Printf("系统释放锁:%s", key)
|
|
|
+ lock.ReleaseLock(key, holderStr)
|
|
|
+ }()
|
|
|
+ if lock.AcquireLock(key, 10, holderStr) {
|
|
|
+ //先删除redis中的缓存
|
|
|
+ _ = RemoveChatRecord(chatId)
|
|
|
+ err = llm.BatchInsertRecords(newRecords)
|
|
|
+ if err != nil {
|
|
|
+ utils.FileLog.Error("批量插入记录失败:", err.Error())
|
|
|
+ return fmt.Errorf("批量插入记录失败: %w", err)
|
|
|
+ }
|
|
|
+ _ = RemoveChatRecord(chatId)
|
|
|
+ //重新加载数据
|
|
|
+ _ = flushRecordsToRedis(chatId)
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+// SaveAllChatRecordsToDB 定时任务保存所有 Redis 中的聊天记录到数据库
|
|
|
+func SaveAllChatRecordsToDB() {
|
|
|
+ for {
|
|
|
+ fmt.Println("开始保存聊天记录到数据库...")
|
|
|
+ keys, err := utils.Rc.Keys(redisChatPrefix + "*")
|
|
|
+ if err != nil {
|
|
|
+ utils.FileLog.Error("获取 Redis 键失败: %v", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ var wg sync.WaitGroup
|
|
|
+ wg.Add(len(keys))
|
|
|
+ for _, key := range keys {
|
|
|
+ go func(key string) {
|
|
|
+ defer wg.Done()
|
|
|
+ chatIdStr := strings.TrimPrefix(key, redisChatPrefix)
|
|
|
+ chatId, parseErr := strconv.Atoi(chatIdStr)
|
|
|
+ if parseErr != nil {
|
|
|
+ utils.FileLog.Error("解析聊天ID失败: %v", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if err = SaveChatRecordsToDB(chatId); err != nil {
|
|
|
+ utils.FileLog.Error("解析聊天ID失败: %v", err)
|
|
|
+ }
|
|
|
+ fmt.Println("保存聊天记录到数据库完成")
|
|
|
+ }(key)
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ fmt.Printf("计划任务完成")
|
|
|
+ time.Sleep(10 * time.Second)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// RemoveChatRecord 从 Redis 删除聊天记录
|
|
|
+func RemoveChatRecord(chatId int) error {
|
|
|
+ key := fmt.Sprintf("%s%d", redisChatPrefix, chatId)
|
|
|
+ err := utils.Rc.Delete(key)
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("删除 Redis 缓存失败: %w", err)
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func GetChatRecordsFromDB(chatId int) ([]*llm.UserChatRecord, error) {
|
|
|
+ o := global.DbMap[utils.DbNameAI]
|
|
|
+ var records []*llm.UserChatRecord
|
|
|
+ if err := o.Where("chat_id = ?", chatId).Find(&records).Error; err != nil {
|
|
|
+ return nil, fmt.Errorf("从数据库获取聊天记录失败: %w", err)
|
|
|
+ }
|
|
|
+ return records, nil
|
|
|
+}
|