es.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883
  1. package es
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "errors"
  7. "eta/eta_mini_ht_api/common/component/config"
  8. logger "eta/eta_mini_ht_api/common/component/log"
  9. "eta/eta_mini_ht_api/common/contants"
  10. "fmt"
  11. "github.com/elastic/go-elasticsearch/v7"
  12. "github.com/elastic/go-elasticsearch/v7/esapi"
  13. "io"
  14. "strconv"
  15. "strings"
  16. "sync"
  17. )
  18. type ESBase interface {
  19. GetId() string
  20. }
  21. var (
  22. esOnce sync.Once
  23. esClient *ESClient
  24. )
  25. type ESClient struct {
  26. esOp *elasticsearch.Client
  27. }
  28. type SearchType string
  29. const (
  30. MatchAll = "match_all"
  31. Match = "match"
  32. CountWithDocIds = "count_with_doc_ids"
  33. Range = "range"
  34. MatchAllByCondition = "match_all_by_condition"
  35. RangeByCondition = "range_by_condition"
  36. RangeByConditionWithDocIds = "range_by_condition_with_doc_ids"
  37. RangeByConditionWithDocIdsNoLimit = "range_by_condition_with_doc_ids_no_limit"
  38. RangeWithDocIds = "range_with_doc_ids"
  39. LimitByScore = "limit_by_score"
  40. HomeSearch = "home_search"
  41. )
  42. func GetInstance() *ESClient {
  43. esOnce.Do(func() {
  44. // 检查是否成功获取到RedisConfig实例,没有配置则不进行redis初始化
  45. if esConf, ok := config.GetConfig(contants.ES).(*config.ESConfig); ok {
  46. logger.Info("初始化es")
  47. // 这里可以添加初始化Redis的逻辑
  48. esClient = newEs(esConf)
  49. logger.Info("es地址:%v", esConf.GetUrl())
  50. }
  51. })
  52. return esClient
  53. }
  54. func (es *ESClient) es() *elasticsearch.Client {
  55. return es.esOp
  56. }
  57. func newEs(config *config.ESConfig) *ESClient {
  58. elasticsearch.NewDefaultClient()
  59. client, err := elasticsearch.NewClient(
  60. elasticsearch.Config{
  61. Addresses: []string{config.GetUrl()},
  62. // A list of Elasticsearch nodes to use.
  63. Username: config.GetUserName(),
  64. Password: config.GetPassword(), // Password for HTTP Basic Authentication.
  65. },
  66. )
  67. if err != nil {
  68. logger.Error("连接ES失败:%v", err)
  69. panic("启动es失败")
  70. }
  71. return &ESClient{esOp: client}
  72. }
  73. func init() {
  74. if GetInstance() == nil {
  75. panic("初始化es失败")
  76. }
  77. logger.Info("es初始化成功")
  78. }
  79. // BulkInsert 批量创建文档
  80. func (es *ESClient) BulkInsert(indexName string, docs []ESBase) (err error) {
  81. // 创建批量请求
  82. bulkBody := new(bytes.Buffer)
  83. for _, doc := range docs {
  84. enc := json.NewEncoder(bulkBody)
  85. if err = enc.Encode(map[string]interface{}{
  86. "index": map[string]interface{}{
  87. "_index": indexName,
  88. "_id": doc.GetId(),
  89. },
  90. }); err != nil {
  91. logger.Error("生成es批处理请求参数失败: %s", err)
  92. }
  93. if err = enc.Encode(doc); err != nil {
  94. logger.Error("生成es批处理文档失败: %s", err)
  95. }
  96. }
  97. bulkReq := esapi.BulkRequest{
  98. Body: bytes.NewReader(bulkBody.Bytes()),
  99. Refresh: "true",
  100. }
  101. res, err := bulkReq.Do(context.Background(), es.esOp)
  102. if err != nil {
  103. logger.Error("es批处理创建失败: %s", err)
  104. }
  105. defer res.Body.Close()
  106. if res.IsError() {
  107. var e map[string]interface{}
  108. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  109. logger.Error("解析es应答失败: %v", err)
  110. } else {
  111. logger.Error("es请求失败: %s: %v\n", res.Status(), err)
  112. }
  113. }
  114. return
  115. }
  116. type ESResponse struct {
  117. Took int `json:"took"`
  118. Count int `json:"count"`
  119. TimedOut bool `json:"timedOut"`
  120. Hits Hits `json:"hits"`
  121. _Shards ShardsInfo `json:"_shards"`
  122. }
  123. type Hits struct {
  124. Total TotalHits `json:"total"`
  125. MaxScore float64 `json:"maxScore"`
  126. Hits []Hit `json:"hits"`
  127. }
  128. type TotalHits struct {
  129. Value int `json:"value"`
  130. Relation string `json:"relation"`
  131. }
  132. type Hit struct {
  133. Index string `json:"_index"`
  134. Type string `json:"_type"`
  135. ID string `json:"_id"`
  136. Score float64 `json:"_score"`
  137. Source json.RawMessage `json:"_source"`
  138. Highlight json.RawMessage `json:"highlight"`
  139. }
  140. type Doc struct {
  141. Index string `json:"_index"`
  142. Type string `json:"_type"`
  143. ID string `json:"_id"`
  144. Version float64 `json:"_version"`
  145. SeqNo float64 `json:"_seq_no"`
  146. PrimaryTerm float64 `json:"_primary_term"`
  147. Found bool `json:"found"`
  148. Source json.RawMessage `json:"_source"`
  149. }
  150. type ShardsInfo struct {
  151. Total int `json:"total"`
  152. Successful int `json:"successful"`
  153. Skipped int `json:"skipped"`
  154. Failed int `json:"failed"`
  155. }
  156. type ESQueryRequest struct {
  157. IndexName string
  158. From int
  159. Size int
  160. Key string
  161. Column string
  162. Condition string
  163. ConditionValue string
  164. Sorts []string
  165. Type SearchType
  166. RangeColumn string
  167. DocIds []string
  168. Max interface{}
  169. Min interface{}
  170. MinScore float64
  171. }
  172. func (req *ESQueryRequest) CreateESQueryRequest(index string, column string, key string, from int, size int, sorts []string, searchType SearchType) *ESQueryRequest {
  173. return &ESQueryRequest{
  174. IndexName: index,
  175. Type: searchType,
  176. From: from,
  177. Size: size,
  178. Key: key,
  179. Column: column,
  180. Sorts: sorts,
  181. }
  182. }
  183. func (req *ESQueryRequest) Limit(limit int) *ESQueryRequest {
  184. req.Size = limit
  185. return req
  186. }
  187. func (req *ESQueryRequest) WithScore(score float64) *ESQueryRequest {
  188. req.MinScore = score
  189. return req
  190. }
  191. func (req *ESQueryRequest) Range(from int64, to int64, column string) *ESQueryRequest {
  192. req.RangeColumn = column
  193. req.Max = to
  194. req.Min = from
  195. return req
  196. }
  197. func (req *ESQueryRequest) Before(max interface{}, column string) *ESQueryRequest {
  198. req.RangeColumn = column
  199. req.Max = max
  200. return req
  201. }
  202. func (req *ESQueryRequest) ByCondition(column string, value string) *ESQueryRequest {
  203. req.Condition = column
  204. req.ConditionValue = value
  205. return req
  206. }
  207. func (req *ESQueryRequest) WithDocs(docIds []string) *ESQueryRequest {
  208. req.DocIds = docIds
  209. return req
  210. }
  211. func (req *ESQueryRequest) parseJsonQuery() (queryMap map[string]interface{}) {
  212. switch req.Type {
  213. case MatchAll:
  214. queryMap = map[string]interface{}{
  215. "query": map[string]interface{}{
  216. "match": map[string]interface{}{
  217. req.Column: req.Key,
  218. },
  219. },
  220. }
  221. return
  222. case MatchAllByCondition:
  223. queryMap = map[string]interface{}{
  224. "query": map[string]interface{}{
  225. "bool": map[string]interface{}{
  226. "must": []map[string]interface{}{
  227. {
  228. "match": map[string]interface{}{
  229. req.Column: req.Key,
  230. },
  231. },
  232. {
  233. "term": map[string]interface{}{
  234. req.Condition: req.ConditionValue,
  235. },
  236. },
  237. },
  238. },
  239. },
  240. }
  241. return
  242. case Match:
  243. queryMap = map[string]interface{}{
  244. "query": map[string]interface{}{
  245. "match": map[string]interface{}{
  246. req.Column: req.Key,
  247. },
  248. },
  249. "highlight": map[string]interface{}{
  250. "fields": map[string]interface{}{
  251. req.Column: map[string]interface{}{},
  252. },
  253. "pre_tags": []string{"<span style='color:#0078E8'>"},
  254. "post_tags": []string{"</span>"},
  255. },
  256. }
  257. return
  258. case CountWithDocIds:
  259. queryMap = map[string]interface{}{
  260. "query": map[string]interface{}{
  261. "bool": map[string]interface{}{
  262. "must": []map[string]interface{}{
  263. {
  264. "match": map[string]interface{}{
  265. req.Column: req.Key,
  266. },
  267. },
  268. },
  269. "filter": map[string]interface{}{
  270. "terms": map[string]interface{}{
  271. "_id": req.DocIds,
  272. },
  273. },
  274. },
  275. },
  276. }
  277. return
  278. case Range:
  279. queryMap = map[string]interface{}{
  280. "query": map[string]interface{}{
  281. "match": map[string]interface{}{
  282. req.Column: req.Key,
  283. },
  284. },
  285. "highlight": map[string]interface{}{
  286. "fields": map[string]interface{}{
  287. req.Column: map[string]interface{}{},
  288. },
  289. "pre_tags": []string{"<span style='color:#0078E8'>"},
  290. "post_tags": []string{"</span>"},
  291. },
  292. "post_filter": map[string]interface{}{
  293. "range": map[string]interface{}{
  294. req.RangeColumn: map[string]interface{}{
  295. "gte": req.Min,
  296. "lte": req.Max,
  297. },
  298. },
  299. },
  300. }
  301. return
  302. case RangeWithDocIds:
  303. queryMap = map[string]interface{}{
  304. "query": map[string]interface{}{
  305. "match": map[string]interface{}{
  306. req.Column: req.Key,
  307. },
  308. },
  309. "highlight": map[string]interface{}{
  310. "fields": map[string]interface{}{
  311. req.Column: map[string]interface{}{},
  312. },
  313. "pre_tags": []string{"<span style='color:#0078E8'>"},
  314. "post_tags": []string{"</span>"},
  315. },
  316. "post_filter": map[string]interface{}{
  317. "bool": map[string]interface{}{
  318. "must": []map[string]interface{}{
  319. {
  320. "range": map[string]interface{}{
  321. req.RangeColumn: map[string]interface{}{
  322. "gte": req.Min,
  323. "lte": req.Max,
  324. },
  325. },
  326. },
  327. {
  328. "terms": map[string]interface{}{
  329. "_id": req.DocIds,
  330. },
  331. },
  332. },
  333. },
  334. },
  335. }
  336. return
  337. case RangeByCondition:
  338. queryMap = map[string]interface{}{
  339. "query": map[string]interface{}{
  340. "match": map[string]interface{}{
  341. req.Column: req.Key,
  342. },
  343. },
  344. "highlight": map[string]interface{}{
  345. "fields": map[string]interface{}{
  346. req.Column: map[string]interface{}{},
  347. },
  348. "pre_tags": []string{"<span style='color:#0078E8'>"},
  349. "post_tags": []string{"</span>"},
  350. },
  351. "post_filter": map[string]interface{}{
  352. "bool": map[string]interface{}{
  353. "must": []map[string]interface{}{
  354. {
  355. "range": map[string]interface{}{
  356. req.RangeColumn: map[string]interface{}{
  357. "gte": req.Min,
  358. "lte": req.Max,
  359. },
  360. },
  361. },
  362. {
  363. "term": map[string]interface{}{
  364. req.Condition: req.ConditionValue,
  365. },
  366. },
  367. },
  368. },
  369. },
  370. }
  371. return
  372. case RangeByConditionWithDocIds:
  373. queryMap = map[string]interface{}{
  374. "query": map[string]interface{}{
  375. "match": map[string]interface{}{
  376. req.Column: req.Key,
  377. },
  378. },
  379. "highlight": map[string]interface{}{
  380. "fields": map[string]interface{}{
  381. req.Column: map[string]interface{}{},
  382. },
  383. "pre_tags": []string{"<span style='color:#0078E8'>"},
  384. "post_tags": []string{"</span>"},
  385. },
  386. "post_filter": map[string]interface{}{
  387. "bool": map[string]interface{}{
  388. "must": []map[string]interface{}{
  389. {
  390. "range": map[string]interface{}{
  391. req.RangeColumn: map[string]interface{}{
  392. "gte": req.Min,
  393. "lte": req.Max,
  394. },
  395. },
  396. },
  397. {
  398. "term": map[string]interface{}{
  399. req.Condition: req.ConditionValue,
  400. },
  401. },
  402. {
  403. "terms": map[string]interface{}{
  404. "_id": req.DocIds,
  405. },
  406. },
  407. },
  408. },
  409. },
  410. }
  411. return
  412. case RangeByConditionWithDocIdsNoLimit:
  413. queryMap = map[string]interface{}{
  414. "query": map[string]interface{}{
  415. "match": map[string]interface{}{
  416. req.Column: req.Key,
  417. },
  418. },
  419. "highlight": map[string]interface{}{
  420. "fields": map[string]interface{}{
  421. req.Column: map[string]interface{}{},
  422. },
  423. "pre_tags": []string{"<span style='color:#0078E8'>"},
  424. "post_tags": []string{"</span>"},
  425. },
  426. "post_filter": map[string]interface{}{
  427. "bool": map[string]interface{}{
  428. "must": []map[string]interface{}{
  429. {
  430. "term": map[string]interface{}{
  431. req.Condition: req.ConditionValue,
  432. },
  433. },
  434. {
  435. "terms": map[string]interface{}{
  436. "_id": req.DocIds,
  437. },
  438. },
  439. },
  440. },
  441. },
  442. }
  443. return
  444. case LimitByScore:
  445. queryMap = map[string]interface{}{
  446. "query": map[string]interface{}{
  447. "match": map[string]interface{}{
  448. req.Column: req.Key,
  449. },
  450. },
  451. "highlight": map[string]interface{}{
  452. "fields": map[string]interface{}{
  453. req.Column: map[string]interface{}{},
  454. },
  455. "pre_tags": []string{"<span style='color:#0078E8'>"},
  456. "post_tags": []string{"</span>"},
  457. },
  458. "post_filter": map[string]interface{}{
  459. "bool": map[string]interface{}{
  460. "must": []map[string]interface{}{
  461. {
  462. "terms": map[string]interface{}{
  463. "_id": req.DocIds,
  464. },
  465. },
  466. },
  467. },
  468. },
  469. "min_score": req.MinScore,
  470. }
  471. return
  472. case HomeSearch:
  473. queryMap = map[string]interface{}{
  474. "query": map[string]interface{}{
  475. "bool": map[string]interface{}{
  476. "should": []map[string]interface{}{
  477. {
  478. "bool": map[string]interface{}{
  479. "must": []map[string]interface{}{
  480. {
  481. "match": map[string]interface{}{
  482. "title": req.Key,
  483. },
  484. },
  485. {
  486. "term": map[string]interface{}{
  487. "status": "PUBLISH",
  488. },
  489. },
  490. {
  491. "range": map[string]interface{}{
  492. "publishedTime": map[string]interface{}{
  493. "lte": req.Max,
  494. },
  495. },
  496. },
  497. },
  498. },
  499. },
  500. {
  501. "bool": map[string]interface{}{
  502. "must": []map[string]interface{}{
  503. {
  504. "match": map[string]interface{}{
  505. "mediaName": req.Key,
  506. },
  507. },
  508. {
  509. "range": map[string]interface{}{
  510. req.RangeColumn: map[string]interface{}{
  511. "lte": req.Max,
  512. },
  513. },
  514. },
  515. },
  516. },
  517. },
  518. },
  519. },
  520. },
  521. "highlight": map[string]interface{}{
  522. "fields": map[string]interface{}{
  523. "title": map[string]interface{}{},
  524. "mediaName": map[string]interface{}{},
  525. },
  526. "pre_tags": []string{"<span style='color:#0078E8'>"},
  527. "post_tags": []string{"</span>"},
  528. },
  529. }
  530. return
  531. default:
  532. queryMap = map[string]interface{}{}
  533. return
  534. }
  535. }
  536. // /*
  537. // *
  538. //
  539. // 搜索
  540. //
  541. // indexName 访问索引名
  542. // query 搜索条件
  543. // from 开始搜索位置
  544. // size 搜索条数
  545. // sort 排序
  546. // */
  547. func (es *ESClient) Count(params *ESQueryRequest) (response ESResponse, err error) {
  548. queryMap := params.parseJsonQuery()
  549. jsonQuery, _ := json.Marshal(queryMap)
  550. logger.Info("查询语句: %s", string(jsonQuery))
  551. request := esapi.CountRequest{
  552. Index: []string{params.IndexName},
  553. Body: strings.NewReader(string(jsonQuery)),
  554. }
  555. res, err := request.Do(context.Background(), esClient.esOp)
  556. defer res.Body.Close()
  557. if err != nil {
  558. logger.Error("es查询失败: %s", err)
  559. }
  560. if res.IsError() {
  561. var e map[string]interface{}
  562. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  563. logger.Error("解析es应答失败: %v", err)
  564. } else {
  565. logger.Error("es请求失败: %s: %v\n", res.Status(), e)
  566. }
  567. }
  568. body, err := io.ReadAll(res.Body)
  569. if err != nil {
  570. logger.Error("获取es应答失败: %v", err)
  571. }
  572. return parseESResponse(body)
  573. }
  574. func (es *ESClient) Search(params *ESQueryRequest) (response ESResponse, err error) {
  575. queryMap := params.parseJsonQuery()
  576. jsonQuery, _ := json.Marshal(queryMap)
  577. logger.Info("查询语句: %s", string(jsonQuery))
  578. indexes := strings.Split(params.IndexName, ",")
  579. request := esapi.SearchRequest{
  580. Index: indexes,
  581. Body: strings.NewReader(string(jsonQuery)),
  582. From: &params.From,
  583. Size: &params.Size,
  584. Sort: params.Sorts,
  585. }
  586. res, err := request.Do(context.Background(), esClient.esOp)
  587. defer res.Body.Close()
  588. if err != nil {
  589. logger.Error("es查询失败: %s", err)
  590. }
  591. if res.IsError() {
  592. var e map[string]interface{}
  593. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  594. logger.Error("解析es应答失败: %v", err)
  595. } else {
  596. logger.Error("es请求失败: %s: %v\n", res.Status(), e)
  597. }
  598. }
  599. body, err := io.ReadAll(res.Body)
  600. if err != nil {
  601. logger.Error("获取es应答失败: %v", err)
  602. }
  603. return parseESResponse(body)
  604. }
  605. func parseESResponse(body []byte) (ESResponse, error) {
  606. var response ESResponse
  607. if err := json.Unmarshal(body, &response); err != nil {
  608. return ESResponse{}, err
  609. }
  610. for _, hit := range response.Hits.Hits {
  611. var source map[string]interface{}
  612. if err := json.Unmarshal(hit.Source, &source); err != nil {
  613. return ESResponse{}, err
  614. }
  615. }
  616. return response, nil
  617. }
  618. func (es *ESClient) GetSource(hits Hits) []Hit {
  619. return hits.Hits
  620. }
  621. func (es *ESClient) GetCount(hits Hits) []Hit {
  622. return hits.Hits
  623. }
  624. // /*
  625. // *
  626. // 添加es
  627. // indexName 索引名
  628. // id es的id
  629. // body es的值
  630. // */
  631. func (es *ESClient) Update(indexName string, id int, doc interface{}) bool {
  632. jsonUpdate := map[string]interface{}{
  633. "doc": doc,
  634. }
  635. jsonDoc, _ := json.Marshal(jsonUpdate)
  636. logger.Info("查询语句: %s", string(jsonDoc))
  637. req := esapi.UpdateRequest{
  638. Index: indexName,
  639. DocumentID: strconv.Itoa(id),
  640. Body: strings.NewReader(string(jsonDoc)),
  641. Refresh: "true",
  642. }
  643. res, err := req.Do(context.Background(), es.es())
  644. defer res.Body.Close()
  645. if err != nil {
  646. logger.Error("es查询失败: %s", err)
  647. }
  648. if res.IsError() {
  649. var e map[string]interface{}
  650. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  651. logger.Error("解析es应答失败: %v", err)
  652. } else {
  653. logger.Error("es请求失败: %s: %v\n", res.Status(), e)
  654. }
  655. }
  656. body, err := io.ReadAll(res.Body)
  657. if err != nil {
  658. logger.Error("获取es应答失败: %v", err)
  659. }
  660. fmt.Printf("%s\n", string(body))
  661. return true
  662. }
  663. func (es *ESClient) InsertOrUpdate(indexName string, id int, doc interface{}) (success bool) {
  664. if exist, existErr := es.Exist(indexName, id); existErr == nil && exist {
  665. return es.Update(indexName, id, doc)
  666. } else {
  667. return es.CreateDocument(indexName, id, doc)
  668. }
  669. }
  670. // Delete *
  671. // 删除
  672. // indexName 索引名
  673. // id es的id
  674. // */
  675. func (es *ESClient) Delete(indexName string, query interface{}) bool {
  676. jsonQuery, _ := json.Marshal(query)
  677. refresh := new(bool)
  678. *refresh = true
  679. logger.Info("查询语句: %s", string(jsonQuery))
  680. req := esapi.DeleteByQueryRequest{
  681. Index: []string{indexName},
  682. Body: strings.NewReader(string(jsonQuery)),
  683. Refresh: refresh,
  684. }
  685. res, err := req.Do(context.Background(), es.es())
  686. defer res.Body.Close()
  687. if err != nil {
  688. logger.Error("es查询失败: %s", err)
  689. }
  690. if res.IsError() {
  691. var e map[string]interface{}
  692. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  693. logger.Error("解析es应答失败: %v", err)
  694. } else {
  695. logger.Error("es请求失败: %s: %v\n", res.Status(), e)
  696. }
  697. }
  698. body, err := io.ReadAll(res.Body)
  699. if err != nil {
  700. logger.Error("获取es应答失败: %v", err)
  701. }
  702. fmt.Printf("%s\n", string(body))
  703. return true
  704. }
  705. func (es *ESClient) Exist(indexName string, docId int) (exist bool, err error) {
  706. getRequest := esapi.GetRequest{
  707. Index: indexName,
  708. DocumentID: strconv.Itoa(docId),
  709. }
  710. // 执行请求
  711. res, err := getRequest.Do(context.Background(), es.es())
  712. if err != nil {
  713. logger.Error("es获取文档是否存在失败: %v", err)
  714. return
  715. }
  716. defer res.Body.Close()
  717. // 检查文档是否存在
  718. if res.IsError() {
  719. // 如果文档不存在,通常返回 404 Not Found
  720. if res.StatusCode == 404 {
  721. logger.Info("文档不存在.")
  722. err = errors.New("ES文档不存在")
  723. return
  724. } else {
  725. // 其他错误
  726. var e map[string]interface{}
  727. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  728. logger.Error("解析es应答失败: %v", err)
  729. return
  730. } else {
  731. // Print the response status and error information.
  732. logger.Error("[%s] %s: %s\n", res.Status(), e["error"].(map[string]interface{})["type"], e["error"].(map[string]interface{})["原因"])
  733. err = errors.New("获取ES记录失败")
  734. return
  735. }
  736. }
  737. } else {
  738. // 如果文档存在
  739. logger.Info("doc存在")
  740. return true, nil
  741. }
  742. }
  743. func (es *ESClient) Get(indexName string, docId int) (doc Doc, err error) {
  744. getRequest := esapi.GetRequest{
  745. Index: indexName,
  746. DocumentID: strconv.Itoa(docId),
  747. }
  748. // 执行请求
  749. res, err := getRequest.Do(context.Background(), es.es())
  750. if err != nil {
  751. logger.Error("es获取文档是否存在失败: %v", err)
  752. return
  753. }
  754. defer res.Body.Close()
  755. // 检查文档是否存在
  756. if res.IsError() {
  757. // 如果文档不存在,通常返回 404 Not Found
  758. if res.StatusCode == 404 {
  759. logger.Info("文档不存在.")
  760. err = errors.New("ES文档不存在")
  761. return
  762. } else {
  763. // 其他错误
  764. var e map[string]interface{}
  765. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  766. logger.Error("解析es应答失败: %v", err)
  767. return
  768. } else {
  769. // Print the response status and error information.
  770. logger.Error("[%s] %s: %s\n", res.Status(), e["error"].(map[string]interface{})["type"], e["error"].(map[string]interface{})["原因"])
  771. err = errors.New("获取ES记录失败")
  772. return
  773. }
  774. }
  775. } else {
  776. // 如果文档存在
  777. body, readErr := io.ReadAll(res.Body)
  778. if readErr != nil {
  779. logger.Error("获取es应答失败: %v", err)
  780. err = readErr
  781. return
  782. }
  783. err = json.Unmarshal(body, &doc)
  784. if err != nil {
  785. logger.Error("反序列化es应答失败: %v", err)
  786. return
  787. }
  788. return
  789. }
  790. }
  791. //
  792. //func CreateIndex(indexName string) error {
  793. // resp, err := esClient.es().Indices.
  794. // Create(indexName).
  795. // Do(context.Background())
  796. // if err != nil {
  797. // logger.Error("创建ES索引失败:%v", err)
  798. // return err
  799. // }
  800. // fmt.Printf("index:%#v\n", resp.Index)
  801. // return nil
  802. //}
  803. // DeleteIndex 删除索引
  804. //
  805. // func DeleteIndex(indexName string) error {
  806. // _, err := esClient.es().Indices. // 表明是对索引的操作,而Index则表示是要操作具体索引下的文档
  807. // Delete(indexName).
  808. // Do(context.Background())
  809. // if err != nil {
  810. // fmt.Printf("delete index failed,err:%v\n", err)
  811. // return err
  812. // }
  813. // fmt.Printf("delete index successed,indexName:%s", indexName)
  814. // return nil
  815. // }
  816. //
  817. // CreateDocument 创建文档
  818. func (es *ESClient) CreateDocument(indexName string, id int, doc interface{}) (success bool) {
  819. jsonDoc, _ := json.Marshal(doc)
  820. logger.Info("查询语句: %s", string(jsonDoc))
  821. // 添加文档
  822. indexRequest := esapi.IndexRequest{
  823. Index: indexName,
  824. DocumentID: strconv.Itoa(id),
  825. Body: strings.NewReader(string(jsonDoc)),
  826. Refresh: "true",
  827. }
  828. // 执行请求
  829. res, err := indexRequest.Do(context.Background(), es.es())
  830. if err != nil {
  831. logger.Error("ES创建文档失败: %s", err)
  832. return false
  833. }
  834. defer res.Body.Close()
  835. // 检查文档是否成功创建
  836. if res.IsError() {
  837. var e map[string]interface{}
  838. if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
  839. logger.Error("解析ES应答失败: %s", err)
  840. } else {
  841. // Print the response status and error information.
  842. logger.Error("[%s] %s: %s\n", res.Status(), e["error"].(map[string]interface{})["类型"], e["错误"].(map[string]interface{})["原因"])
  843. }
  844. return false
  845. } else {
  846. // 如果文档成功创建
  847. logger.Info("创建文档成功")
  848. return true
  849. }
  850. }