es.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569
  1. package es
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "eta/eta_mini_ht_api/common/component/config"
  7. logger "eta/eta_mini_ht_api/common/component/log"
  8. "eta/eta_mini_ht_api/common/contants"
  9. "fmt"
  10. "github.com/elastic/go-elasticsearch/v7"
  11. "github.com/elastic/go-elasticsearch/v7/esapi"
  12. "io"
  13. "strconv"
  14. "strings"
  15. "sync"
  16. )
  17. type ESBase interface {
  18. GetId() string
  19. }
  20. var (
  21. esOnce sync.Once
  22. esClient *ESClient
  23. )
  24. type ESClient struct {
  25. esOp *elasticsearch.Client
  26. }
  27. type SearchType string
  28. const (
  29. MatchAll = "match_all"
  30. Match = "match"
  31. Range = "range"
  32. MatchAllByCondition = "match_all_by_condition"
  33. RangeByCondition = "range_by_condition"
  34. )
  35. func GetInstance() *ESClient {
  36. esOnce.Do(func() {
  37. // 检查是否成功获取到RedisConfig实例,没有配置则不进行redis初始化
  38. if esConf, ok := config.GetConfig(contants.ES).(*config.ESConfig); ok {
  39. logger.Info("初始化es")
  40. // 这里可以添加初始化Redis的逻辑
  41. esClient = newEs(esConf)
  42. }
  43. })
  44. return esClient
  45. }
  46. func (es *ESClient) es() *elasticsearch.Client {
  47. return es.esOp
  48. }
  49. func newEs(config *config.ESConfig) *ESClient {
  50. elasticsearch.NewDefaultClient()
  51. client, err := elasticsearch.NewClient(
  52. elasticsearch.Config{
  53. Addresses: []string{config.GetUrl()},
  54. // A list of Elasticsearch nodes to use.
  55. Username: config.GetUserName(),
  56. Password: config.GetPassword(), // Password for HTTP Basic Authentication.
  57. },
  58. )
  59. if err != nil {
  60. logger.Error("连接ES失败:%v", err)
  61. panic("启动es失败")
  62. }
  63. return &ESClient{esOp: client}
  64. }
  65. func init() {
  66. if GetInstance() == nil {
  67. panic("初始化es失败")
  68. }
  69. logger.Info("es初始化成功")
  70. }
  71. // BulkInsert 批量创建文档
  72. func (es *ESClient) BulkInsert(indexName string, docs []ESBase) (err error) {
  73. // 创建批量请求
  74. bulkBody := new(bytes.Buffer)
  75. for _, doc := range docs {
  76. enc := json.NewEncoder(bulkBody)
  77. if err = enc.Encode(map[string]interface{}{
  78. "index": map[string]interface{}{
  79. "_index": indexName,
  80. "_id": doc.GetId(),
  81. },
  82. }); err != nil {
  83. logger.Error("生成es批处理请求参数失败: %s", err)
  84. }
  85. if err = enc.Encode(doc); err != nil {
  86. logger.Error("生成es批处理文档失败: %s", err)
  87. }
  88. }
  89. bulkReq := esapi.BulkRequest{
  90. Body: bytes.NewReader(bulkBody.Bytes()),
  91. Refresh: "true",
  92. }
  93. res, err := bulkReq.Do(context.Background(), es.esOp)
  94. if err != nil {
  95. logger.Error("es批处理创建失败: %s", err)
  96. }
  97. defer res.Body.Close()
  98. if res.IsError() {
  99. var e map[string]interface{}
  100. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  101. logger.Error("解析es应答失败: %v", err)
  102. } else {
  103. logger.Error("es请求失败: %s: %v\n", res.Status(), err)
  104. }
  105. }
  106. return
  107. }
  108. type ESResponse struct {
  109. Took int `json:"took"`
  110. Count int `json:"count"`
  111. TimedOut bool `json:"timedOut"`
  112. Hits Hits `json:"hits"`
  113. _Shards ShardsInfo `json:"_shards"`
  114. }
  115. type Hits struct {
  116. Total TotalHits `json:"total"`
  117. MaxScore float64 `json:"maxScore"`
  118. Hits []Hit `json:"hits"`
  119. }
  120. type TotalHits struct {
  121. Value int `json:"value"`
  122. Relation string `json:"relation"`
  123. }
  124. type Hit struct {
  125. Index string `json:"_index"`
  126. Type string `json:"_type"`
  127. ID string `json:"_id"`
  128. Score float64 `json:"_score"`
  129. Source json.RawMessage `json:"_source"`
  130. Highlight json.RawMessage `json:"highlight"`
  131. }
  132. type ShardsInfo struct {
  133. Total int `json:"total"`
  134. Successful int `json:"successful"`
  135. Skipped int `json:"skipped"`
  136. Failed int `json:"failed"`
  137. }
  138. type ESQueryRequest struct {
  139. IndexName string
  140. From int
  141. Size int
  142. Key string
  143. Column string
  144. Condition string
  145. ConditionValue string
  146. Sorts []string
  147. Type SearchType
  148. RangeColumn string
  149. Max interface{}
  150. Min interface{}
  151. }
  152. func (req *ESQueryRequest) CreateESQueryRequest(index string, column string, key string, from int, size int, sorts []string, searchType SearchType) *ESQueryRequest {
  153. return &ESQueryRequest{
  154. IndexName: index,
  155. Type: searchType,
  156. From: from,
  157. Size: size,
  158. Key: key,
  159. Column: column,
  160. Sorts: sorts,
  161. }
  162. }
  163. func (req *ESQueryRequest) Range(from int64, to int64, column string) *ESQueryRequest {
  164. req.RangeColumn = column
  165. req.Max = to
  166. req.Min = from
  167. return req
  168. }
  169. func (req *ESQueryRequest) ByCondition(column string, value string) *ESQueryRequest {
  170. req.Condition = column
  171. req.ConditionValue = value
  172. return req
  173. }
  174. func (req *ESQueryRequest) parseJsonQuery() (queryMap map[string]interface{}) {
  175. switch req.Type {
  176. case MatchAll:
  177. queryMap = map[string]interface{}{
  178. "query": map[string]interface{}{
  179. "match": map[string]interface{}{
  180. req.Column: req.Key,
  181. },
  182. },
  183. }
  184. return
  185. case MatchAllByCondition:
  186. queryMap = map[string]interface{}{
  187. "query": map[string]interface{}{
  188. "bool": map[string]interface{}{
  189. "must": []map[string]interface{}{
  190. {
  191. "match": map[string]interface{}{
  192. req.Column: req.Key,
  193. },
  194. },
  195. {
  196. "term": map[string]interface{}{
  197. req.Condition: req.ConditionValue,
  198. },
  199. },
  200. },
  201. },
  202. },
  203. }
  204. return
  205. case Match:
  206. queryMap = map[string]interface{}{
  207. "query": map[string]interface{}{
  208. "match": map[string]interface{}{
  209. req.Column: req.Key,
  210. },
  211. },
  212. "highlight": map[string]interface{}{
  213. "fields": map[string]interface{}{
  214. req.Column: map[string]interface{}{},
  215. },
  216. "pre_tags": []string{"<span style='color:#0078E8'>"},
  217. "post_tags": []string{"</span>"},
  218. },
  219. }
  220. return
  221. case Range:
  222. queryMap = map[string]interface{}{
  223. "query": map[string]interface{}{
  224. "match": map[string]interface{}{
  225. req.Column: req.Key,
  226. },
  227. },
  228. "highlight": map[string]interface{}{
  229. "fields": map[string]interface{}{
  230. req.Column: map[string]interface{}{},
  231. },
  232. "pre_tags": []string{"<span style='color:#0078E8'>"},
  233. "post_tags": []string{"</span>"},
  234. },
  235. "post_filter": map[string]interface{}{
  236. "range": map[string]interface{}{
  237. req.RangeColumn: map[string]interface{}{
  238. "gte": req.Min,
  239. "lte": req.Max,
  240. },
  241. },
  242. },
  243. }
  244. return
  245. case RangeByCondition:
  246. queryMap = map[string]interface{}{
  247. "query": map[string]interface{}{
  248. "match": map[string]interface{}{
  249. req.Column: req.Key,
  250. },
  251. },
  252. "highlight": map[string]interface{}{
  253. "fields": map[string]interface{}{
  254. req.Column: map[string]interface{}{},
  255. },
  256. "pre_tags": []string{"<span style='color:#0078E8'>"},
  257. "post_tags": []string{"</span>"},
  258. },
  259. "post_filter": map[string]interface{}{
  260. "bool": map[string]interface{}{
  261. "must": []map[string]interface{}{
  262. {
  263. "range": map[string]interface{}{
  264. req.RangeColumn: map[string]interface{}{
  265. "gte": req.Min,
  266. "lte": req.Max,
  267. },
  268. },
  269. },
  270. {
  271. "term": map[string]interface{}{
  272. req.Condition: req.ConditionValue,
  273. },
  274. },
  275. },
  276. },
  277. },
  278. }
  279. return
  280. default:
  281. queryMap = map[string]interface{}{}
  282. return
  283. }
  284. }
  285. // /*
  286. // *
  287. //
  288. // 搜索
  289. //
  290. // indexName 访问索引名
  291. // query 搜索条件
  292. // from 开始搜索位置
  293. // size 搜索条数
  294. // sort 排序
  295. // */
  296. func (es *ESClient) Count(params *ESQueryRequest) (response ESResponse, err error) {
  297. queryMap := params.parseJsonQuery()
  298. jsonQuery, _ := json.Marshal(queryMap)
  299. logger.Info("查询语句: %s", string(jsonQuery))
  300. request := esapi.CountRequest{
  301. Index: []string{params.IndexName},
  302. Body: strings.NewReader(string(jsonQuery)),
  303. }
  304. res, err := request.Do(context.Background(), esClient.esOp)
  305. defer res.Body.Close()
  306. if err != nil {
  307. logger.Error("es查询失败: %s", err)
  308. }
  309. if res.IsError() {
  310. var e map[string]interface{}
  311. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  312. logger.Error("解析es应答失败: %v", err)
  313. } else {
  314. logger.Error("es请求失败: %s: %v\n", res.Status(), e)
  315. }
  316. }
  317. body, err := io.ReadAll(res.Body)
  318. if err != nil {
  319. logger.Error("获取es应答失败: %v", err)
  320. }
  321. return parseESResponse(body)
  322. }
  323. func (es *ESClient) Search(params *ESQueryRequest) (response ESResponse, err error) {
  324. queryMap := params.parseJsonQuery()
  325. jsonQuery, _ := json.Marshal(queryMap)
  326. logger.Info("查询语句: %s", string(jsonQuery))
  327. request := esapi.SearchRequest{
  328. Index: []string{params.IndexName},
  329. Body: strings.NewReader(string(jsonQuery)),
  330. From: &params.From,
  331. Size: &params.Size,
  332. Sort: params.Sorts,
  333. }
  334. res, err := request.Do(context.Background(), esClient.esOp)
  335. defer res.Body.Close()
  336. if err != nil {
  337. logger.Error("es查询失败: %s", err)
  338. }
  339. if res.IsError() {
  340. var e map[string]interface{}
  341. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  342. logger.Error("解析es应答失败: %v", err)
  343. } else {
  344. logger.Error("es请求失败: %s: %v\n", res.Status(), e)
  345. }
  346. }
  347. body, err := io.ReadAll(res.Body)
  348. if err != nil {
  349. logger.Error("获取es应答失败: %v", err)
  350. }
  351. return parseESResponse(body)
  352. }
  353. func parseESResponse(body []byte) (ESResponse, error) {
  354. var response ESResponse
  355. if err := json.Unmarshal(body, &response); err != nil {
  356. return ESResponse{}, err
  357. }
  358. for _, hit := range response.Hits.Hits {
  359. var source map[string]interface{}
  360. if err := json.Unmarshal(hit.Source, &source); err != nil {
  361. return ESResponse{}, err
  362. }
  363. }
  364. return response, nil
  365. }
  366. func (es *ESClient) GetSource(hits Hits) []Hit {
  367. return hits.Hits
  368. }
  369. func (es *ESClient) GetCount(hits Hits) []Hit {
  370. return hits.Hits
  371. }
  372. // /*
  373. // *
  374. // 添加es
  375. // indexName 索引名
  376. // id es的id
  377. // body es的值
  378. // */
  379. func (es *ESClient) Update(indexName string, id int, doc interface{}) bool {
  380. jsonUpdate := map[string]interface{}{
  381. "doc": doc,
  382. }
  383. jsonDoc, _ := json.Marshal(jsonUpdate)
  384. logger.Info("查询语句: %s", string(jsonDoc))
  385. req := esapi.UpdateRequest{
  386. Index: indexName,
  387. DocumentID: strconv.Itoa(id),
  388. Body: strings.NewReader(string(jsonDoc)),
  389. Refresh: "true",
  390. }
  391. res, err := req.Do(context.Background(), es.es())
  392. defer res.Body.Close()
  393. if err != nil {
  394. logger.Error("es查询失败: %s", err)
  395. }
  396. if res.IsError() {
  397. var e map[string]interface{}
  398. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  399. logger.Error("解析es应答失败: %v", err)
  400. } else {
  401. logger.Error("es请求失败: %s: %v\n", res.Status(), e)
  402. }
  403. }
  404. body, err := io.ReadAll(res.Body)
  405. if err != nil {
  406. logger.Error("获取es应答失败: %v", err)
  407. }
  408. fmt.Printf("%s\n", string(body))
  409. return true
  410. }
  411. // Delete *
  412. // 删除
  413. // indexName 索引名
  414. // id es的id
  415. // */
  416. func (es *ESClient) Delete(indexName string, id int) bool {
  417. req := esapi.DeleteRequest{
  418. Index: indexName,
  419. DocumentID: strconv.Itoa(id),
  420. Refresh: "true",
  421. }
  422. res, err := req.Do(context.Background(), es.es())
  423. defer res.Body.Close()
  424. if err != nil {
  425. logger.Error("es查询失败: %s", err)
  426. }
  427. if res.IsError() {
  428. var e map[string]interface{}
  429. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  430. logger.Error("解析es应答失败: %v", err)
  431. } else {
  432. logger.Error("es请求失败: %s: %v\n", res.Status(), e)
  433. }
  434. }
  435. body, err := io.ReadAll(res.Body)
  436. if err != nil {
  437. logger.Error("获取es应答失败: %v", err)
  438. }
  439. fmt.Printf("%s\n", string(body))
  440. return true
  441. }
  442. func (es *ESClient) Exist(indexName string, docId int) (exist bool, err error) {
  443. getRequest := esapi.GetRequest{
  444. Index: indexName,
  445. DocumentID: strconv.Itoa(docId),
  446. }
  447. // 执行请求
  448. res, err := getRequest.Do(context.Background(), es.es())
  449. if err != nil {
  450. logger.Error("es获取文档是否存在失败: %v", err)
  451. }
  452. defer res.Body.Close()
  453. // 检查文档是否存在
  454. if res.IsError() {
  455. // 如果文档不存在,通常返回 404 Not Found
  456. if res.StatusCode == 404 {
  457. logger.Info("文档不存在.")
  458. return false, nil
  459. } else {
  460. // 其他错误
  461. var e map[string]interface{}
  462. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  463. logger.Error("解析es应答失败: %v", err)
  464. return false, err
  465. } else {
  466. // Print the response status and error information.
  467. logger.Error("[%s] %s: %s\n", res.Status(), e["error"].(map[string]interface{})["type"], e["error"].(map[string]interface{})["原因"])
  468. return false, nil
  469. }
  470. }
  471. } else {
  472. // 如果文档存在
  473. logger.Info("doc存在")
  474. return true, nil
  475. }
  476. }
  477. //
  478. //func CreateIndex(indexName string) error {
  479. // resp, err := esClient.es().Indices.
  480. // Create(indexName).
  481. // Do(context.Background())
  482. // if err != nil {
  483. // logger.Error("创建ES索引失败:%v", err)
  484. // return err
  485. // }
  486. // fmt.Printf("index:%#v\n", resp.Index)
  487. // return nil
  488. //}
  489. // DeleteIndex 删除索引
  490. //
  491. // func DeleteIndex(indexName string) error {
  492. // _, err := esClient.es().Indices. // 表明是对索引的操作,而Index则表示是要操作具体索引下的文档
  493. // Delete(indexName).
  494. // Do(context.Background())
  495. // if err != nil {
  496. // fmt.Printf("delete index failed,err:%v\n", err)
  497. // return err
  498. // }
  499. // fmt.Printf("delete index successed,indexName:%s", indexName)
  500. // return nil
  501. // }
  502. //
  503. // CreateDocument 创建文档
  504. func (es *ESClient) CreateDocument(indexName string, id int, doc interface{}) (success bool) {
  505. jsonDoc, _ := json.Marshal(doc)
  506. logger.Info("查询语句: %s", string(jsonDoc))
  507. // 添加文档
  508. indexRequest := esapi.IndexRequest{
  509. Index: indexName,
  510. DocumentID: strconv.Itoa(id),
  511. Body: strings.NewReader(string(jsonDoc)),
  512. Refresh: "true",
  513. }
  514. // 执行请求
  515. res, err := indexRequest.Do(context.Background(), es.es())
  516. if err != nil {
  517. logger.Error("ES创建文档失败: %s", err)
  518. return false
  519. }
  520. defer res.Body.Close()
  521. // 检查文档是否成功创建
  522. if res.IsError() {
  523. var e map[string]interface{}
  524. if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
  525. logger.Error("解析ES应答失败: %s", err)
  526. } else {
  527. // Print the response status and error information.
  528. logger.Error("[%s] %s: %s\n", res.Status(), e["error"].(map[string]interface{})["类型"], e["错误"].(map[string]interface{})["原因"])
  529. }
  530. return false
  531. } else {
  532. // 如果文档成功创建
  533. logger.Info("创建文档成功")
  534. return true
  535. }
  536. }