es.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878
  1. package es
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "errors"
  7. "eta/eta_mini_ht_api/common/component/config"
  8. logger "eta/eta_mini_ht_api/common/component/log"
  9. "eta/eta_mini_ht_api/common/contants"
  10. "fmt"
  11. "github.com/elastic/go-elasticsearch/v7"
  12. "github.com/elastic/go-elasticsearch/v7/esapi"
  13. "io"
  14. "strconv"
  15. "strings"
  16. "sync"
  17. )
  18. type ESBase interface {
  19. GetId() string
  20. }
  21. var (
  22. esOnce sync.Once
  23. esClient *ESClient
  24. )
  25. type ESClient struct {
  26. esOp *elasticsearch.Client
  27. }
  28. type SearchType string
  29. const (
  30. MatchAll = "match_all"
  31. Match = "match"
  32. CountWithDocIds = "count_with_doc_ids"
  33. Range = "range"
  34. MatchAllByCondition = "match_all_by_condition"
  35. RangeByCondition = "range_by_condition"
  36. RangeByConditionWithDocIds = "range_by_condition_with_doc_ids"
  37. RangeByConditionWithDocIdsNoLimit = "range_by_condition_with_doc_ids_no_limit"
  38. RangeWithDocIds = "range_with_doc_ids"
  39. LimitByScore = "limit_by_score"
  40. HomeSearch = "home_search"
  41. )
  42. func GetInstance() *ESClient {
  43. esOnce.Do(func() {
  44. // 检查是否成功获取到RedisConfig实例,没有配置则不进行redis初始化
  45. if esConf, ok := config.GetConfig(contants.ES).(*config.ESConfig); ok {
  46. logger.Info("初始化es")
  47. // 这里可以添加初始化Redis的逻辑
  48. esClient = newEs(esConf)
  49. logger.Info("es地址:%v", esConf.GetUrl())
  50. }
  51. })
  52. return esClient
  53. }
  54. func (es *ESClient) es() *elasticsearch.Client {
  55. return es.esOp
  56. }
  57. func newEs(config *config.ESConfig) *ESClient {
  58. elasticsearch.NewDefaultClient()
  59. client, err := elasticsearch.NewClient(
  60. elasticsearch.Config{
  61. Addresses: []string{config.GetUrl()},
  62. // A list of Elasticsearch nodes to use.
  63. Username: config.GetUserName(),
  64. Password: config.GetPassword(), // Password for HTTP Basic Authentication.
  65. },
  66. )
  67. if err != nil {
  68. logger.Error("连接ES失败:%v", err)
  69. panic("启动es失败")
  70. }
  71. return &ESClient{esOp: client}
  72. }
  73. func init() {
  74. if GetInstance() == nil {
  75. panic("初始化es失败")
  76. }
  77. logger.Info("es初始化成功")
  78. }
  79. // BulkInsert 批量创建文档
  80. func (es *ESClient) BulkInsert(indexName string, docs []ESBase) (err error) {
  81. // 创建批量请求
  82. bulkBody := new(bytes.Buffer)
  83. for _, doc := range docs {
  84. enc := json.NewEncoder(bulkBody)
  85. if err = enc.Encode(map[string]interface{}{
  86. "index": map[string]interface{}{
  87. "_index": indexName,
  88. "_id": doc.GetId(),
  89. },
  90. }); err != nil {
  91. logger.Error("生成es批处理请求参数失败: %s", err)
  92. }
  93. if err = enc.Encode(doc); err != nil {
  94. logger.Error("生成es批处理文档失败: %s", err)
  95. }
  96. }
  97. bulkReq := esapi.BulkRequest{
  98. Body: bytes.NewReader(bulkBody.Bytes()),
  99. Refresh: "true",
  100. }
  101. res, err := bulkReq.Do(context.Background(), es.esOp)
  102. if err != nil {
  103. logger.Error("es批处理创建失败: %s", err)
  104. }
  105. defer res.Body.Close()
  106. if res.IsError() {
  107. var e map[string]interface{}
  108. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  109. logger.Error("解析es应答失败: %v", err)
  110. } else {
  111. logger.Error("es请求失败: %s: %v\n", res.Status(), err)
  112. }
  113. }
  114. return
  115. }
  116. type ESResponse struct {
  117. Took int `json:"took"`
  118. Count int `json:"count"`
  119. TimedOut bool `json:"timedOut"`
  120. Hits Hits `json:"hits"`
  121. _Shards ShardsInfo `json:"_shards"`
  122. }
  123. type Hits struct {
  124. Total TotalHits `json:"total"`
  125. MaxScore float64 `json:"maxScore"`
  126. Hits []Hit `json:"hits"`
  127. }
  128. type TotalHits struct {
  129. Value int `json:"value"`
  130. Relation string `json:"relation"`
  131. }
  132. type Hit struct {
  133. Index string `json:"_index"`
  134. Type string `json:"_type"`
  135. ID string `json:"_id"`
  136. Score float64 `json:"_score"`
  137. Source json.RawMessage `json:"_source"`
  138. Highlight json.RawMessage `json:"highlight"`
  139. }
  140. type Doc struct {
  141. Index string `json:"_index"`
  142. Type string `json:"_type"`
  143. ID string `json:"_id"`
  144. Version float64 `json:"_version"`
  145. SeqNo float64 `json:"_seq_no"`
  146. PrimaryTerm float64 `json:"_primary_term"`
  147. Found bool `json:"found"`
  148. Source json.RawMessage `json:"_source"`
  149. }
  150. type ShardsInfo struct {
  151. Total int `json:"total"`
  152. Successful int `json:"successful"`
  153. Skipped int `json:"skipped"`
  154. Failed int `json:"failed"`
  155. }
  156. type ESQueryRequest struct {
  157. IndexName string
  158. From int
  159. Size int
  160. Key string
  161. Column string
  162. Condition string
  163. ConditionValue string
  164. Sorts []string
  165. Type SearchType
  166. RangeColumn string
  167. DocIds []string
  168. Max interface{}
  169. Min interface{}
  170. MinScore float64
  171. }
  172. func (req *ESQueryRequest) CreateESQueryRequest(index string, column string, key string, from int, size int, sorts []string, searchType SearchType) *ESQueryRequest {
  173. return &ESQueryRequest{
  174. IndexName: index,
  175. Type: searchType,
  176. From: from,
  177. Size: size,
  178. Key: key,
  179. Column: column,
  180. Sorts: sorts,
  181. }
  182. }
  183. func (req *ESQueryRequest) Limit(limit int) *ESQueryRequest {
  184. req.Size = limit
  185. return req
  186. }
  187. func (req *ESQueryRequest) WithScore(score float64) *ESQueryRequest {
  188. req.MinScore = score
  189. return req
  190. }
  191. func (req *ESQueryRequest) Range(from int64, to int64, column string) *ESQueryRequest {
  192. req.RangeColumn = column
  193. req.Max = to
  194. req.Min = from
  195. return req
  196. }
  197. func (req *ESQueryRequest) Before(max interface{}, column string) *ESQueryRequest {
  198. req.RangeColumn = column
  199. req.Max = max
  200. return req
  201. }
  202. func (req *ESQueryRequest) ByCondition(column string, value string) *ESQueryRequest {
  203. req.Condition = column
  204. req.ConditionValue = value
  205. return req
  206. }
  207. func (req *ESQueryRequest) WithDocs(docIds []string) *ESQueryRequest {
  208. req.DocIds = docIds
  209. return req
  210. }
  211. func (req *ESQueryRequest) parseJsonQuery() (queryMap map[string]interface{}) {
  212. switch req.Type {
  213. case MatchAll:
  214. queryMap = map[string]interface{}{
  215. "query": map[string]interface{}{
  216. "match": map[string]interface{}{
  217. req.Column: req.Key,
  218. },
  219. },
  220. }
  221. return
  222. case MatchAllByCondition:
  223. queryMap = map[string]interface{}{
  224. "query": map[string]interface{}{
  225. "bool": map[string]interface{}{
  226. "must": []map[string]interface{}{
  227. {
  228. "match": map[string]interface{}{
  229. req.Column: req.Key,
  230. },
  231. },
  232. {
  233. "term": map[string]interface{}{
  234. req.Condition: req.ConditionValue,
  235. },
  236. },
  237. },
  238. },
  239. },
  240. }
  241. return
  242. case Match:
  243. queryMap = map[string]interface{}{
  244. "query": map[string]interface{}{
  245. "match": map[string]interface{}{
  246. req.Column: req.Key,
  247. },
  248. },
  249. "highlight": map[string]interface{}{
  250. "fields": map[string]interface{}{
  251. req.Column: map[string]interface{}{},
  252. },
  253. "pre_tags": []string{"<span style='color:#0078E8'>"},
  254. "post_tags": []string{"</span>"},
  255. },
  256. }
  257. return
  258. case CountWithDocIds:
  259. queryMap = map[string]interface{}{
  260. "query": map[string]interface{}{
  261. "bool": map[string]interface{}{
  262. "must": []map[string]interface{}{
  263. {
  264. "match": map[string]interface{}{
  265. req.Column: req.Key,
  266. },
  267. },
  268. },
  269. "filter": map[string]interface{}{
  270. "terms": map[string]interface{}{
  271. "_id": req.DocIds,
  272. },
  273. },
  274. },
  275. },
  276. }
  277. return
  278. case Range:
  279. queryMap = map[string]interface{}{
  280. "query": map[string]interface{}{
  281. "match": map[string]interface{}{
  282. req.Column: req.Key,
  283. },
  284. },
  285. "highlight": map[string]interface{}{
  286. "fields": map[string]interface{}{
  287. req.Column: map[string]interface{}{},
  288. },
  289. "pre_tags": []string{"<span style='color:#0078E8'>"},
  290. "post_tags": []string{"</span>"},
  291. },
  292. "post_filter": map[string]interface{}{
  293. "range": map[string]interface{}{
  294. req.RangeColumn: map[string]interface{}{
  295. "gte": req.Min,
  296. "lte": req.Max,
  297. },
  298. },
  299. },
  300. }
  301. return
  302. case RangeWithDocIds:
  303. queryMap = map[string]interface{}{
  304. "query": map[string]interface{}{
  305. "match": map[string]interface{}{
  306. req.Column: req.Key,
  307. },
  308. },
  309. "highlight": map[string]interface{}{
  310. "fields": map[string]interface{}{
  311. req.Column: map[string]interface{}{},
  312. },
  313. "pre_tags": []string{"<span style='color:#0078E8'>"},
  314. "post_tags": []string{"</span>"},
  315. },
  316. "post_filter": map[string]interface{}{
  317. "bool": map[string]interface{}{
  318. "must": []map[string]interface{}{
  319. {
  320. "range": map[string]interface{}{
  321. req.RangeColumn: map[string]interface{}{
  322. "gte": req.Min,
  323. "lte": req.Max,
  324. },
  325. },
  326. },
  327. {
  328. "terms": map[string]interface{}{
  329. "_id": req.DocIds,
  330. },
  331. },
  332. },
  333. },
  334. },
  335. }
  336. return
  337. case RangeByCondition:
  338. queryMap = map[string]interface{}{
  339. "query": map[string]interface{}{
  340. "match": map[string]interface{}{
  341. req.Column: req.Key,
  342. },
  343. },
  344. "highlight": map[string]interface{}{
  345. "fields": map[string]interface{}{
  346. req.Column: map[string]interface{}{},
  347. },
  348. "pre_tags": []string{"<span style='color:#0078E8'>"},
  349. "post_tags": []string{"</span>"},
  350. },
  351. "post_filter": map[string]interface{}{
  352. "bool": map[string]interface{}{
  353. "must": []map[string]interface{}{
  354. {
  355. "range": map[string]interface{}{
  356. req.RangeColumn: map[string]interface{}{
  357. "gte": req.Min,
  358. "lte": req.Max,
  359. },
  360. },
  361. },
  362. {
  363. "term": map[string]interface{}{
  364. req.Condition: req.ConditionValue,
  365. },
  366. },
  367. },
  368. },
  369. },
  370. }
  371. return
  372. case RangeByConditionWithDocIds:
  373. queryMap = map[string]interface{}{
  374. "query": map[string]interface{}{
  375. "match": map[string]interface{}{
  376. req.Column: req.Key,
  377. },
  378. },
  379. "highlight": map[string]interface{}{
  380. "fields": map[string]interface{}{
  381. req.Column: map[string]interface{}{},
  382. },
  383. "pre_tags": []string{"<span style='color:#0078E8'>"},
  384. "post_tags": []string{"</span>"},
  385. },
  386. "post_filter": map[string]interface{}{
  387. "bool": map[string]interface{}{
  388. "must": []map[string]interface{}{
  389. {
  390. "range": map[string]interface{}{
  391. req.RangeColumn: map[string]interface{}{
  392. "gte": req.Min,
  393. "lte": req.Max,
  394. },
  395. },
  396. },
  397. {
  398. "term": map[string]interface{}{
  399. req.Condition: req.ConditionValue,
  400. },
  401. },
  402. {
  403. "terms": map[string]interface{}{
  404. "_id": req.DocIds,
  405. },
  406. },
  407. },
  408. },
  409. },
  410. }
  411. return
  412. case RangeByConditionWithDocIdsNoLimit:
  413. queryMap = map[string]interface{}{
  414. "query": map[string]interface{}{
  415. "match": map[string]interface{}{
  416. req.Column: req.Key,
  417. },
  418. },
  419. "highlight": map[string]interface{}{
  420. "fields": map[string]interface{}{
  421. req.Column: map[string]interface{}{},
  422. },
  423. "pre_tags": []string{"<span style='color:#0078E8'>"},
  424. "post_tags": []string{"</span>"},
  425. },
  426. "post_filter": map[string]interface{}{
  427. "bool": map[string]interface{}{
  428. "must": []map[string]interface{}{
  429. {
  430. "terms": map[string]interface{}{
  431. "_id": req.DocIds,
  432. },
  433. },
  434. },
  435. },
  436. },
  437. }
  438. return
  439. case LimitByScore:
  440. queryMap = map[string]interface{}{
  441. "query": map[string]interface{}{
  442. "match": map[string]interface{}{
  443. req.Column: req.Key,
  444. },
  445. },
  446. "highlight": map[string]interface{}{
  447. "fields": map[string]interface{}{
  448. req.Column: map[string]interface{}{},
  449. },
  450. "pre_tags": []string{"<span style='color:#0078E8'>"},
  451. "post_tags": []string{"</span>"},
  452. },
  453. "post_filter": map[string]interface{}{
  454. "bool": map[string]interface{}{
  455. "must": []map[string]interface{}{
  456. {
  457. "terms": map[string]interface{}{
  458. "_id": req.DocIds,
  459. },
  460. },
  461. },
  462. },
  463. },
  464. "min_score": req.MinScore,
  465. }
  466. return
  467. case HomeSearch:
  468. queryMap = map[string]interface{}{
  469. "query": map[string]interface{}{
  470. "bool": map[string]interface{}{
  471. "should": []map[string]interface{}{
  472. {
  473. "bool": map[string]interface{}{
  474. "must": []map[string]interface{}{
  475. {
  476. "match": map[string]interface{}{
  477. "title": req.Key,
  478. },
  479. },
  480. {
  481. "term": map[string]interface{}{
  482. "status": "PUBLISH",
  483. },
  484. },
  485. {
  486. "range": map[string]interface{}{
  487. "publishedTime": map[string]interface{}{
  488. "lte": req.Max,
  489. },
  490. },
  491. },
  492. },
  493. },
  494. },
  495. {
  496. "bool": map[string]interface{}{
  497. "must": []map[string]interface{}{
  498. {
  499. "match": map[string]interface{}{
  500. "mediaName": req.Key,
  501. },
  502. },
  503. {
  504. "range": map[string]interface{}{
  505. req.RangeColumn: map[string]interface{}{
  506. "lte": req.Max,
  507. },
  508. },
  509. },
  510. },
  511. },
  512. },
  513. },
  514. },
  515. },
  516. "highlight": map[string]interface{}{
  517. "fields": map[string]interface{}{
  518. "title": map[string]interface{}{},
  519. "mediaName": map[string]interface{}{},
  520. },
  521. "pre_tags": []string{"<span style='color:#0078E8'>"},
  522. "post_tags": []string{"</span>"},
  523. },
  524. }
  525. return
  526. default:
  527. queryMap = map[string]interface{}{}
  528. return
  529. }
  530. }
  531. // /*
  532. // *
  533. //
  534. // 搜索
  535. //
  536. // indexName 访问索引名
  537. // query 搜索条件
  538. // from 开始搜索位置
  539. // size 搜索条数
  540. // sort 排序
  541. // */
  542. func (es *ESClient) Count(params *ESQueryRequest) (response ESResponse, err error) {
  543. queryMap := params.parseJsonQuery()
  544. jsonQuery, _ := json.Marshal(queryMap)
  545. logger.Info("查询语句: %s", string(jsonQuery))
  546. request := esapi.CountRequest{
  547. Index: []string{params.IndexName},
  548. Body: strings.NewReader(string(jsonQuery)),
  549. }
  550. res, err := request.Do(context.Background(), esClient.esOp)
  551. defer res.Body.Close()
  552. if err != nil {
  553. logger.Error("es查询失败: %s", err)
  554. }
  555. if res.IsError() {
  556. var e map[string]interface{}
  557. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  558. logger.Error("解析es应答失败: %v", err)
  559. } else {
  560. logger.Error("es请求失败: %s: %v\n", res.Status(), e)
  561. }
  562. }
  563. body, err := io.ReadAll(res.Body)
  564. if err != nil {
  565. logger.Error("获取es应答失败: %v", err)
  566. }
  567. return parseESResponse(body)
  568. }
  569. func (es *ESClient) Search(params *ESQueryRequest) (response ESResponse, err error) {
  570. queryMap := params.parseJsonQuery()
  571. jsonQuery, _ := json.Marshal(queryMap)
  572. logger.Info("查询语句: %s", string(jsonQuery))
  573. indexes := strings.Split(params.IndexName, ",")
  574. request := esapi.SearchRequest{
  575. Index: indexes,
  576. Body: strings.NewReader(string(jsonQuery)),
  577. From: &params.From,
  578. Size: &params.Size,
  579. Sort: params.Sorts,
  580. }
  581. res, err := request.Do(context.Background(), esClient.esOp)
  582. defer res.Body.Close()
  583. if err != nil {
  584. logger.Error("es查询失败: %s", err)
  585. }
  586. if res.IsError() {
  587. var e map[string]interface{}
  588. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  589. logger.Error("解析es应答失败: %v", err)
  590. } else {
  591. logger.Error("es请求失败: %s: %v\n", res.Status(), e)
  592. }
  593. }
  594. body, err := io.ReadAll(res.Body)
  595. if err != nil {
  596. logger.Error("获取es应答失败: %v", err)
  597. }
  598. return parseESResponse(body)
  599. }
  600. func parseESResponse(body []byte) (ESResponse, error) {
  601. var response ESResponse
  602. if err := json.Unmarshal(body, &response); err != nil {
  603. return ESResponse{}, err
  604. }
  605. for _, hit := range response.Hits.Hits {
  606. var source map[string]interface{}
  607. if err := json.Unmarshal(hit.Source, &source); err != nil {
  608. return ESResponse{}, err
  609. }
  610. }
  611. return response, nil
  612. }
  613. func (es *ESClient) GetSource(hits Hits) []Hit {
  614. return hits.Hits
  615. }
  616. func (es *ESClient) GetCount(hits Hits) []Hit {
  617. return hits.Hits
  618. }
  619. // /*
  620. // *
  621. // 添加es
  622. // indexName 索引名
  623. // id es的id
  624. // body es的值
  625. // */
  626. func (es *ESClient) Update(indexName string, id int, doc interface{}) bool {
  627. jsonUpdate := map[string]interface{}{
  628. "doc": doc,
  629. }
  630. jsonDoc, _ := json.Marshal(jsonUpdate)
  631. logger.Info("查询语句: %s", string(jsonDoc))
  632. req := esapi.UpdateRequest{
  633. Index: indexName,
  634. DocumentID: strconv.Itoa(id),
  635. Body: strings.NewReader(string(jsonDoc)),
  636. Refresh: "true",
  637. }
  638. res, err := req.Do(context.Background(), es.es())
  639. defer res.Body.Close()
  640. if err != nil {
  641. logger.Error("es查询失败: %s", err)
  642. }
  643. if res.IsError() {
  644. var e map[string]interface{}
  645. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  646. logger.Error("解析es应答失败: %v", err)
  647. } else {
  648. logger.Error("es请求失败: %s: %v\n", res.Status(), e)
  649. }
  650. }
  651. body, err := io.ReadAll(res.Body)
  652. if err != nil {
  653. logger.Error("获取es应答失败: %v", err)
  654. }
  655. fmt.Printf("%s\n", string(body))
  656. return true
  657. }
  658. func (es *ESClient) InsertOrUpdate(indexName string, id int, doc interface{}) (success bool) {
  659. if exist, existErr := es.Exist(indexName, id); existErr == nil && exist {
  660. return es.Update(indexName, id, doc)
  661. } else {
  662. return es.CreateDocument(indexName, id, doc)
  663. }
  664. }
  665. // Delete *
  666. // 删除
  667. // indexName 索引名
  668. // id es的id
  669. // */
  670. func (es *ESClient) Delete(indexName string, query interface{}) bool {
  671. jsonQuery, _ := json.Marshal(query)
  672. refresh := new(bool)
  673. *refresh = true
  674. logger.Info("查询语句: %s", string(jsonQuery))
  675. req := esapi.DeleteByQueryRequest{
  676. Index: []string{indexName},
  677. Body: strings.NewReader(string(jsonQuery)),
  678. Refresh: refresh,
  679. }
  680. res, err := req.Do(context.Background(), es.es())
  681. defer res.Body.Close()
  682. if err != nil {
  683. logger.Error("es查询失败: %s", err)
  684. }
  685. if res.IsError() {
  686. var e map[string]interface{}
  687. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  688. logger.Error("解析es应答失败: %v", err)
  689. } else {
  690. logger.Error("es请求失败: %s: %v\n", res.Status(), e)
  691. }
  692. }
  693. body, err := io.ReadAll(res.Body)
  694. if err != nil {
  695. logger.Error("获取es应答失败: %v", err)
  696. }
  697. fmt.Printf("%s\n", string(body))
  698. return true
  699. }
  700. func (es *ESClient) Exist(indexName string, docId int) (exist bool, err error) {
  701. getRequest := esapi.GetRequest{
  702. Index: indexName,
  703. DocumentID: strconv.Itoa(docId),
  704. }
  705. // 执行请求
  706. res, err := getRequest.Do(context.Background(), es.es())
  707. if err != nil {
  708. logger.Error("es获取文档是否存在失败: %v", err)
  709. return
  710. }
  711. defer res.Body.Close()
  712. // 检查文档是否存在
  713. if res.IsError() {
  714. // 如果文档不存在,通常返回 404 Not Found
  715. if res.StatusCode == 404 {
  716. logger.Info("文档不存在.")
  717. err = errors.New("ES文档不存在")
  718. return
  719. } else {
  720. // 其他错误
  721. var e map[string]interface{}
  722. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  723. logger.Error("解析es应答失败: %v", err)
  724. return
  725. } else {
  726. // Print the response status and error information.
  727. logger.Error("[%s] %s: %s\n", res.Status(), e["error"].(map[string]interface{})["type"], e["error"].(map[string]interface{})["原因"])
  728. err = errors.New("获取ES记录失败")
  729. return
  730. }
  731. }
  732. } else {
  733. // 如果文档存在
  734. logger.Info("doc存在")
  735. return true, nil
  736. }
  737. }
  738. func (es *ESClient) Get(indexName string, docId int) (doc Doc, err error) {
  739. getRequest := esapi.GetRequest{
  740. Index: indexName,
  741. DocumentID: strconv.Itoa(docId),
  742. }
  743. // 执行请求
  744. res, err := getRequest.Do(context.Background(), es.es())
  745. if err != nil {
  746. logger.Error("es获取文档是否存在失败: %v", err)
  747. return
  748. }
  749. defer res.Body.Close()
  750. // 检查文档是否存在
  751. if res.IsError() {
  752. // 如果文档不存在,通常返回 404 Not Found
  753. if res.StatusCode == 404 {
  754. logger.Info("文档不存在.")
  755. err = errors.New("ES文档不存在")
  756. return
  757. } else {
  758. // 其他错误
  759. var e map[string]interface{}
  760. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  761. logger.Error("解析es应答失败: %v", err)
  762. return
  763. } else {
  764. // Print the response status and error information.
  765. logger.Error("[%s] %s: %s\n", res.Status(), e["error"].(map[string]interface{})["type"], e["error"].(map[string]interface{})["原因"])
  766. err = errors.New("获取ES记录失败")
  767. return
  768. }
  769. }
  770. } else {
  771. // 如果文档存在
  772. body, readErr := io.ReadAll(res.Body)
  773. if readErr != nil {
  774. logger.Error("获取es应答失败: %v", err)
  775. err = readErr
  776. return
  777. }
  778. err = json.Unmarshal(body, &doc)
  779. if err != nil {
  780. logger.Error("反序列化es应答失败: %v", err)
  781. return
  782. }
  783. return
  784. }
  785. }
  786. //
  787. //func CreateIndex(indexName string) error {
  788. // resp, err := esClient.es().Indices.
  789. // Create(indexName).
  790. // Do(context.Background())
  791. // if err != nil {
  792. // logger.Error("创建ES索引失败:%v", err)
  793. // return err
  794. // }
  795. // fmt.Printf("index:%#v\n", resp.Index)
  796. // return nil
  797. //}
  798. // DeleteIndex 删除索引
  799. //
  800. // func DeleteIndex(indexName string) error {
  801. // _, err := esClient.es().Indices. // 表明是对索引的操作,而Index则表示是要操作具体索引下的文档
  802. // Delete(indexName).
  803. // Do(context.Background())
  804. // if err != nil {
  805. // fmt.Printf("delete index failed,err:%v\n", err)
  806. // return err
  807. // }
  808. // fmt.Printf("delete index successed,indexName:%s", indexName)
  809. // return nil
  810. // }
  811. //
  812. // CreateDocument 创建文档
  813. func (es *ESClient) CreateDocument(indexName string, id int, doc interface{}) (success bool) {
  814. jsonDoc, _ := json.Marshal(doc)
  815. logger.Info("查询语句: %s", string(jsonDoc))
  816. // 添加文档
  817. indexRequest := esapi.IndexRequest{
  818. Index: indexName,
  819. DocumentID: strconv.Itoa(id),
  820. Body: strings.NewReader(string(jsonDoc)),
  821. Refresh: "true",
  822. }
  823. // 执行请求
  824. res, err := indexRequest.Do(context.Background(), es.es())
  825. if err != nil {
  826. logger.Error("ES创建文档失败: %s", err)
  827. return false
  828. }
  829. defer res.Body.Close()
  830. // 检查文档是否成功创建
  831. if res.IsError() {
  832. var e map[string]interface{}
  833. if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
  834. logger.Error("解析ES应答失败: %s", err)
  835. } else {
  836. // Print the response status and error information.
  837. logger.Error("[%s] %s: %s\n", res.Status(), e["error"].(map[string]interface{})["类型"], e["错误"].(map[string]interface{})["原因"])
  838. }
  839. return false
  840. } else {
  841. // 如果文档成功创建
  842. logger.Info("创建文档成功")
  843. return true
  844. }
  845. }