package es import ( "bytes" "context" "encoding/json" "errors" "eta/eta_mini_ht_api/common/component/config" logger "eta/eta_mini_ht_api/common/component/log" "eta/eta_mini_ht_api/common/contants" "fmt" "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esapi" "io" "strconv" "strings" "sync" ) type ESBase interface { GetId() string } var ( esOnce sync.Once esClient *ESClient ) type ESClient struct { esOp *elasticsearch.Client } type SearchType string const ( MatchAll = "match_all" Match = "match" CountWithDocIds = "count_with_doc_ids" Range = "range" MatchAllByCondition = "match_all_by_condition" RangeByCondition = "range_by_condition" RangeByConditionWithDocIds = "range_by_condition_with_doc_ids" RangeByConditionWithDocIdsNoLimit = "range_by_condition_with_doc_ids_no_limit" RangeByConditionWithDocIdsNoLimitByScore = "range_by_condition_with_doc_ids_no_limit_by_score" RangeWithDocIds = "range_with_doc_ids" LimitByScore = "limit_by_score" HomeSearch = "home_search" ) func GetInstance() *ESClient { esOnce.Do(func() { // 检查是否成功获取到RedisConfig实例,没有配置则不进行redis初始化 if esConf, ok := config.GetConfig(contants.ES).(*config.ESConfig); ok { logger.Info("初始化es") // 这里可以添加初始化Redis的逻辑 esClient = newEs(esConf) logger.Info("es地址:%v", esConf.GetUrl()) } }) return esClient } func (es *ESClient) es() *elasticsearch.Client { return es.esOp } func newEs(config *config.ESConfig) *ESClient { elasticsearch.NewDefaultClient() client, err := elasticsearch.NewClient( elasticsearch.Config{ Addresses: []string{config.GetUrl()}, // A list of Elasticsearch nodes to use. Username: config.GetUserName(), Password: config.GetPassword(), // Password for HTTP Basic Authentication. }, ) if err != nil { logger.Error("连接ES失败:%v", err) panic("启动es失败") } return &ESClient{esOp: client} } func init() { if GetInstance() == nil { panic("初始化es失败") } logger.Info("es初始化成功") } // BulkInsert 批量创建文档 func (es *ESClient) BulkInsert(indexName string, docs []ESBase) (err error) { // 创建批量请求 bulkBody := new(bytes.Buffer) for _, doc := range docs { enc := json.NewEncoder(bulkBody) if err = enc.Encode(map[string]interface{}{ "index": map[string]interface{}{ "_index": indexName, "_id": doc.GetId(), }, }); err != nil { logger.Error("生成es批处理请求参数失败: %s", err) } if err = enc.Encode(doc); err != nil { logger.Error("生成es批处理文档失败: %s", err) } } bulkReq := esapi.BulkRequest{ Body: bytes.NewReader(bulkBody.Bytes()), Refresh: "true", } res, err := bulkReq.Do(context.Background(), es.esOp) if err != nil { logger.Error("es批处理创建失败: %s", err) } defer res.Body.Close() if res.IsError() { var e map[string]interface{} if err = json.NewDecoder(res.Body).Decode(&e); err != nil { logger.Error("解析es应答失败: %v", err) } else { logger.Error("es请求失败: %s: %v\n", res.Status(), err) } } return } type ESResponse struct { Took int `json:"took"` Count int `json:"count"` TimedOut bool `json:"timedOut"` Hits Hits `json:"hits"` _Shards ShardsInfo `json:"_shards"` } type Hits struct { Total TotalHits `json:"total"` MaxScore float64 `json:"maxScore"` Hits []Hit `json:"hits"` } type TotalHits struct { Value int `json:"value"` Relation string `json:"relation"` } type Hit struct { Index string `json:"_index"` Type string `json:"_type"` ID string `json:"_id"` Score float64 `json:"_score"` Source json.RawMessage `json:"_source"` Highlight json.RawMessage `json:"highlight"` } type Doc struct { Index string `json:"_index"` Type string `json:"_type"` ID string `json:"_id"` Version float64 `json:"_version"` SeqNo float64 `json:"_seq_no"` PrimaryTerm float64 `json:"_primary_term"` Found bool `json:"found"` Source json.RawMessage `json:"_source"` } type ShardsInfo struct { Total int `json:"total"` Successful int `json:"successful"` Skipped int `json:"skipped"` Failed int `json:"failed"` } type ESQueryRequest struct { IndexName string From int Size int Key string Column string Condition string ConditionValue string Sorts []string Type SearchType RangeColumn string DocIds []string Max interface{} Min interface{} MinScore float64 } func (req *ESQueryRequest) CreateESQueryRequest(index string, column string, key string, from int, size int, sorts []string, searchType SearchType) *ESQueryRequest { return &ESQueryRequest{ IndexName: index, Type: searchType, From: from, Size: size, Key: key, Column: column, Sorts: sorts, } } func (req *ESQueryRequest) Limit(limit int) *ESQueryRequest { req.Size = limit return req } func (req *ESQueryRequest) WithScore(score float64) *ESQueryRequest { req.MinScore = score return req } func (req *ESQueryRequest) Range(from int64, to int64, column string) *ESQueryRequest { req.RangeColumn = column req.Max = to req.Min = from return req } func (req *ESQueryRequest) Before(max interface{}, column string) *ESQueryRequest { req.RangeColumn = column req.Max = max return req } func (req *ESQueryRequest) ByCondition(column string, value string) *ESQueryRequest { req.Condition = column req.ConditionValue = value return req } func (req *ESQueryRequest) WithDocs(docIds []string) *ESQueryRequest { req.DocIds = docIds return req } func (req *ESQueryRequest) parseJsonQuery() (queryMap map[string]interface{}) { switch req.Type { case MatchAll: queryMap = map[string]interface{}{ "query": map[string]interface{}{ "match": map[string]interface{}{ req.Column: req.Key, }, }, } return case MatchAllByCondition: queryMap = map[string]interface{}{ "query": map[string]interface{}{ "bool": map[string]interface{}{ "must": []map[string]interface{}{ { "match": map[string]interface{}{ req.Column: req.Key, }, }, { "term": map[string]interface{}{ req.Condition: req.ConditionValue, }, }, }, }, }, } return case Match: queryMap = map[string]interface{}{ "query": map[string]interface{}{ "match": map[string]interface{}{ req.Column: req.Key, }, }, "highlight": map[string]interface{}{ "fields": map[string]interface{}{ req.Column: map[string]interface{}{}, }, "pre_tags": []string{""}, "post_tags": []string{""}, }, } return case CountWithDocIds: queryMap = map[string]interface{}{ "query": map[string]interface{}{ "bool": map[string]interface{}{ "must": []map[string]interface{}{ { "match": map[string]interface{}{ req.Column: req.Key, }, }, }, "filter": map[string]interface{}{ "terms": map[string]interface{}{ "_id": req.DocIds, }, }, }, }, } return case Range: queryMap = map[string]interface{}{ "query": map[string]interface{}{ "match": map[string]interface{}{ req.Column: req.Key, }, }, "highlight": map[string]interface{}{ "fields": map[string]interface{}{ req.Column: map[string]interface{}{}, }, "pre_tags": []string{""}, "post_tags": []string{""}, }, "post_filter": map[string]interface{}{ "range": map[string]interface{}{ req.RangeColumn: map[string]interface{}{ "gte": req.Min, "lte": req.Max, }, }, }, } return case RangeWithDocIds: queryMap = map[string]interface{}{ "query": map[string]interface{}{ "match": map[string]interface{}{ req.Column: req.Key, }, }, "highlight": map[string]interface{}{ "fields": map[string]interface{}{ req.Column: map[string]interface{}{}, }, "pre_tags": []string{""}, "post_tags": []string{""}, }, "post_filter": map[string]interface{}{ "bool": map[string]interface{}{ "must": []map[string]interface{}{ { "range": map[string]interface{}{ req.RangeColumn: map[string]interface{}{ "gte": req.Min, "lte": req.Max, }, }, }, { "terms": map[string]interface{}{ "_id": req.DocIds, }, }, }, }, }, } return case RangeByCondition: queryMap = map[string]interface{}{ "query": map[string]interface{}{ "match": map[string]interface{}{ req.Column: req.Key, }, }, "highlight": map[string]interface{}{ "fields": map[string]interface{}{ req.Column: map[string]interface{}{}, }, "pre_tags": []string{""}, "post_tags": []string{""}, }, "post_filter": map[string]interface{}{ "bool": map[string]interface{}{ "must": []map[string]interface{}{ { "range": map[string]interface{}{ req.RangeColumn: map[string]interface{}{ "gte": req.Min, "lte": req.Max, }, }, }, { "term": map[string]interface{}{ req.Condition: req.ConditionValue, }, }, }, }, }, } return case RangeByConditionWithDocIds: queryMap = map[string]interface{}{ "query": map[string]interface{}{ "match": map[string]interface{}{ req.Column: req.Key, }, }, "highlight": map[string]interface{}{ "fields": map[string]interface{}{ req.Column: map[string]interface{}{}, }, "pre_tags": []string{""}, "post_tags": []string{""}, }, "post_filter": map[string]interface{}{ "bool": map[string]interface{}{ "must": []map[string]interface{}{ { "range": map[string]interface{}{ req.RangeColumn: map[string]interface{}{ "gte": req.Min, "lte": req.Max, }, }, }, { "term": map[string]interface{}{ req.Condition: req.ConditionValue, }, }, { "terms": map[string]interface{}{ "_id": req.DocIds, }, }, }, }, }, } return case RangeByConditionWithDocIdsNoLimit: queryMap = map[string]interface{}{ "query": map[string]interface{}{ "match": map[string]interface{}{ req.Column: req.Key, }, }, "highlight": map[string]interface{}{ "fields": map[string]interface{}{ req.Column: map[string]interface{}{}, }, "pre_tags": []string{""}, "post_tags": []string{""}, }, "post_filter": map[string]interface{}{ "bool": map[string]interface{}{ "must": []map[string]interface{}{ { "terms": map[string]interface{}{ "_id": req.DocIds, }, }, }, }, }, } return case RangeByConditionWithDocIdsNoLimitByScore: queryMap = map[string]interface{}{ "query": map[string]interface{}{ "match": map[string]interface{}{ req.Column: req.Key, }, }, "highlight": map[string]interface{}{ "fields": map[string]interface{}{ req.Column: map[string]interface{}{}, }, "pre_tags": []string{""}, "post_tags": []string{""}, }, "post_filter": map[string]interface{}{ "bool": map[string]interface{}{ "must": []map[string]interface{}{ { "terms": map[string]interface{}{ "_id": req.DocIds, }, }, }, }, }, "min_score": req.MinScore, } return case LimitByScore: queryMap = map[string]interface{}{ "query": map[string]interface{}{ "match": map[string]interface{}{ req.Column: req.Key, }, }, "highlight": map[string]interface{}{ "fields": map[string]interface{}{ req.Column: map[string]interface{}{}, }, "pre_tags": []string{""}, "post_tags": []string{""}, }, "post_filter": map[string]interface{}{ "bool": map[string]interface{}{ "must": []map[string]interface{}{ { "terms": map[string]interface{}{ "_id": req.DocIds, }, }, }, }, }, "min_score": req.MinScore, } return case HomeSearch: queryMap = map[string]interface{}{ "query": map[string]interface{}{ "bool": map[string]interface{}{ "should": []map[string]interface{}{ { "bool": map[string]interface{}{ "must": []map[string]interface{}{ { "match": map[string]interface{}{ "title": req.Key, }, }, { "term": map[string]interface{}{ "status": "PUBLISH", }, }, { "range": map[string]interface{}{ "publishedTime": map[string]interface{}{ "lte": req.Max, }, }, }, }, }, }, { "bool": map[string]interface{}{ "must": []map[string]interface{}{ { "match": map[string]interface{}{ "mediaName": req.Key, }, }, { "range": map[string]interface{}{ req.RangeColumn: map[string]interface{}{ "lte": req.Max, }, }, }, }, }, }, }, }, }, "highlight": map[string]interface{}{ "fields": map[string]interface{}{ "title": map[string]interface{}{}, "mediaName": map[string]interface{}{}, }, "pre_tags": []string{""}, "post_tags": []string{""}, }, } return default: queryMap = map[string]interface{}{} return } } // /* // * // // 搜索 // // indexName 访问索引名 // query 搜索条件 // from 开始搜索位置 // size 搜索条数 // sort 排序 // */ func (es *ESClient) Count(params *ESQueryRequest) (response ESResponse, err error) { queryMap := params.parseJsonQuery() jsonQuery, _ := json.Marshal(queryMap) logger.Info("查询语句: %s", string(jsonQuery)) request := esapi.CountRequest{ Index: []string{params.IndexName}, Body: strings.NewReader(string(jsonQuery)), } res, err := request.Do(context.Background(), esClient.esOp) defer res.Body.Close() if err != nil { logger.Error("es查询失败: %s", err) } if res.IsError() { var e map[string]interface{} if err = json.NewDecoder(res.Body).Decode(&e); err != nil { logger.Error("解析es应答失败: %v", err) } else { logger.Error("es请求失败: %s: %v\n", res.Status(), e) } } body, err := io.ReadAll(res.Body) if err != nil { logger.Error("获取es应答失败: %v", err) } return parseESResponse(body) } func (es *ESClient) Search(params *ESQueryRequest) (response ESResponse, err error) { queryMap := params.parseJsonQuery() jsonQuery, _ := json.Marshal(queryMap) logger.Info("查询语句: %s", string(jsonQuery)) indexes := strings.Split(params.IndexName, ",") request := esapi.SearchRequest{ Index: indexes, Body: strings.NewReader(string(jsonQuery)), From: ¶ms.From, Size: ¶ms.Size, Sort: params.Sorts, } res, err := request.Do(context.Background(), esClient.esOp) defer res.Body.Close() if err != nil { logger.Error("es查询失败: %s", err) } if res.IsError() { var e map[string]interface{} if err = json.NewDecoder(res.Body).Decode(&e); err != nil { logger.Error("解析es应答失败: %v", err) } else { logger.Error("es请求失败: %s: %v\n", res.Status(), e) } } body, err := io.ReadAll(res.Body) if err != nil { logger.Error("获取es应答失败: %v", err) } return parseESResponse(body) } func parseESResponse(body []byte) (ESResponse, error) { var response ESResponse if err := json.Unmarshal(body, &response); err != nil { return ESResponse{}, err } for _, hit := range response.Hits.Hits { var source map[string]interface{} if err := json.Unmarshal(hit.Source, &source); err != nil { return ESResponse{}, err } } return response, nil } func (es *ESClient) GetSource(hits Hits) []Hit { return hits.Hits } func (es *ESClient) GetCount(hits Hits) []Hit { return hits.Hits } // /* // * // 添加es // indexName 索引名 // id es的id // body es的值 // */ func (es *ESClient) Update(indexName string, id int, doc interface{}) bool { jsonUpdate := map[string]interface{}{ "doc": doc, } jsonDoc, _ := json.Marshal(jsonUpdate) logger.Info("查询语句: %s", string(jsonDoc)) req := esapi.UpdateRequest{ Index: indexName, DocumentID: strconv.Itoa(id), Body: strings.NewReader(string(jsonDoc)), Refresh: "true", } res, err := req.Do(context.Background(), es.es()) defer res.Body.Close() if err != nil { logger.Error("es查询失败: %s", err) } if res.IsError() { var e map[string]interface{} if err = json.NewDecoder(res.Body).Decode(&e); err != nil { logger.Error("解析es应答失败: %v", err) } else { logger.Error("es请求失败: %s: %v\n", res.Status(), e) } } body, err := io.ReadAll(res.Body) if err != nil { logger.Error("获取es应答失败: %v", err) } fmt.Printf("%s\n", string(body)) return true } func (es *ESClient) InsertOrUpdate(indexName string, id int, doc interface{}) (success bool) { if exist, existErr := es.Exist(indexName, id); existErr == nil && exist { return es.Update(indexName, id, doc) } else { return es.CreateDocument(indexName, id, doc) } } // Delete * // 删除 // indexName 索引名 // id es的id // */ func (es *ESClient) Delete(indexName string, query interface{}) bool { jsonQuery, _ := json.Marshal(query) refresh := new(bool) *refresh = true logger.Info("查询语句: %s", string(jsonQuery)) req := esapi.DeleteByQueryRequest{ Index: []string{indexName}, Body: strings.NewReader(string(jsonQuery)), Refresh: refresh, } res, err := req.Do(context.Background(), es.es()) defer res.Body.Close() if err != nil { logger.Error("es查询失败: %s", err) } if res.IsError() { var e map[string]interface{} if err = json.NewDecoder(res.Body).Decode(&e); err != nil { logger.Error("解析es应答失败: %v", err) } else { logger.Error("es请求失败: %s: %v\n", res.Status(), e) } } body, err := io.ReadAll(res.Body) if err != nil { logger.Error("获取es应答失败: %v", err) } fmt.Printf("%s\n", string(body)) return true } func (es *ESClient) Exist(indexName string, docId int) (exist bool, err error) { getRequest := esapi.GetRequest{ Index: indexName, DocumentID: strconv.Itoa(docId), } // 执行请求 res, err := getRequest.Do(context.Background(), es.es()) if err != nil { logger.Error("es获取文档是否存在失败: %v", err) return } defer res.Body.Close() // 检查文档是否存在 if res.IsError() { // 如果文档不存在,通常返回 404 Not Found if res.StatusCode == 404 { logger.Info("文档不存在.") err = errors.New("ES文档不存在") return } else { // 其他错误 var e map[string]interface{} if err = json.NewDecoder(res.Body).Decode(&e); err != nil { logger.Error("解析es应答失败: %v", err) return } else { // Print the response status and error information. logger.Error("[%s] %s: %s\n", res.Status(), e["error"].(map[string]interface{})["type"], e["error"].(map[string]interface{})["原因"]) err = errors.New("获取ES记录失败") return } } } else { // 如果文档存在 logger.Info("doc存在") return true, nil } } func (es *ESClient) Get(indexName string, docId int) (doc Doc, err error) { getRequest := esapi.GetRequest{ Index: indexName, DocumentID: strconv.Itoa(docId), } // 执行请求 res, err := getRequest.Do(context.Background(), es.es()) if err != nil { logger.Error("es获取文档是否存在失败: %v", err) return } defer res.Body.Close() // 检查文档是否存在 if res.IsError() { // 如果文档不存在,通常返回 404 Not Found if res.StatusCode == 404 { logger.Info("文档不存在.") err = errors.New("ES文档不存在") return } else { // 其他错误 var e map[string]interface{} if err = json.NewDecoder(res.Body).Decode(&e); err != nil { logger.Error("解析es应答失败: %v", err) return } else { // Print the response status and error information. logger.Error("[%s] %s: %s\n", res.Status(), e["error"].(map[string]interface{})["type"], e["error"].(map[string]interface{})["原因"]) err = errors.New("获取ES记录失败") return } } } else { // 如果文档存在 body, readErr := io.ReadAll(res.Body) if readErr != nil { logger.Error("获取es应答失败: %v", err) err = readErr return } err = json.Unmarshal(body, &doc) if err != nil { logger.Error("反序列化es应答失败: %v", err) return } return } } // //func CreateIndex(indexName string) error { // resp, err := esClient.es().Indices. // Create(indexName). // Do(context.Background()) // if err != nil { // logger.Error("创建ES索引失败:%v", err) // return err // } // fmt.Printf("index:%#v\n", resp.Index) // return nil //} // DeleteIndex 删除索引 // // func DeleteIndex(indexName string) error { // _, err := esClient.es().Indices. // 表明是对索引的操作,而Index则表示是要操作具体索引下的文档 // Delete(indexName). // Do(context.Background()) // if err != nil { // fmt.Printf("delete index failed,err:%v\n", err) // return err // } // fmt.Printf("delete index successed,indexName:%s", indexName) // return nil // } // // CreateDocument 创建文档 func (es *ESClient) CreateDocument(indexName string, id int, doc interface{}) (success bool) { jsonDoc, _ := json.Marshal(doc) logger.Info("查询语句: %s", string(jsonDoc)) // 添加文档 indexRequest := esapi.IndexRequest{ Index: indexName, DocumentID: strconv.Itoa(id), Body: strings.NewReader(string(jsonDoc)), Refresh: "true", } // 执行请求 res, err := indexRequest.Do(context.Background(), es.es()) if err != nil { logger.Error("ES创建文档失败: %s", err) return false } defer res.Body.Close() // 检查文档是否成功创建 if res.IsError() { var e map[string]interface{} if err := json.NewDecoder(res.Body).Decode(&e); err != nil { logger.Error("解析ES应答失败: %s", err) } else { // Print the response status and error information. logger.Error("[%s] %s: %s\n", res.Status(), e["error"].(map[string]interface{})["类型"], e["错误"].(map[string]interface{})["原因"]) } return false } else { // 如果文档成功创建 logger.Info("创建文档成功") return true } }