package eta_bridge import ( "context" "encoding/json" "eta_gn/eta_task/models" "eta_gn/eta_task/utils" "fmt" "html" "strconv" "strings" "github.com/PuerkitoBio/goquery" "github.com/olivere/elastic/v7" ) func NewClient() (client *elastic.Client, err error) { client, err = elastic.NewClient( elastic.SetURL(utils.ES_URL), elastic.SetBasicAuth(utils.ES_USERNAME, utils.ES_PASSWORD), elastic.SetSniff(false)) return } // EsAddOrEditKnowledgeResource 新增/修改es中的知识资源数据 func EsAddOrEditKnowledgeResource(item *models.KnowledgeResource) (err error) { defer func() { if err != nil { fmt.Println("EsAddOrEditData Err:", err.Error()) utils.FileLog.Info("EsAddOrEditKnowledgeResource err:", err) } }() client, err := NewClient() if err != nil { return } indexName := utils.EsKnowledgeResourceIndexName if item.IsFile == 0 { content := ExtractTextFromResourceContent(item.Content) contentRunes := []rune(content) if len(contentRunes) > 60 { item.Content = string(contentRunes[:60]) } else { item.Content = content } } request := client.Index().Index(indexName).Id(strconv.Itoa(item.KnowledgeResourceId)).BodyJson(item) response, err := request.Do(context.Background()) if err != nil { jsonBytes, _ := json.Marshal(item) utils.FileLog.Info("add json:%s,EsAddOrEditKnowledgeResource err:%s", string(jsonBytes), err.Error()) return } if response.Status == 0 { err = nil } else { fmt.Println("EsAddOrEditKnowledgeResource:", response.Status, response.Result) } return } func ExtractTextFromResourceContent(content string) (text string) { content = html.UnescapeString(content) doc, err := goquery.NewDocumentFromReader(strings.NewReader(content)) if err != nil { return } text = doc.Text() text = strings.ReplaceAll(text, "\n", "") return } // UpdateEsKnowledgeResource updates a document in the Elasticsearch knowledge resource index. // // Parameters: // - docId: The ID of the document to update. // - docFields: A map containing the fields to update in the document. // // Returns: // - err: An error object if an error occurred, otherwise nil. func UpdateEsKnowledgeResource(docId int, docFields map[string]interface{}) (err error) { defer func() { if err != nil { fmt.Println("EsAddOrEditData Err:", err.Error()) utils.FileLog.Info("EsAddOrEditKnowledgeResource err:", err) } }() client, err := NewClient() if err != nil { return } indexName := utils.EsKnowledgeResourceIndexName res, err := client.Update(). Index(indexName). Id(strconv.Itoa(docId)). Doc(docFields). Do(context.Background()) if err != nil { utils.FileLog.Info("update docId:%d,UpdateEsKnowledgeResource err:%s", docId, err.Error()) return } if res.Result == "updated" { err = nil } else { fmt.Println("UpdateEsKnowledgeResource:", res.Result) } return } func EsBatchAddOrEditKnowledgeResource(list []*models.KnowledgeResource) (err error) { if len(list) == 0 { return } defer func() { if err != nil { fmt.Println("EsBatchAddOrEditData Err:", err.Error()) utils.FileLog.Info("EsBatchAddOrEditKnowledgeResource err:", err) } }() indexName := utils.EsKnowledgeResourceIndexName client, err := NewClient() if err != nil { return } actions := make([]elastic.BulkableRequest, len(list)) for i, item := range list { request := elastic.NewBulkIndexRequest().Index(indexName).Id(strconv.Itoa(item.KnowledgeResourceId)).Doc(item) actions[i] = request } response, err := client.Bulk().Add(actions...).Do(context.Background()) if err != nil { jsonBytes, _ := json.Marshal(list) fmt.Println("add json:", string(jsonBytes)) fmt.Println("EsBatchAddOrEditKnowledgeResource err:", err) return } if response.Errors { err = fmt.Errorf("add knowledge resource to es failed, response result is %+v", response.Items) fmt.Println("EsBatchAddOrEditKnowledgeResource:", response.Errors) } else { fmt.Printf("EsBatchAddOrEditKnowledgeResource:%+v\n", response) } return }