package es import ( "bytes" "context" "encoding/json" "eta_mini_ht_api/common/component/config" logger "eta_mini_ht_api/common/component/log" "eta_mini_ht_api/common/contants" "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esapi" "sync" ) type ESBase interface { GetId() string } var ( esOnce sync.Once esClient *ESClient ) type ESClient struct { esOp *elasticsearch.Client } func GetInstance() *ESClient { esOnce.Do(func() { // 检查是否成功获取到RedisConfig实例,没有配置则不进行redis初始化 if esConf, ok := config.GetConfig(contants.ES).(*config.ESConfig); ok { logger.Info("初始化es") // 这里可以添加初始化Redis的逻辑 esClient = newEs(esConf) } }) return esClient } func (es *ESClient) es() *elasticsearch.Client { return es.esOp } func newEs(config *config.ESConfig) *ESClient { elasticsearch.NewDefaultClient() client, err := elasticsearch.NewClient( elasticsearch.Config{ Addresses: []string{config.GetUrl()}, // A list of Elasticsearch nodes to use. Username: config.GetUserName(), Password: config.GetPassword(), // Password for HTTP Basic Authentication. }, ) if err != nil { logger.Error("连接ES失败:%v", err) panic("启动es失败") } return &ESClient{esOp: client} } func init() { if GetInstance() == nil { panic("初始化es失败") } logger.Info("es初始化成功") } // //func CreateIndex(indexName string) error { // resp, err := esClient.es().Indices. // Create(indexName). // Do(context.Background()) // if err != nil { // logger.Error("创建ES索引失败:%v", err) // return err // } // fmt.Printf("index:%#v\n", resp.Index) // return nil //} // DeleteIndex 删除索引 //func DeleteIndex(indexName string) error { // _, err := esClient.es().Indices. // 表明是对索引的操作,而Index则表示是要操作具体索引下的文档 // Delete(indexName). // Do(context.Background()) // if err != nil { // fmt.Printf("delete index failed,err:%v\n", err) // return err // } // fmt.Printf("delete index successed,indexName:%s", indexName) // return nil //} // //// CreateDocument 创建文档 //func CreateDocument(indexName string, id string, doc interface{}) { // // 添加文档 // resp, err := esClient.esOp.Index(indexName).Id(id).Document(doc).Do(context.Background()) // if err != nil { // logger.Error("indexing document failed, err:%v\n", err) // return // } // logger.Info("result:%#v\n", resp.Result) // return //} // BulkInsert 批量创建文档 func (es *ESClient) BulkInsert(indexName string, docs []ESBase) (err error) { // 创建批量请求 bulkBody := new(bytes.Buffer) for _, doc := range docs { enc := json.NewEncoder(bulkBody) if err = enc.Encode(map[string]interface{}{ "index": map[string]interface{}{ "_index": indexName, "_id": doc.GetId(), }, }); err != nil { logger.Error("生成es批处理请求参数失败: %s", err) } if err = enc.Encode(doc); err != nil { logger.Error("生成es批处理文档失败: %s", err) } } bulkReq := esapi.BulkRequest{ Body: bytes.NewReader(bulkBody.Bytes()), Refresh: "true", } res, err := bulkReq.Do(context.Background(), es.esOp) if err != nil { logger.Error("es批处理创建失败: %s", err) } defer res.Body.Close() if res.IsError() { var e map[string]interface{} if err = json.NewDecoder(res.Body).Decode(&e); err != nil { logger.Error("解析es应答失败: %v", err) } else { // Print the response status and error information. logger.Error("es请求失败: %s: %v\n", res.Status(), err) } } return } ///* //* // // 搜索 // //indexName 访问索引名 //query 搜索条件 //from 开始搜索位置 //size 搜索条数 //sort 排序 //*/ //func EsSearch(indexName string, query map[string]interface{}, from int, size int, sort []map[string]string) HitsData { // searchQuery := map[string]interface{}{ // "query": query, // "from": from, // "size": size, // "sort": sort, // } // esClient.esOp.Create() // req := httplib.Post(esUrl + indexName + "/_search") // req.JSONBody(searchQuery) // str, err := req.String() // if err != nil { // fmt.Println("elasticsearch is error ", err) // } // fmt.Println(str) // var stb ReqSearchData // err = json.Unmarshal([]byte(str), &stb) // return stb.Hits //} // ///* //* //添加es //indexName 索引名 //id es的id //body es的值 //*/ //func EsAdd(indexName string, id string, body map[string]interface{}) bool { // req := httplib.Post(esUrl + indexName + "/_doc/" + id) // req.JSONBody(body) // _, err := req.String() // if err != nil { // fmt.Println("elasticsearch is error ", err) // return false // } // return true //} // ///* //* //修改es //indexName 索引名 //id es的id //body es的值 //*/ //func EsUpdate(indexName string, id string, body map[string]interface{}) bool { // bodyData := map[string]interface{}{ // "doc": body, // } // req := httplib.Post(esUrl + indexName + "/_doc/" + id + "/_update") // req.JSONBody(bodyData) // _, err := req.String() // if err != nil { // fmt.Println("elasticsearch is error ", err) // return false // } // return true //} // ///* //* //删除 //indexName 索引名 //id es的id //*/ //func EsDelete(indexName string, id string) bool { // req := httplib.Delete(esUrl + indexName + "/_doc/" + id) // _, err := req.String() // if err != nil { // fmt.Println("elasticsearch is error ", err) // return false // } // return true // //}