session_manager.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. package ws
  2. import (
  3. "encoding/json"
  4. "errors"
  5. chatService "eta/eta_api/services/llm"
  6. "eta/eta_api/utils"
  7. "eta/eta_api/utils/llm"
  8. "eta/eta_api/utils/llm/eta_llm"
  9. "eta/eta_api/utils/llm/eta_llm/eta_llm_http"
  10. "fmt"
  11. "github.com/gorilla/websocket"
  12. "net/http"
  13. "strings"
  14. "sync"
  15. "time"
  16. )
  17. var (
  18. llmService, _ = llm.GetInstance(llm.ETA_LLM_CLIENT)
  19. )
  20. const (
  21. defaultCheckInterval = 2 * time.Minute // 检测间隔应小于心跳超时时间
  22. connectionTimeout = 10 * time.Minute // 客户端超时时间
  23. TcpTimeout = 20 * time.Minute // TCP超时时间,保底关闭,覆盖会话超时时间
  24. ReadTimeout = 15 * time.Minute // 读取超时时间,保底关闭,覆盖会话超时时间
  25. writeWaitTimeout = 60 * time.Second //写入超时时间
  26. )
  27. type ConnectionManager struct {
  28. Sessions sync.Map
  29. ticker *time.Ticker
  30. stopChan chan struct{}
  31. }
  32. var (
  33. smOnce sync.Once
  34. manager *ConnectionManager
  35. )
  36. func GetInstance() *ConnectionManager {
  37. smOnce.Do(func() {
  38. if manager == nil {
  39. manager = &ConnectionManager{
  40. ticker: time.NewTicker(defaultCheckInterval),
  41. stopChan: make(chan struct{}),
  42. }
  43. }
  44. })
  45. return manager
  46. }
  47. func Manager() *ConnectionManager {
  48. return manager
  49. }
  50. // HandleMessage 消息处理核心逻辑
  51. func (manager *ConnectionManager) HandleMessage(userID int, sessionID string, message []byte) error {
  52. session, exists := manager.GetSession(sessionID)
  53. if !exists {
  54. return errors.New("session not found")
  55. }
  56. if strings.ToLower(string(message)) == "pong" {
  57. session.UpdateActivity()
  58. fmt.Printf("收到心跳消息,续期长连接:%v", session.LastActive)
  59. return nil
  60. }
  61. if !Allow(userID, QA_LIMITER) {
  62. _ = session.Conn.WriteMessage(websocket.TextMessage, []byte("<think></think>"))
  63. _ = session.Conn.WriteMessage(websocket.TextMessage, []byte("您提问的太频繁了,请稍后再试"))
  64. _ = session.Conn.WriteMessage(websocket.TextMessage, []byte("<EOF/>"))
  65. return nil
  66. }
  67. var userMessage Message
  68. err := json.Unmarshal(message, &userMessage)
  69. if err != nil {
  70. utils.FileLog.Error(fmt.Sprintf("消息格式错误:%s", string(message)))
  71. fmt.Printf("消息格式错误:%s", string(message))
  72. return errors.New("消息格式错误:" + err.Error())
  73. }
  74. // 处理业务逻辑
  75. //session.History = append(session.History, userMessage.LastTopics...)
  76. redisHisChat, err := chatService.GetChatRecordsFromRedis(userMessage.ChatId)
  77. if err != nil {
  78. utils.FileLog.Error("获取历史对话数据失败,err:", err.Error())
  79. } else {
  80. for _, chat := range redisHisChat {
  81. his := eta_llm_http.HistoryContent{
  82. Content: chat.Content,
  83. Role: chat.ChatUserType,
  84. }
  85. hisMsg, _ := json.Marshal(&his)
  86. if len(hisMsg) != 0 {
  87. session.History = append(session.History, hisMsg)
  88. }
  89. }
  90. }
  91. resp, err := llmService.KnowledgeBaseChat(userMessage.Query, userMessage.KbName, session.History)
  92. defer func() {
  93. if resp != nil && resp.Body != nil && err == nil {
  94. _ = resp.Body.Close()
  95. }
  96. }()
  97. if resp == nil {
  98. utils.FileLog.Error("知识库问答失败: 无应答")
  99. return errors.New("知识库问答失败: 无应答")
  100. }
  101. if err != nil {
  102. utils.FileLog.Error(fmt.Sprintf("知识库问答失败: httpCode:%d,错误信息:%s", resp.StatusCode, http.StatusText(resp.StatusCode)))
  103. err = errors.New(fmt.Sprintf("知识库问答失败: httpCode:%d,错误信息:%s", resp.StatusCode, http.StatusText(resp.StatusCode)))
  104. return err
  105. }
  106. if resp.StatusCode != http.StatusOK {
  107. utils.FileLog.Error(fmt.Sprintf("知识库问答失败: httpCode:%d,错误信息:%s", resp.StatusCode, http.StatusText(resp.StatusCode)))
  108. err = errors.New(fmt.Sprintf("知识库问答失败: httpCode:%d,错误信息:%s", resp.StatusCode, http.StatusText(resp.StatusCode)))
  109. return err
  110. }
  111. // 解析流式响应
  112. contentChan, errChan, closeChan := eta_llm.ParseStreamResponse(resp)
  113. emptyContent := true
  114. // 处理流式数据并发送到 WebSocket
  115. for {
  116. select {
  117. case content, ok := <-contentChan:
  118. if !ok {
  119. err = errors.New("未知的错误异常")
  120. return err
  121. }
  122. session.UpdateActivity()
  123. if emptyContent {
  124. emptyContent = false
  125. }
  126. // 发送消息到 WebSocket
  127. _ = session.Conn.WriteMessage(websocket.TextMessage, []byte(content))
  128. case chanErr, ok := <-errChan:
  129. if !ok {
  130. err = errors.New("未知的错误异常")
  131. } else {
  132. err = errors.New(chanErr.Error())
  133. }
  134. // 发送错误消息到 WebSocket
  135. return err
  136. case <-closeChan:
  137. if emptyContent {
  138. _ = session.Conn.WriteMessage(websocket.TextMessage, []byte("<think></think>"))
  139. }
  140. _ = session.Conn.WriteMessage(websocket.TextMessage, []byte("<EOF/>"))
  141. return nil
  142. }
  143. }
  144. // 更新最后活跃时间
  145. // 发送响应
  146. //return session.Conn.WriteMessage(websocket.TextMessage, []byte(response))
  147. }
  148. // AddSession Add 添加一个新的会话
  149. func (manager *ConnectionManager) AddSession(session *Session) {
  150. manager.Sessions.Store(session.Id, session)
  151. }
  152. func (manager *ConnectionManager) GetSessionId(userId int, sessionId string) (sessionID string) {
  153. return fmt.Sprintf("%d_%s", userId, sessionId)
  154. }
  155. // RemoveSession Remove 移除一个会话
  156. func (manager *ConnectionManager) RemoveSession(sessionCode string) {
  157. fmt.Printf("移除会话: SessionID=%s, UserID=%s", sessionCode, sessionCode)
  158. manager.Sessions.Delete(sessionCode)
  159. }
  160. // GetSession 获取一个会话
  161. func (manager *ConnectionManager) GetSession(sessionCode string) (session *Session, exists bool) {
  162. if data, ok := manager.Sessions.Load(sessionCode); ok {
  163. session = data.(*Session)
  164. exists = ok
  165. }
  166. return
  167. }
  168. // CheckAll 批量检测所有连接
  169. func (manager *ConnectionManager) CheckAll() {
  170. manager.Sessions.Range(func(key, value interface{}) bool {
  171. session := value.(*Session)
  172. // 判断超时
  173. if time.Since(session.LastActive) > 2*connectionTimeout {
  174. fmt.Printf("连接超时关闭: SessionID=%s, UserID=%s", session.Id, session.UserId)
  175. utils.FileLog.Warn("连接超时关闭: SessionID=%s, UserID=%s", session.Id, session.UserId)
  176. session.Close()
  177. return true
  178. }
  179. // 发送心跳
  180. go func(s *Session) {
  181. err := s.Conn.WriteControl(websocket.PingMessage,
  182. nil, time.Now().Add(writeWaitTimeout))
  183. if err != nil {
  184. fmt.Printf("心跳发送失败: SessionID=%s, Error=%v", s.Id, err)
  185. utils.FileLog.Warn("心跳发送失败: SessionID=%s, Error=%v",
  186. s.Id, err)
  187. fmt.Println("心跳无响应,退出请求")
  188. session.Close()
  189. }
  190. }(session)
  191. return true
  192. })
  193. }
  194. // Start 启动心跳检测
  195. func (manager *ConnectionManager) Start() {
  196. defer manager.ticker.Stop()
  197. for {
  198. select {
  199. case <-manager.ticker.C:
  200. manager.CheckAll()
  201. case <-manager.stopChan:
  202. return
  203. }
  204. }
  205. }
  206. // Stop 停止心跳检测
  207. func (manager *ConnectionManager) Stop() {
  208. close(manager.stopChan)
  209. }