|
@@ -0,0 +1,167 @@
|
|
|
+package llm
|
|
|
+
|
|
|
+import (
|
|
|
+ "encoding/json"
|
|
|
+ "eta/eta_api/global"
|
|
|
+ "eta/eta_api/models/llm"
|
|
|
+ "eta/eta_api/utils"
|
|
|
+ "eta/eta_api/utils/lock"
|
|
|
+ "eta/eta_api/utils/redis"
|
|
|
+ "fmt"
|
|
|
+ "github.com/google/uuid"
|
|
|
+ "strconv"
|
|
|
+ "strings"
|
|
|
+ "time"
|
|
|
+)
|
|
|
+
|
|
|
+const (
|
|
|
+ redisChatPrefix = "chat:zet:"
|
|
|
+ RecordLock = "lock:chat:record:"
|
|
|
+ redisTTL = 24 * time.Hour // Redis 缓存过期时间
|
|
|
+)
|
|
|
+
|
|
|
+// AddChatRecord 添加聊天记录到 Redis
|
|
|
+func AddChatRecord(record *llm.UserChatRecordRedis) error {
|
|
|
+ key := fmt.Sprintf("%s%d", redisChatPrefix, record.ChatId)
|
|
|
+ lockKey := fmt.Sprintf("%s%s", RecordLock, key)
|
|
|
+ fmt.Printf("2%s", lockKey)
|
|
|
+ holder, _ := uuid.NewUUID()
|
|
|
+ if lock.AcquireLock(lockKey, 10, holder.String()) {
|
|
|
+ defer lock.ReleaseLock(key, holder.String())
|
|
|
+ data, err := json.Marshal(record)
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("序列化聊天记录失败: %w", err)
|
|
|
+ }
|
|
|
+ zSet, _ := utils.Rc.ZRangeWithScores(key)
|
|
|
+ if len(zSet) == 0 {
|
|
|
+ // 设置过期时间
|
|
|
+ _ = utils.Rc.Expire(key, 24*time.Hour)
|
|
|
+ }
|
|
|
+ zSet = append(zSet, &redis.Zset{
|
|
|
+ Member: data,
|
|
|
+ Score: float64(time.Now().Unix()),
|
|
|
+ })
|
|
|
+ fmt.Println(strconv.Itoa(len(zSet)))
|
|
|
+ err = utils.Rc.ZAdd(key, zSet...)
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("保存聊天记录到 Redis 失败: %w", err)
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ return fmt.Errorf("获取锁失败,请稍后重试")
|
|
|
+}
|
|
|
+
|
|
|
+// GetChatRecordsFromRedis 从 Redis 获取聊天记录
|
|
|
+func GetChatRecordsFromRedis(chatId int) (redisList []*llm.UserChatRecordRedis, err error) {
|
|
|
+ key := fmt.Sprintf("%s%d", redisChatPrefix, chatId)
|
|
|
+ zSet, _ := utils.Rc.ZRangeWithScores(key)
|
|
|
+ if len(zSet) == 0 {
|
|
|
+ // 缓存不存在,从数据库拉取数据
|
|
|
+ records, dbErr := GetChatRecordsFromDB(chatId)
|
|
|
+ if dbErr != nil {
|
|
|
+ err = fmt.Errorf("从数据库获取聊天记录失败: %w", dbErr)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ // 将数据保存到 Redis
|
|
|
+ for _, record := range records {
|
|
|
+ redisRecord := &llm.UserChatRecordRedis{
|
|
|
+ Id: record.Id,
|
|
|
+ ChatId: chatId,
|
|
|
+ ChatUserType: record.ChatUserType,
|
|
|
+ Content: record.Content,
|
|
|
+ SendTime: record.SendTime.Format(utils.FormatDateTime),
|
|
|
+ }
|
|
|
+ redisList = append(redisList, redisRecord)
|
|
|
+ }
|
|
|
+ return
|
|
|
+ }
|
|
|
+ for _, z := range zSet {
|
|
|
+ var redisRecord llm.UserChatRecordRedis
|
|
|
+ if err = json.Unmarshal([]byte(z.Member.(string)), &redisRecord); err != nil {
|
|
|
+ return nil, fmt.Errorf("解析聊天记录失败: %w", err)
|
|
|
+ }
|
|
|
+ redisList = append(redisList, &redisRecord)
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+// SaveChatRecordsToDB 将 Redis 中的聊天记录保存到数据库
|
|
|
+func SaveChatRecordsToDB(chatId int) error {
|
|
|
+ list, err := GetChatRecordsFromRedis(chatId)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ var newRecords []*llm.UserChatRecord
|
|
|
+ for _, record := range list {
|
|
|
+ //if record.Id == 0 {
|
|
|
+ sendTime, parseErr := time.Parse(utils.FormatDateTime, record.SendTime)
|
|
|
+ if parseErr != nil {
|
|
|
+ sendTime = time.Now()
|
|
|
+ }
|
|
|
+ newRecords = append(newRecords, &llm.UserChatRecord{
|
|
|
+ ChatId: record.ChatId,
|
|
|
+ ChatUserType: record.ChatUserType,
|
|
|
+ Content: record.Content,
|
|
|
+ SendTime: sendTime,
|
|
|
+ CreatedTime: time.Now(),
|
|
|
+ })
|
|
|
+ //}
|
|
|
+ }
|
|
|
+ //先删除redis中的缓存
|
|
|
+ //_ = RemoveChatRecord(chatId)
|
|
|
+ err = llm.BatchInsertRecords(newRecords)
|
|
|
+ if err != nil {
|
|
|
+ utils.FileLog.Error("批量插入记录失败:", err.Error())
|
|
|
+ return fmt.Errorf("批量插入记录失败: %w", err)
|
|
|
+ }
|
|
|
+ //_ = RemoveChatRecord(chatId)
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+// SaveAllChatRecordsToDB 定时任务保存所有 Redis 中的聊天记录到数据库
|
|
|
+func SaveAllChatRecordsToDB() {
|
|
|
+ for {
|
|
|
+ fmt.Println("开始保存聊天记录到数据库...")
|
|
|
+ keys, err := utils.Rc.Keys(redisChatPrefix + "*")
|
|
|
+ if err != nil {
|
|
|
+ utils.FileLog.Error("获取 Redis 键失败: %v", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ for _, key := range keys {
|
|
|
+ lockKey := fmt.Sprintf("%s%s", RecordLock, key)
|
|
|
+ fmt.Printf("1%s", lockKey)
|
|
|
+ chatIdStr := strings.TrimPrefix(key, redisChatPrefix)
|
|
|
+ chatId, parseErr := strconv.Atoi(chatIdStr)
|
|
|
+ if parseErr != nil {
|
|
|
+ utils.FileLog.Error("解析聊天ID失败: %v", err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if lock.AcquireLock(lockKey, 10, "system_task") {
|
|
|
+ if err = SaveChatRecordsToDB(chatId); err != nil {
|
|
|
+ utils.FileLog.Error("解析聊天ID失败: %v", err)
|
|
|
+ }
|
|
|
+ lock.ReleaseLock(key, "system_task")
|
|
|
+ }
|
|
|
+ }
|
|
|
+ time.Sleep(10 * time.Second)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// RemoveChatRecord 从 Redis 删除聊天记录
|
|
|
+func RemoveChatRecord(chatId int) error {
|
|
|
+ key := fmt.Sprintf("%s%d", redisChatPrefix, chatId)
|
|
|
+ err := utils.Rc.Delete(key)
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("删除 Redis 缓存失败: %w", err)
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func GetChatRecordsFromDB(chatID int) ([]*llm.UserChatRecord, error) {
|
|
|
+ o := global.DbMap[utils.DbNameAI]
|
|
|
+ var records []*llm.UserChatRecord
|
|
|
+ if err := o.Where("chat_id = ?", chatID).Find(&records).Error; err != nil {
|
|
|
+ return nil, fmt.Errorf("从数据库获取聊天记录失败: %w", err)
|
|
|
+ }
|
|
|
+ return records, nil
|
|
|
+}
|