package es
import (
"bytes"
"context"
"encoding/json"
"errors"
"eta/eta_mini_ht_api/common/component/config"
logger "eta/eta_mini_ht_api/common/component/log"
"eta/eta_mini_ht_api/common/contants"
"fmt"
"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"io"
"strconv"
"strings"
"sync"
)
type ESBase interface {
GetId() string
}
var (
esOnce sync.Once
esClient *ESClient
)
type ESClient struct {
esOp *elasticsearch.Client
}
type SearchType string
const (
MatchAll = "match_all"
Match = "match"
CountWithDocIds = "count_with_doc_ids"
Range = "range"
MatchAllByCondition = "match_all_by_condition"
RangeByCondition = "range_by_condition"
RangeByConditionWithDocIds = "range_by_condition_with_doc_ids"
RangeByConditionWithDocIdsNoLimit = "range_by_condition_with_doc_ids_no_limit"
RangeByConditionWithDocIdsNoLimitByScore = "range_by_condition_with_doc_ids_no_limit_by_score"
RangeWithDocIds = "range_with_doc_ids"
LimitByScore = "limit_by_score"
HomeSearch = "home_search"
)
func GetInstance() *ESClient {
esOnce.Do(func() {
// 检查是否成功获取到RedisConfig实例,没有配置则不进行redis初始化
if esConf, ok := config.GetConfig(contants.ES).(*config.ESConfig); ok {
logger.Info("初始化es")
// 这里可以添加初始化Redis的逻辑
esClient = newEs(esConf)
logger.Info("es地址:%v", esConf.GetUrl())
}
})
return esClient
}
func (es *ESClient) es() *elasticsearch.Client {
return es.esOp
}
func newEs(config *config.ESConfig) *ESClient {
elasticsearch.NewDefaultClient()
client, err := elasticsearch.NewClient(
elasticsearch.Config{
Addresses: []string{config.GetUrl()},
// A list of Elasticsearch nodes to use.
Username: config.GetUserName(),
Password: config.GetPassword(), // Password for HTTP Basic Authentication.
},
)
if err != nil {
logger.Error("连接ES失败:%v", err)
panic("启动es失败")
}
return &ESClient{esOp: client}
}
func init() {
if GetInstance() == nil {
panic("初始化es失败")
}
logger.Info("es初始化成功")
}
// BulkInsert 批量创建文档
func (es *ESClient) BulkInsert(indexName string, docs []ESBase) (err error) {
// 创建批量请求
bulkBody := new(bytes.Buffer)
for _, doc := range docs {
enc := json.NewEncoder(bulkBody)
if err = enc.Encode(map[string]interface{}{
"index": map[string]interface{}{
"_index": indexName,
"_id": doc.GetId(),
},
}); err != nil {
logger.Error("生成es批处理请求参数失败: %s", err)
}
if err = enc.Encode(doc); err != nil {
logger.Error("生成es批处理文档失败: %s", err)
}
}
bulkReq := esapi.BulkRequest{
Body: bytes.NewReader(bulkBody.Bytes()),
Refresh: "true",
}
res, err := bulkReq.Do(context.Background(), es.esOp)
if err != nil {
logger.Error("es批处理创建失败: %s", err)
}
defer res.Body.Close()
if res.IsError() {
var e map[string]interface{}
if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
logger.Error("解析es应答失败: %v", err)
} else {
logger.Error("es请求失败: %s: %v\n", res.Status(), err)
}
}
return
}
type ESResponse struct {
Took int `json:"took"`
Count int `json:"count"`
TimedOut bool `json:"timedOut"`
Hits Hits `json:"hits"`
_Shards ShardsInfo `json:"_shards"`
}
type Hits struct {
Total TotalHits `json:"total"`
MaxScore float64 `json:"maxScore"`
Hits []Hit `json:"hits"`
}
type TotalHits struct {
Value int `json:"value"`
Relation string `json:"relation"`
}
type Hit struct {
Index string `json:"_index"`
Type string `json:"_type"`
ID string `json:"_id"`
Score float64 `json:"_score"`
Source json.RawMessage `json:"_source"`
Highlight json.RawMessage `json:"highlight"`
}
type Doc struct {
Index string `json:"_index"`
Type string `json:"_type"`
ID string `json:"_id"`
Version float64 `json:"_version"`
SeqNo float64 `json:"_seq_no"`
PrimaryTerm float64 `json:"_primary_term"`
Found bool `json:"found"`
Source json.RawMessage `json:"_source"`
}
type ShardsInfo struct {
Total int `json:"total"`
Successful int `json:"successful"`
Skipped int `json:"skipped"`
Failed int `json:"failed"`
}
type ESQueryRequest struct {
IndexName string
From int
Size int
Key string
Column string
Condition string
ConditionValue string
Sorts []string
Type SearchType
RangeColumn string
DocIds []string
Max interface{}
Min interface{}
MinScore float64
}
func (req *ESQueryRequest) CreateESQueryRequest(index string, column string, key string, from int, size int, sorts []string, searchType SearchType) *ESQueryRequest {
return &ESQueryRequest{
IndexName: index,
Type: searchType,
From: from,
Size: size,
Key: key,
Column: column,
Sorts: sorts,
}
}
func (req *ESQueryRequest) Limit(limit int) *ESQueryRequest {
req.Size = limit
return req
}
func (req *ESQueryRequest) WithScore(score float64) *ESQueryRequest {
req.MinScore = score
return req
}
func (req *ESQueryRequest) Range(from int64, to int64, column string) *ESQueryRequest {
req.RangeColumn = column
req.Max = to
req.Min = from
return req
}
func (req *ESQueryRequest) Before(max interface{}, column string) *ESQueryRequest {
req.RangeColumn = column
req.Max = max
return req
}
func (req *ESQueryRequest) ByCondition(column string, value string) *ESQueryRequest {
req.Condition = column
req.ConditionValue = value
return req
}
func (req *ESQueryRequest) WithDocs(docIds []string) *ESQueryRequest {
req.DocIds = docIds
return req
}
func (req *ESQueryRequest) parseJsonQuery() (queryMap map[string]interface{}) {
switch req.Type {
case MatchAll:
queryMap = map[string]interface{}{
"query": map[string]interface{}{
"match": map[string]interface{}{
req.Column: req.Key,
},
},
}
return
case MatchAllByCondition:
queryMap = map[string]interface{}{
"query": map[string]interface{}{
"bool": map[string]interface{}{
"must": []map[string]interface{}{
{
"match": map[string]interface{}{
req.Column: req.Key,
},
},
{
"term": map[string]interface{}{
req.Condition: req.ConditionValue,
},
},
},
},
},
}
return
case Match:
queryMap = map[string]interface{}{
"query": map[string]interface{}{
"match": map[string]interface{}{
req.Column: req.Key,
},
},
"highlight": map[string]interface{}{
"fields": map[string]interface{}{
req.Column: map[string]interface{}{},
},
"pre_tags": []string{""},
"post_tags": []string{""},
},
}
return
case CountWithDocIds:
queryMap = map[string]interface{}{
"query": map[string]interface{}{
"bool": map[string]interface{}{
"must": []map[string]interface{}{
{
"match": map[string]interface{}{
req.Column: req.Key,
},
},
},
"filter": map[string]interface{}{
"terms": map[string]interface{}{
"_id": req.DocIds,
},
},
},
},
}
return
case Range:
queryMap = map[string]interface{}{
"query": map[string]interface{}{
"match": map[string]interface{}{
req.Column: req.Key,
},
},
"highlight": map[string]interface{}{
"fields": map[string]interface{}{
req.Column: map[string]interface{}{},
},
"pre_tags": []string{""},
"post_tags": []string{""},
},
"post_filter": map[string]interface{}{
"range": map[string]interface{}{
req.RangeColumn: map[string]interface{}{
"gte": req.Min,
"lte": req.Max,
},
},
},
}
return
case RangeWithDocIds:
queryMap = map[string]interface{}{
"query": map[string]interface{}{
"match": map[string]interface{}{
req.Column: req.Key,
},
},
"highlight": map[string]interface{}{
"fields": map[string]interface{}{
req.Column: map[string]interface{}{},
},
"pre_tags": []string{""},
"post_tags": []string{""},
},
"post_filter": map[string]interface{}{
"bool": map[string]interface{}{
"must": []map[string]interface{}{
{
"range": map[string]interface{}{
req.RangeColumn: map[string]interface{}{
"gte": req.Min,
"lte": req.Max,
},
},
},
{
"terms": map[string]interface{}{
"_id": req.DocIds,
},
},
},
},
},
}
return
case RangeByCondition:
queryMap = map[string]interface{}{
"query": map[string]interface{}{
"match": map[string]interface{}{
req.Column: req.Key,
},
},
"highlight": map[string]interface{}{
"fields": map[string]interface{}{
req.Column: map[string]interface{}{},
},
"pre_tags": []string{""},
"post_tags": []string{""},
},
"post_filter": map[string]interface{}{
"bool": map[string]interface{}{
"must": []map[string]interface{}{
{
"range": map[string]interface{}{
req.RangeColumn: map[string]interface{}{
"gte": req.Min,
"lte": req.Max,
},
},
},
{
"term": map[string]interface{}{
req.Condition: req.ConditionValue,
},
},
},
},
},
}
return
case RangeByConditionWithDocIds:
queryMap = map[string]interface{}{
"query": map[string]interface{}{
"match": map[string]interface{}{
req.Column: req.Key,
},
},
"highlight": map[string]interface{}{
"fields": map[string]interface{}{
req.Column: map[string]interface{}{},
},
"pre_tags": []string{""},
"post_tags": []string{""},
},
"post_filter": map[string]interface{}{
"bool": map[string]interface{}{
"must": []map[string]interface{}{
{
"range": map[string]interface{}{
req.RangeColumn: map[string]interface{}{
"gte": req.Min,
"lte": req.Max,
},
},
},
{
"term": map[string]interface{}{
req.Condition: req.ConditionValue,
},
},
{
"terms": map[string]interface{}{
"_id": req.DocIds,
},
},
},
},
},
}
return
case RangeByConditionWithDocIdsNoLimit:
queryMap = map[string]interface{}{
"query": map[string]interface{}{
"match": map[string]interface{}{
req.Column: req.Key,
},
},
"highlight": map[string]interface{}{
"fields": map[string]interface{}{
req.Column: map[string]interface{}{},
},
"pre_tags": []string{""},
"post_tags": []string{""},
},
"post_filter": map[string]interface{}{
"bool": map[string]interface{}{
"must": []map[string]interface{}{
{
"terms": map[string]interface{}{
"_id": req.DocIds,
},
},
},
},
},
}
return
case RangeByConditionWithDocIdsNoLimitByScore:
queryMap = map[string]interface{}{
"query": map[string]interface{}{
"match": map[string]interface{}{
req.Column: req.Key,
},
},
"highlight": map[string]interface{}{
"fields": map[string]interface{}{
req.Column: map[string]interface{}{},
},
"pre_tags": []string{""},
"post_tags": []string{""},
},
"post_filter": map[string]interface{}{
"bool": map[string]interface{}{
"must": []map[string]interface{}{
{
"terms": map[string]interface{}{
"_id": req.DocIds,
},
},
},
},
},
"min_score": req.MinScore,
}
return
case LimitByScore:
queryMap = map[string]interface{}{
"query": map[string]interface{}{
"match": map[string]interface{}{
req.Column: req.Key,
},
},
"highlight": map[string]interface{}{
"fields": map[string]interface{}{
req.Column: map[string]interface{}{},
},
"pre_tags": []string{""},
"post_tags": []string{""},
},
"post_filter": map[string]interface{}{
"bool": map[string]interface{}{
"must": []map[string]interface{}{
{
"terms": map[string]interface{}{
"_id": req.DocIds,
},
},
},
},
},
"min_score": req.MinScore,
}
return
case HomeSearch:
queryMap = map[string]interface{}{
"query": map[string]interface{}{
"bool": map[string]interface{}{
"should": []map[string]interface{}{
{
"bool": map[string]interface{}{
"must": []map[string]interface{}{
{
"match": map[string]interface{}{
"title": req.Key,
},
},
{
"term": map[string]interface{}{
"status": "PUBLISH",
},
},
{
"range": map[string]interface{}{
"publishedTime": map[string]interface{}{
"lte": req.Max,
},
},
},
},
},
},
{
"bool": map[string]interface{}{
"must": []map[string]interface{}{
{
"match": map[string]interface{}{
"mediaName": req.Key,
},
},
{
"range": map[string]interface{}{
req.RangeColumn: map[string]interface{}{
"lte": req.Max,
},
},
},
},
},
},
},
},
},
"highlight": map[string]interface{}{
"fields": map[string]interface{}{
"title": map[string]interface{}{},
"mediaName": map[string]interface{}{},
},
"pre_tags": []string{""},
"post_tags": []string{""},
},
}
return
default:
queryMap = map[string]interface{}{}
return
}
}
// /*
// *
//
// 搜索
//
// indexName 访问索引名
// query 搜索条件
// from 开始搜索位置
// size 搜索条数
// sort 排序
// */
func (es *ESClient) Count(params *ESQueryRequest) (response ESResponse, err error) {
queryMap := params.parseJsonQuery()
jsonQuery, _ := json.Marshal(queryMap)
logger.Info("查询语句: %s", string(jsonQuery))
request := esapi.CountRequest{
Index: []string{params.IndexName},
Body: strings.NewReader(string(jsonQuery)),
}
res, err := request.Do(context.Background(), esClient.esOp)
defer res.Body.Close()
if err != nil {
logger.Error("es查询失败: %s", err)
}
if res.IsError() {
var e map[string]interface{}
if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
logger.Error("解析es应答失败: %v", err)
} else {
logger.Error("es请求失败: %s: %v\n", res.Status(), e)
}
}
body, err := io.ReadAll(res.Body)
if err != nil {
logger.Error("获取es应答失败: %v", err)
}
return parseESResponse(body)
}
func (es *ESClient) Search(params *ESQueryRequest) (response ESResponse, err error) {
queryMap := params.parseJsonQuery()
jsonQuery, _ := json.Marshal(queryMap)
logger.Info("查询语句: %s", string(jsonQuery))
indexes := strings.Split(params.IndexName, ",")
request := esapi.SearchRequest{
Index: indexes,
Body: strings.NewReader(string(jsonQuery)),
From: ¶ms.From,
Size: ¶ms.Size,
Sort: params.Sorts,
}
res, err := request.Do(context.Background(), esClient.esOp)
defer res.Body.Close()
if err != nil {
logger.Error("es查询失败: %s", err)
}
if res.IsError() {
var e map[string]interface{}
if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
logger.Error("解析es应答失败: %v", err)
} else {
logger.Error("es请求失败: %s: %v\n", res.Status(), e)
}
}
body, err := io.ReadAll(res.Body)
if err != nil {
logger.Error("获取es应答失败: %v", err)
}
return parseESResponse(body)
}
func parseESResponse(body []byte) (ESResponse, error) {
var response ESResponse
if err := json.Unmarshal(body, &response); err != nil {
return ESResponse{}, err
}
for _, hit := range response.Hits.Hits {
var source map[string]interface{}
if err := json.Unmarshal(hit.Source, &source); err != nil {
return ESResponse{}, err
}
}
return response, nil
}
func (es *ESClient) GetSource(hits Hits) []Hit {
return hits.Hits
}
func (es *ESClient) GetCount(hits Hits) []Hit {
return hits.Hits
}
// /*
// *
// 添加es
// indexName 索引名
// id es的id
// body es的值
// */
func (es *ESClient) Update(indexName string, id int, doc interface{}) bool {
jsonUpdate := map[string]interface{}{
"doc": doc,
}
jsonDoc, _ := json.Marshal(jsonUpdate)
logger.Info("查询语句: %s", string(jsonDoc))
req := esapi.UpdateRequest{
Index: indexName,
DocumentID: strconv.Itoa(id),
Body: strings.NewReader(string(jsonDoc)),
Refresh: "true",
}
res, err := req.Do(context.Background(), es.es())
defer res.Body.Close()
if err != nil {
logger.Error("es查询失败: %s", err)
}
if res.IsError() {
var e map[string]interface{}
if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
logger.Error("解析es应答失败: %v", err)
} else {
logger.Error("es请求失败: %s: %v\n", res.Status(), e)
}
}
body, err := io.ReadAll(res.Body)
if err != nil {
logger.Error("获取es应答失败: %v", err)
}
fmt.Printf("%s\n", string(body))
return true
}
func (es *ESClient) InsertOrUpdate(indexName string, id int, doc interface{}) (success bool) {
if exist, existErr := es.Exist(indexName, id); existErr == nil && exist {
return es.Update(indexName, id, doc)
} else {
return es.CreateDocument(indexName, id, doc)
}
}
// Delete *
// 删除
// indexName 索引名
// id es的id
// */
func (es *ESClient) Delete(indexName string, query interface{}) bool {
jsonQuery, _ := json.Marshal(query)
refresh := new(bool)
*refresh = true
logger.Info("查询语句: %s", string(jsonQuery))
req := esapi.DeleteByQueryRequest{
Index: []string{indexName},
Body: strings.NewReader(string(jsonQuery)),
Refresh: refresh,
}
res, err := req.Do(context.Background(), es.es())
defer res.Body.Close()
if err != nil {
logger.Error("es查询失败: %s", err)
}
if res.IsError() {
var e map[string]interface{}
if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
logger.Error("解析es应答失败: %v", err)
} else {
logger.Error("es请求失败: %s: %v\n", res.Status(), e)
}
}
body, err := io.ReadAll(res.Body)
if err != nil {
logger.Error("获取es应答失败: %v", err)
}
fmt.Printf("%s\n", string(body))
return true
}
func (es *ESClient) Exist(indexName string, docId int) (exist bool, err error) {
getRequest := esapi.GetRequest{
Index: indexName,
DocumentID: strconv.Itoa(docId),
}
// 执行请求
res, err := getRequest.Do(context.Background(), es.es())
if err != nil {
logger.Error("es获取文档是否存在失败: %v", err)
return
}
defer res.Body.Close()
// 检查文档是否存在
if res.IsError() {
// 如果文档不存在,通常返回 404 Not Found
if res.StatusCode == 404 {
logger.Info("文档不存在.")
err = errors.New("ES文档不存在")
return
} else {
// 其他错误
var e map[string]interface{}
if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
logger.Error("解析es应答失败: %v", err)
return
} else {
// Print the response status and error information.
logger.Error("[%s] %s: %s\n", res.Status(), e["error"].(map[string]interface{})["type"], e["error"].(map[string]interface{})["原因"])
err = errors.New("获取ES记录失败")
return
}
}
} else {
// 如果文档存在
logger.Info("doc存在")
return true, nil
}
}
func (es *ESClient) Get(indexName string, docId int) (doc Doc, err error) {
getRequest := esapi.GetRequest{
Index: indexName,
DocumentID: strconv.Itoa(docId),
}
// 执行请求
res, err := getRequest.Do(context.Background(), es.es())
if err != nil {
logger.Error("es获取文档是否存在失败: %v", err)
return
}
defer res.Body.Close()
// 检查文档是否存在
if res.IsError() {
// 如果文档不存在,通常返回 404 Not Found
if res.StatusCode == 404 {
logger.Info("文档不存在.")
err = errors.New("ES文档不存在")
return
} else {
// 其他错误
var e map[string]interface{}
if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
logger.Error("解析es应答失败: %v", err)
return
} else {
// Print the response status and error information.
logger.Error("[%s] %s: %s\n", res.Status(), e["error"].(map[string]interface{})["type"], e["error"].(map[string]interface{})["原因"])
err = errors.New("获取ES记录失败")
return
}
}
} else {
// 如果文档存在
body, readErr := io.ReadAll(res.Body)
if readErr != nil {
logger.Error("获取es应答失败: %v", err)
err = readErr
return
}
err = json.Unmarshal(body, &doc)
if err != nil {
logger.Error("反序列化es应答失败: %v", err)
return
}
return
}
}
//
//func CreateIndex(indexName string) error {
// resp, err := esClient.es().Indices.
// Create(indexName).
// Do(context.Background())
// if err != nil {
// logger.Error("创建ES索引失败:%v", err)
// return err
// }
// fmt.Printf("index:%#v\n", resp.Index)
// return nil
//}
// DeleteIndex 删除索引
//
// func DeleteIndex(indexName string) error {
// _, err := esClient.es().Indices. // 表明是对索引的操作,而Index则表示是要操作具体索引下的文档
// Delete(indexName).
// Do(context.Background())
// if err != nil {
// fmt.Printf("delete index failed,err:%v\n", err)
// return err
// }
// fmt.Printf("delete index successed,indexName:%s", indexName)
// return nil
// }
//
// CreateDocument 创建文档
func (es *ESClient) CreateDocument(indexName string, id int, doc interface{}) (success bool) {
jsonDoc, _ := json.Marshal(doc)
logger.Info("查询语句: %s", string(jsonDoc))
// 添加文档
indexRequest := esapi.IndexRequest{
Index: indexName,
DocumentID: strconv.Itoa(id),
Body: strings.NewReader(string(jsonDoc)),
Refresh: "true",
}
// 执行请求
res, err := indexRequest.Do(context.Background(), es.es())
if err != nil {
logger.Error("ES创建文档失败: %s", err)
return false
}
defer res.Body.Close()
// 检查文档是否成功创建
if res.IsError() {
var e map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
logger.Error("解析ES应答失败: %s", err)
} else {
// Print the response status and error information.
logger.Error("[%s] %s: %s\n", res.Status(), e["error"].(map[string]interface{})["类型"], e["错误"].(map[string]interface{})["原因"])
}
return false
} else {
// 如果文档成功创建
logger.Info("创建文档成功")
return true
}
}