es.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519
  1. package es
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "eta/eta_mini_ht_api/common/component/config"
  7. logger "eta/eta_mini_ht_api/common/component/log"
  8. "eta/eta_mini_ht_api/common/contants"
  9. "fmt"
  10. "github.com/elastic/go-elasticsearch/v7"
  11. "github.com/elastic/go-elasticsearch/v7/esapi"
  12. "io"
  13. "strconv"
  14. "strings"
  15. "sync"
  16. )
  17. type ESBase interface {
  18. GetId() string
  19. }
  20. var (
  21. esOnce sync.Once
  22. esClient *ESClient
  23. )
  24. type ESClient struct {
  25. esOp *elasticsearch.Client
  26. }
  27. type SearchType string
  28. const (
  29. MatchAll = "match_all"
  30. Match = "match"
  31. Range = "range"
  32. MatchAllByCondition = "match_all_by_condition"
  33. RangeByCondition = "range_by_condition"
  34. )
  35. func GetInstance() *ESClient {
  36. esOnce.Do(func() {
  37. // 检查是否成功获取到RedisConfig实例,没有配置则不进行redis初始化
  38. if esConf, ok := config.GetConfig(contants.ES).(*config.ESConfig); ok {
  39. logger.Info("初始化es")
  40. // 这里可以添加初始化Redis的逻辑
  41. esClient = newEs(esConf)
  42. }
  43. })
  44. return esClient
  45. }
  46. func (es *ESClient) es() *elasticsearch.Client {
  47. return es.esOp
  48. }
  49. func newEs(config *config.ESConfig) *ESClient {
  50. elasticsearch.NewDefaultClient()
  51. client, err := elasticsearch.NewClient(
  52. elasticsearch.Config{
  53. Addresses: []string{config.GetUrl()},
  54. // A list of Elasticsearch nodes to use.
  55. Username: config.GetUserName(),
  56. Password: config.GetPassword(), // Password for HTTP Basic Authentication.
  57. },
  58. )
  59. if err != nil {
  60. logger.Error("连接ES失败:%v", err)
  61. panic("启动es失败")
  62. }
  63. return &ESClient{esOp: client}
  64. }
  65. func init() {
  66. if GetInstance() == nil {
  67. panic("初始化es失败")
  68. }
  69. logger.Info("es初始化成功")
  70. }
  71. // BulkInsert 批量创建文档
  72. func (es *ESClient) BulkInsert(indexName string, docs []ESBase) (err error) {
  73. // 创建批量请求
  74. bulkBody := new(bytes.Buffer)
  75. for _, doc := range docs {
  76. enc := json.NewEncoder(bulkBody)
  77. if err = enc.Encode(map[string]interface{}{
  78. "index": map[string]interface{}{
  79. "_index": indexName,
  80. "_id": doc.GetId(),
  81. },
  82. }); err != nil {
  83. logger.Error("生成es批处理请求参数失败: %s", err)
  84. }
  85. if err = enc.Encode(doc); err != nil {
  86. logger.Error("生成es批处理文档失败: %s", err)
  87. }
  88. }
  89. bulkReq := esapi.BulkRequest{
  90. Body: bytes.NewReader(bulkBody.Bytes()),
  91. Refresh: "true",
  92. }
  93. res, err := bulkReq.Do(context.Background(), es.esOp)
  94. if err != nil {
  95. logger.Error("es批处理创建失败: %s", err)
  96. }
  97. defer res.Body.Close()
  98. if res.IsError() {
  99. var e map[string]interface{}
  100. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  101. logger.Error("解析es应答失败: %v", err)
  102. } else {
  103. // Print the response status and error information.
  104. logger.Error("es请求失败: %s: %v\n", res.Status(), err)
  105. }
  106. }
  107. return
  108. }
  109. type ESResponse struct {
  110. Took int `json:"took"`
  111. Count int `json:"count"`
  112. TimedOut bool `json:"timedOut"`
  113. Hits Hits `json:"hits"`
  114. _Shards ShardsInfo `json:"_shards"`
  115. }
  116. type Hits struct {
  117. Total TotalHits `json:"total"`
  118. MaxScore float64 `json:"maxScore"`
  119. Hits []Hit `json:"hits"`
  120. }
  121. type TotalHits struct {
  122. Value int `json:"value"`
  123. Relation string `json:"relation"`
  124. }
  125. type Hit struct {
  126. Index string `json:"_index"`
  127. Type string `json:"_type"`
  128. ID string `json:"_id"`
  129. Score float64 `json:"_score"`
  130. Source json.RawMessage `json:"_source"`
  131. Highlight json.RawMessage `json:"highlight"`
  132. }
  133. type ShardsInfo struct {
  134. Total int `json:"total"`
  135. Successful int `json:"successful"`
  136. Skipped int `json:"skipped"`
  137. Failed int `json:"failed"`
  138. }
  139. type ESQueryRequest struct {
  140. IndexName string
  141. From int
  142. Size int
  143. Key string
  144. Column string
  145. Condition string
  146. ConditionValue string
  147. Sorts []string
  148. Type SearchType
  149. RangeColumn string
  150. Max interface{}
  151. Min interface{}
  152. }
  153. func (req *ESQueryRequest) CreateESQueryRequest(index string, column string, key string, from int, size int, sorts []string, searchType SearchType) *ESQueryRequest {
  154. return &ESQueryRequest{
  155. IndexName: index,
  156. Type: searchType,
  157. From: from,
  158. Size: size,
  159. Key: key,
  160. Column: column,
  161. Sorts: sorts,
  162. }
  163. }
  164. func (req *ESQueryRequest) Range(from int64, to int64, column string) *ESQueryRequest {
  165. req.RangeColumn = column
  166. req.Max = to
  167. req.Min = from
  168. return req
  169. }
  170. func (req *ESQueryRequest) ByCondition(column string, value string) *ESQueryRequest {
  171. req.Condition = column
  172. req.ConditionValue = value
  173. return req
  174. }
  175. func (req *ESQueryRequest) parseJsonQuery() (queryMap map[string]interface{}) {
  176. switch req.Type {
  177. case MatchAll:
  178. queryMap = map[string]interface{}{
  179. "query": map[string]interface{}{
  180. "match": map[string]interface{}{
  181. req.Column: req.Key,
  182. },
  183. },
  184. "highlight": map[string]interface{}{
  185. "fields": map[string]interface{}{
  186. req.Column: map[string]interface{}{},
  187. },
  188. "pre_tags": []string{"<span style='color:#0078E8'>"},
  189. "post_tags": []string{"</span>"},
  190. },
  191. }
  192. return
  193. case MatchAllByCondition:
  194. queryMap = map[string]interface{}{
  195. "query": map[string]interface{}{
  196. "bool": map[string]interface{}{
  197. "must": []map[string]interface{}{
  198. {
  199. "match": map[string]interface{}{
  200. req.Column: req.Key,
  201. },
  202. },
  203. {
  204. "term": map[string]interface{}{
  205. req.Condition: req.ConditionValue,
  206. },
  207. },
  208. },
  209. },
  210. },
  211. "highlight": map[string]interface{}{
  212. "fields": map[string]interface{}{
  213. req.Column: map[string]interface{}{},
  214. },
  215. "pre_tags": []string{"<span style='color:#0078E8'>"},
  216. "post_tags": []string{"</span>"},
  217. },
  218. }
  219. return
  220. case Match:
  221. queryMap = map[string]interface{}{
  222. "query": map[string]interface{}{
  223. "match": map[string]interface{}{
  224. req.Column: req.Key,
  225. },
  226. },
  227. "highlight": map[string]interface{}{
  228. "fields": map[string]interface{}{
  229. req.Column: map[string]interface{}{},
  230. },
  231. "pre_tags": []string{"<span style='color:#0078E8'>"},
  232. "post_tags": []string{"</span>"},
  233. },
  234. }
  235. return
  236. case Range:
  237. queryMap = map[string]interface{}{
  238. "query": map[string]interface{}{
  239. "match": map[string]interface{}{
  240. req.Column: req.Key,
  241. },
  242. },
  243. "highlight": map[string]interface{}{
  244. "fields": map[string]interface{}{
  245. req.Column: map[string]interface{}{},
  246. },
  247. "pre_tags": []string{"<span style='color:#0078E8'>"},
  248. "post_tags": []string{"</span>"},
  249. },
  250. "post_filter": map[string]interface{}{
  251. "range": map[string]interface{}{
  252. req.RangeColumn: map[string]interface{}{
  253. "gte": req.Min,
  254. "lte": req.Max,
  255. },
  256. },
  257. },
  258. }
  259. return
  260. case RangeByCondition:
  261. queryMap = map[string]interface{}{
  262. "query": map[string]interface{}{
  263. "match": map[string]interface{}{
  264. req.Column: req.Key,
  265. },
  266. },
  267. "highlight": map[string]interface{}{
  268. "fields": map[string]interface{}{
  269. req.Column: map[string]interface{}{},
  270. },
  271. "pre_tags": []string{"<span style='color:#0078E8'>"},
  272. "post_tags": []string{"</span>"},
  273. },
  274. "post_filter": map[string]interface{}{
  275. "bool": map[string]interface{}{
  276. "must": []map[string]interface{}{
  277. {
  278. "range": map[string]interface{}{
  279. "mediaId": map[string]interface{}{
  280. "gte": req.Min,
  281. "lte": req.Max,
  282. },
  283. },
  284. },
  285. {
  286. "term": map[string]interface{}{
  287. "mediaType": "video",
  288. },
  289. },
  290. },
  291. },
  292. },
  293. }
  294. return
  295. default:
  296. queryMap = map[string]interface{}{}
  297. return
  298. }
  299. }
  300. // /*
  301. // *
  302. //
  303. // 搜索
  304. //
  305. // indexName 访问索引名
  306. // query 搜索条件
  307. // from 开始搜索位置
  308. // size 搜索条数
  309. // sort 排序
  310. // */
  311. func (es *ESClient) Count(params *ESQueryRequest) (response ESResponse, err error) {
  312. queryMap := params.parseJsonQuery()
  313. jsonQuery, _ := json.Marshal(queryMap)
  314. request := esapi.CountRequest{
  315. Index: []string{params.IndexName},
  316. Body: strings.NewReader(string(jsonQuery)),
  317. }
  318. res, err := request.Do(context.Background(), esClient.esOp)
  319. defer res.Body.Close()
  320. if err != nil {
  321. logger.Error("es查询失败: %s", err)
  322. }
  323. if res.IsError() {
  324. var e map[string]interface{}
  325. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  326. logger.Error("解析es应答失败: %v", err)
  327. } else {
  328. logger.Error("es请求失败: %s: %v\n", res.Status(), e)
  329. }
  330. }
  331. body, err := io.ReadAll(res.Body)
  332. if err != nil {
  333. logger.Error("获取es应答失败: %v", err)
  334. }
  335. return parseESResponse(body)
  336. }
  337. func (es *ESClient) Search(params *ESQueryRequest) (response ESResponse, err error) {
  338. queryMap := params.parseJsonQuery()
  339. jsonQuery, _ := json.Marshal(queryMap)
  340. request := esapi.SearchRequest{
  341. Index: []string{params.IndexName},
  342. Body: strings.NewReader(string(jsonQuery)),
  343. From: &params.From,
  344. Size: &params.Size,
  345. Sort: params.Sorts,
  346. }
  347. res, err := request.Do(context.Background(), esClient.esOp)
  348. defer res.Body.Close()
  349. if err != nil {
  350. logger.Error("es查询失败: %s", err)
  351. }
  352. if res.IsError() {
  353. var e map[string]interface{}
  354. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  355. logger.Error("解析es应答失败: %v", err)
  356. } else {
  357. logger.Error("es请求失败: %s: %v\n", res.Status(), e)
  358. }
  359. }
  360. body, err := io.ReadAll(res.Body)
  361. if err != nil {
  362. logger.Error("获取es应答失败: %v", err)
  363. }
  364. return parseESResponse(body)
  365. }
  366. func parseESResponse(body []byte) (ESResponse, error) {
  367. var response ESResponse
  368. if err := json.Unmarshal(body, &response); err != nil {
  369. return ESResponse{}, err
  370. }
  371. for _, hit := range response.Hits.Hits {
  372. var source map[string]interface{}
  373. if err := json.Unmarshal(hit.Source, &source); err != nil {
  374. return ESResponse{}, err
  375. }
  376. }
  377. return response, nil
  378. }
  379. func (es *ESClient) GetSource(hits Hits) []Hit {
  380. return hits.Hits
  381. }
  382. func (es *ESClient) GetCount(hits Hits) []Hit {
  383. return hits.Hits
  384. }
  385. // /*
  386. // *
  387. // 添加es
  388. // indexName 索引名
  389. // id es的id
  390. // body es的值
  391. // */
  392. func (es *ESClient) Add(indexName string, id int, doc interface{}) bool {
  393. jsonDoc, _ := json.Marshal(doc)
  394. req := esapi.IndexRequest{
  395. Index: indexName,
  396. DocumentID: strconv.Itoa(id),
  397. Body: strings.NewReader(string(jsonDoc)),
  398. Refresh: "true",
  399. }
  400. res, err := req.Do(context.Background(), es.es())
  401. defer res.Body.Close()
  402. if err != nil {
  403. logger.Error("es查询失败: %s", err)
  404. }
  405. if res.IsError() {
  406. var e map[string]interface{}
  407. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  408. logger.Error("解析es应答失败: %v", err)
  409. } else {
  410. logger.Error("es请求失败: %s: %v\n", res.Status(), e)
  411. }
  412. }
  413. body, err := io.ReadAll(res.Body)
  414. if err != nil {
  415. logger.Error("获取es应答失败: %v", err)
  416. }
  417. fmt.Printf("%s\n", string(body))
  418. return true
  419. }
  420. //
  421. ///*
  422. //*
  423. //修改es
  424. //indexName 索引名
  425. //id es的id
  426. //body es的值
  427. //*/
  428. //func EsUpdate(indexName string, id string, body map[string]interface{}) bool {
  429. // bodyData := map[string]interface{}{
  430. // "doc": body,
  431. // }
  432. // req := httplib.Post(esUrl + indexName + "/_doc/" + id + "/_update")
  433. // req.JSONBody(bodyData)
  434. // _, err := req.String()
  435. // if err != nil {
  436. // fmt.Println("elasticsearch is error ", err)
  437. // return false
  438. // }
  439. // return true
  440. //}
  441. //
  442. ///*
  443. //*
  444. //删除
  445. //indexName 索引名
  446. //id es的id
  447. //*/
  448. //func EsDelete(indexName string, id string) bool {
  449. // req := httplib.Delete(esUrl + indexName + "/_doc/" + id)
  450. // _, err := req.String()
  451. // if err != nil {
  452. // fmt.Println("elasticsearch is error ", err)
  453. // return false
  454. // }
  455. // return true
  456. //
  457. //}
  458. //
  459. //func CreateIndex(indexName string) error {
  460. // resp, err := esClient.es().Indices.
  461. // Create(indexName).
  462. // Do(context.Background())
  463. // if err != nil {
  464. // logger.Error("创建ES索引失败:%v", err)
  465. // return err
  466. // }
  467. // fmt.Printf("index:%#v\n", resp.Index)
  468. // return nil
  469. //}
  470. // DeleteIndex 删除索引
  471. //func DeleteIndex(indexName string) error {
  472. // _, err := esClient.es().Indices. // 表明是对索引的操作,而Index则表示是要操作具体索引下的文档
  473. // Delete(indexName).
  474. // Do(context.Background())
  475. // if err != nil {
  476. // fmt.Printf("delete index failed,err:%v\n", err)
  477. // return err
  478. // }
  479. // fmt.Printf("delete index successed,indexName:%s", indexName)
  480. // return nil
  481. //}
  482. //
  483. //// CreateDocument 创建文档
  484. //func CreateDocument(indexName string, id string, doc interface{}) {
  485. // // 添加文档
  486. // resp, err := esClient.esOp.Index(indexName).Id(id).Document(doc).Do(context.Background())
  487. // if err != nil {
  488. // logger.Error("indexing document failed, err:%v\n", err)
  489. // return
  490. // }
  491. // logger.Info("result:%#v\n", resp.Result)
  492. // return
  493. //}