daf_service.go 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package llm
  2. import (
  3. "eta/eta_api/models/llm/dfa"
  4. "eta/eta_api/models/rag"
  5. "eta/eta_api/utils"
  6. "eta/eta_api/utils/llm"
  7. "fmt"
  8. "time"
  9. )
  10. const (
  11. fetchSize = 500
  12. ETA_ARTICLE_TABLE = "rag_eta_report"
  13. WECHAT_ARTICLE_TABLE = "wechat_article"
  14. )
  15. var (
  16. DAFHandler = llm.GetDAFHandlerInstance()
  17. )
  18. func DealHistoryArticleDafTags() {
  19. utils.FileLog.Info("自动处理历史文章算法标签任务开始")
  20. //获取还未处理的标签的文章
  21. ArticleParts, err := getDealDafTagsArticleList()
  22. if err != nil {
  23. utils.FileLog.Error("自动处理历史文章算法标签任务错误退出:err", err.Error())
  24. return
  25. }
  26. go func() {
  27. etaErr := DAFTagDeal(dfa.ArticleSourceEta, ArticleParts[dfa.ArticleSourceEta])
  28. if etaErr != nil {
  29. }
  30. }()
  31. go func() {
  32. wechatErr := DAFTagDeal(dfa.ArticleSourceWechat, ArticleParts[dfa.ArticleSourceWechat])
  33. if wechatErr != nil {
  34. }
  35. }()
  36. utils.FileLog.Info("自动处理历史文章算法标签任务结束")
  37. }
  38. func getDealDafTagsArticleList() (articleIdsMap map[dfa.ArticleSource][]int, err error) {
  39. list, err := dfa.GetArticleDafTagMappingList()
  40. if err != nil {
  41. utils.FileLog.Error("获取DAF算法标签文章列表失败,err:", err)
  42. err = fmt.Errorf("获取DAF算法标签文章列表失败,err:%v", err)
  43. return
  44. }
  45. articleIdsMap = make(map[dfa.ArticleSource][]int)
  46. for _, item := range list {
  47. if _, ok := articleIdsMap[item.Source]; !ok {
  48. articleIdsMap[item.Source] = make([]int, 0)
  49. }
  50. articleIdsMap[item.Source] = append(articleIdsMap[item.Source], item.ArticleID)
  51. }
  52. return
  53. }
  54. func DAFTagDeal(source dfa.ArticleSource, excludeArticleIds []int) (err error) {
  55. switch source {
  56. case dfa.ArticleSourceWechat:
  57. return wechatDeal(excludeArticleIds)
  58. case dfa.ArticleSourceEta:
  59. return
  60. default:
  61. utils.FileLog.Warn("位置的文章来源,无法进行DAF标签处理,自动退出")
  62. return
  63. }
  64. }
  65. func wechatDeal(excludeArticleIds []int) (err error) {
  66. var id = 0
  67. for {
  68. var articleList []*rag.DafWechatArticleItem
  69. var dafTagList []*dfa.ArticleDfaTagMapping
  70. articleList, id, err = rag.FetchArticleDafTagMappingList(id, excludeArticleIds, fetchSize)
  71. if err != nil {
  72. utils.FileLog.Error("获取微信文章列表失败,err:", err)
  73. return
  74. }
  75. if len(articleList) == 0 {
  76. break
  77. }
  78. for _, article := range articleList {
  79. dafTagList = append(dafTagList, &dfa.ArticleDfaTagMapping{
  80. ArticleID: article.WechatArticleId,
  81. Source: dfa.ArticleSourceWechat,
  82. ArticleTitle: article.Title,
  83. ArticleContent: article.TextContent,
  84. TagName: "微信文章",
  85. CreatedTime: time.Now(),
  86. })
  87. }
  88. DAFHandler.BatchSubmitTasks(dafTagList)
  89. }
  90. return
  91. }