|
- package es
- import (
- "bytes"
- "context"
- "encoding/json"
- "errors"
- "eta/eta_mini_ht_api/common/component/config"
- logger "eta/eta_mini_ht_api/common/component/log"
- "eta/eta_mini_ht_api/common/contants"
- "fmt"
- "github.com/elastic/go-elasticsearch/v7"
- "github.com/elastic/go-elasticsearch/v7/esapi"
- "io"
- "strconv"
- "strings"
- "sync"
- )
- type ESBase interface {
- GetId() string
- }
- var (
- esOnce sync.Once
- esClient *ESClient
- )
- type ESClient struct {
- esOp *elasticsearch.Client
- }
- type SearchType string
- const (
- MatchAll = "match_all"
- Match = "match"
- CountWithDocIds = "count_with_doc_ids"
- Range = "range"
- MatchAllByCondition = "match_all_by_condition"
- RangeByCondition = "range_by_condition"
- RangeByConditionWithDocIds = "range_by_condition_with_doc_ids"
- RangeByConditionWithDocIdsNoLimit = "range_by_condition_with_doc_ids_no_limit"
- RangeWithDocIds = "range_with_doc_ids"
- LimitByScore = "limit_by_score"
- HomeSearch = "home_search"
- )
- func GetInstance() *ESClient {
- esOnce.Do(func() {
- // 检查是否成功获取到RedisConfig实例,没有配置则不进行redis初始化
- if esConf, ok := config.GetConfig(contants.ES).(*config.ESConfig); ok {
- logger.Info("初始化es")
- // 这里可以添加初始化Redis的逻辑
- esClient = newEs(esConf)
- logger.Info("es地址:%v", esConf.GetUrl())
- }
- })
- return esClient
- }
- func (es *ESClient) es() *elasticsearch.Client {
- return es.esOp
- }
- func newEs(config *config.ESConfig) *ESClient {
- elasticsearch.NewDefaultClient()
- client, err := elasticsearch.NewClient(
- elasticsearch.Config{
- Addresses: []string{config.GetUrl()},
- // A list of Elasticsearch nodes to use.
- Username: config.GetUserName(),
- Password: config.GetPassword(), // Password for HTTP Basic Authentication.
- },
- )
- if err != nil {
- logger.Error("连接ES失败:%v", err)
- panic("启动es失败")
- }
- return &ESClient{esOp: client}
- }
- func init() {
- if GetInstance() == nil {
- panic("初始化es失败")
- }
- logger.Info("es初始化成功")
- }
- // BulkInsert 批量创建文档
- func (es *ESClient) BulkInsert(indexName string, docs []ESBase) (err error) {
- // 创建批量请求
- bulkBody := new(bytes.Buffer)
- for _, doc := range docs {
- enc := json.NewEncoder(bulkBody)
- if err = enc.Encode(map[string]interface{}{
- "index": map[string]interface{}{
- "_index": indexName,
- "_id": doc.GetId(),
- },
- }); err != nil {
- logger.Error("生成es批处理请求参数失败: %s", err)
- }
- if err = enc.Encode(doc); err != nil {
- logger.Error("生成es批处理文档失败: %s", err)
- }
- }
- bulkReq := esapi.BulkRequest{
- Body: bytes.NewReader(bulkBody.Bytes()),
- Refresh: "true",
- }
- res, err := bulkReq.Do(context.Background(), es.esOp)
- if err != nil {
- logger.Error("es批处理创建失败: %s", err)
- }
- defer res.Body.Close()
- if res.IsError() {
- var e map[string]interface{}
- if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
- logger.Error("解析es应答失败: %v", err)
- } else {
- logger.Error("es请求失败: %s: %v\n", res.Status(), err)
- }
- }
- return
- }
- type ESResponse struct {
- Took int `json:"took"`
- Count int `json:"count"`
- TimedOut bool `json:"timedOut"`
- Hits Hits `json:"hits"`
- _Shards ShardsInfo `json:"_shards"`
- }
- type Hits struct {
- Total TotalHits `json:"total"`
- MaxScore float64 `json:"maxScore"`
- Hits []Hit `json:"hits"`
- }
- type TotalHits struct {
- Value int `json:"value"`
- Relation string `json:"relation"`
- }
- type Hit struct {
- Index string `json:"_index"`
- Type string `json:"_type"`
- ID string `json:"_id"`
- Score float64 `json:"_score"`
- Source json.RawMessage `json:"_source"`
- Highlight json.RawMessage `json:"highlight"`
- }
- type Doc struct {
- Index string `json:"_index"`
- Type string `json:"_type"`
- ID string `json:"_id"`
- Version float64 `json:"_version"`
- SeqNo float64 `json:"_seq_no"`
- PrimaryTerm float64 `json:"_primary_term"`
- Found bool `json:"found"`
- Source json.RawMessage `json:"_source"`
- }
- type ShardsInfo struct {
- Total int `json:"total"`
- Successful int `json:"successful"`
- Skipped int `json:"skipped"`
- Failed int `json:"failed"`
- }
- type ESQueryRequest struct {
- IndexName string
- From int
- Size int
- Key string
- Column string
- Condition string
- ConditionValue string
- Sorts []string
- Type SearchType
- RangeColumn string
- DocIds []string
- Max interface{}
- Min interface{}
- MinScore float64
- }
- func (req *ESQueryRequest) CreateESQueryRequest(index string, column string, key string, from int, size int, sorts []string, searchType SearchType) *ESQueryRequest {
- return &ESQueryRequest{
- IndexName: index,
- Type: searchType,
- From: from,
- Size: size,
- Key: key,
- Column: column,
- Sorts: sorts,
- }
- }
- func (req *ESQueryRequest) Limit(limit int) *ESQueryRequest {
- req.Size = limit
- return req
- }
- func (req *ESQueryRequest) WithScore(score float64) *ESQueryRequest {
- req.MinScore = score
- return req
- }
- func (req *ESQueryRequest) Range(from int64, to int64, column string) *ESQueryRequest {
- req.RangeColumn = column
- req.Max = to
- req.Min = from
- return req
- }
- func (req *ESQueryRequest) Before(max interface{}, column string) *ESQueryRequest {
- req.RangeColumn = column
- req.Max = max
- return req
- }
- func (req *ESQueryRequest) ByCondition(column string, value string) *ESQueryRequest {
- req.Condition = column
- req.ConditionValue = value
- return req
- }
- func (req *ESQueryRequest) WithDocs(docIds []string) *ESQueryRequest {
- req.DocIds = docIds
- return req
- }
- func (req *ESQueryRequest) parseJsonQuery() (queryMap map[string]interface{}) {
- switch req.Type {
- case MatchAll:
- queryMap = map[string]interface{}{
- "query": map[string]interface{}{
- "match": map[string]interface{}{
- req.Column: req.Key,
- },
- },
- }
- return
- case MatchAllByCondition:
- queryMap = map[string]interface{}{
- "query": map[string]interface{}{
- "bool": map[string]interface{}{
- "must": []map[string]interface{}{
- {
- "match": map[string]interface{}{
- req.Column: req.Key,
- },
- },
- {
- "term": map[string]interface{}{
- req.Condition: req.ConditionValue,
- },
- },
- },
- },
- },
- }
- return
- case Match:
- queryMap = map[string]interface{}{
- "query": map[string]interface{}{
- "match": map[string]interface{}{
- req.Column: req.Key,
- },
- },
- "highlight": map[string]interface{}{
- "fields": map[string]interface{}{
- req.Column: map[string]interface{}{},
- },
- "pre_tags": []string{"<span style='color:#0078E8'>"},
- "post_tags": []string{"</span>"},
- },
- }
- return
- case CountWithDocIds:
- queryMap = map[string]interface{}{
- "query": map[string]interface{}{
- "bool": map[string]interface{}{
- "must": []map[string]interface{}{
- {
- "match": map[string]interface{}{
- req.Column: req.Key,
- },
- },
- },
- "filter": map[string]interface{}{
- "terms": map[string]interface{}{
- "_id": req.DocIds,
- },
- },
- },
- },
- }
- return
- case Range:
- queryMap = map[string]interface{}{
- "query": map[string]interface{}{
- "match": map[string]interface{}{
- req.Column: req.Key,
- },
- },
- "highlight": map[string]interface{}{
- "fields": map[string]interface{}{
- req.Column: map[string]interface{}{},
- },
- "pre_tags": []string{"<span style='color:#0078E8'>"},
- "post_tags": []string{"</span>"},
- },
- "post_filter": map[string]interface{}{
- "range": map[string]interface{}{
- req.RangeColumn: map[string]interface{}{
- "gte": req.Min,
- "lte": req.Max,
- },
- },
- },
- }
- return
- case RangeWithDocIds:
- queryMap = map[string]interface{}{
- "query": map[string]interface{}{
- "match": map[string]interface{}{
- req.Column: req.Key,
- },
- },
- "highlight": map[string]interface{}{
- "fields": map[string]interface{}{
- req.Column: map[string]interface{}{},
- },
- "pre_tags": []string{"<span style='color:#0078E8'>"},
- "post_tags": []string{"</span>"},
- },
- "post_filter": map[string]interface{}{
- "bool": map[string]interface{}{
- "must": []map[string]interface{}{
- {
- "range": map[string]interface{}{
- req.RangeColumn: map[string]interface{}{
- "gte": req.Min,
- "lte": req.Max,
- },
- },
- },
- {
- "terms": map[string]interface{}{
- "_id": req.DocIds,
- },
- },
- },
- },
- },
- }
- return
- case RangeByCondition:
- queryMap = map[string]interface{}{
- "query": map[string]interface{}{
- "match": map[string]interface{}{
- req.Column: req.Key,
- },
- },
- "highlight": map[string]interface{}{
- "fields": map[string]interface{}{
- req.Column: map[string]interface{}{},
- },
- "pre_tags": []string{"<span style='color:#0078E8'>"},
- "post_tags": []string{"</span>"},
- },
- "post_filter": map[string]interface{}{
- "bool": map[string]interface{}{
- "must": []map[string]interface{}{
- {
- "range": map[string]interface{}{
- req.RangeColumn: map[string]interface{}{
- "gte": req.Min,
- "lte": req.Max,
- },
- },
- },
- {
- "term": map[string]interface{}{
- req.Condition: req.ConditionValue,
- },
- },
- },
- },
- },
- }
- return
- case RangeByConditionWithDocIds:
- queryMap = map[string]interface{}{
- "query": map[string]interface{}{
- "match": map[string]interface{}{
- req.Column: req.Key,
- },
- },
- "highlight": map[string]interface{}{
- "fields": map[string]interface{}{
- req.Column: map[string]interface{}{},
- },
- "pre_tags": []string{"<span style='color:#0078E8'>"},
- "post_tags": []string{"</span>"},
- },
- "post_filter": map[string]interface{}{
- "bool": map[string]interface{}{
- "must": []map[string]interface{}{
- {
- "range": map[string]interface{}{
- req.RangeColumn: map[string]interface{}{
- "gte": req.Min,
- "lte": req.Max,
- },
- },
- },
- {
- "term": map[string]interface{}{
- req.Condition: req.ConditionValue,
- },
- },
- {
- "terms": map[string]interface{}{
- "_id": req.DocIds,
- },
- },
- },
- },
- },
- }
- return
- case RangeByConditionWithDocIdsNoLimit:
- queryMap = map[string]interface{}{
- "query": map[string]interface{}{
- "match": map[string]interface{}{
- req.Column: req.Key,
- },
- },
- "highlight": map[string]interface{}{
- "fields": map[string]interface{}{
- req.Column: map[string]interface{}{},
- },
- "pre_tags": []string{"<span style='color:#0078E8'>"},
- "post_tags": []string{"</span>"},
- },
- "post_filter": map[string]interface{}{
- "bool": map[string]interface{}{
- "must": []map[string]interface{}{
- {
- "term": map[string]interface{}{
- req.Condition: req.ConditionValue,
- },
- },
- {
- "terms": map[string]interface{}{
- "_id": req.DocIds,
- },
- },
- },
- },
- },
- }
- return
- case LimitByScore:
- queryMap = map[string]interface{}{
- "query": map[string]interface{}{
- "match": map[string]interface{}{
- req.Column: req.Key,
- },
- },
- "highlight": map[string]interface{}{
- "fields": map[string]interface{}{
- req.Column: map[string]interface{}{},
- },
- "pre_tags": []string{"<span style='color:#0078E8'>"},
- "post_tags": []string{"</span>"},
- },
- "post_filter": map[string]interface{}{
- "bool": map[string]interface{}{
- "must": []map[string]interface{}{
- {
- "terms": map[string]interface{}{
- "_id": req.DocIds,
- },
- },
- },
- },
- },
- "min_score": req.MinScore,
- }
- return
- case HomeSearch:
- queryMap = map[string]interface{}{
- "query": map[string]interface{}{
- "bool": map[string]interface{}{
- "should": []map[string]interface{}{
- {
- "bool": map[string]interface{}{
- "must": []map[string]interface{}{
- {
- "match": map[string]interface{}{
- "title": req.Key,
- },
- },
- {
- "term": map[string]interface{}{
- "status": "PUBLISH",
- },
- },
- {
- "range": map[string]interface{}{
- "publishedTime": map[string]interface{}{
- "lte": req.Max,
- },
- },
- },
- },
- },
- },
- {
- "bool": map[string]interface{}{
- "must": []map[string]interface{}{
- {
- "match": map[string]interface{}{
- "mediaName": req.Key,
- },
- },
- {
- "range": map[string]interface{}{
- req.RangeColumn: map[string]interface{}{
- "lte": req.Max,
- },
- },
- },
- },
- },
- },
- },
- },
- },
- "highlight": map[string]interface{}{
- "fields": map[string]interface{}{
- "title": map[string]interface{}{},
- "mediaName": map[string]interface{}{},
- },
- "pre_tags": []string{"<span style='color:#0078E8'>"},
- "post_tags": []string{"</span>"},
- },
- }
- return
- default:
- queryMap = map[string]interface{}{}
- return
- }
- }
- // /*
- // *
- //
- // 搜索
- //
- // indexName 访问索引名
- // query 搜索条件
- // from 开始搜索位置
- // size 搜索条数
- // sort 排序
- // */
- func (es *ESClient) Count(params *ESQueryRequest) (response ESResponse, err error) {
- queryMap := params.parseJsonQuery()
- jsonQuery, _ := json.Marshal(queryMap)
- logger.Info("查询语句: %s", string(jsonQuery))
- request := esapi.CountRequest{
- Index: []string{params.IndexName},
- Body: strings.NewReader(string(jsonQuery)),
- }
- res, err := request.Do(context.Background(), esClient.esOp)
- defer res.Body.Close()
- if err != nil {
- logger.Error("es查询失败: %s", err)
- }
- if res.IsError() {
- var e map[string]interface{}
- if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
- logger.Error("解析es应答失败: %v", err)
- } else {
- logger.Error("es请求失败: %s: %v\n", res.Status(), e)
- }
- }
- body, err := io.ReadAll(res.Body)
- if err != nil {
- logger.Error("获取es应答失败: %v", err)
- }
- return parseESResponse(body)
- }
- func (es *ESClient) Search(params *ESQueryRequest) (response ESResponse, err error) {
- queryMap := params.parseJsonQuery()
- jsonQuery, _ := json.Marshal(queryMap)
- logger.Info("查询语句: %s", string(jsonQuery))
- indexes := strings.Split(params.IndexName, ",")
- request := esapi.SearchRequest{
- Index: indexes,
- Body: strings.NewReader(string(jsonQuery)),
- From: ¶ms.From,
- Size: ¶ms.Size,
- Sort: params.Sorts,
- }
- res, err := request.Do(context.Background(), esClient.esOp)
- defer res.Body.Close()
- if err != nil {
- logger.Error("es查询失败: %s", err)
- }
- if res.IsError() {
- var e map[string]interface{}
- if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
- logger.Error("解析es应答失败: %v", err)
- } else {
- logger.Error("es请求失败: %s: %v\n", res.Status(), e)
- }
- }
- body, err := io.ReadAll(res.Body)
- if err != nil {
- logger.Error("获取es应答失败: %v", err)
- }
- return parseESResponse(body)
- }
- func parseESResponse(body []byte) (ESResponse, error) {
- var response ESResponse
- if err := json.Unmarshal(body, &response); err != nil {
- return ESResponse{}, err
- }
- for _, hit := range response.Hits.Hits {
- var source map[string]interface{}
- if err := json.Unmarshal(hit.Source, &source); err != nil {
- return ESResponse{}, err
- }
- }
- return response, nil
- }
- func (es *ESClient) GetSource(hits Hits) []Hit {
- return hits.Hits
- }
- func (es *ESClient) GetCount(hits Hits) []Hit {
- return hits.Hits
- }
- // /*
- // *
- // 添加es
- // indexName 索引名
- // id es的id
- // body es的值
- // */
- func (es *ESClient) Update(indexName string, id int, doc interface{}) bool {
- jsonUpdate := map[string]interface{}{
- "doc": doc,
- }
- jsonDoc, _ := json.Marshal(jsonUpdate)
- logger.Info("查询语句: %s", string(jsonDoc))
- req := esapi.UpdateRequest{
- Index: indexName,
- DocumentID: strconv.Itoa(id),
- Body: strings.NewReader(string(jsonDoc)),
- Refresh: "true",
- }
- res, err := req.Do(context.Background(), es.es())
- defer res.Body.Close()
- if err != nil {
- logger.Error("es查询失败: %s", err)
- }
- if res.IsError() {
- var e map[string]interface{}
- if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
- logger.Error("解析es应答失败: %v", err)
- } else {
- logger.Error("es请求失败: %s: %v\n", res.Status(), e)
- }
- }
- body, err := io.ReadAll(res.Body)
- if err != nil {
- logger.Error("获取es应答失败: %v", err)
- }
- fmt.Printf("%s\n", string(body))
- return true
- }
- func (es *ESClient) InsertOrUpdate(indexName string, id int, doc interface{}) (success bool) {
- if exist, existErr := es.Exist(indexName, id); existErr == nil && exist {
- return es.Update(indexName, id, doc)
- } else {
- return es.CreateDocument(indexName, id, doc)
- }
- }
- // Delete *
- // 删除
- // indexName 索引名
- // id es的id
- // */
- func (es *ESClient) Delete(indexName string, query interface{}) bool {
- jsonQuery, _ := json.Marshal(query)
- refresh := new(bool)
- *refresh = true
- logger.Info("查询语句: %s", string(jsonQuery))
- req := esapi.DeleteByQueryRequest{
- Index: []string{indexName},
- Body: strings.NewReader(string(jsonQuery)),
- Refresh: refresh,
- }
- res, err := req.Do(context.Background(), es.es())
- defer res.Body.Close()
- if err != nil {
- logger.Error("es查询失败: %s", err)
- }
- if res.IsError() {
- var e map[string]interface{}
- if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
- logger.Error("解析es应答失败: %v", err)
- } else {
- logger.Error("es请求失败: %s: %v\n", res.Status(), e)
- }
- }
- body, err := io.ReadAll(res.Body)
- if err != nil {
- logger.Error("获取es应答失败: %v", err)
- }
- fmt.Printf("%s\n", string(body))
- return true
- }
- func (es *ESClient) Exist(indexName string, docId int) (exist bool, err error) {
- getRequest := esapi.GetRequest{
- Index: indexName,
- DocumentID: strconv.Itoa(docId),
- }
- // 执行请求
- res, err := getRequest.Do(context.Background(), es.es())
- if err != nil {
- logger.Error("es获取文档是否存在失败: %v", err)
- return
- }
- defer res.Body.Close()
- // 检查文档是否存在
- if res.IsError() {
- // 如果文档不存在,通常返回 404 Not Found
- if res.StatusCode == 404 {
- logger.Info("文档不存在.")
- err = errors.New("ES文档不存在")
- return
- } else {
- // 其他错误
- var e map[string]interface{}
- if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
- logger.Error("解析es应答失败: %v", err)
- return
- } else {
- // Print the response status and error information.
- logger.Error("[%s] %s: %s\n", res.Status(), e["error"].(map[string]interface{})["type"], e["error"].(map[string]interface{})["原因"])
- err = errors.New("获取ES记录失败")
- return
- }
- }
- } else {
- // 如果文档存在
- logger.Info("doc存在")
- return true, nil
- }
- }
- func (es *ESClient) Get(indexName string, docId int) (doc Doc, err error) {
- getRequest := esapi.GetRequest{
- Index: indexName,
- DocumentID: strconv.Itoa(docId),
- }
- // 执行请求
- res, err := getRequest.Do(context.Background(), es.es())
- if err != nil {
- logger.Error("es获取文档是否存在失败: %v", err)
- return
- }
- defer res.Body.Close()
- // 检查文档是否存在
- if res.IsError() {
- // 如果文档不存在,通常返回 404 Not Found
- if res.StatusCode == 404 {
- logger.Info("文档不存在.")
- err = errors.New("ES文档不存在")
- return
- } else {
- // 其他错误
- var e map[string]interface{}
- if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
- logger.Error("解析es应答失败: %v", err)
- return
- } else {
- // Print the response status and error information.
- logger.Error("[%s] %s: %s\n", res.Status(), e["error"].(map[string]interface{})["type"], e["error"].(map[string]interface{})["原因"])
- err = errors.New("获取ES记录失败")
- return
- }
- }
- } else {
- // 如果文档存在
- body, readErr := io.ReadAll(res.Body)
- if readErr != nil {
- logger.Error("获取es应答失败: %v", err)
- err = readErr
- return
- }
- err = json.Unmarshal(body, &doc)
- if err != nil {
- logger.Error("反序列化es应答失败: %v", err)
- return
- }
- return
- }
- }
- //
- //func CreateIndex(indexName string) error {
- // resp, err := esClient.es().Indices.
- // Create(indexName).
- // Do(context.Background())
- // if err != nil {
- // logger.Error("创建ES索引失败:%v", err)
- // return err
- // }
- // fmt.Printf("index:%#v\n", resp.Index)
- // return nil
- //}
- // DeleteIndex 删除索引
- //
- // func DeleteIndex(indexName string) error {
- // _, err := esClient.es().Indices. // 表明是对索引的操作,而Index则表示是要操作具体索引下的文档
- // Delete(indexName).
- // Do(context.Background())
- // if err != nil {
- // fmt.Printf("delete index failed,err:%v\n", err)
- // return err
- // }
- // fmt.Printf("delete index successed,indexName:%s", indexName)
- // return nil
- // }
- //
- // CreateDocument 创建文档
- func (es *ESClient) CreateDocument(indexName string, id int, doc interface{}) (success bool) {
- jsonDoc, _ := json.Marshal(doc)
- logger.Info("查询语句: %s", string(jsonDoc))
- // 添加文档
- indexRequest := esapi.IndexRequest{
- Index: indexName,
- DocumentID: strconv.Itoa(id),
- Body: strings.NewReader(string(jsonDoc)),
- Refresh: "true",
- }
- // 执行请求
- res, err := indexRequest.Do(context.Background(), es.es())
- if err != nil {
- logger.Error("ES创建文档失败: %s", err)
- return false
- }
- defer res.Body.Close()
- // 检查文档是否成功创建
- if res.IsError() {
- var e map[string]interface{}
- if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
- logger.Error("解析ES应答失败: %s", err)
- } else {
- // Print the response status and error information.
- logger.Error("[%s] %s: %s\n", res.Status(), e["error"].(map[string]interface{})["类型"], e["错误"].(map[string]interface{})["原因"])
- }
- return false
- } else {
- // 如果文档成功创建
- logger.Info("创建文档成功")
- return true
- }
- }
|