es.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package eta_bridge
  2. import (
  3. "context"
  4. "encoding/json"
  5. "eta_gn/eta_task/models"
  6. "eta_gn/eta_task/utils"
  7. "fmt"
  8. "html"
  9. "strconv"
  10. "strings"
  11. "github.com/PuerkitoBio/goquery"
  12. "github.com/olivere/elastic/v7"
  13. )
  14. func NewClient() (client *elastic.Client, err error) {
  15. client, err = elastic.NewClient(
  16. elastic.SetURL(utils.ES_URL),
  17. elastic.SetBasicAuth(utils.ES_USERNAME, utils.ES_PASSWORD),
  18. elastic.SetSniff(false))
  19. return
  20. }
  21. // EsAddOrEditKnowledgeResource 新增/修改es中的知识资源数据
  22. func EsAddOrEditKnowledgeResource(item *models.KnowledgeResource) (err error) {
  23. defer func() {
  24. if err != nil {
  25. fmt.Println("EsAddOrEditData Err:", err.Error())
  26. utils.FileLog.Info("EsAddOrEditKnowledgeResource err:", err)
  27. }
  28. }()
  29. client, err := NewClient()
  30. if err != nil {
  31. return
  32. }
  33. indexName := utils.EsKnowledgeResourceIndexName
  34. if item.IsFile == 0 {
  35. content := ExtractTextFromResourceContent(item.Content)
  36. contentRunes := []rune(content)
  37. if len(contentRunes) > 60 {
  38. item.Content = string(contentRunes[:60])
  39. } else {
  40. item.Content = content
  41. }
  42. }
  43. request := client.Index().Index(indexName).Id(strconv.Itoa(item.KnowledgeResourceId)).BodyJson(item)
  44. response, err := request.Do(context.Background())
  45. if err != nil {
  46. jsonBytes, _ := json.Marshal(item)
  47. utils.FileLog.Info("add json:%s,EsAddOrEditKnowledgeResource err:%s", string(jsonBytes), err.Error())
  48. return
  49. }
  50. if response.Status == 0 {
  51. err = nil
  52. } else {
  53. fmt.Println("EsAddOrEditKnowledgeResource:", response.Status, response.Result)
  54. }
  55. return
  56. }
  57. func ExtractTextFromResourceContent(content string) (text string) {
  58. content = html.UnescapeString(content)
  59. doc, err := goquery.NewDocumentFromReader(strings.NewReader(content))
  60. if err != nil {
  61. return
  62. }
  63. text = doc.Text()
  64. text = strings.ReplaceAll(text, "\n", "")
  65. return
  66. }
  67. // UpdateEsKnowledgeResource updates a document in the Elasticsearch knowledge resource index.
  68. //
  69. // Parameters:
  70. // - docId: The ID of the document to update.
  71. // - docFields: A map containing the fields to update in the document.
  72. //
  73. // Returns:
  74. // - err: An error object if an error occurred, otherwise nil.
  75. func UpdateEsKnowledgeResource(docId int, docFields map[string]interface{}) (err error) {
  76. defer func() {
  77. if err != nil {
  78. fmt.Println("EsAddOrEditData Err:", err.Error())
  79. utils.FileLog.Info("EsAddOrEditKnowledgeResource err:", err)
  80. }
  81. }()
  82. client, err := NewClient()
  83. if err != nil {
  84. return
  85. }
  86. indexName := utils.EsKnowledgeResourceIndexName
  87. res, err := client.Update().
  88. Index(indexName).
  89. Id(strconv.Itoa(docId)).
  90. Doc(docFields).
  91. Do(context.Background())
  92. if err != nil {
  93. utils.FileLog.Info("update docId:%d,UpdateEsKnowledgeResource err:%s", docId, err.Error())
  94. return
  95. }
  96. if res.Result == "updated" {
  97. err = nil
  98. } else {
  99. fmt.Println("UpdateEsKnowledgeResource:", res.Result)
  100. }
  101. return
  102. }
  103. func EsBatchAddOrEditKnowledgeResource(list []*models.KnowledgeResource) (err error) {
  104. if len(list) == 0 {
  105. return
  106. }
  107. defer func() {
  108. if err != nil {
  109. fmt.Println("EsBatchAddOrEditData Err:", err.Error())
  110. utils.FileLog.Info("EsBatchAddOrEditKnowledgeResource err:", err)
  111. }
  112. }()
  113. indexName := utils.EsKnowledgeResourceIndexName
  114. client, err := NewClient()
  115. if err != nil {
  116. return
  117. }
  118. actions := make([]elastic.BulkableRequest, len(list))
  119. for i, item := range list {
  120. request := elastic.NewBulkIndexRequest().Index(indexName).Id(strconv.Itoa(item.KnowledgeResourceId)).Doc(item)
  121. actions[i] = request
  122. }
  123. response, err := client.Bulk().Add(actions...).Do(context.Background())
  124. if err != nil {
  125. jsonBytes, _ := json.Marshal(list)
  126. fmt.Println("add json:", string(jsonBytes))
  127. fmt.Println("EsBatchAddOrEditKnowledgeResource err:", err)
  128. return
  129. }
  130. if response.Errors {
  131. err = fmt.Errorf("add knowledge resource to es failed, response result is %+v", response.Items)
  132. fmt.Println("EsBatchAddOrEditKnowledgeResource:", response.Errors)
  133. } else {
  134. fmt.Printf("EsBatchAddOrEditKnowledgeResource:%+v\n", response)
  135. }
  136. return
  137. }