kobe6258 1 mēnesi atpakaļ
vecāks
revīzija
9355499eff

+ 29 - 39
controllers/rag/chat_controller.go

@@ -2,6 +2,7 @@ package rag
 
 import (
 	"eta/eta_api/controllers"
+	"eta/eta_api/models"
 	"eta/eta_api/models/system"
 	"eta/eta_api/services/llm/facade"
 	"eta/eta_api/utils"
@@ -25,44 +26,33 @@ func (cc *ChatController) Prepare() {
 	}
 }
 
-//// ChatTest @Title 测试知识库问答
-//// @Description 测试知识库问答
-//// @Success 101 {object} response.ListResp
-//// @router /chat/chat_test [post]
-//func (kbctrl *KbController) ChatTest() {
-//	br := new(models.BaseResponse).Init()
-//	defer func() {
-//		if br.ErrMsg == "" {
-//			br.IsSendEmail = false
-//		}
-//		kbctrl.Data["json"] = br
-//		kbctrl.ServeJSON()
-//	}()
-//	sysUser := kbctrl.SysUser
-//	if sysUser == nil {
-//		br.Msg = "请登录"
-//		br.ErrMsg = "请登录,SysUser Is Empty"
-//		br.Ret = 408
-//		return
-//	}
-//	var req facade.LLMKnowledgeChat
-//	err := json.Unmarshal(kbctrl.Ctx.Input.RequestBody, &req)
-//	if err != nil {
-//		br.Msg = "参数解析异常!"
-//		br.ErrMsg = "参数解析失败,Err:" + err.Error()
-//		return
-//	}
-//	searchResp, err := facade.LLMKnowledgeBaseChat(req)
-//	if err != nil {
-//		br.Msg = "知识库问答"
-//		br.ErrMsg = "知识库问答:" + err.Error()
-//		return
-//	}
-//	br.Data = searchResp
-//	br.Ret = 200
-//	br.Success = true
-//	br.Msg = "知识库问答成功"
-//}
+// NewChat @Title 新建对话框
+// @Description 测试知识库问答
+// @Success 101 {object} response.ListResp
+// @router /chat/new_chat [post]
+func (kbctrl *KbController) NewChat() {
+	br := new(models.BaseResponse).Init()
+	defer func() {
+		kbctrl.Data["json"] = br
+		kbctrl.ServeJSON()
+	}()
+	sysUser := kbctrl.SysUser
+	if sysUser == nil {
+		br.Msg = "请登录"
+		br.ErrMsg = "请登录,SysUser Is Empty"
+		br.Ret = 408
+		return
+	}
+	//searchResp, err := facade.LLMKnowledgeBaseChat(req)
+	//if err != nil {
+	//	br.Msg = "知识库问答"
+	//	br.ErrMsg = "知识库问答:" + err.Error()
+	//	return
+	//}
+	br.Ret = 200
+	br.Success = true
+	br.Msg = "创建成功"
+}
 
 // ChatConnect @Title 知识库问答创建对话连接
 // @Description 知识库问答创建对话连接
@@ -102,7 +92,7 @@ func webSocketHandler(w http.ResponseWriter, r *http.Request) (conn *websocket.C
 	// 获取底层 TCP 连接并设置保活
 	if tcpConn, ok := conn.NetConn().(*net.TCPConn); ok {
 		_ = tcpConn.SetKeepAlive(true)
-		_ = tcpConn.SetKeepAlivePeriod(90 * time.Second)
+		_ = tcpConn.SetKeepAlivePeriod(ws.TcpTimeout)
 		utils.FileLog.Info("TCP KeepAlive 已启用")
 	}
 	_ = conn.SetReadDeadline(time.Now().Add(ws.ReadTimeout))

+ 4 - 0
models/llm/user_llm_chat.go

@@ -0,0 +1,4 @@
+package llm
+
+type UserLlmChat struct {
+}

+ 9 - 0
routers/commentsRouter.go

@@ -8530,6 +8530,15 @@ func init() {
             Filters: nil,
             Params: nil})
 
+    beego.GlobalControllerRouter["eta/eta_api/controllers/rag:KbController"] = append(beego.GlobalControllerRouter["eta/eta_api/controllers/rag:KbController"],
+        beego.ControllerComments{
+            Method: "NewChat",
+            Router: `/chat/new_chat`,
+            AllowHTTPMethods: []string{"post"},
+            MethodParams: param.Make(),
+            Filters: nil,
+            Params: nil})
+
     beego.GlobalControllerRouter["eta/eta_api/controllers/rag:KbController"] = append(beego.GlobalControllerRouter["eta/eta_api/controllers/rag:KbController"],
         beego.ControllerComments{
             Method: "SearchDocs",

+ 2 - 0
services/llm/facade/llm_service.go

@@ -18,12 +18,14 @@ func generateSessionCode() (code string) {
 	return fmt.Sprintf("%s%s", "llm_session_", uuid.NewUUID().Hex32())
 }
 
+// AddSession 创建会话session
 func AddSession(userId int, conn *websocket.Conn) {
 	sessionId := generateSessionCode()
 	session := ws.NewSession(userId, sessionId, conn)
 	ws.Manager().AddSession(session)
 }
 
+// LLMKnowledgeBaseSearchDocs 搜索知识库
 func LLMKnowledgeBaseSearchDocs(search LLMKnowledgeSearch) (resp bus_response.SearchDocsEtaResponse, err error) {
 	docs, err := llmService.SearchKbDocs(search.Query, search.KnowledgeBaseName)
 	if err != nil {

+ 2 - 2
utils/ws/latency_measurer.go

@@ -8,7 +8,7 @@ import (
 )
 
 const (
-	maxMessageSize   = 1024 * 1024 // 1MB
+	maxMessageSize   = 1024 * 1024 * 10 // 1MB
 	basePingInterval = 5 * time.Second
 	maxPingInterval  = 120 * time.Second
 	minPingInterval  = 15 * time.Second
@@ -39,7 +39,7 @@ func (lm *LatencyMeasurer) SendPing(conn *websocket.Conn) error {
 		return errors.New("connection closed")
 	}
 	// 发送Ping消息
-	err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(5*time.Second))
+	err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWaitTimeout))
 	if err != nil {
 		return err
 	}

+ 4 - 2
utils/ws/limiter.go

@@ -21,6 +21,8 @@ const (
 	QA_LIMITER          = "qaLimiter"
 	LIMITER_KEY         = "llm_chat_key_user_%d"
 	CONNECT_LIMITER_KEY = "llm_chat_connect_key_user_%d"
+
+	RATE_LIMTER_TIME	=60*time.Second
 )
 
 type RateLimiter struct {
@@ -46,7 +48,7 @@ func (qalm *LimiterManger) GetLimiter(token string) *RateLimiter {
 
 	// 创建一个新的限流器,例如每10秒1个请求
 	limiter := &RateLimiter{
-		Limiter: rate.NewLimiter(rate.Every(10*time.Second), 1),
+		Limiter: rate.NewLimiter(rate.Every(RATE_LIMTER_TIME), 1),
 	}
 	qalm.limiterMap[token] = limiter
 	return limiter
@@ -57,7 +59,7 @@ func (qalm *LimiterManger) Allow(token string) bool {
 		limiter.LastRequest = time.Now()
 		return limiter.Allow()
 	}
-	if time.Now().Sub(limiter.LastRequest) < 10*time.Second {
+	if time.Now().Sub(limiter.LastRequest) < RATE_LIMTER_TIME {
 		return false
 	}
 	limiter.LastRequest = time.Now()

+ 13 - 3
utils/ws/session.go

@@ -3,6 +3,7 @@ package ws
 import (
 	"errors"
 	"eta/eta_api/utils"
+	"fmt"
 	"github.com/gorilla/websocket"
 	"sync"
 	"time"
@@ -29,12 +30,16 @@ type Message struct {
 
 // readPump 处理读操作
 func (s *Session) readPump() {
-	defer manager.RemoveSession(s.Id)
+	defer func() {
+		fmt.Printf("读进程session %s closed", s.Id)
+		manager.RemoveSession(s.Id)
+	}()
 	s.Conn.SetReadLimit(maxMessageSize)
-	_ = s.Conn.SetReadDeadline(time.Now().Add(60 * time.Second))
+	_ = s.Conn.SetReadDeadline(time.Now().Add(ReadTimeout))
 	for {
 		_, message, err := s.Conn.ReadMessage()
 		if err != nil {
+			fmt.Printf("websocket 错误关闭 %s closed", err.Error())
 			handleCloseError(err)
 			return
 		}
@@ -43,6 +48,7 @@ func (s *Session) readPump() {
 		// 处理消息
 		if err = manager.HandleMessage(s.UserId, s.Id, message); err != nil {
 			//写应答
+
 			_ = s.writeWithTimeout(err.Error())
 
 		}
@@ -83,6 +89,7 @@ func (s *Session) writeWithTimeout(msg string) error {
 func (s *Session) writePump() {
 	ticker := time.NewTicker(basePingInterval)
 	defer func() {
+		fmt.Printf("写继进程:session %s closed", s.Id)
 		manager.RemoveSession(s.Id)
 		ticker.Stop()
 	}()
@@ -105,12 +112,15 @@ func handleCloseError(err error) {
 	if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
 		var wsErr *websocket.CloseError
 		if !errors.As(err, &wsErr) {
+			fmt.Printf("websocket未知错误 %s", err.Error())
 			utils.FileLog.Error("未知错误 %s", err.Error())
 		} else {
 			switch wsErr.Code {
 			case websocket.CloseNormalClosure:
+				fmt.Println("websocket正常关闭连接")
 				utils.FileLog.Info("正常关闭连接")
 			default:
+				fmt.Printf("websocket关闭代码 %d:%s", wsErr.Code, wsErr.Text)
 				utils.FileLog.Error("关闭代码:%d:%s", wsErr.Code, wsErr.Text)
 			}
 		}
@@ -125,7 +135,7 @@ func (s *Session) forceClose() {
 	// 发送关闭帧
 	_ = s.Conn.WriteControl(websocket.CloseMessage,
 		websocket.FormatCloseMessage(websocket.ClosePolicyViolation, "heartbeat failed"),
-		time.Now().Add(5*time.Second))
+		time.Now().Add(writeWaitTimeout))
 	_ = s.Conn.Close()
 	s.Conn = nil // 标记连接已关闭
 	utils.FileLog.Info("连接已强制关闭",

+ 5 - 2
utils/ws/session_manager.go

@@ -20,7 +20,8 @@ var (
 const (
 	defaultCheckInterval = 2 * time.Minute  // 检测间隔应小于心跳超时时间
 	connectionTimeout    = 10 * time.Minute // 客户端超时时间
-	ReadTimeout          = 60 * time.Second // 读取超时时间
+	TcpTimeout           = 20 * time.Minute // TCP超时时间,保底关闭,覆盖会话超时时间
+	ReadTimeout          = 15 * time.Minute // 读取超时时间,保底关闭,覆盖会话超时时间
 	writeWaitTimeout     = 60 * time.Second //写入超时时间
 )
 
@@ -100,6 +101,7 @@ func (manager *ConnectionManager) HandleMessage(userID int, sessionID string, me
 			// 发送错误消息到 WebSocket
 			return err
 		case <-closeChan:
+			_ = session.Conn.WriteMessage(websocket.TextMessage, []byte("<EOF>"))
 			return nil
 		}
 	}
@@ -118,6 +120,7 @@ func (manager *ConnectionManager) GetSessionId(userId int, sessionId string) (se
 
 // RemoveSession Remove 移除一个会话
 func (manager *ConnectionManager) RemoveSession(sessionCode string) {
+	fmt.Printf("移除会话: SessionID=%s, UserID=%s", sessionCode, sessionCode)
 	manager.Sessions.Delete(sessionCode)
 }
 
@@ -144,7 +147,7 @@ func (manager *ConnectionManager) CheckAll() {
 		// 发送心跳
 		go func(s *Session) {
 			err := s.Conn.WriteControl(websocket.PingMessage,
-				nil, time.Now().Add(5*time.Second))
+				nil, time.Now().Add(writeWaitTimeout))
 			if err != nil {
 				fmt.Printf("心跳发送失败: SessionID=%s, Error=%v", s.Id, err)
 				utils.FileLog.Warn("心跳发送失败: SessionID=%s, Error=%v",