es.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651
  1. package es
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "eta/eta_mini_ht_api/common/component/config"
  7. logger "eta/eta_mini_ht_api/common/component/log"
  8. "eta/eta_mini_ht_api/common/contants"
  9. "fmt"
  10. "github.com/elastic/go-elasticsearch/v7"
  11. "github.com/elastic/go-elasticsearch/v7/esapi"
  12. "io"
  13. "strconv"
  14. "strings"
  15. "sync"
  16. )
  17. type ESBase interface {
  18. GetId() string
  19. }
  20. var (
  21. esOnce sync.Once
  22. esClient *ESClient
  23. )
  24. type ESClient struct {
  25. esOp *elasticsearch.Client
  26. }
  27. type SearchType string
  28. const (
  29. MatchAll = "match_all"
  30. Match = "match"
  31. Range = "range"
  32. MatchAllByCondition = "match_all_by_condition"
  33. RangeByCondition = "range_by_condition"
  34. RangeByConditionWithDocIds = "range_by_condition_with_doc_ids"
  35. RangeWithDocIds = "range_with_doc_ids"
  36. )
  37. func GetInstance() *ESClient {
  38. esOnce.Do(func() {
  39. // 检查是否成功获取到RedisConfig实例,没有配置则不进行redis初始化
  40. if esConf, ok := config.GetConfig(contants.ES).(*config.ESConfig); ok {
  41. logger.Info("初始化es")
  42. // 这里可以添加初始化Redis的逻辑
  43. esClient = newEs(esConf)
  44. }
  45. })
  46. return esClient
  47. }
  48. func (es *ESClient) es() *elasticsearch.Client {
  49. return es.esOp
  50. }
  51. func newEs(config *config.ESConfig) *ESClient {
  52. elasticsearch.NewDefaultClient()
  53. client, err := elasticsearch.NewClient(
  54. elasticsearch.Config{
  55. Addresses: []string{config.GetUrl()},
  56. // A list of Elasticsearch nodes to use.
  57. Username: config.GetUserName(),
  58. Password: config.GetPassword(), // Password for HTTP Basic Authentication.
  59. },
  60. )
  61. if err != nil {
  62. logger.Error("连接ES失败:%v", err)
  63. panic("启动es失败")
  64. }
  65. return &ESClient{esOp: client}
  66. }
  67. func init() {
  68. if GetInstance() == nil {
  69. panic("初始化es失败")
  70. }
  71. logger.Info("es初始化成功")
  72. }
  73. // BulkInsert 批量创建文档
  74. func (es *ESClient) BulkInsert(indexName string, docs []ESBase) (err error) {
  75. // 创建批量请求
  76. bulkBody := new(bytes.Buffer)
  77. for _, doc := range docs {
  78. enc := json.NewEncoder(bulkBody)
  79. if err = enc.Encode(map[string]interface{}{
  80. "index": map[string]interface{}{
  81. "_index": indexName,
  82. "_id": doc.GetId(),
  83. },
  84. }); err != nil {
  85. logger.Error("生成es批处理请求参数失败: %s", err)
  86. }
  87. if err = enc.Encode(doc); err != nil {
  88. logger.Error("生成es批处理文档失败: %s", err)
  89. }
  90. }
  91. bulkReq := esapi.BulkRequest{
  92. Body: bytes.NewReader(bulkBody.Bytes()),
  93. Refresh: "true",
  94. }
  95. res, err := bulkReq.Do(context.Background(), es.esOp)
  96. if err != nil {
  97. logger.Error("es批处理创建失败: %s", err)
  98. }
  99. defer res.Body.Close()
  100. if res.IsError() {
  101. var e map[string]interface{}
  102. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  103. logger.Error("解析es应答失败: %v", err)
  104. } else {
  105. logger.Error("es请求失败: %s: %v\n", res.Status(), err)
  106. }
  107. }
  108. return
  109. }
  110. type ESResponse struct {
  111. Took int `json:"took"`
  112. Count int `json:"count"`
  113. TimedOut bool `json:"timedOut"`
  114. Hits Hits `json:"hits"`
  115. _Shards ShardsInfo `json:"_shards"`
  116. }
  117. type Hits struct {
  118. Total TotalHits `json:"total"`
  119. MaxScore float64 `json:"maxScore"`
  120. Hits []Hit `json:"hits"`
  121. }
  122. type TotalHits struct {
  123. Value int `json:"value"`
  124. Relation string `json:"relation"`
  125. }
  126. type Hit struct {
  127. Index string `json:"_index"`
  128. Type string `json:"_type"`
  129. ID string `json:"_id"`
  130. Score float64 `json:"_score"`
  131. Source json.RawMessage `json:"_source"`
  132. Highlight json.RawMessage `json:"highlight"`
  133. }
  134. type ShardsInfo struct {
  135. Total int `json:"total"`
  136. Successful int `json:"successful"`
  137. Skipped int `json:"skipped"`
  138. Failed int `json:"failed"`
  139. }
  140. type ESQueryRequest struct {
  141. IndexName string
  142. From int
  143. Size int
  144. Key string
  145. Column string
  146. Condition string
  147. ConditionValue string
  148. Sorts []string
  149. Type SearchType
  150. RangeColumn string
  151. DocIds []string
  152. Max interface{}
  153. Min interface{}
  154. }
  155. func (req *ESQueryRequest) CreateESQueryRequest(index string, column string, key string, from int, size int, sorts []string, searchType SearchType) *ESQueryRequest {
  156. return &ESQueryRequest{
  157. IndexName: index,
  158. Type: searchType,
  159. From: from,
  160. Size: size,
  161. Key: key,
  162. Column: column,
  163. Sorts: sorts,
  164. }
  165. }
  166. func (req *ESQueryRequest) Range(from int64, to int64, column string) *ESQueryRequest {
  167. req.RangeColumn = column
  168. req.Max = to
  169. req.Min = from
  170. return req
  171. }
  172. func (req *ESQueryRequest) ByCondition(column string, value string) *ESQueryRequest {
  173. req.Condition = column
  174. req.ConditionValue = value
  175. return req
  176. }
  177. func (req *ESQueryRequest) WithDocs(docIds []string) *ESQueryRequest {
  178. req.DocIds = docIds
  179. return req
  180. }
  181. func (req *ESQueryRequest) parseJsonQuery() (queryMap map[string]interface{}) {
  182. switch req.Type {
  183. case MatchAll:
  184. queryMap = map[string]interface{}{
  185. "query": map[string]interface{}{
  186. "match": map[string]interface{}{
  187. req.Column: req.Key,
  188. },
  189. },
  190. }
  191. return
  192. case MatchAllByCondition:
  193. queryMap = map[string]interface{}{
  194. "query": map[string]interface{}{
  195. "bool": map[string]interface{}{
  196. "must": []map[string]interface{}{
  197. {
  198. "match": map[string]interface{}{
  199. req.Column: req.Key,
  200. },
  201. },
  202. {
  203. "term": map[string]interface{}{
  204. req.Condition: req.ConditionValue,
  205. },
  206. },
  207. },
  208. },
  209. },
  210. }
  211. return
  212. case Match:
  213. queryMap = map[string]interface{}{
  214. "query": map[string]interface{}{
  215. "match": map[string]interface{}{
  216. req.Column: req.Key,
  217. },
  218. },
  219. "highlight": map[string]interface{}{
  220. "fields": map[string]interface{}{
  221. req.Column: map[string]interface{}{},
  222. },
  223. "pre_tags": []string{"<span style='color:#0078E8'>"},
  224. "post_tags": []string{"</span>"},
  225. },
  226. }
  227. return
  228. case Range:
  229. queryMap = map[string]interface{}{
  230. "query": map[string]interface{}{
  231. "match": map[string]interface{}{
  232. req.Column: req.Key,
  233. },
  234. },
  235. "highlight": map[string]interface{}{
  236. "fields": map[string]interface{}{
  237. req.Column: map[string]interface{}{},
  238. },
  239. "pre_tags": []string{"<span style='color:#0078E8'>"},
  240. "post_tags": []string{"</span>"},
  241. },
  242. "post_filter": map[string]interface{}{
  243. "range": map[string]interface{}{
  244. req.RangeColumn: map[string]interface{}{
  245. "gte": req.Min,
  246. "lte": req.Max,
  247. },
  248. },
  249. },
  250. }
  251. return
  252. case RangeWithDocIds:
  253. queryMap = map[string]interface{}{
  254. "query": map[string]interface{}{
  255. "match": map[string]interface{}{
  256. req.Column: req.Key,
  257. },
  258. },
  259. "highlight": map[string]interface{}{
  260. "fields": map[string]interface{}{
  261. req.Column: map[string]interface{}{},
  262. },
  263. "pre_tags": []string{"<span style='color:#0078E8'>"},
  264. "post_tags": []string{"</span>"},
  265. },
  266. "post_filter": map[string]interface{}{
  267. "bool": map[string]interface{}{
  268. "must": []map[string]interface{}{
  269. {
  270. "range": map[string]interface{}{
  271. req.RangeColumn: map[string]interface{}{
  272. "gte": req.Min,
  273. "lte": req.Max,
  274. },
  275. },
  276. },
  277. {
  278. "terms": map[string]interface{}{
  279. "_id": req.DocIds,
  280. },
  281. },
  282. },
  283. },
  284. },
  285. }
  286. return
  287. case RangeByCondition:
  288. queryMap = map[string]interface{}{
  289. "query": map[string]interface{}{
  290. "match": map[string]interface{}{
  291. req.Column: req.Key,
  292. },
  293. },
  294. "highlight": map[string]interface{}{
  295. "fields": map[string]interface{}{
  296. req.Column: map[string]interface{}{},
  297. },
  298. "pre_tags": []string{"<span style='color:#0078E8'>"},
  299. "post_tags": []string{"</span>"},
  300. },
  301. "post_filter": map[string]interface{}{
  302. "bool": map[string]interface{}{
  303. "must": []map[string]interface{}{
  304. {
  305. "range": map[string]interface{}{
  306. req.RangeColumn: map[string]interface{}{
  307. "gte": req.Min,
  308. "lte": req.Max,
  309. },
  310. },
  311. },
  312. {
  313. "term": map[string]interface{}{
  314. req.Condition: req.ConditionValue,
  315. },
  316. },
  317. },
  318. },
  319. },
  320. }
  321. return
  322. case RangeByConditionWithDocIds:
  323. queryMap = map[string]interface{}{
  324. "query": map[string]interface{}{
  325. "match": map[string]interface{}{
  326. req.Column: req.Key,
  327. },
  328. },
  329. "highlight": map[string]interface{}{
  330. "fields": map[string]interface{}{
  331. req.Column: map[string]interface{}{},
  332. },
  333. "pre_tags": []string{"<span style='color:#0078E8'>"},
  334. "post_tags": []string{"</span>"},
  335. },
  336. "post_filter": map[string]interface{}{
  337. "bool": map[string]interface{}{
  338. "must": []map[string]interface{}{
  339. {
  340. "range": map[string]interface{}{
  341. req.RangeColumn: map[string]interface{}{
  342. "gte": req.Min,
  343. "lte": req.Max,
  344. },
  345. },
  346. },
  347. {
  348. "term": map[string]interface{}{
  349. req.Condition: req.ConditionValue,
  350. },
  351. },
  352. {
  353. "terms": map[string]interface{}{
  354. "_id": req.DocIds,
  355. },
  356. },
  357. },
  358. },
  359. },
  360. }
  361. return
  362. default:
  363. queryMap = map[string]interface{}{}
  364. return
  365. }
  366. }
  367. // /*
  368. // *
  369. //
  370. // 搜索
  371. //
  372. // indexName 访问索引名
  373. // query 搜索条件
  374. // from 开始搜索位置
  375. // size 搜索条数
  376. // sort 排序
  377. // */
  378. func (es *ESClient) Count(params *ESQueryRequest) (response ESResponse, err error) {
  379. queryMap := params.parseJsonQuery()
  380. jsonQuery, _ := json.Marshal(queryMap)
  381. logger.Info("查询语句: %s", string(jsonQuery))
  382. request := esapi.CountRequest{
  383. Index: []string{params.IndexName},
  384. Body: strings.NewReader(string(jsonQuery)),
  385. }
  386. res, err := request.Do(context.Background(), esClient.esOp)
  387. defer res.Body.Close()
  388. if err != nil {
  389. logger.Error("es查询失败: %s", err)
  390. }
  391. if res.IsError() {
  392. var e map[string]interface{}
  393. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  394. logger.Error("解析es应答失败: %v", err)
  395. } else {
  396. logger.Error("es请求失败: %s: %v\n", res.Status(), e)
  397. }
  398. }
  399. body, err := io.ReadAll(res.Body)
  400. if err != nil {
  401. logger.Error("获取es应答失败: %v", err)
  402. }
  403. return parseESResponse(body)
  404. }
  405. func (es *ESClient) Search(params *ESQueryRequest) (response ESResponse, err error) {
  406. queryMap := params.parseJsonQuery()
  407. jsonQuery, _ := json.Marshal(queryMap)
  408. logger.Info("查询语句: %s", string(jsonQuery))
  409. request := esapi.SearchRequest{
  410. Index: []string{params.IndexName},
  411. Body: strings.NewReader(string(jsonQuery)),
  412. From: &params.From,
  413. Size: &params.Size,
  414. Sort: params.Sorts,
  415. }
  416. res, err := request.Do(context.Background(), esClient.esOp)
  417. defer res.Body.Close()
  418. if err != nil {
  419. logger.Error("es查询失败: %s", err)
  420. }
  421. if res.IsError() {
  422. var e map[string]interface{}
  423. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  424. logger.Error("解析es应答失败: %v", err)
  425. } else {
  426. logger.Error("es请求失败: %s: %v\n", res.Status(), e)
  427. }
  428. }
  429. body, err := io.ReadAll(res.Body)
  430. if err != nil {
  431. logger.Error("获取es应答失败: %v", err)
  432. }
  433. return parseESResponse(body)
  434. }
  435. func parseESResponse(body []byte) (ESResponse, error) {
  436. var response ESResponse
  437. if err := json.Unmarshal(body, &response); err != nil {
  438. return ESResponse{}, err
  439. }
  440. for _, hit := range response.Hits.Hits {
  441. var source map[string]interface{}
  442. if err := json.Unmarshal(hit.Source, &source); err != nil {
  443. return ESResponse{}, err
  444. }
  445. }
  446. return response, nil
  447. }
  448. func (es *ESClient) GetSource(hits Hits) []Hit {
  449. return hits.Hits
  450. }
  451. func (es *ESClient) GetCount(hits Hits) []Hit {
  452. return hits.Hits
  453. }
  454. // /*
  455. // *
  456. // 添加es
  457. // indexName 索引名
  458. // id es的id
  459. // body es的值
  460. // */
  461. func (es *ESClient) Update(indexName string, id int, doc interface{}) bool {
  462. jsonUpdate := map[string]interface{}{
  463. "doc": doc,
  464. }
  465. jsonDoc, _ := json.Marshal(jsonUpdate)
  466. logger.Info("查询语句: %s", string(jsonDoc))
  467. req := esapi.UpdateRequest{
  468. Index: indexName,
  469. DocumentID: strconv.Itoa(id),
  470. Body: strings.NewReader(string(jsonDoc)),
  471. Refresh: "true",
  472. }
  473. res, err := req.Do(context.Background(), es.es())
  474. defer res.Body.Close()
  475. if err != nil {
  476. logger.Error("es查询失败: %s", err)
  477. }
  478. if res.IsError() {
  479. var e map[string]interface{}
  480. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  481. logger.Error("解析es应答失败: %v", err)
  482. } else {
  483. logger.Error("es请求失败: %s: %v\n", res.Status(), e)
  484. }
  485. }
  486. body, err := io.ReadAll(res.Body)
  487. if err != nil {
  488. logger.Error("获取es应答失败: %v", err)
  489. }
  490. fmt.Printf("%s\n", string(body))
  491. return true
  492. }
  493. // Delete *
  494. // 删除
  495. // indexName 索引名
  496. // id es的id
  497. // */
  498. func (es *ESClient) Delete(indexName string, id int) bool {
  499. req := esapi.DeleteRequest{
  500. Index: indexName,
  501. DocumentID: strconv.Itoa(id),
  502. Refresh: "true",
  503. }
  504. res, err := req.Do(context.Background(), es.es())
  505. defer res.Body.Close()
  506. if err != nil {
  507. logger.Error("es查询失败: %s", err)
  508. }
  509. if res.IsError() {
  510. var e map[string]interface{}
  511. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  512. logger.Error("解析es应答失败: %v", err)
  513. } else {
  514. logger.Error("es请求失败: %s: %v\n", res.Status(), e)
  515. }
  516. }
  517. body, err := io.ReadAll(res.Body)
  518. if err != nil {
  519. logger.Error("获取es应答失败: %v", err)
  520. }
  521. fmt.Printf("%s\n", string(body))
  522. return true
  523. }
  524. func (es *ESClient) Exist(indexName string, docId int) (exist bool, err error) {
  525. getRequest := esapi.GetRequest{
  526. Index: indexName,
  527. DocumentID: strconv.Itoa(docId),
  528. }
  529. // 执行请求
  530. res, err := getRequest.Do(context.Background(), es.es())
  531. if err != nil {
  532. logger.Error("es获取文档是否存在失败: %v", err)
  533. }
  534. defer res.Body.Close()
  535. // 检查文档是否存在
  536. if res.IsError() {
  537. // 如果文档不存在,通常返回 404 Not Found
  538. if res.StatusCode == 404 {
  539. logger.Info("文档不存在.")
  540. return false, nil
  541. } else {
  542. // 其他错误
  543. var e map[string]interface{}
  544. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  545. logger.Error("解析es应答失败: %v", err)
  546. return false, err
  547. } else {
  548. // Print the response status and error information.
  549. logger.Error("[%s] %s: %s\n", res.Status(), e["error"].(map[string]interface{})["type"], e["error"].(map[string]interface{})["原因"])
  550. return false, nil
  551. }
  552. }
  553. } else {
  554. // 如果文档存在
  555. logger.Info("doc存在")
  556. return true, nil
  557. }
  558. }
  559. //
  560. //func CreateIndex(indexName string) error {
  561. // resp, err := esClient.es().Indices.
  562. // Create(indexName).
  563. // Do(context.Background())
  564. // if err != nil {
  565. // logger.Error("创建ES索引失败:%v", err)
  566. // return err
  567. // }
  568. // fmt.Printf("index:%#v\n", resp.Index)
  569. // return nil
  570. //}
  571. // DeleteIndex 删除索引
  572. //
  573. // func DeleteIndex(indexName string) error {
  574. // _, err := esClient.es().Indices. // 表明是对索引的操作,而Index则表示是要操作具体索引下的文档
  575. // Delete(indexName).
  576. // Do(context.Background())
  577. // if err != nil {
  578. // fmt.Printf("delete index failed,err:%v\n", err)
  579. // return err
  580. // }
  581. // fmt.Printf("delete index successed,indexName:%s", indexName)
  582. // return nil
  583. // }
  584. //
  585. // CreateDocument 创建文档
  586. func (es *ESClient) CreateDocument(indexName string, id int, doc interface{}) (success bool) {
  587. jsonDoc, _ := json.Marshal(doc)
  588. logger.Info("查询语句: %s", string(jsonDoc))
  589. // 添加文档
  590. indexRequest := esapi.IndexRequest{
  591. Index: indexName,
  592. DocumentID: strconv.Itoa(id),
  593. Body: strings.NewReader(string(jsonDoc)),
  594. Refresh: "true",
  595. }
  596. // 执行请求
  597. res, err := indexRequest.Do(context.Background(), es.es())
  598. if err != nil {
  599. logger.Error("ES创建文档失败: %s", err)
  600. return false
  601. }
  602. defer res.Body.Close()
  603. // 检查文档是否成功创建
  604. if res.IsError() {
  605. var e map[string]interface{}
  606. if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
  607. logger.Error("解析ES应答失败: %s", err)
  608. } else {
  609. // Print the response status and error information.
  610. logger.Error("[%s] %s: %s\n", res.Status(), e["error"].(map[string]interface{})["类型"], e["错误"].(map[string]interface{})["原因"])
  611. }
  612. return false
  613. } else {
  614. // 如果文档成功创建
  615. logger.Info("创建文档成功")
  616. return true
  617. }
  618. }