es.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. package es
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "eta_mini_ht_api/common/component/config"
  7. logger "eta_mini_ht_api/common/component/log"
  8. "eta_mini_ht_api/common/contants"
  9. "github.com/elastic/go-elasticsearch/v7"
  10. "github.com/elastic/go-elasticsearch/v7/esapi"
  11. "io"
  12. "strings"
  13. "sync"
  14. )
  15. type ESBase interface {
  16. GetId() string
  17. }
  18. var (
  19. esOnce sync.Once
  20. esClient *ESClient
  21. )
  22. type ESClient struct {
  23. esOp *elasticsearch.Client
  24. }
  25. type SearchType string
  26. const (
  27. MatchAll = "match_all"
  28. Match = "match"
  29. Range = "range"
  30. )
  31. func GetInstance() *ESClient {
  32. esOnce.Do(func() {
  33. // 检查是否成功获取到RedisConfig实例,没有配置则不进行redis初始化
  34. if esConf, ok := config.GetConfig(contants.ES).(*config.ESConfig); ok {
  35. logger.Info("初始化es")
  36. // 这里可以添加初始化Redis的逻辑
  37. esClient = newEs(esConf)
  38. }
  39. })
  40. return esClient
  41. }
  42. func (es *ESClient) es() *elasticsearch.Client {
  43. return es.esOp
  44. }
  45. func newEs(config *config.ESConfig) *ESClient {
  46. elasticsearch.NewDefaultClient()
  47. client, err := elasticsearch.NewClient(
  48. elasticsearch.Config{
  49. Addresses: []string{config.GetUrl()},
  50. // A list of Elasticsearch nodes to use.
  51. Username: config.GetUserName(),
  52. Password: config.GetPassword(), // Password for HTTP Basic Authentication.
  53. },
  54. )
  55. if err != nil {
  56. logger.Error("连接ES失败:%v", err)
  57. panic("启动es失败")
  58. }
  59. return &ESClient{esOp: client}
  60. }
  61. func init() {
  62. if GetInstance() == nil {
  63. panic("初始化es失败")
  64. }
  65. logger.Info("es初始化成功")
  66. }
  67. // BulkInsert 批量创建文档
  68. func (es *ESClient) BulkInsert(indexName string, docs []ESBase) (err error) {
  69. // 创建批量请求
  70. bulkBody := new(bytes.Buffer)
  71. for _, doc := range docs {
  72. enc := json.NewEncoder(bulkBody)
  73. if err = enc.Encode(map[string]interface{}{
  74. "index": map[string]interface{}{
  75. "_index": indexName,
  76. "_id": doc.GetId(),
  77. },
  78. }); err != nil {
  79. logger.Error("生成es批处理请求参数失败: %s", err)
  80. }
  81. if err = enc.Encode(doc); err != nil {
  82. logger.Error("生成es批处理文档失败: %s", err)
  83. }
  84. }
  85. bulkReq := esapi.BulkRequest{
  86. Body: bytes.NewReader(bulkBody.Bytes()),
  87. Refresh: "true",
  88. }
  89. res, err := bulkReq.Do(context.Background(), es.esOp)
  90. if err != nil {
  91. logger.Error("es批处理创建失败: %s", err)
  92. }
  93. defer res.Body.Close()
  94. if res.IsError() {
  95. var e map[string]interface{}
  96. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  97. logger.Error("解析es应答失败: %v", err)
  98. } else {
  99. // Print the response status and error information.
  100. logger.Error("es请求失败: %s: %v\n", res.Status(), err)
  101. }
  102. }
  103. return
  104. }
  105. type ESResponse struct {
  106. Took int `json:"took"`
  107. TimedOut bool `json:"timed_out"`
  108. Hits Hits `json:"hits"`
  109. _Shards ShardsInfo `json:"_shards"`
  110. }
  111. type Hits struct {
  112. Total TotalHits `json:"total"`
  113. MaxScore float64 `json:"max_score"`
  114. Hits []Hit `json:"hits"`
  115. }
  116. type TotalHits struct {
  117. Value int `json:"value"`
  118. Relation string `json:"relation"`
  119. }
  120. type Hit struct {
  121. Index string `json:"_index"`
  122. Type string `json:"_type"`
  123. ID string `json:"_id"`
  124. Score float64 `json:"_score"`
  125. Source json.RawMessage `json:"_source"`
  126. Highlight json.RawMessage `json:"highlight"`
  127. }
  128. type ShardsInfo struct {
  129. Total int `json:"total"`
  130. Successful int `json:"successful"`
  131. Skipped int `json:"skipped"`
  132. Failed int `json:"failed"`
  133. }
  134. type ESQueryRequest struct {
  135. IndexName string
  136. From int
  137. Size int
  138. Key string
  139. Column string
  140. Sorts []string
  141. Type SearchType
  142. RangeColumn string
  143. Max interface{}
  144. Min interface{}
  145. }
  146. func (req *ESQueryRequest) CreateESQueryRequest(index string, column string, key string, from int, size int, sorts []string, searchType SearchType) *ESQueryRequest {
  147. return &ESQueryRequest{
  148. IndexName: index,
  149. Type: searchType,
  150. From: from,
  151. Size: size,
  152. Key: key,
  153. Column: column,
  154. Sorts: sorts,
  155. }
  156. }
  157. func (req *ESQueryRequest) Range(from int64, to int64, column string) *ESQueryRequest {
  158. req.RangeColumn = column
  159. req.Max = to
  160. req.Min = from
  161. return req
  162. }
  163. func (req *ESQueryRequest) parseJsonQuery() (queryMap map[string]interface{}) {
  164. switch req.Type {
  165. case MatchAll:
  166. queryMap = map[string]interface{}{
  167. "query": map[string]interface{}{
  168. "match_all": map[string]interface{}{},
  169. },
  170. }
  171. return
  172. case Match:
  173. queryMap = map[string]interface{}{
  174. "query": map[string]interface{}{
  175. "match": map[string]interface{}{
  176. req.Column: req.Key,
  177. },
  178. },
  179. "highlight": map[string]interface{}{
  180. "fields": map[string]interface{}{
  181. req.Column: map[string]interface{}{},
  182. },
  183. "pre_tags": []string{"<span style='color:red'>"},
  184. "post_tags": []string{"</span>"},
  185. },
  186. }
  187. return
  188. case Range:
  189. queryMap = map[string]interface{}{
  190. "query": map[string]interface{}{
  191. "match": map[string]interface{}{
  192. req.Column: req.Key,
  193. },
  194. },
  195. "highlight": map[string]interface{}{
  196. "fields": map[string]interface{}{
  197. req.Column: map[string]interface{}{},
  198. },
  199. "pre_tags": []string{"<span style='color:red'>"},
  200. "post_tags": []string{"</span>"},
  201. },
  202. "post_filter": map[string]interface{}{
  203. "range": map[string]interface{}{
  204. req.RangeColumn: map[string]interface{}{
  205. "gte": req.Min,
  206. "lte": req.Max,
  207. },
  208. },
  209. },
  210. }
  211. return
  212. default:
  213. queryMap = map[string]interface{}{}
  214. return
  215. }
  216. }
  217. // /*
  218. // *
  219. //
  220. // 搜索
  221. //
  222. // indexName 访问索引名
  223. // query 搜索条件
  224. // from 开始搜索位置
  225. // size 搜索条数
  226. // sort 排序
  227. // */
  228. func (es *ESClient) Search(params *ESQueryRequest) (response ESResponse, err error) {
  229. queryMap := params.parseJsonQuery()
  230. jsonQuery, _ := json.Marshal(queryMap)
  231. request := esapi.SearchRequest{
  232. Index: []string{params.IndexName},
  233. Body: strings.NewReader(string(jsonQuery)),
  234. From: &params.From,
  235. Size: &params.Size,
  236. Sort: params.Sorts,
  237. }
  238. res, err := request.Do(context.Background(), esClient.esOp)
  239. defer res.Body.Close()
  240. if err != nil {
  241. logger.Error("es查询失败: %s", err)
  242. }
  243. if res.IsError() {
  244. var e map[string]interface{}
  245. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  246. logger.Error("解析es应答失败: %v", err)
  247. } else {
  248. // Print the response status and error information.
  249. logger.Error("es请求失败: %s: %v\n", res.Status(), err)
  250. }
  251. }
  252. body, err := io.ReadAll(res.Body)
  253. if err != nil {
  254. logger.Error("获取es应答失败: %v", err)
  255. }
  256. return parseESResponse(body)
  257. }
  258. func parseESResponse(body []byte) (ESResponse, error) {
  259. var response ESResponse
  260. if err := json.Unmarshal(body, &response); err != nil {
  261. return ESResponse{}, err
  262. }
  263. for _, hit := range response.Hits.Hits {
  264. var source map[string]interface{}
  265. if err := json.Unmarshal(hit.Source, &source); err != nil {
  266. return ESResponse{}, err
  267. }
  268. }
  269. return response, nil
  270. }
  271. func (es *ESClient) GetSource(hits Hits) []Hit {
  272. return hits.Hits
  273. }
  274. //
  275. ///*
  276. //*
  277. //添加es
  278. //indexName 索引名
  279. //id es的id
  280. //body es的值
  281. //*/
  282. //func EsAdd(indexName string, id string, body map[string]interface{}) bool {
  283. // req := httplib.Post(esUrl + indexName + "/_doc/" + id)
  284. // req.JSONBody(body)
  285. // _, err := req.String()
  286. // if err != nil {
  287. // fmt.Println("elasticsearch is error ", err)
  288. // return false
  289. // }
  290. // return true
  291. //}
  292. //
  293. ///*
  294. //*
  295. //修改es
  296. //indexName 索引名
  297. //id es的id
  298. //body es的值
  299. //*/
  300. //func EsUpdate(indexName string, id string, body map[string]interface{}) bool {
  301. // bodyData := map[string]interface{}{
  302. // "doc": body,
  303. // }
  304. // req := httplib.Post(esUrl + indexName + "/_doc/" + id + "/_update")
  305. // req.JSONBody(bodyData)
  306. // _, err := req.String()
  307. // if err != nil {
  308. // fmt.Println("elasticsearch is error ", err)
  309. // return false
  310. // }
  311. // return true
  312. //}
  313. //
  314. ///*
  315. //*
  316. //删除
  317. //indexName 索引名
  318. //id es的id
  319. //*/
  320. //func EsDelete(indexName string, id string) bool {
  321. // req := httplib.Delete(esUrl + indexName + "/_doc/" + id)
  322. // _, err := req.String()
  323. // if err != nil {
  324. // fmt.Println("elasticsearch is error ", err)
  325. // return false
  326. // }
  327. // return true
  328. //
  329. //}
  330. //
  331. //func CreateIndex(indexName string) error {
  332. // resp, err := esClient.es().Indices.
  333. // Create(indexName).
  334. // Do(context.Background())
  335. // if err != nil {
  336. // logger.Error("创建ES索引失败:%v", err)
  337. // return err
  338. // }
  339. // fmt.Printf("index:%#v\n", resp.Index)
  340. // return nil
  341. //}
  342. // DeleteIndex 删除索引
  343. //func DeleteIndex(indexName string) error {
  344. // _, err := esClient.es().Indices. // 表明是对索引的操作,而Index则表示是要操作具体索引下的文档
  345. // Delete(indexName).
  346. // Do(context.Background())
  347. // if err != nil {
  348. // fmt.Printf("delete index failed,err:%v\n", err)
  349. // return err
  350. // }
  351. // fmt.Printf("delete index successed,indexName:%s", indexName)
  352. // return nil
  353. //}
  354. //
  355. //// CreateDocument 创建文档
  356. //func CreateDocument(indexName string, id string, doc interface{}) {
  357. // // 添加文档
  358. // resp, err := esClient.esOp.Index(indexName).Id(id).Document(doc).Do(context.Background())
  359. // if err != nil {
  360. // logger.Error("indexing document failed, err:%v\n", err)
  361. // return
  362. // }
  363. // logger.Info("result:%#v\n", resp.Result)
  364. // return
  365. //}