123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- package ws
- import (
- "encoding/json"
- "errors"
- "eta/eta_api/utils"
- "fmt"
- "github.com/gorilla/websocket"
- "sync"
- "time"
- )
- // Session 会话结构
- type Session struct {
- Id string
- UserId int
- Conn *websocket.Conn
- LastActive time.Time
- Latency *LatencyMeasurer
- History []json.RawMessage
- CloseChan chan struct{}
- MessageChan chan string
- mu sync.RWMutex
- sessionOnce sync.Once
- }
- type Message struct {
- KbName string `json:"KbName"`
- Query string `json:"Query"`
- ChatId int `json:"ChatId"`
- //LastTopics []json.RawMessage `json:"LastTopics"`
- }
- // readPump 处理读操作
- func (s *Session) readPump() {
- defer func() {
- fmt.Printf("读进程session %s closed", s.Id)
- manager.RemoveSession(s.Id)
- }()
- s.Conn.SetReadLimit(maxMessageSize)
- _ = s.Conn.SetReadDeadline(time.Now().Add(ReadTimeout))
- for {
- _, message, err := s.Conn.ReadMessage()
- if err != nil {
- fmt.Printf("websocket 错误关闭 %s closed", err.Error())
- handleCloseError(err)
- return
- }
- // 更新活跃时间
- s.UpdateActivity()
- // 处理消息
- if err = manager.HandleMessage(s.UserId, s.Id, message); err != nil {
- //写应答
- _ = s.writeWithTimeout("<think></think>")
- _ = s.writeWithTimeout(err.Error())
- _ = s.writeWithTimeout("<EOF/>")
- }
- }
- }
- // UpdateActivity 跟新最近活跃时间
- func (s *Session) UpdateActivity() {
- s.mu.Lock()
- defer s.mu.Unlock()
- s.LastActive = time.Now()
- }
- func (s *Session) Close() {
- s.sessionOnce.Do(func() {
- // 控制关闭顺序
- close(s.CloseChan)
- close(s.MessageChan)
- s.forceClose()
- })
- }
- // 带超时的安全写入
- func (s *Session) writeWithTimeout(msg string) error {
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.Conn == nil {
- utils.FileLog.Error("写入消息失败,connection已关闭")
- return errors.New("connection closed")
- }
- // 设置写超时
- if err := s.Conn.SetWriteDeadline(time.Now().Add(writeWaitTimeout)); err != nil {
- return err
- }
- return s.Conn.WriteMessage(websocket.TextMessage, []byte(msg))
- }
- // writePump 处理写操作
- func (s *Session) writePump() {
- ticker := time.NewTicker(basePingInterval)
- defer func() {
- fmt.Printf("写继进程:session %s closed", s.Id)
- manager.RemoveSession(s.Id)
- ticker.Stop()
- }()
- for {
- select {
- case message, ok := <-s.MessageChan:
- if !ok {
- return
- }
- _ = s.writeWithTimeout(message)
- case <-ticker.C:
- _ = s.Latency.SendPing(s.Conn)
- ticker.Reset(s.Latency.lastLatency)
- case <-s.CloseChan:
- return
- }
- }
- }
- func handleCloseError(err error) {
- utils.FileLog.Error("websocket错误关闭 %s closed", err.Error())
- if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
- var wsErr *websocket.CloseError
- if !errors.As(err, &wsErr) {
- fmt.Printf("websocket未知错误 %s", err.Error())
- utils.FileLog.Error("未知错误 %s", err.Error())
- } else {
- switch wsErr.Code {
- case websocket.CloseNormalClosure:
- fmt.Println("websocket正常关闭连接")
- utils.FileLog.Info("正常关闭连接")
- default:
- fmt.Printf("websocket关闭代码 %d:%s", wsErr.Code, wsErr.Text)
- utils.FileLog.Error(":%d:%s", wsErr.Code, wsErr.Text)
- }
- }
- }
- }
- // 强制关闭连接
- func (s *Session) forceClose() {
- // 添加互斥锁保护
- s.mu.Lock()
- defer s.mu.Unlock()
- // 发送关闭帧
- _ = s.Conn.WriteControl(websocket.CloseMessage,
- websocket.FormatCloseMessage(websocket.ClosePolicyViolation, "heartbeat failed"),
- time.Now().Add(writeWaitTimeout))
- _ = s.Conn.Close()
- s.Conn = nil // 标记连接已关闭
- utils.FileLog.Info("连接已强制关闭",
- "user", s.UserId,
- "session", s.Id)
- }
- func NewSession(userId int, sessionId string, conn *websocket.Conn) (session *Session) {
- session = &Session{
- UserId: userId,
- Id: sessionId,
- Conn: conn,
- LastActive: time.Now(),
- CloseChan: make(chan struct{}),
- MessageChan: make(chan string, 10),
- }
- session.Latency = SetupLatencyMeasurement(conn)
- go session.readPump()
- go session.writePump()
- return
- }
|