|
@@ -0,0 +1,229 @@
|
|
|
+package es
|
|
|
+
|
|
|
+import (
|
|
|
+ "bytes"
|
|
|
+ "context"
|
|
|
+ "encoding/json"
|
|
|
+ "eta_mini_ht_api/common/component/config"
|
|
|
+ logger "eta_mini_ht_api/common/component/log"
|
|
|
+ "eta_mini_ht_api/common/contants"
|
|
|
+ "github.com/elastic/go-elasticsearch/v7"
|
|
|
+ "github.com/elastic/go-elasticsearch/v7/esapi"
|
|
|
+ "sync"
|
|
|
+)
|
|
|
+
|
|
|
+type ESBase interface {
|
|
|
+ GetId() string
|
|
|
+}
|
|
|
+
|
|
|
+var (
|
|
|
+ esOnce sync.Once
|
|
|
+ esClient *ESClient
|
|
|
+)
|
|
|
+
|
|
|
+type ESClient struct {
|
|
|
+ esOp *elasticsearch.Client
|
|
|
+}
|
|
|
+
|
|
|
+func GetInstance() *ESClient {
|
|
|
+ esOnce.Do(func() {
|
|
|
+ // 检查是否成功获取到RedisConfig实例,没有配置则不进行redis初始化
|
|
|
+ if esConf, ok := config.GetConfig(contants.ES).(*config.ESConfig); ok {
|
|
|
+ logger.Info("初始化es")
|
|
|
+ // 这里可以添加初始化Redis的逻辑
|
|
|
+ esClient = newEs(esConf)
|
|
|
+ }
|
|
|
+ })
|
|
|
+ return esClient
|
|
|
+}
|
|
|
+func (es *ESClient) es() *elasticsearch.Client {
|
|
|
+ return es.esOp
|
|
|
+}
|
|
|
+func newEs(config *config.ESConfig) *ESClient {
|
|
|
+ elasticsearch.NewDefaultClient()
|
|
|
+ client, err := elasticsearch.NewClient(
|
|
|
+ elasticsearch.Config{
|
|
|
+ Addresses: []string{config.GetUrl()},
|
|
|
+ // A list of Elasticsearch nodes to use.
|
|
|
+ Username: config.GetUserName(),
|
|
|
+ Password: config.GetPassword(), // Password for HTTP Basic Authentication.
|
|
|
+ },
|
|
|
+ )
|
|
|
+ if err != nil {
|
|
|
+ logger.Error("连接ES失败:%v", err)
|
|
|
+ panic("启动es失败")
|
|
|
+ }
|
|
|
+ return &ESClient{esOp: client}
|
|
|
+}
|
|
|
+
|
|
|
+func init() {
|
|
|
+ if GetInstance() == nil {
|
|
|
+ panic("初始化es失败")
|
|
|
+ }
|
|
|
+ logger.Info("es初始化成功")
|
|
|
+}
|
|
|
+
|
|
|
+//
|
|
|
+//func CreateIndex(indexName string) error {
|
|
|
+// resp, err := esClient.es().Indices.
|
|
|
+// Create(indexName).
|
|
|
+// Do(context.Background())
|
|
|
+// if err != nil {
|
|
|
+// logger.Error("创建ES索引失败:%v", err)
|
|
|
+// return err
|
|
|
+// }
|
|
|
+// fmt.Printf("index:%#v\n", resp.Index)
|
|
|
+// return nil
|
|
|
+//}
|
|
|
+
|
|
|
+// DeleteIndex 删除索引
|
|
|
+//func DeleteIndex(indexName string) error {
|
|
|
+// _, err := esClient.es().Indices. // 表明是对索引的操作,而Index则表示是要操作具体索引下的文档
|
|
|
+// Delete(indexName).
|
|
|
+// Do(context.Background())
|
|
|
+// if err != nil {
|
|
|
+// fmt.Printf("delete index failed,err:%v\n", err)
|
|
|
+// return err
|
|
|
+// }
|
|
|
+// fmt.Printf("delete index successed,indexName:%s", indexName)
|
|
|
+// return nil
|
|
|
+//}
|
|
|
+//
|
|
|
+//// CreateDocument 创建文档
|
|
|
+//func CreateDocument(indexName string, id string, doc interface{}) {
|
|
|
+// // 添加文档
|
|
|
+// resp, err := esClient.esOp.Index(indexName).Id(id).Document(doc).Do(context.Background())
|
|
|
+// if err != nil {
|
|
|
+// logger.Error("indexing document failed, err:%v\n", err)
|
|
|
+// return
|
|
|
+// }
|
|
|
+// logger.Info("result:%#v\n", resp.Result)
|
|
|
+// return
|
|
|
+//}
|
|
|
+
|
|
|
+// BulkInsert 批量创建文档
|
|
|
+func (es *ESClient) BulkInsert(indexName string, docs []ESBase) (err error) {
|
|
|
+ // 创建批量请求
|
|
|
+ bulkBody := new(bytes.Buffer)
|
|
|
+ for _, doc := range docs {
|
|
|
+ enc := json.NewEncoder(bulkBody)
|
|
|
+ if err = enc.Encode(map[string]interface{}{
|
|
|
+ "index": map[string]interface{}{
|
|
|
+ "_index": indexName,
|
|
|
+ "_id": doc.GetId(),
|
|
|
+ },
|
|
|
+ }); err != nil {
|
|
|
+ logger.Error("生成es批处理请求参数失败: %s", err)
|
|
|
+ }
|
|
|
+ if err = enc.Encode(doc); err != nil {
|
|
|
+ logger.Error("生成es批处理文档失败: %s", err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ bulkReq := esapi.BulkRequest{
|
|
|
+ Body: bytes.NewReader(bulkBody.Bytes()),
|
|
|
+ Refresh: "true",
|
|
|
+ }
|
|
|
+
|
|
|
+ res, err := bulkReq.Do(context.Background(), es.esOp)
|
|
|
+ if err != nil {
|
|
|
+ logger.Error("es批处理创建失败: %s", err)
|
|
|
+ }
|
|
|
+ defer res.Body.Close()
|
|
|
+ if res.IsError() {
|
|
|
+ var e map[string]interface{}
|
|
|
+ if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
|
|
|
+ logger.Error("解析es应答失败: %v", err)
|
|
|
+ } else {
|
|
|
+ // Print the response status and error information.
|
|
|
+ logger.Error("es请求失败: %s: %v\n", res.Status(), err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+///*
|
|
|
+//*
|
|
|
+//
|
|
|
+// 搜索
|
|
|
+//
|
|
|
+//indexName 访问索引名
|
|
|
+//query 搜索条件
|
|
|
+//from 开始搜索位置
|
|
|
+//size 搜索条数
|
|
|
+//sort 排序
|
|
|
+//*/
|
|
|
+//func EsSearch(indexName string, query map[string]interface{}, from int, size int, sort []map[string]string) HitsData {
|
|
|
+// searchQuery := map[string]interface{}{
|
|
|
+// "query": query,
|
|
|
+// "from": from,
|
|
|
+// "size": size,
|
|
|
+// "sort": sort,
|
|
|
+// }
|
|
|
+// esClient.esOp.Create()
|
|
|
+// req := httplib.Post(esUrl + indexName + "/_search")
|
|
|
+// req.JSONBody(searchQuery)
|
|
|
+// str, err := req.String()
|
|
|
+// if err != nil {
|
|
|
+// fmt.Println("elasticsearch is error ", err)
|
|
|
+// }
|
|
|
+// fmt.Println(str)
|
|
|
+// var stb ReqSearchData
|
|
|
+// err = json.Unmarshal([]byte(str), &stb)
|
|
|
+// return stb.Hits
|
|
|
+//}
|
|
|
+//
|
|
|
+///*
|
|
|
+//*
|
|
|
+//添加es
|
|
|
+//indexName 索引名
|
|
|
+//id es的id
|
|
|
+//body es的值
|
|
|
+//*/
|
|
|
+//func EsAdd(indexName string, id string, body map[string]interface{}) bool {
|
|
|
+// req := httplib.Post(esUrl + indexName + "/_doc/" + id)
|
|
|
+// req.JSONBody(body)
|
|
|
+// _, err := req.String()
|
|
|
+// if err != nil {
|
|
|
+// fmt.Println("elasticsearch is error ", err)
|
|
|
+// return false
|
|
|
+// }
|
|
|
+// return true
|
|
|
+//}
|
|
|
+//
|
|
|
+///*
|
|
|
+//*
|
|
|
+//修改es
|
|
|
+//indexName 索引名
|
|
|
+//id es的id
|
|
|
+//body es的值
|
|
|
+//*/
|
|
|
+//func EsUpdate(indexName string, id string, body map[string]interface{}) bool {
|
|
|
+// bodyData := map[string]interface{}{
|
|
|
+// "doc": body,
|
|
|
+// }
|
|
|
+// req := httplib.Post(esUrl + indexName + "/_doc/" + id + "/_update")
|
|
|
+// req.JSONBody(bodyData)
|
|
|
+// _, err := req.String()
|
|
|
+// if err != nil {
|
|
|
+// fmt.Println("elasticsearch is error ", err)
|
|
|
+// return false
|
|
|
+// }
|
|
|
+// return true
|
|
|
+//}
|
|
|
+//
|
|
|
+///*
|
|
|
+//*
|
|
|
+//删除
|
|
|
+//indexName 索引名
|
|
|
+//id es的id
|
|
|
+//*/
|
|
|
+//func EsDelete(indexName string, id string) bool {
|
|
|
+// req := httplib.Delete(esUrl + indexName + "/_doc/" + id)
|
|
|
+// _, err := req.String()
|
|
|
+// if err != nil {
|
|
|
+// fmt.Println("elasticsearch is error ", err)
|
|
|
+// return false
|
|
|
+// }
|
|
|
+// return true
|
|
|
+//
|
|
|
+//}
|