es.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614
  1. package es
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "eta/eta_mini_ht_api/common/component/config"
  7. logger "eta/eta_mini_ht_api/common/component/log"
  8. "eta/eta_mini_ht_api/common/contants"
  9. "fmt"
  10. "github.com/elastic/go-elasticsearch/v7"
  11. "github.com/elastic/go-elasticsearch/v7/esapi"
  12. "io"
  13. "strconv"
  14. "strings"
  15. "sync"
  16. )
  17. type ESBase interface {
  18. GetId() string
  19. }
  20. var (
  21. esOnce sync.Once
  22. esClient *ESClient
  23. )
  24. type ESClient struct {
  25. esOp *elasticsearch.Client
  26. }
  27. type SearchType string
  28. const (
  29. MatchAll = "match_all"
  30. Match = "match"
  31. Range = "range"
  32. MatchAllByCondition = "match_all_by_condition"
  33. RangeByCondition = "range_by_condition"
  34. RangeByConditionWithDocIds = "range_by_condition_with_doc_ids"
  35. )
  36. func GetInstance() *ESClient {
  37. esOnce.Do(func() {
  38. // 检查是否成功获取到RedisConfig实例,没有配置则不进行redis初始化
  39. if esConf, ok := config.GetConfig(contants.ES).(*config.ESConfig); ok {
  40. logger.Info("初始化es")
  41. // 这里可以添加初始化Redis的逻辑
  42. esClient = newEs(esConf)
  43. }
  44. })
  45. return esClient
  46. }
  47. func (es *ESClient) es() *elasticsearch.Client {
  48. return es.esOp
  49. }
  50. func newEs(config *config.ESConfig) *ESClient {
  51. elasticsearch.NewDefaultClient()
  52. client, err := elasticsearch.NewClient(
  53. elasticsearch.Config{
  54. Addresses: []string{config.GetUrl()},
  55. // A list of Elasticsearch nodes to use.
  56. Username: config.GetUserName(),
  57. Password: config.GetPassword(), // Password for HTTP Basic Authentication.
  58. },
  59. )
  60. if err != nil {
  61. logger.Error("连接ES失败:%v", err)
  62. panic("启动es失败")
  63. }
  64. return &ESClient{esOp: client}
  65. }
  66. func init() {
  67. if GetInstance() == nil {
  68. panic("初始化es失败")
  69. }
  70. logger.Info("es初始化成功")
  71. }
  72. // BulkInsert 批量创建文档
  73. func (es *ESClient) BulkInsert(indexName string, docs []ESBase) (err error) {
  74. // 创建批量请求
  75. bulkBody := new(bytes.Buffer)
  76. for _, doc := range docs {
  77. enc := json.NewEncoder(bulkBody)
  78. if err = enc.Encode(map[string]interface{}{
  79. "index": map[string]interface{}{
  80. "_index": indexName,
  81. "_id": doc.GetId(),
  82. },
  83. }); err != nil {
  84. logger.Error("生成es批处理请求参数失败: %s", err)
  85. }
  86. if err = enc.Encode(doc); err != nil {
  87. logger.Error("生成es批处理文档失败: %s", err)
  88. }
  89. }
  90. bulkReq := esapi.BulkRequest{
  91. Body: bytes.NewReader(bulkBody.Bytes()),
  92. Refresh: "true",
  93. }
  94. res, err := bulkReq.Do(context.Background(), es.esOp)
  95. if err != nil {
  96. logger.Error("es批处理创建失败: %s", err)
  97. }
  98. defer res.Body.Close()
  99. if res.IsError() {
  100. var e map[string]interface{}
  101. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  102. logger.Error("解析es应答失败: %v", err)
  103. } else {
  104. logger.Error("es请求失败: %s: %v\n", res.Status(), err)
  105. }
  106. }
  107. return
  108. }
  109. type ESResponse struct {
  110. Took int `json:"took"`
  111. Count int `json:"count"`
  112. TimedOut bool `json:"timedOut"`
  113. Hits Hits `json:"hits"`
  114. _Shards ShardsInfo `json:"_shards"`
  115. }
  116. type Hits struct {
  117. Total TotalHits `json:"total"`
  118. MaxScore float64 `json:"maxScore"`
  119. Hits []Hit `json:"hits"`
  120. }
  121. type TotalHits struct {
  122. Value int `json:"value"`
  123. Relation string `json:"relation"`
  124. }
  125. type Hit struct {
  126. Index string `json:"_index"`
  127. Type string `json:"_type"`
  128. ID string `json:"_id"`
  129. Score float64 `json:"_score"`
  130. Source json.RawMessage `json:"_source"`
  131. Highlight json.RawMessage `json:"highlight"`
  132. }
  133. type ShardsInfo struct {
  134. Total int `json:"total"`
  135. Successful int `json:"successful"`
  136. Skipped int `json:"skipped"`
  137. Failed int `json:"failed"`
  138. }
  139. type ESQueryRequest struct {
  140. IndexName string
  141. From int
  142. Size int
  143. Key string
  144. Column string
  145. Condition string
  146. ConditionValue string
  147. Sorts []string
  148. Type SearchType
  149. RangeColumn string
  150. DocIds []string
  151. Max interface{}
  152. Min interface{}
  153. }
  154. func (req *ESQueryRequest) CreateESQueryRequest(index string, column string, key string, from int, size int, sorts []string, searchType SearchType) *ESQueryRequest {
  155. return &ESQueryRequest{
  156. IndexName: index,
  157. Type: searchType,
  158. From: from,
  159. Size: size,
  160. Key: key,
  161. Column: column,
  162. Sorts: sorts,
  163. }
  164. }
  165. func (req *ESQueryRequest) Range(from int64, to int64, column string) *ESQueryRequest {
  166. req.RangeColumn = column
  167. req.Max = to
  168. req.Min = from
  169. return req
  170. }
  171. func (req *ESQueryRequest) ByCondition(column string, value string) *ESQueryRequest {
  172. req.Condition = column
  173. req.ConditionValue = value
  174. return req
  175. }
  176. func (req *ESQueryRequest) WithDocs(docIds []string) *ESQueryRequest {
  177. req.DocIds = docIds
  178. return req
  179. }
  180. func (req *ESQueryRequest) parseJsonQuery() (queryMap map[string]interface{}) {
  181. switch req.Type {
  182. case MatchAll:
  183. queryMap = map[string]interface{}{
  184. "query": map[string]interface{}{
  185. "match": map[string]interface{}{
  186. req.Column: req.Key,
  187. },
  188. },
  189. }
  190. return
  191. case MatchAllByCondition:
  192. queryMap = map[string]interface{}{
  193. "query": map[string]interface{}{
  194. "bool": map[string]interface{}{
  195. "must": []map[string]interface{}{
  196. {
  197. "match": map[string]interface{}{
  198. req.Column: req.Key,
  199. },
  200. },
  201. {
  202. "term": map[string]interface{}{
  203. req.Condition: req.ConditionValue,
  204. },
  205. },
  206. },
  207. },
  208. },
  209. }
  210. return
  211. case Match:
  212. queryMap = map[string]interface{}{
  213. "query": map[string]interface{}{
  214. "match": map[string]interface{}{
  215. req.Column: req.Key,
  216. },
  217. },
  218. "highlight": map[string]interface{}{
  219. "fields": map[string]interface{}{
  220. req.Column: map[string]interface{}{},
  221. },
  222. "pre_tags": []string{"<span style='color:#0078E8'>"},
  223. "post_tags": []string{"</span>"},
  224. },
  225. }
  226. return
  227. case Range:
  228. queryMap = map[string]interface{}{
  229. "query": map[string]interface{}{
  230. "match": map[string]interface{}{
  231. req.Column: req.Key,
  232. },
  233. },
  234. "highlight": map[string]interface{}{
  235. "fields": map[string]interface{}{
  236. req.Column: map[string]interface{}{},
  237. },
  238. "pre_tags": []string{"<span style='color:#0078E8'>"},
  239. "post_tags": []string{"</span>"},
  240. },
  241. "post_filter": map[string]interface{}{
  242. "range": map[string]interface{}{
  243. req.RangeColumn: map[string]interface{}{
  244. "gte": req.Min,
  245. "lte": req.Max,
  246. },
  247. },
  248. },
  249. }
  250. return
  251. case RangeByCondition:
  252. queryMap = map[string]interface{}{
  253. "query": map[string]interface{}{
  254. "match": map[string]interface{}{
  255. req.Column: req.Key,
  256. },
  257. },
  258. "highlight": map[string]interface{}{
  259. "fields": map[string]interface{}{
  260. req.Column: map[string]interface{}{},
  261. },
  262. "pre_tags": []string{"<span style='color:#0078E8'>"},
  263. "post_tags": []string{"</span>"},
  264. },
  265. "post_filter": map[string]interface{}{
  266. "bool": map[string]interface{}{
  267. "must": []map[string]interface{}{
  268. {
  269. "range": map[string]interface{}{
  270. req.RangeColumn: map[string]interface{}{
  271. "gte": req.Min,
  272. "lte": req.Max,
  273. },
  274. },
  275. },
  276. {
  277. "term": map[string]interface{}{
  278. req.Condition: req.ConditionValue,
  279. },
  280. },
  281. },
  282. },
  283. },
  284. }
  285. return
  286. case RangeByConditionWithDocIds:
  287. queryMap = map[string]interface{}{
  288. "query": map[string]interface{}{
  289. "match": map[string]interface{}{
  290. req.Column: req.Key,
  291. },
  292. },
  293. "highlight": map[string]interface{}{
  294. "fields": map[string]interface{}{
  295. req.Column: map[string]interface{}{},
  296. },
  297. "pre_tags": []string{"<span style='color:#0078E8'>"},
  298. "post_tags": []string{"</span>"},
  299. },
  300. "post_filter": map[string]interface{}{
  301. "bool": map[string]interface{}{
  302. "must": []map[string]interface{}{
  303. {
  304. "range": map[string]interface{}{
  305. req.RangeColumn: map[string]interface{}{
  306. "gte": req.Min,
  307. "lte": req.Max,
  308. },
  309. },
  310. },
  311. {
  312. "term": map[string]interface{}{
  313. req.Condition: req.ConditionValue,
  314. },
  315. },
  316. {
  317. "terms": map[string]interface{}{
  318. "_id": req.DocIds,
  319. },
  320. },
  321. },
  322. },
  323. },
  324. }
  325. return
  326. default:
  327. queryMap = map[string]interface{}{}
  328. return
  329. }
  330. }
  331. // /*
  332. // *
  333. //
  334. // 搜索
  335. //
  336. // indexName 访问索引名
  337. // query 搜索条件
  338. // from 开始搜索位置
  339. // size 搜索条数
  340. // sort 排序
  341. // */
  342. func (es *ESClient) Count(params *ESQueryRequest) (response ESResponse, err error) {
  343. queryMap := params.parseJsonQuery()
  344. jsonQuery, _ := json.Marshal(queryMap)
  345. logger.Info("查询语句: %s", string(jsonQuery))
  346. request := esapi.CountRequest{
  347. Index: []string{params.IndexName},
  348. Body: strings.NewReader(string(jsonQuery)),
  349. }
  350. res, err := request.Do(context.Background(), esClient.esOp)
  351. defer res.Body.Close()
  352. if err != nil {
  353. logger.Error("es查询失败: %s", err)
  354. }
  355. if res.IsError() {
  356. var e map[string]interface{}
  357. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  358. logger.Error("解析es应答失败: %v", err)
  359. } else {
  360. logger.Error("es请求失败: %s: %v\n", res.Status(), e)
  361. }
  362. }
  363. body, err := io.ReadAll(res.Body)
  364. if err != nil {
  365. logger.Error("获取es应答失败: %v", err)
  366. }
  367. return parseESResponse(body)
  368. }
  369. func (es *ESClient) Search(params *ESQueryRequest) (response ESResponse, err error) {
  370. queryMap := params.parseJsonQuery()
  371. jsonQuery, _ := json.Marshal(queryMap)
  372. logger.Info("查询语句: %s", string(jsonQuery))
  373. request := esapi.SearchRequest{
  374. Index: []string{params.IndexName},
  375. Body: strings.NewReader(string(jsonQuery)),
  376. From: &params.From,
  377. Size: &params.Size,
  378. Sort: params.Sorts,
  379. }
  380. res, err := request.Do(context.Background(), esClient.esOp)
  381. defer res.Body.Close()
  382. if err != nil {
  383. logger.Error("es查询失败: %s", err)
  384. }
  385. if res.IsError() {
  386. var e map[string]interface{}
  387. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  388. logger.Error("解析es应答失败: %v", err)
  389. } else {
  390. logger.Error("es请求失败: %s: %v\n", res.Status(), e)
  391. }
  392. }
  393. body, err := io.ReadAll(res.Body)
  394. if err != nil {
  395. logger.Error("获取es应答失败: %v", err)
  396. }
  397. return parseESResponse(body)
  398. }
  399. func parseESResponse(body []byte) (ESResponse, error) {
  400. var response ESResponse
  401. if err := json.Unmarshal(body, &response); err != nil {
  402. return ESResponse{}, err
  403. }
  404. for _, hit := range response.Hits.Hits {
  405. var source map[string]interface{}
  406. if err := json.Unmarshal(hit.Source, &source); err != nil {
  407. return ESResponse{}, err
  408. }
  409. }
  410. return response, nil
  411. }
  412. func (es *ESClient) GetSource(hits Hits) []Hit {
  413. return hits.Hits
  414. }
  415. func (es *ESClient) GetCount(hits Hits) []Hit {
  416. return hits.Hits
  417. }
  418. // /*
  419. // *
  420. // 添加es
  421. // indexName 索引名
  422. // id es的id
  423. // body es的值
  424. // */
  425. func (es *ESClient) Update(indexName string, id int, doc interface{}) bool {
  426. jsonUpdate := map[string]interface{}{
  427. "doc": doc,
  428. }
  429. jsonDoc, _ := json.Marshal(jsonUpdate)
  430. logger.Info("查询语句: %s", string(jsonDoc))
  431. req := esapi.UpdateRequest{
  432. Index: indexName,
  433. DocumentID: strconv.Itoa(id),
  434. Body: strings.NewReader(string(jsonDoc)),
  435. Refresh: "true",
  436. }
  437. res, err := req.Do(context.Background(), es.es())
  438. defer res.Body.Close()
  439. if err != nil {
  440. logger.Error("es查询失败: %s", err)
  441. }
  442. if res.IsError() {
  443. var e map[string]interface{}
  444. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  445. logger.Error("解析es应答失败: %v", err)
  446. } else {
  447. logger.Error("es请求失败: %s: %v\n", res.Status(), e)
  448. }
  449. }
  450. body, err := io.ReadAll(res.Body)
  451. if err != nil {
  452. logger.Error("获取es应答失败: %v", err)
  453. }
  454. fmt.Printf("%s\n", string(body))
  455. return true
  456. }
  457. // Delete *
  458. // 删除
  459. // indexName 索引名
  460. // id es的id
  461. // */
  462. func (es *ESClient) Delete(indexName string, id int) bool {
  463. req := esapi.DeleteRequest{
  464. Index: indexName,
  465. DocumentID: strconv.Itoa(id),
  466. Refresh: "true",
  467. }
  468. res, err := req.Do(context.Background(), es.es())
  469. defer res.Body.Close()
  470. if err != nil {
  471. logger.Error("es查询失败: %s", err)
  472. }
  473. if res.IsError() {
  474. var e map[string]interface{}
  475. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  476. logger.Error("解析es应答失败: %v", err)
  477. } else {
  478. logger.Error("es请求失败: %s: %v\n", res.Status(), e)
  479. }
  480. }
  481. body, err := io.ReadAll(res.Body)
  482. if err != nil {
  483. logger.Error("获取es应答失败: %v", err)
  484. }
  485. fmt.Printf("%s\n", string(body))
  486. return true
  487. }
  488. func (es *ESClient) Exist(indexName string, docId int) (exist bool, err error) {
  489. getRequest := esapi.GetRequest{
  490. Index: indexName,
  491. DocumentID: strconv.Itoa(docId),
  492. }
  493. // 执行请求
  494. res, err := getRequest.Do(context.Background(), es.es())
  495. if err != nil {
  496. logger.Error("es获取文档是否存在失败: %v", err)
  497. }
  498. defer res.Body.Close()
  499. // 检查文档是否存在
  500. if res.IsError() {
  501. // 如果文档不存在,通常返回 404 Not Found
  502. if res.StatusCode == 404 {
  503. logger.Info("文档不存在.")
  504. return false, nil
  505. } else {
  506. // 其他错误
  507. var e map[string]interface{}
  508. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  509. logger.Error("解析es应答失败: %v", err)
  510. return false, err
  511. } else {
  512. // Print the response status and error information.
  513. logger.Error("[%s] %s: %s\n", res.Status(), e["error"].(map[string]interface{})["type"], e["error"].(map[string]interface{})["原因"])
  514. return false, nil
  515. }
  516. }
  517. } else {
  518. // 如果文档存在
  519. logger.Info("doc存在")
  520. return true, nil
  521. }
  522. }
  523. //
  524. //func CreateIndex(indexName string) error {
  525. // resp, err := esClient.es().Indices.
  526. // Create(indexName).
  527. // Do(context.Background())
  528. // if err != nil {
  529. // logger.Error("创建ES索引失败:%v", err)
  530. // return err
  531. // }
  532. // fmt.Printf("index:%#v\n", resp.Index)
  533. // return nil
  534. //}
  535. // DeleteIndex 删除索引
  536. //
  537. // func DeleteIndex(indexName string) error {
  538. // _, err := esClient.es().Indices. // 表明是对索引的操作,而Index则表示是要操作具体索引下的文档
  539. // Delete(indexName).
  540. // Do(context.Background())
  541. // if err != nil {
  542. // fmt.Printf("delete index failed,err:%v\n", err)
  543. // return err
  544. // }
  545. // fmt.Printf("delete index successed,indexName:%s", indexName)
  546. // return nil
  547. // }
  548. //
  549. // CreateDocument 创建文档
  550. func (es *ESClient) CreateDocument(indexName string, id int, doc interface{}) (success bool) {
  551. jsonDoc, _ := json.Marshal(doc)
  552. logger.Info("查询语句: %s", string(jsonDoc))
  553. // 添加文档
  554. indexRequest := esapi.IndexRequest{
  555. Index: indexName,
  556. DocumentID: strconv.Itoa(id),
  557. Body: strings.NewReader(string(jsonDoc)),
  558. Refresh: "true",
  559. }
  560. // 执行请求
  561. res, err := indexRequest.Do(context.Background(), es.es())
  562. if err != nil {
  563. logger.Error("ES创建文档失败: %s", err)
  564. return false
  565. }
  566. defer res.Body.Close()
  567. // 检查文档是否成功创建
  568. if res.IsError() {
  569. var e map[string]interface{}
  570. if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
  571. logger.Error("解析ES应答失败: %s", err)
  572. } else {
  573. // Print the response status and error information.
  574. logger.Error("[%s] %s: %s\n", res.Status(), e["error"].(map[string]interface{})["类型"], e["错误"].(map[string]interface{})["原因"])
  575. }
  576. return false
  577. } else {
  578. // 如果文档成功创建
  579. logger.Info("创建文档成功")
  580. return true
  581. }
  582. }