Browse Source

Merge branch 'feature/deepseek_rag_1.0' into debug

kobe6258 1 day ago
parent
commit
a8ca18ce30

+ 1 - 1
services/llm/chat_service.go

@@ -190,7 +190,7 @@ func SaveAllChatRecordsToDB() {
 			}(key)
 		}
 		wg.Wait()
-		time.Sleep(10 * time.Second)
+		time.Sleep(10 * time.Minute)
 	}
 }
 

+ 4 - 0
utils/lock/distrubtLock.go

@@ -36,6 +36,10 @@ func AcquireLock(key string, expiration int, Holder string) bool {
 	}
 	return false
 }
+
+//func TryLock(key string, expiration int, Holder string, wait bool, timeout time.Duration)error{
+//
+//}
 func Lock() error {
 	if !AcquireLock("test", 10, "test") {
 		return fmt.Errorf("加锁失败")

+ 0 - 28
utils/lock/reentrant_lock.go

@@ -1,28 +0,0 @@
-package lock
-
-import (
-	"context"
-	"github.com/go-redis/redis/v8"
-	"sync"
-)
-
-type RedisReentrantLock struct {
-	key      string
-	client   *redis.UniversalClient
-	ctx      context.Context
-	identity string
-	count    int
-	mu       sync.Mutex
-	stopChan chan struct{}
-}
-
-//func NewRedisReentrantLock(client *redis.Client, ctx context.Context, key string, identity string) *RedisReentrantLock {
-//	return &RedisReentrantLock{
-//		key:      key,
-//		client:   client,
-//		ctx:      ctx,
-//		identity: identity,
-//		count:    0,
-//		stopChan: make(chan struct{}),
-//	}
-//}

+ 2 - 0
utils/ws/latency_measurer.go

@@ -2,6 +2,7 @@ package ws
 
 import (
 	"errors"
+	"fmt"
 	"github.com/gorilla/websocket"
 	"sync"
 	"time"
@@ -86,6 +87,7 @@ func (lm *LatencyMeasurer) GetLatency() time.Duration {
 func SetupLatencyMeasurement(conn *websocket.Conn) *LatencyMeasurer {
 	lm := NewLatencyMeasurer(5) // 使用最近5次测量的滑动窗口
 	conn.SetPongHandler(func(appData string) error {
+		fmt.Println("Pong received")
 		lm.CalculateLatency()
 		return nil
 	})

+ 2 - 2
utils/ws/limiter.go

@@ -34,7 +34,7 @@ const (
 	CONNECT_LIMITER_KEY = "llm_chat_connect_key_user_%d"
 
 	RATE_LIMTER_TIME     = 30 * time.Second
-	CONNECT_LIMITER_TIME = 5 * time.Second
+	CONNECT_LIMITER_TIME = 200 * time.Millisecond
 )
 
 var ()
@@ -109,5 +109,5 @@ func Allow(userId int, limiter string) bool {
 	if handler == nil {
 		return false
 	}
-	return handler.Allow(token,limiter)
+	return handler.Allow(token, limiter)
 }

+ 4 - 4
utils/ws/session.go

@@ -25,9 +25,10 @@ type Session struct {
 }
 
 type Message struct {
-	KbName     string            `json:"KbName"`
-	Query      string            `json:"Query"`
-	LastTopics []json.RawMessage `json:"LastTopics"`
+	KbName string `json:"KbName"`
+	Query  string `json:"Query"`
+	ChatId int    `json:"ChatId"`
+	//LastTopics []json.RawMessage `json:"LastTopics"`
 }
 
 // readPump 处理读操作
@@ -143,7 +144,6 @@ func (s *Session) forceClose() {
 		"user", s.UserId,
 		"session", s.Id)
 }
-
 func NewSession(userId int, sessionId string, conn *websocket.Conn) (session *Session) {
 	session = &Session{
 		UserId:      userId,

+ 24 - 2
utils/ws/session_manager.go

@@ -3,12 +3,15 @@ package ws
 import (
 	"encoding/json"
 	"errors"
+	chatService "eta/eta_api/services/llm"
 	"eta/eta_api/utils"
 	"eta/eta_api/utils/llm"
 	"eta/eta_api/utils/llm/eta_llm"
+	"eta/eta_api/utils/llm/eta_llm/eta_llm_http"
 	"fmt"
 	"github.com/gorilla/websocket"
 	"net/http"
+	"strings"
 	"sync"
 	"time"
 )
@@ -60,6 +63,11 @@ func (manager *ConnectionManager) HandleMessage(userID int, sessionID string, me
 	if !exists {
 		return errors.New("session not found")
 	}
+	if strings.ToLower(string(message)) == "pong" {
+		session.UpdateActivity()
+		fmt.Printf("收到心跳消息,续期长连接:%v", session.LastActive)
+		return nil
+	}
 	var userMessage Message
 	err := json.Unmarshal(message, &userMessage)
 	if err != nil {
@@ -67,8 +75,22 @@ func (manager *ConnectionManager) HandleMessage(userID int, sessionID string, me
 		return errors.New("消息格式错误:" + err.Error())
 	}
 	// 处理业务逻辑
-	session.History = append(session.History, userMessage.LastTopics...)
-
+	//session.History = append(session.History, userMessage.LastTopics...)
+	redisHisChat, err := chatService.GetChatRecordsFromRedis(userMessage.ChatId)
+	if err != nil {
+		utils.FileLog.Error("获取历史对话数据失败,err:", err.Error())
+	} else {
+		for _, chat := range redisHisChat {
+			his := eta_llm_http.HistoryContent{
+				Content: chat.Content,
+				Role:    chat.ChatUserType,
+			}
+			hisMsg, _ := json.Marshal(&his)
+			if len(hisMsg) != 0 {
+				session.History = append(session.History, hisMsg)
+			}
+		}
+	}
 	resp, err := llmService.KnowledgeBaseChat(userMessage.Query, userMessage.KbName, session.History)
 	defer func() {
 		if resp != nil && resp.Body != nil && err == nil {