Browse Source

feat(wechat): 实现微信文章加缓存异步处理

- 新增微信文章操作缓存队列和处理逻辑
- 重构批量添加微信文章函数,支持指定数量
- 在控制器中添加微信文章操作到缓存
- 新增 HandleSearchByWechatOp函数处理缓存中的微信文章操作
Roc 1 month ago
parent
commit
bea587b077

+ 28 - 0
cache/wechat_platform.go

@@ -0,0 +1,28 @@
+package cache
+
+import (
+	"eta/eta_api/services/llm"
+	"eta/eta_api/utils"
+	"fmt"
+)
+
+// AddWechatArticleOpToCache
+// @Description: 将公众号文章操作加入缓存
+// @param wechatPlatformId
+// @param source
+// @return bool
+func AddWechatArticleOpToCache(wechatPlatformId int, source string) bool {
+	record := new(llm.WechatArticleOp)
+	record.Source = source
+	record.WechatPlatformId = wechatPlatformId
+	if utils.Re == nil {
+		err := utils.Rc.LPush(utils.CACHE_WECHAT_PLATFORM_ARTICLE, record)
+
+		utils.FileLog.Info(fmt.Sprintf("将公众号文章操作 加入缓存 AddWechatArticleOpToCache LPush: 操作类型:%s,公众号id:%d", source, wechatPlatformId))
+		if err != nil {
+			fmt.Println("AddWechatArticleOpToCache LPush Err:" + err.Error())
+		}
+		return true
+	}
+	return false
+}

+ 4 - 2
controllers/rag/wechat_platform_controller.go

@@ -2,13 +2,13 @@ package rag
 
 import (
 	"encoding/json"
+	"eta/eta_api/cache"
 	"eta/eta_api/controllers"
 	"eta/eta_api/models"
 	"eta/eta_api/models/rag"
 	"eta/eta_api/models/rag/request"
 	"eta/eta_api/models/rag/response"
 	"eta/eta_api/models/system"
-	"eta/eta_api/services/llm"
 	"eta/eta_api/utils"
 	"fmt"
 	"github.com/rdlucklib/rdluck_tools/paging"
@@ -156,7 +156,9 @@ func (c *WechatPlatformController) Add() {
 	}
 
 	// 异步新增公众号
-	go llm.AddWechatPlatform(item)
+	//go llm.AddWechatPlatform(item)
+
+	go cache.AddWechatArticleOpToCache(item.WechatPlatformId, `add`)
 
 	br.Ret = 200
 	br.Success = true

+ 19 - 9
services/llm/wechat_platform.go

@@ -11,6 +11,14 @@ import (
 
 // TODO 改成走队列,避免并发
 
+type WechatArticleOp struct {
+	Source           string
+	WechatPlatformId int
+}
+
+// AddWechatPlatform
+// @Description: 添加新的公众号
+// @param item
 func AddWechatPlatform(item *rag.WechatPlatform) {
 	var err error
 	defer func() {
@@ -75,7 +83,7 @@ func AddWechatPlatform(item *rag.WechatPlatform) {
 	// 把刚搜索的文章加入到指标库
 	AddWechatArticle(item, articleLink, articleDetail, nil)
 
-	BeachAddWechatPlatform(item)
+	BeachAddWechatArticle(item, 10)
 	fmt.Println("公众号入库完成")
 
 	return
@@ -152,13 +160,13 @@ func AddWechatArticle(item *rag.WechatPlatform, articleLink string, articleDetai
 	err = obj.Create()
 }
 
-// BeachAddWechatPlatform
+// BeachAddWechatArticle
 // @Description: 批量添加公众号文章
-// @author: Roc
-// @datetime 2025-03-05 15:05:07
-// @param item *rag.WechatPlatform
-// @return err error
-func BeachAddWechatPlatform(item *rag.WechatPlatform) (err error) {
+// @param item
+// @param num
+// @return err
+func BeachAddWechatArticle(item *rag.WechatPlatform, num int) {
+	var err error
 	defer func() {
 		fmt.Println("公众号文章批量入库完成")
 		if err != nil {
@@ -170,8 +178,6 @@ func BeachAddWechatPlatform(item *rag.WechatPlatform) (err error) {
 		return
 	}
 
-	num := 10
-
 	wechatArticleObj := new(rag.WechatArticle)
 
 	// 获取公众号的文章列表
@@ -186,6 +192,10 @@ func BeachAddWechatPlatform(item *rag.WechatPlatform) (err error) {
 			// 文章已经入库了,不需要重复入库
 			continue
 		}
+		if !utils.IsErrNoRow(err) {
+			return
+		}
+		err = nil
 
 		articleDetail, tmpErr := SearchByWechatArticle(articleMenu.Link)
 		if tmpErr != nil {

+ 35 - 0
services/task.go

@@ -1,7 +1,9 @@
 package services
 
 import (
+	"encoding/json"
 	"eta/eta_api/models"
+	"eta/eta_api/models/rag"
 	"eta/eta_api/services/binlog"
 	"eta/eta_api/services/data"
 	edbmonitor "eta/eta_api/services/edb_monitor"
@@ -571,3 +573,36 @@ func ModifyEsEnglishReport() {
 //
 //	return rnd.Float64()*11000 - 1000
 //}
+
+// HandleSearchByWechatOp
+// @Description: 处理微信爬虫
+func HandleWechatArticleOp() {
+	defer func() {
+		if err := recover(); err != nil {
+			fmt.Println("[ImportManualDataRefresh]", err)
+		}
+	}()
+	obj := rag.WechatPlatform{}
+	for {
+		utils.Rc.Brpop(utils.CACHE_WECHAT_PLATFORM_ARTICLE, func(b []byte) {
+			wechatArticleOp := new(llm.WechatArticleOp)
+			if err := json.Unmarshal(b, &wechatArticleOp); err != nil {
+				fmt.Println("json unmarshal wrong!")
+				return
+			}
+			item, tmpErr := obj.GetByID(wechatArticleOp.WechatPlatformId)
+			if tmpErr != nil {
+				// 找不到就处理失败
+				return
+			}
+
+			switch wechatArticleOp.Source {
+			case `add`:
+				llm.AddWechatPlatform(item)
+			case `refresh`:
+				llm.BeachAddWechatArticle(item, 2)
+
+			}
+		})
+	}
+}

+ 2 - 1
utils/constants.go

@@ -260,7 +260,8 @@ const (
 
 	CACHE_DATA_SOURCE_ES_HANDLE = "eta:data_source_es:handle" // 数据源es处理队列
 
-	CACHE_EXCEL_REFRESH = "CACHE_EXCEL_REFRESH" // 表格刷新
+	CACHE_EXCEL_REFRESH           = "CACHE_EXCEL_REFRESH"        // 表格刷新
+	CACHE_WECHAT_PLATFORM_ARTICLE = "wechat_platform:article:op" //微信文章处理
 )
 
 // 模板消息推送类型