123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229 |
- package es
- import (
- "bytes"
- "context"
- "encoding/json"
- "eta_mini_ht_api/common/component/config"
- logger "eta_mini_ht_api/common/component/log"
- "eta_mini_ht_api/common/contants"
- "github.com/elastic/go-elasticsearch/v7"
- "github.com/elastic/go-elasticsearch/v7/esapi"
- "sync"
- )
- type ESBase interface {
- GetId() string
- }
- var (
- esOnce sync.Once
- esClient *ESClient
- )
- type ESClient struct {
- esOp *elasticsearch.Client
- }
- func GetInstance() *ESClient {
- esOnce.Do(func() {
- // 检查是否成功获取到RedisConfig实例,没有配置则不进行redis初始化
- if esConf, ok := config.GetConfig(contants.ES).(*config.ESConfig); ok {
- logger.Info("初始化es")
- // 这里可以添加初始化Redis的逻辑
- esClient = newEs(esConf)
- }
- })
- return esClient
- }
- func (es *ESClient) es() *elasticsearch.Client {
- return es.esOp
- }
- func newEs(config *config.ESConfig) *ESClient {
- elasticsearch.NewDefaultClient()
- client, err := elasticsearch.NewClient(
- elasticsearch.Config{
- Addresses: []string{config.GetUrl()},
- // A list of Elasticsearch nodes to use.
- Username: config.GetUserName(),
- Password: config.GetPassword(), // Password for HTTP Basic Authentication.
- },
- )
- if err != nil {
- logger.Error("连接ES失败:%v", err)
- panic("启动es失败")
- }
- return &ESClient{esOp: client}
- }
- func init() {
- if GetInstance() == nil {
- panic("初始化es失败")
- }
- logger.Info("es初始化成功")
- }
- //
- //func CreateIndex(indexName string) error {
- // resp, err := esClient.es().Indices.
- // Create(indexName).
- // Do(context.Background())
- // if err != nil {
- // logger.Error("创建ES索引失败:%v", err)
- // return err
- // }
- // fmt.Printf("index:%#v\n", resp.Index)
- // return nil
- //}
- // DeleteIndex 删除索引
- //func DeleteIndex(indexName string) error {
- // _, err := esClient.es().Indices. // 表明是对索引的操作,而Index则表示是要操作具体索引下的文档
- // Delete(indexName).
- // Do(context.Background())
- // if err != nil {
- // fmt.Printf("delete index failed,err:%v\n", err)
- // return err
- // }
- // fmt.Printf("delete index successed,indexName:%s", indexName)
- // return nil
- //}
- //
- //// CreateDocument 创建文档
- //func CreateDocument(indexName string, id string, doc interface{}) {
- // // 添加文档
- // resp, err := esClient.esOp.Index(indexName).Id(id).Document(doc).Do(context.Background())
- // if err != nil {
- // logger.Error("indexing document failed, err:%v\n", err)
- // return
- // }
- // logger.Info("result:%#v\n", resp.Result)
- // return
- //}
- // BulkInsert 批量创建文档
- func (es *ESClient) BulkInsert(indexName string, docs []ESBase) (err error) {
- // 创建批量请求
- bulkBody := new(bytes.Buffer)
- for _, doc := range docs {
- enc := json.NewEncoder(bulkBody)
- if err = enc.Encode(map[string]interface{}{
- "index": map[string]interface{}{
- "_index": indexName,
- "_id": doc.GetId(),
- },
- }); err != nil {
- logger.Error("生成es批处理请求参数失败: %s", err)
- }
- if err = enc.Encode(doc); err != nil {
- logger.Error("生成es批处理文档失败: %s", err)
- }
- }
- bulkReq := esapi.BulkRequest{
- Body: bytes.NewReader(bulkBody.Bytes()),
- Refresh: "true",
- }
- res, err := bulkReq.Do(context.Background(), es.esOp)
- if err != nil {
- logger.Error("es批处理创建失败: %s", err)
- }
- defer res.Body.Close()
- if res.IsError() {
- var e map[string]interface{}
- if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
- logger.Error("解析es应答失败: %v", err)
- } else {
- // Print the response status and error information.
- logger.Error("es请求失败: %s: %v\n", res.Status(), err)
- }
- }
- return
- }
- ///*
- //*
- //
- // 搜索
- //
- //indexName 访问索引名
- //query 搜索条件
- //from 开始搜索位置
- //size 搜索条数
- //sort 排序
- //*/
- //func EsSearch(indexName string, query map[string]interface{}, from int, size int, sort []map[string]string) HitsData {
- // searchQuery := map[string]interface{}{
- // "query": query,
- // "from": from,
- // "size": size,
- // "sort": sort,
- // }
- // esClient.esOp.Create()
- // req := httplib.Post(esUrl + indexName + "/_search")
- // req.JSONBody(searchQuery)
- // str, err := req.String()
- // if err != nil {
- // fmt.Println("elasticsearch is error ", err)
- // }
- // fmt.Println(str)
- // var stb ReqSearchData
- // err = json.Unmarshal([]byte(str), &stb)
- // return stb.Hits
- //}
- //
- ///*
- //*
- //添加es
- //indexName 索引名
- //id es的id
- //body es的值
- //*/
- //func EsAdd(indexName string, id string, body map[string]interface{}) bool {
- // req := httplib.Post(esUrl + indexName + "/_doc/" + id)
- // req.JSONBody(body)
- // _, err := req.String()
- // if err != nil {
- // fmt.Println("elasticsearch is error ", err)
- // return false
- // }
- // return true
- //}
- //
- ///*
- //*
- //修改es
- //indexName 索引名
- //id es的id
- //body es的值
- //*/
- //func EsUpdate(indexName string, id string, body map[string]interface{}) bool {
- // bodyData := map[string]interface{}{
- // "doc": body,
- // }
- // req := httplib.Post(esUrl + indexName + "/_doc/" + id + "/_update")
- // req.JSONBody(bodyData)
- // _, err := req.String()
- // if err != nil {
- // fmt.Println("elasticsearch is error ", err)
- // return false
- // }
- // return true
- //}
- //
- ///*
- //*
- //删除
- //indexName 索引名
- //id es的id
- //*/
- //func EsDelete(indexName string, id string) bool {
- // req := httplib.Delete(esUrl + indexName + "/_doc/" + id)
- // _, err := req.String()
- // if err != nil {
- // fmt.Println("elasticsearch is error ", err)
- // return false
- // }
- // return true
- //
- //}
|