Browse Source

合并冲突

kobe6258 1 month ago
parent
commit
3bca14f332

+ 28 - 0
cache/wechat_platform.go

@@ -0,0 +1,28 @@
+package cache
+
+import (
+	"eta/eta_api/services/llm"
+	"eta/eta_api/utils"
+	"fmt"
+)
+
+// AddWechatArticleOpToCache
+// @Description: 将公众号文章操作加入缓存
+// @param wechatPlatformId
+// @param source
+// @return bool
+func AddWechatArticleOpToCache(wechatPlatformId int, source string) bool {
+	record := new(llm.WechatArticleOp)
+	record.Source = source
+	record.WechatPlatformId = wechatPlatformId
+	if utils.Re == nil {
+		err := utils.Rc.LPush(utils.CACHE_WECHAT_PLATFORM_ARTICLE, record)
+
+		utils.FileLog.Info(fmt.Sprintf("将公众号文章操作 加入缓存 AddWechatArticleOpToCache LPush: 操作类型:%s,公众号id:%d", source, wechatPlatformId))
+		if err != nil {
+			fmt.Println("AddWechatArticleOpToCache LPush Err:" + err.Error())
+		}
+		return true
+	}
+	return false
+}

+ 4 - 2
controllers/rag/wechat_platform_controller.go

@@ -2,13 +2,13 @@ package rag
 
 import (
 	"encoding/json"
+	"eta/eta_api/cache"
 	"eta/eta_api/controllers"
 	"eta/eta_api/models"
 	"eta/eta_api/models/rag"
 	"eta/eta_api/models/rag/request"
 	"eta/eta_api/models/rag/response"
 	"eta/eta_api/models/system"
-	"eta/eta_api/services/llm"
 	"eta/eta_api/utils"
 	"fmt"
 	"github.com/rdlucklib/rdluck_tools/paging"
@@ -156,7 +156,9 @@ func (c *WechatPlatformController) Add() {
 	}
 
 	// 异步新增公众号
-	go llm.AddWechatPlatform(item)
+	//go llm.AddWechatPlatform(item)
+
+	go cache.AddWechatArticleOpToCache(item.WechatPlatformId, `add`)
 
 	br.Ret = 200
 	br.Success = true

+ 19 - 9
services/llm/wechat_platform.go

@@ -11,6 +11,14 @@ import (
 
 // TODO 改成走队列,避免并发
 
+type WechatArticleOp struct {
+	Source           string
+	WechatPlatformId int
+}
+
+// AddWechatPlatform
+// @Description: 添加新的公众号
+// @param item
 func AddWechatPlatform(item *rag.WechatPlatform) {
 	var err error
 	defer func() {
@@ -75,7 +83,7 @@ func AddWechatPlatform(item *rag.WechatPlatform) {
 	// 把刚搜索的文章加入到指标库
 	AddWechatArticle(item, articleLink, articleDetail, nil)
 
-	BeachAddWechatPlatform(item)
+	BeachAddWechatArticle(item, 10)
 	fmt.Println("公众号入库完成")
 
 	return
@@ -152,13 +160,13 @@ func AddWechatArticle(item *rag.WechatPlatform, articleLink string, articleDetai
 	err = obj.Create()
 }
 
-// BeachAddWechatPlatform
+// BeachAddWechatArticle
 // @Description: 批量添加公众号文章
-// @author: Roc
-// @datetime 2025-03-05 15:05:07
-// @param item *rag.WechatPlatform
-// @return err error
-func BeachAddWechatPlatform(item *rag.WechatPlatform) (err error) {
+// @param item
+// @param num
+// @return err
+func BeachAddWechatArticle(item *rag.WechatPlatform, num int) {
+	var err error
 	defer func() {
 		fmt.Println("公众号文章批量入库完成")
 		if err != nil {
@@ -170,8 +178,6 @@ func BeachAddWechatPlatform(item *rag.WechatPlatform) (err error) {
 		return
 	}
 
-	num := 10
-
 	wechatArticleObj := new(rag.WechatArticle)
 
 	// 获取公众号的文章列表
@@ -186,6 +192,10 @@ func BeachAddWechatPlatform(item *rag.WechatPlatform) (err error) {
 			// 文章已经入库了,不需要重复入库
 			continue
 		}
+		if !utils.IsErrNoRow(err) {
+			return
+		}
+		err = nil
 
 		articleDetail, tmpErr := SearchByWechatArticle(articleMenu.Link)
 		if tmpErr != nil {

+ 39 - 0
services/task.go

@@ -1,7 +1,9 @@
 package services
 
 import (
+	"encoding/json"
 	"eta/eta_api/models"
+	"eta/eta_api/models/rag"
 	"eta/eta_api/services/binlog"
 	"eta/eta_api/services/data"
 	edbmonitor "eta/eta_api/services/edb_monitor"
@@ -63,6 +65,10 @@ func Task() {
 	}
 	go StartSessionManager()
 	go llm.SaveAllChatRecordsToDB()
+
+	// 定时任务进行微信文章操作
+	go HandleWechatArticleOp()
+
 	// TODO:数据修复
 	//FixNewEs()
 	fmt.Println("task end")
@@ -571,3 +577,36 @@ func ModifyEsEnglishReport() {
 //
 //	return rnd.Float64()*11000 - 1000
 //}
+
+// HandleSearchByWechatOp
+// @Description: 处理微信爬虫
+func HandleWechatArticleOp() {
+	defer func() {
+		if err := recover(); err != nil {
+			fmt.Println("[ImportManualDataRefresh]", err)
+		}
+	}()
+	obj := rag.WechatPlatform{}
+	for {
+		utils.Rc.Brpop(utils.CACHE_WECHAT_PLATFORM_ARTICLE, func(b []byte) {
+			wechatArticleOp := new(llm.WechatArticleOp)
+			if err := json.Unmarshal(b, &wechatArticleOp); err != nil {
+				fmt.Println("json unmarshal wrong!")
+				return
+			}
+			item, tmpErr := obj.GetByID(wechatArticleOp.WechatPlatformId)
+			if tmpErr != nil {
+				// 找不到就处理失败
+				return
+			}
+
+			switch wechatArticleOp.Source {
+			case `add`:
+				llm.AddWechatPlatform(item)
+			case `refresh`:
+				llm.BeachAddWechatArticle(item, 2)
+
+			}
+		})
+	}
+}

+ 1 - 0
utils/constants.go

@@ -266,6 +266,7 @@ const (
 
 	CACHE_DATA_SOURCE_ES_HANDLE = "eta:data_source_es:handle" // 数据源es处理队列
 	CACHE_CHART_AUTH            = "chart:auth:"               //图表数据授权
+	CACHE_WECHAT_PLATFORM_ARTICLE = "wechat_platform:article:op" //微信文章处理
 )
 
 // 模板消息推送类型

+ 31 - 0
utils/llm/eta_llm/eta_llm_client.go

@@ -27,6 +27,7 @@ const (
 	DEFALUT_PROMPT_NAME            = "default"
 	CONTENT_TYPE_JSON              = "application/json"
 	KNOWLEDGE_BASE_CHAT_API        = "/chat/kb_chat"
+	DOCUMENT_CHAT_API        = "/chat/file_chat"
 	KNOWLEDGE_BASE_SEARCH_DOCS_API = "/knowledge_base/search_docs"
 )
 
@@ -34,6 +35,7 @@ type ETALLMClient struct {
 	*llm.LLMClient
 	LlmModel string
 }
+
 type LLMConfig struct {
 	LlmAddress string `json:"llm_server"`
 	LlmModel   string `json:"llm_model"`
@@ -62,6 +64,35 @@ func GetInstance() llm.LLMService {
 	return etaLlmClient
 }
 
+func (ds *ETALLMClient) DocumentChat(query string, KnowledgeId string, history []string, stream bool) (llmRes *http.Response, err error) {
+	ChatHistory := make([]eta_llm_http.HistoryContent, 0)
+	for _, historyItemStr := range history {
+		str := strings.Split(historyItemStr, "-")
+		historyItem := eta_llm_http.HistoryContent{
+			Role:    str[0],
+			Content: str[1],
+		}
+		ChatHistory = append(ChatHistory, historyItem)
+	}
+	kbReq := eta_llm_http.DocumentChatRequest{
+		Query:          query,
+		KnowledgeId:    KnowledgeId,
+		History:        ChatHistory,
+		TopK:           3,
+		ScoreThreshold: 0.5,
+		Stream:         stream,
+		ModelName:      ds.LlmModel,
+		Temperature:    0.7,
+		MaxTokens:      0,
+		PromptName:     DEFALUT_PROMPT_NAME,
+	}
+	fmt.Printf("%v", kbReq.History)
+	body, err := json.Marshal(kbReq)
+	if err != nil {
+		return
+	}
+	return ds.DoStreamPost(DOCUMENT_CHAT_API, body)
+}
 func (ds *ETALLMClient) KnowledgeBaseChat(query string, KnowledgeBaseName string, history []string) (llmRes *http.Response, err error) {
 	ChatHistory := make([]eta_llm_http.HistoryContent, 0)
 	for _, historyItemStr := range history {

+ 12 - 1
utils/llm/eta_llm/eta_llm_http/request.go

@@ -14,7 +14,18 @@ type KbChatRequest struct {
 	PromptName     string           `json:"prompt_name"`
 	ReturnDirect   bool             `json:"return_direct"`
 }
-
+type DocumentChatRequest struct {
+	Query          string           `json:"query"`
+	KnowledgeId    string           `json:"knowledge_id"`
+	TopK           int              `json:"top_k"`
+	ScoreThreshold float32          `json:"score_threshold"`
+	History        []HistoryContent `json:"history"`
+	Stream         bool             `json:"stream"`
+	ModelName      string           `json:"model_name"`
+	Temperature    float32          `json:"temperature"`
+	MaxTokens      int              `json:"max_tokens"`
+	PromptName     string           `json:"prompt_name"`
+}
 type HistoryContent struct {
 	Content string `json:"content"`
 	Role    string `json:"role"`

+ 3 - 0
utils/llm/llm_client.go

@@ -21,5 +21,8 @@ func NewLLMClient(baseURL string, timeout time.Duration) *LLMClient {
 
 type LLMService interface {
 	KnowledgeBaseChat(query string, KnowledgeBaseName string, history []string) (llmRes *http.Response, err error)
+	DocumentChat(query string, KnowledgeId string, history []string, stream bool) (llmRes *http.Response, err error)
 	SearchKbDocs(query string, KnowledgeBaseName string) (data interface{}, err error)
+
+
 }

+ 0 - 2
utils/lock/distrubtLock.go

@@ -34,7 +34,6 @@ func AcquireLock(key string, expiration int, Holder string) bool {
 	if result == 1 {
 		return true
 	}
-	fmt.Printf("加锁失败:")
 	return false
 }
 
@@ -55,6 +54,5 @@ func ReleaseLock(key string, holder string) bool {
 	if result == 1 {
 		return true
 	}
-	fmt.Printf("解锁失败:")
 	return false
 }