es.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. package es
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "eta_mini_ht_api/common/component/config"
  7. logger "eta_mini_ht_api/common/component/log"
  8. "eta_mini_ht_api/common/contants"
  9. "github.com/elastic/go-elasticsearch/v7"
  10. "github.com/elastic/go-elasticsearch/v7/esapi"
  11. "sync"
  12. )
  13. type ESBase interface {
  14. GetId() string
  15. }
  16. var (
  17. esOnce sync.Once
  18. esClient *ESClient
  19. )
  20. type ESClient struct {
  21. esOp *elasticsearch.Client
  22. }
  23. func GetInstance() *ESClient {
  24. esOnce.Do(func() {
  25. // 检查是否成功获取到RedisConfig实例,没有配置则不进行redis初始化
  26. if esConf, ok := config.GetConfig(contants.ES).(*config.ESConfig); ok {
  27. logger.Info("初始化es")
  28. // 这里可以添加初始化Redis的逻辑
  29. esClient = newEs(esConf)
  30. }
  31. })
  32. return esClient
  33. }
  34. func (es *ESClient) es() *elasticsearch.Client {
  35. return es.esOp
  36. }
  37. func newEs(config *config.ESConfig) *ESClient {
  38. elasticsearch.NewDefaultClient()
  39. client, err := elasticsearch.NewClient(
  40. elasticsearch.Config{
  41. Addresses: []string{config.GetUrl()},
  42. // A list of Elasticsearch nodes to use.
  43. Username: config.GetUserName(),
  44. Password: config.GetPassword(), // Password for HTTP Basic Authentication.
  45. },
  46. )
  47. if err != nil {
  48. logger.Error("连接ES失败:%v", err)
  49. panic("启动es失败")
  50. }
  51. return &ESClient{esOp: client}
  52. }
  53. func init() {
  54. if GetInstance() == nil {
  55. panic("初始化es失败")
  56. }
  57. logger.Info("es初始化成功")
  58. }
  59. //
  60. //func CreateIndex(indexName string) error {
  61. // resp, err := esClient.es().Indices.
  62. // Create(indexName).
  63. // Do(context.Background())
  64. // if err != nil {
  65. // logger.Error("创建ES索引失败:%v", err)
  66. // return err
  67. // }
  68. // fmt.Printf("index:%#v\n", resp.Index)
  69. // return nil
  70. //}
  71. // DeleteIndex 删除索引
  72. //func DeleteIndex(indexName string) error {
  73. // _, err := esClient.es().Indices. // 表明是对索引的操作,而Index则表示是要操作具体索引下的文档
  74. // Delete(indexName).
  75. // Do(context.Background())
  76. // if err != nil {
  77. // fmt.Printf("delete index failed,err:%v\n", err)
  78. // return err
  79. // }
  80. // fmt.Printf("delete index successed,indexName:%s", indexName)
  81. // return nil
  82. //}
  83. //
  84. //// CreateDocument 创建文档
  85. //func CreateDocument(indexName string, id string, doc interface{}) {
  86. // // 添加文档
  87. // resp, err := esClient.esOp.Index(indexName).Id(id).Document(doc).Do(context.Background())
  88. // if err != nil {
  89. // logger.Error("indexing document failed, err:%v\n", err)
  90. // return
  91. // }
  92. // logger.Info("result:%#v\n", resp.Result)
  93. // return
  94. //}
  95. // BulkInsert 批量创建文档
  96. func (es *ESClient) BulkInsert(indexName string, docs []ESBase) (err error) {
  97. // 创建批量请求
  98. bulkBody := new(bytes.Buffer)
  99. for _, doc := range docs {
  100. enc := json.NewEncoder(bulkBody)
  101. if err = enc.Encode(map[string]interface{}{
  102. "index": map[string]interface{}{
  103. "_index": indexName,
  104. "_id": doc.GetId(),
  105. },
  106. }); err != nil {
  107. logger.Error("生成es批处理请求参数失败: %s", err)
  108. }
  109. if err = enc.Encode(doc); err != nil {
  110. logger.Error("生成es批处理文档失败: %s", err)
  111. }
  112. }
  113. bulkReq := esapi.BulkRequest{
  114. Body: bytes.NewReader(bulkBody.Bytes()),
  115. Refresh: "true",
  116. }
  117. res, err := bulkReq.Do(context.Background(), es.esOp)
  118. if err != nil {
  119. logger.Error("es批处理创建失败: %s", err)
  120. }
  121. defer res.Body.Close()
  122. if res.IsError() {
  123. var e map[string]interface{}
  124. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  125. logger.Error("解析es应答失败: %v", err)
  126. } else {
  127. // Print the response status and error information.
  128. logger.Error("es请求失败: %s: %v\n", res.Status(), err)
  129. }
  130. }
  131. return
  132. }
  133. ///*
  134. //*
  135. //
  136. // 搜索
  137. //
  138. //indexName 访问索引名
  139. //query 搜索条件
  140. //from 开始搜索位置
  141. //size 搜索条数
  142. //sort 排序
  143. //*/
  144. //func EsSearch(indexName string, query map[string]interface{}, from int, size int, sort []map[string]string) HitsData {
  145. // searchQuery := map[string]interface{}{
  146. // "query": query,
  147. // "from": from,
  148. // "size": size,
  149. // "sort": sort,
  150. // }
  151. // esClient.esOp.Create()
  152. // req := httplib.Post(esUrl + indexName + "/_search")
  153. // req.JSONBody(searchQuery)
  154. // str, err := req.String()
  155. // if err != nil {
  156. // fmt.Println("elasticsearch is error ", err)
  157. // }
  158. // fmt.Println(str)
  159. // var stb ReqSearchData
  160. // err = json.Unmarshal([]byte(str), &stb)
  161. // return stb.Hits
  162. //}
  163. //
  164. ///*
  165. //*
  166. //添加es
  167. //indexName 索引名
  168. //id es的id
  169. //body es的值
  170. //*/
  171. //func EsAdd(indexName string, id string, body map[string]interface{}) bool {
  172. // req := httplib.Post(esUrl + indexName + "/_doc/" + id)
  173. // req.JSONBody(body)
  174. // _, err := req.String()
  175. // if err != nil {
  176. // fmt.Println("elasticsearch is error ", err)
  177. // return false
  178. // }
  179. // return true
  180. //}
  181. //
  182. ///*
  183. //*
  184. //修改es
  185. //indexName 索引名
  186. //id es的id
  187. //body es的值
  188. //*/
  189. //func EsUpdate(indexName string, id string, body map[string]interface{}) bool {
  190. // bodyData := map[string]interface{}{
  191. // "doc": body,
  192. // }
  193. // req := httplib.Post(esUrl + indexName + "/_doc/" + id + "/_update")
  194. // req.JSONBody(bodyData)
  195. // _, err := req.String()
  196. // if err != nil {
  197. // fmt.Println("elasticsearch is error ", err)
  198. // return false
  199. // }
  200. // return true
  201. //}
  202. //
  203. ///*
  204. //*
  205. //删除
  206. //indexName 索引名
  207. //id es的id
  208. //*/
  209. //func EsDelete(indexName string, id string) bool {
  210. // req := httplib.Delete(esUrl + indexName + "/_doc/" + id)
  211. // _, err := req.String()
  212. // if err != nil {
  213. // fmt.Println("elasticsearch is error ", err)
  214. // return false
  215. // }
  216. // return true
  217. //
  218. //}