|
@@ -9,6 +9,8 @@ import (
|
|
|
"eta_mini_ht_api/common/contants"
|
|
|
"github.com/elastic/go-elasticsearch/v7"
|
|
|
"github.com/elastic/go-elasticsearch/v7/esapi"
|
|
|
+ "io"
|
|
|
+ "strings"
|
|
|
"sync"
|
|
|
)
|
|
|
|
|
@@ -25,6 +27,14 @@ type ESClient struct {
|
|
|
esOp *elasticsearch.Client
|
|
|
}
|
|
|
|
|
|
+type SearchType string
|
|
|
+
|
|
|
+const (
|
|
|
+ MatchAll = "match_all"
|
|
|
+ Match = "match"
|
|
|
+ Range = "range"
|
|
|
+)
|
|
|
+
|
|
|
func GetInstance() *ESClient {
|
|
|
esOnce.Do(func() {
|
|
|
// 检查是否成功获取到RedisConfig实例,没有配置则不进行redis初始化
|
|
@@ -63,44 +73,6 @@ func init() {
|
|
|
logger.Info("es初始化成功")
|
|
|
}
|
|
|
|
|
|
-//
|
|
|
-//func CreateIndex(indexName string) error {
|
|
|
-// resp, err := esClient.es().Indices.
|
|
|
-// Create(indexName).
|
|
|
-// Do(context.Background())
|
|
|
-// if err != nil {
|
|
|
-// logger.Error("创建ES索引失败:%v", err)
|
|
|
-// return err
|
|
|
-// }
|
|
|
-// fmt.Printf("index:%#v\n", resp.Index)
|
|
|
-// return nil
|
|
|
-//}
|
|
|
-
|
|
|
-// DeleteIndex 删除索引
|
|
|
-//func DeleteIndex(indexName string) error {
|
|
|
-// _, err := esClient.es().Indices. // 表明是对索引的操作,而Index则表示是要操作具体索引下的文档
|
|
|
-// Delete(indexName).
|
|
|
-// Do(context.Background())
|
|
|
-// if err != nil {
|
|
|
-// fmt.Printf("delete index failed,err:%v\n", err)
|
|
|
-// return err
|
|
|
-// }
|
|
|
-// fmt.Printf("delete index successed,indexName:%s", indexName)
|
|
|
-// return nil
|
|
|
-//}
|
|
|
-//
|
|
|
-//// CreateDocument 创建文档
|
|
|
-//func CreateDocument(indexName string, id string, doc interface{}) {
|
|
|
-// // 添加文档
|
|
|
-// resp, err := esClient.esOp.Index(indexName).Id(id).Document(doc).Do(context.Background())
|
|
|
-// if err != nil {
|
|
|
-// logger.Error("indexing document failed, err:%v\n", err)
|
|
|
-// return
|
|
|
-// }
|
|
|
-// logger.Info("result:%#v\n", resp.Result)
|
|
|
-// return
|
|
|
-//}
|
|
|
-
|
|
|
// BulkInsert 批量创建文档
|
|
|
func (es *ESClient) BulkInsert(indexName string, docs []ESBase) (err error) {
|
|
|
// 创建批量请求
|
|
@@ -141,36 +113,187 @@ func (es *ESClient) BulkInsert(indexName string, docs []ESBase) (err error) {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
-///*
|
|
|
-//*
|
|
|
+type ESResponse struct {
|
|
|
+ Took int `json:"took"`
|
|
|
+ TimedOut bool `json:"timed_out"`
|
|
|
+ Hits Hits `json:"hits"`
|
|
|
+ _Shards ShardsInfo `json:"_shards"`
|
|
|
+}
|
|
|
+
|
|
|
+type Hits struct {
|
|
|
+ Total TotalHits `json:"total"`
|
|
|
+ MaxScore float64 `json:"max_score"`
|
|
|
+ Hits []Hit `json:"hits"`
|
|
|
+}
|
|
|
+
|
|
|
+type TotalHits struct {
|
|
|
+ Value int `json:"value"`
|
|
|
+ Relation string `json:"relation"`
|
|
|
+}
|
|
|
+
|
|
|
+type Hit struct {
|
|
|
+ Index string `json:"_index"`
|
|
|
+ Type string `json:"_type"`
|
|
|
+ ID string `json:"_id"`
|
|
|
+ Score float64 `json:"_score"`
|
|
|
+ Source json.RawMessage `json:"_source"`
|
|
|
+ Highlight json.RawMessage `json:"highlight"`
|
|
|
+}
|
|
|
+
|
|
|
+type ShardsInfo struct {
|
|
|
+ Total int `json:"total"`
|
|
|
+ Successful int `json:"successful"`
|
|
|
+ Skipped int `json:"skipped"`
|
|
|
+ Failed int `json:"failed"`
|
|
|
+}
|
|
|
+type ESQueryRequest struct {
|
|
|
+ IndexName string
|
|
|
+ From int
|
|
|
+ Size int
|
|
|
+ Key string
|
|
|
+ Column string
|
|
|
+ Sorts []string
|
|
|
+ Type SearchType
|
|
|
+ RangeColumn string
|
|
|
+ Max interface{}
|
|
|
+ Min interface{}
|
|
|
+}
|
|
|
+
|
|
|
+func (req *ESQueryRequest) CreateESQueryRequest(index string, column string, key string, from int, size int, sorts []string, searchType SearchType) *ESQueryRequest {
|
|
|
+ return &ESQueryRequest{
|
|
|
+ IndexName: index,
|
|
|
+ Type: searchType,
|
|
|
+ From: from,
|
|
|
+ Size: size,
|
|
|
+ Key: key,
|
|
|
+ Column: column,
|
|
|
+ Sorts: sorts,
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (req *ESQueryRequest) Range(from int64, to int64, column string) *ESQueryRequest {
|
|
|
+ req.RangeColumn = column
|
|
|
+ req.Max = to
|
|
|
+ req.Min = from
|
|
|
+ return req
|
|
|
+}
|
|
|
+
|
|
|
+func (req *ESQueryRequest) parseJsonQuery() (queryMap map[string]interface{}) {
|
|
|
+ switch req.Type {
|
|
|
+ case MatchAll:
|
|
|
+ queryMap = map[string]interface{}{
|
|
|
+ "query": map[string]interface{}{
|
|
|
+ "match_all": map[string]interface{}{},
|
|
|
+ },
|
|
|
+ }
|
|
|
+ return
|
|
|
+ case Match:
|
|
|
+ queryMap = map[string]interface{}{
|
|
|
+ "query": map[string]interface{}{
|
|
|
+ "match": map[string]interface{}{
|
|
|
+ req.Column: req.Key,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ "highlight": map[string]interface{}{
|
|
|
+ "fields": map[string]interface{}{
|
|
|
+ req.Column: map[string]interface{}{},
|
|
|
+ },
|
|
|
+ "pre_tags": []string{"<span style='color:red'>"},
|
|
|
+ "post_tags": []string{"</span>"},
|
|
|
+ },
|
|
|
+ }
|
|
|
+ return
|
|
|
+ case Range:
|
|
|
+ queryMap = map[string]interface{}{
|
|
|
+ "query": map[string]interface{}{
|
|
|
+ "match": map[string]interface{}{
|
|
|
+ req.Column: req.Key,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ "highlight": map[string]interface{}{
|
|
|
+ "fields": map[string]interface{}{
|
|
|
+ req.Column: map[string]interface{}{},
|
|
|
+ },
|
|
|
+ "pre_tags": []string{"<span style='color:red'>"},
|
|
|
+ "post_tags": []string{"</span>"},
|
|
|
+ },
|
|
|
+ "post_filter": map[string]interface{}{
|
|
|
+ "range": map[string]interface{}{
|
|
|
+ req.RangeColumn: map[string]interface{}{
|
|
|
+ "gte": req.Min,
|
|
|
+ "lte": req.Max,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ },
|
|
|
+ }
|
|
|
+ return
|
|
|
+ default:
|
|
|
+ queryMap = map[string]interface{}{}
|
|
|
+ return
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// /*
|
|
|
+// *
|
|
|
//
|
|
|
// 搜索
|
|
|
//
|
|
|
-//indexName 访问索引名
|
|
|
-//query 搜索条件
|
|
|
-//from 开始搜索位置
|
|
|
-//size 搜索条数
|
|
|
-//sort 排序
|
|
|
-//*/
|
|
|
-//func EsSearch(indexName string, query map[string]interface{}, from int, size int, sort []map[string]string) HitsData {
|
|
|
-// searchQuery := map[string]interface{}{
|
|
|
-// "query": query,
|
|
|
-// "from": from,
|
|
|
-// "size": size,
|
|
|
-// "sort": sort,
|
|
|
-// }
|
|
|
-// esClient.esOp.Create()
|
|
|
-// req := httplib.Post(esUrl + indexName + "/_search")
|
|
|
-// req.JSONBody(searchQuery)
|
|
|
-// str, err := req.String()
|
|
|
-// if err != nil {
|
|
|
-// fmt.Println("elasticsearch is error ", err)
|
|
|
-// }
|
|
|
-// fmt.Println(str)
|
|
|
-// var stb ReqSearchData
|
|
|
-// err = json.Unmarshal([]byte(str), &stb)
|
|
|
-// return stb.Hits
|
|
|
-//}
|
|
|
+// indexName 访问索引名
|
|
|
+// query 搜索条件
|
|
|
+// from 开始搜索位置
|
|
|
+// size 搜索条数
|
|
|
+// sort 排序
|
|
|
+// */
|
|
|
+
|
|
|
+func (es *ESClient) Search(params *ESQueryRequest) (response ESResponse, err error) {
|
|
|
+ queryMap := params.parseJsonQuery()
|
|
|
+ jsonQuery, _ := json.Marshal(queryMap)
|
|
|
+ request := esapi.SearchRequest{
|
|
|
+ Index: []string{params.IndexName},
|
|
|
+ Body: strings.NewReader(string(jsonQuery)),
|
|
|
+ From: ¶ms.From,
|
|
|
+ Size: ¶ms.Size,
|
|
|
+ Sort: params.Sorts,
|
|
|
+ }
|
|
|
+ res, err := request.Do(context.Background(), esClient.esOp)
|
|
|
+
|
|
|
+ defer res.Body.Close()
|
|
|
+ if err != nil {
|
|
|
+ logger.Error("es查询失败: %s", err)
|
|
|
+ }
|
|
|
+ if res.IsError() {
|
|
|
+ var e map[string]interface{}
|
|
|
+ if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
|
|
|
+ logger.Error("解析es应答失败: %v", err)
|
|
|
+ } else {
|
|
|
+ // Print the response status and error information.
|
|
|
+ logger.Error("es请求失败: %s: %v\n", res.Status(), err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ body, err := io.ReadAll(res.Body)
|
|
|
+ if err != nil {
|
|
|
+ logger.Error("获取es应答失败: %v", err)
|
|
|
+ }
|
|
|
+ return parseESResponse(body)
|
|
|
+}
|
|
|
+func parseESResponse(body []byte) (ESResponse, error) {
|
|
|
+ var response ESResponse
|
|
|
+ if err := json.Unmarshal(body, &response); err != nil {
|
|
|
+ return ESResponse{}, err
|
|
|
+ }
|
|
|
+ for _, hit := range response.Hits.Hits {
|
|
|
+ var source map[string]interface{}
|
|
|
+ if err := json.Unmarshal(hit.Source, &source); err != nil {
|
|
|
+ return ESResponse{}, err
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return response, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (es *ESClient) GetSource(hits Hits) []Hit {
|
|
|
+ return hits.Hits
|
|
|
+}
|
|
|
+
|
|
|
//
|
|
|
///*
|
|
|
//*
|
|
@@ -227,3 +350,40 @@ func (es *ESClient) BulkInsert(indexName string, docs []ESBase) (err error) {
|
|
|
// return true
|
|
|
//
|
|
|
//}
|
|
|
+//
|
|
|
+//func CreateIndex(indexName string) error {
|
|
|
+// resp, err := esClient.es().Indices.
|
|
|
+// Create(indexName).
|
|
|
+// Do(context.Background())
|
|
|
+// if err != nil {
|
|
|
+// logger.Error("创建ES索引失败:%v", err)
|
|
|
+// return err
|
|
|
+// }
|
|
|
+// fmt.Printf("index:%#v\n", resp.Index)
|
|
|
+// return nil
|
|
|
+//}
|
|
|
+
|
|
|
+// DeleteIndex 删除索引
|
|
|
+//func DeleteIndex(indexName string) error {
|
|
|
+// _, err := esClient.es().Indices. // 表明是对索引的操作,而Index则表示是要操作具体索引下的文档
|
|
|
+// Delete(indexName).
|
|
|
+// Do(context.Background())
|
|
|
+// if err != nil {
|
|
|
+// fmt.Printf("delete index failed,err:%v\n", err)
|
|
|
+// return err
|
|
|
+// }
|
|
|
+// fmt.Printf("delete index successed,indexName:%s", indexName)
|
|
|
+// return nil
|
|
|
+//}
|
|
|
+//
|
|
|
+//// CreateDocument 创建文档
|
|
|
+//func CreateDocument(indexName string, id string, doc interface{}) {
|
|
|
+// // 添加文档
|
|
|
+// resp, err := esClient.esOp.Index(indexName).Id(id).Document(doc).Do(context.Background())
|
|
|
+// if err != nil {
|
|
|
+// logger.Error("indexing document failed, err:%v\n", err)
|
|
|
+// return
|
|
|
+// }
|
|
|
+// logger.Info("result:%#v\n", resp.Result)
|
|
|
+// return
|
|
|
+//}
|