package es import ( "bytes" "context" "encoding/json" "eta/eta_mini_ht_api/common/component/config" logger "eta/eta_mini_ht_api/common/component/log" "eta/eta_mini_ht_api/common/contants" "fmt" "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esapi" "io" "strconv" "strings" "sync" ) type ESBase interface { GetId() string } var ( esOnce sync.Once esClient *ESClient ) type ESClient struct { esOp *elasticsearch.Client } type SearchType string const ( MatchAll = "match_all" Match = "match" Range = "range" MatchAllByCondition = "match_all_by_condition" RangeByCondition = "range_by_condition" RangeByConditionWithDocIds = "range_by_condition_with_doc_ids" RangeWithDocIds = "range_with_doc_ids" ) func GetInstance() *ESClient { esOnce.Do(func() { // 检查是否成功获取到RedisConfig实例,没有配置则不进行redis初始化 if esConf, ok := config.GetConfig(contants.ES).(*config.ESConfig); ok { logger.Info("初始化es") // 这里可以添加初始化Redis的逻辑 esClient = newEs(esConf) } }) return esClient } func (es *ESClient) es() *elasticsearch.Client { return es.esOp } func newEs(config *config.ESConfig) *ESClient { elasticsearch.NewDefaultClient() client, err := elasticsearch.NewClient( elasticsearch.Config{ Addresses: []string{config.GetUrl()}, // A list of Elasticsearch nodes to use. Username: config.GetUserName(), Password: config.GetPassword(), // Password for HTTP Basic Authentication. }, ) if err != nil { logger.Error("连接ES失败:%v", err) panic("启动es失败") } return &ESClient{esOp: client} } func init() { if GetInstance() == nil { panic("初始化es失败") } logger.Info("es初始化成功") } // BulkInsert 批量创建文档 func (es *ESClient) BulkInsert(indexName string, docs []ESBase) (err error) { // 创建批量请求 bulkBody := new(bytes.Buffer) for _, doc := range docs { enc := json.NewEncoder(bulkBody) if err = enc.Encode(map[string]interface{}{ "index": map[string]interface{}{ "_index": indexName, "_id": doc.GetId(), }, }); err != nil { logger.Error("生成es批处理请求参数失败: %s", err) } if err = enc.Encode(doc); err != nil { logger.Error("生成es批处理文档失败: %s", err) } } bulkReq := esapi.BulkRequest{ Body: bytes.NewReader(bulkBody.Bytes()), Refresh: "true", } res, err := bulkReq.Do(context.Background(), es.esOp) if err != nil { logger.Error("es批处理创建失败: %s", err) } defer res.Body.Close() if res.IsError() { var e map[string]interface{} if err = json.NewDecoder(res.Body).Decode(&e); err != nil { logger.Error("解析es应答失败: %v", err) } else { logger.Error("es请求失败: %s: %v\n", res.Status(), err) } } return } type ESResponse struct { Took int `json:"took"` Count int `json:"count"` TimedOut bool `json:"timedOut"` Hits Hits `json:"hits"` _Shards ShardsInfo `json:"_shards"` } type Hits struct { Total TotalHits `json:"total"` MaxScore float64 `json:"maxScore"` Hits []Hit `json:"hits"` } type TotalHits struct { Value int `json:"value"` Relation string `json:"relation"` } type Hit struct { Index string `json:"_index"` Type string `json:"_type"` ID string `json:"_id"` Score float64 `json:"_score"` Source json.RawMessage `json:"_source"` Highlight json.RawMessage `json:"highlight"` } type ShardsInfo struct { Total int `json:"total"` Successful int `json:"successful"` Skipped int `json:"skipped"` Failed int `json:"failed"` } type ESQueryRequest struct { IndexName string From int Size int Key string Column string Condition string ConditionValue string Sorts []string Type SearchType RangeColumn string DocIds []string Max interface{} Min interface{} } func (req *ESQueryRequest) CreateESQueryRequest(index string, column string, key string, from int, size int, sorts []string, searchType SearchType) *ESQueryRequest { return &ESQueryRequest{ IndexName: index, Type: searchType, From: from, Size: size, Key: key, Column: column, Sorts: sorts, } } func (req *ESQueryRequest) Range(from int64, to int64, column string) *ESQueryRequest { req.RangeColumn = column req.Max = to req.Min = from return req } func (req *ESQueryRequest) ByCondition(column string, value string) *ESQueryRequest { req.Condition = column req.ConditionValue = value return req } func (req *ESQueryRequest) WithDocs(docIds []string) *ESQueryRequest { req.DocIds = docIds return req } func (req *ESQueryRequest) parseJsonQuery() (queryMap map[string]interface{}) { switch req.Type { case MatchAll: queryMap = map[string]interface{}{ "query": map[string]interface{}{ "match": map[string]interface{}{ req.Column: req.Key, }, }, } return case MatchAllByCondition: queryMap = map[string]interface{}{ "query": map[string]interface{}{ "bool": map[string]interface{}{ "must": []map[string]interface{}{ { "match": map[string]interface{}{ req.Column: req.Key, }, }, { "term": map[string]interface{}{ req.Condition: req.ConditionValue, }, }, }, }, }, } return case Match: queryMap = map[string]interface{}{ "query": map[string]interface{}{ "match": map[string]interface{}{ req.Column: req.Key, }, }, "highlight": map[string]interface{}{ "fields": map[string]interface{}{ req.Column: map[string]interface{}{}, }, "pre_tags": []string{""}, "post_tags": []string{""}, }, } return case Range: queryMap = map[string]interface{}{ "query": map[string]interface{}{ "match": map[string]interface{}{ req.Column: req.Key, }, }, "highlight": map[string]interface{}{ "fields": map[string]interface{}{ req.Column: map[string]interface{}{}, }, "pre_tags": []string{""}, "post_tags": []string{""}, }, "post_filter": map[string]interface{}{ "range": map[string]interface{}{ req.RangeColumn: map[string]interface{}{ "gte": req.Min, "lte": req.Max, }, }, }, } return case RangeWithDocIds: queryMap = map[string]interface{}{ "query": map[string]interface{}{ "match": map[string]interface{}{ req.Column: req.Key, }, }, "highlight": map[string]interface{}{ "fields": map[string]interface{}{ req.Column: map[string]interface{}{}, }, "pre_tags": []string{""}, "post_tags": []string{""}, }, "post_filter": map[string]interface{}{ "bool": map[string]interface{}{ "must": []map[string]interface{}{ { "range": map[string]interface{}{ req.RangeColumn: map[string]interface{}{ "gte": req.Min, "lte": req.Max, }, }, }, { "terms": map[string]interface{}{ "_id": req.DocIds, }, }, }, }, }, } return case RangeByCondition: queryMap = map[string]interface{}{ "query": map[string]interface{}{ "match": map[string]interface{}{ req.Column: req.Key, }, }, "highlight": map[string]interface{}{ "fields": map[string]interface{}{ req.Column: map[string]interface{}{}, }, "pre_tags": []string{""}, "post_tags": []string{""}, }, "post_filter": map[string]interface{}{ "bool": map[string]interface{}{ "must": []map[string]interface{}{ { "range": map[string]interface{}{ req.RangeColumn: map[string]interface{}{ "gte": req.Min, "lte": req.Max, }, }, }, { "term": map[string]interface{}{ req.Condition: req.ConditionValue, }, }, }, }, }, } return case RangeByConditionWithDocIds: queryMap = map[string]interface{}{ "query": map[string]interface{}{ "match": map[string]interface{}{ req.Column: req.Key, }, }, "highlight": map[string]interface{}{ "fields": map[string]interface{}{ req.Column: map[string]interface{}{}, }, "pre_tags": []string{""}, "post_tags": []string{""}, }, "post_filter": map[string]interface{}{ "bool": map[string]interface{}{ "must": []map[string]interface{}{ { "range": map[string]interface{}{ req.RangeColumn: map[string]interface{}{ "gte": req.Min, "lte": req.Max, }, }, }, { "term": map[string]interface{}{ req.Condition: req.ConditionValue, }, }, { "terms": map[string]interface{}{ "_id": req.DocIds, }, }, }, }, }, } return default: queryMap = map[string]interface{}{} return } } // /* // * // // 搜索 // // indexName 访问索引名 // query 搜索条件 // from 开始搜索位置 // size 搜索条数 // sort 排序 // */ func (es *ESClient) Count(params *ESQueryRequest) (response ESResponse, err error) { queryMap := params.parseJsonQuery() jsonQuery, _ := json.Marshal(queryMap) logger.Info("查询语句: %s", string(jsonQuery)) request := esapi.CountRequest{ Index: []string{params.IndexName}, Body: strings.NewReader(string(jsonQuery)), } res, err := request.Do(context.Background(), esClient.esOp) defer res.Body.Close() if err != nil { logger.Error("es查询失败: %s", err) } if res.IsError() { var e map[string]interface{} if err = json.NewDecoder(res.Body).Decode(&e); err != nil { logger.Error("解析es应答失败: %v", err) } else { logger.Error("es请求失败: %s: %v\n", res.Status(), e) } } body, err := io.ReadAll(res.Body) if err != nil { logger.Error("获取es应答失败: %v", err) } return parseESResponse(body) } func (es *ESClient) Search(params *ESQueryRequest) (response ESResponse, err error) { queryMap := params.parseJsonQuery() jsonQuery, _ := json.Marshal(queryMap) logger.Info("查询语句: %s", string(jsonQuery)) request := esapi.SearchRequest{ Index: []string{params.IndexName}, Body: strings.NewReader(string(jsonQuery)), From: ¶ms.From, Size: ¶ms.Size, Sort: params.Sorts, } res, err := request.Do(context.Background(), esClient.esOp) defer res.Body.Close() if err != nil { logger.Error("es查询失败: %s", err) } if res.IsError() { var e map[string]interface{} if err = json.NewDecoder(res.Body).Decode(&e); err != nil { logger.Error("解析es应答失败: %v", err) } else { logger.Error("es请求失败: %s: %v\n", res.Status(), e) } } body, err := io.ReadAll(res.Body) if err != nil { logger.Error("获取es应答失败: %v", err) } return parseESResponse(body) } func parseESResponse(body []byte) (ESResponse, error) { var response ESResponse if err := json.Unmarshal(body, &response); err != nil { return ESResponse{}, err } for _, hit := range response.Hits.Hits { var source map[string]interface{} if err := json.Unmarshal(hit.Source, &source); err != nil { return ESResponse{}, err } } return response, nil } func (es *ESClient) GetSource(hits Hits) []Hit { return hits.Hits } func (es *ESClient) GetCount(hits Hits) []Hit { return hits.Hits } // /* // * // 添加es // indexName 索引名 // id es的id // body es的值 // */ func (es *ESClient) Update(indexName string, id int, doc interface{}) bool { jsonUpdate := map[string]interface{}{ "doc": doc, } jsonDoc, _ := json.Marshal(jsonUpdate) logger.Info("查询语句: %s", string(jsonDoc)) req := esapi.UpdateRequest{ Index: indexName, DocumentID: strconv.Itoa(id), Body: strings.NewReader(string(jsonDoc)), Refresh: "true", } res, err := req.Do(context.Background(), es.es()) defer res.Body.Close() if err != nil { logger.Error("es查询失败: %s", err) } if res.IsError() { var e map[string]interface{} if err = json.NewDecoder(res.Body).Decode(&e); err != nil { logger.Error("解析es应答失败: %v", err) } else { logger.Error("es请求失败: %s: %v\n", res.Status(), e) } } body, err := io.ReadAll(res.Body) if err != nil { logger.Error("获取es应答失败: %v", err) } fmt.Printf("%s\n", string(body)) return true } // Delete * // 删除 // indexName 索引名 // id es的id // */ func (es *ESClient) Delete(indexName string, id int) bool { req := esapi.DeleteRequest{ Index: indexName, DocumentID: strconv.Itoa(id), Refresh: "true", } res, err := req.Do(context.Background(), es.es()) defer res.Body.Close() if err != nil { logger.Error("es查询失败: %s", err) } if res.IsError() { var e map[string]interface{} if err = json.NewDecoder(res.Body).Decode(&e); err != nil { logger.Error("解析es应答失败: %v", err) } else { logger.Error("es请求失败: %s: %v\n", res.Status(), e) } } body, err := io.ReadAll(res.Body) if err != nil { logger.Error("获取es应答失败: %v", err) } fmt.Printf("%s\n", string(body)) return true } func (es *ESClient) Exist(indexName string, docId int) (exist bool, err error) { getRequest := esapi.GetRequest{ Index: indexName, DocumentID: strconv.Itoa(docId), } // 执行请求 res, err := getRequest.Do(context.Background(), es.es()) if err != nil { logger.Error("es获取文档是否存在失败: %v", err) } defer res.Body.Close() // 检查文档是否存在 if res.IsError() { // 如果文档不存在,通常返回 404 Not Found if res.StatusCode == 404 { logger.Info("文档不存在.") return false, nil } else { // 其他错误 var e map[string]interface{} if err = json.NewDecoder(res.Body).Decode(&e); err != nil { logger.Error("解析es应答失败: %v", err) return false, err } else { // Print the response status and error information. logger.Error("[%s] %s: %s\n", res.Status(), e["error"].(map[string]interface{})["type"], e["error"].(map[string]interface{})["原因"]) return false, nil } } } else { // 如果文档存在 logger.Info("doc存在") return true, nil } } // //func CreateIndex(indexName string) error { // resp, err := esClient.es().Indices. // Create(indexName). // Do(context.Background()) // if err != nil { // logger.Error("创建ES索引失败:%v", err) // return err // } // fmt.Printf("index:%#v\n", resp.Index) // return nil //} // DeleteIndex 删除索引 // // func DeleteIndex(indexName string) error { // _, err := esClient.es().Indices. // 表明是对索引的操作,而Index则表示是要操作具体索引下的文档 // Delete(indexName). // Do(context.Background()) // if err != nil { // fmt.Printf("delete index failed,err:%v\n", err) // return err // } // fmt.Printf("delete index successed,indexName:%s", indexName) // return nil // } // // CreateDocument 创建文档 func (es *ESClient) CreateDocument(indexName string, id int, doc interface{}) (success bool) { jsonDoc, _ := json.Marshal(doc) logger.Info("查询语句: %s", string(jsonDoc)) // 添加文档 indexRequest := esapi.IndexRequest{ Index: indexName, DocumentID: strconv.Itoa(id), Body: strings.NewReader(string(jsonDoc)), Refresh: "true", } // 执行请求 res, err := indexRequest.Do(context.Background(), es.es()) if err != nil { logger.Error("ES创建文档失败: %s", err) return false } defer res.Body.Close() // 检查文档是否成功创建 if res.IsError() { var e map[string]interface{} if err := json.NewDecoder(res.Body).Decode(&e); err != nil { logger.Error("解析ES应答失败: %s", err) } else { // Print the response status and error information. logger.Error("[%s] %s: %s\n", res.Status(), e["error"].(map[string]interface{})["类型"], e["错误"].(map[string]interface{})["原因"]) } return false } else { // 如果文档成功创建 logger.Info("创建文档成功") return true } }