es.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505
  1. package es
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "eta/eta_mini_ht_api/common/component/config"
  7. logger "eta/eta_mini_ht_api/common/component/log"
  8. "eta/eta_mini_ht_api/common/contants"
  9. "fmt"
  10. "github.com/elastic/go-elasticsearch/v7"
  11. "github.com/elastic/go-elasticsearch/v7/esapi"
  12. "io"
  13. "strconv"
  14. "strings"
  15. "sync"
  16. )
  17. type ESBase interface {
  18. GetId() string
  19. }
  20. var (
  21. esOnce sync.Once
  22. esClient *ESClient
  23. )
  24. type ESClient struct {
  25. esOp *elasticsearch.Client
  26. }
  27. type SearchType string
  28. const (
  29. MatchAll = "match_all"
  30. Match = "match"
  31. Range = "range"
  32. MatchAllByCondition = "match_all_by_condition"
  33. RangeByCondition = "range_by_condition"
  34. )
  35. func GetInstance() *ESClient {
  36. esOnce.Do(func() {
  37. // 检查是否成功获取到RedisConfig实例,没有配置则不进行redis初始化
  38. if esConf, ok := config.GetConfig(contants.ES).(*config.ESConfig); ok {
  39. logger.Info("初始化es")
  40. // 这里可以添加初始化Redis的逻辑
  41. esClient = newEs(esConf)
  42. }
  43. })
  44. return esClient
  45. }
  46. func (es *ESClient) es() *elasticsearch.Client {
  47. return es.esOp
  48. }
  49. func newEs(config *config.ESConfig) *ESClient {
  50. elasticsearch.NewDefaultClient()
  51. client, err := elasticsearch.NewClient(
  52. elasticsearch.Config{
  53. Addresses: []string{config.GetUrl()},
  54. // A list of Elasticsearch nodes to use.
  55. Username: config.GetUserName(),
  56. Password: config.GetPassword(), // Password for HTTP Basic Authentication.
  57. },
  58. )
  59. if err != nil {
  60. logger.Error("连接ES失败:%v", err)
  61. panic("启动es失败")
  62. }
  63. return &ESClient{esOp: client}
  64. }
  65. func init() {
  66. if GetInstance() == nil {
  67. panic("初始化es失败")
  68. }
  69. logger.Info("es初始化成功")
  70. }
  71. // BulkInsert 批量创建文档
  72. func (es *ESClient) BulkInsert(indexName string, docs []ESBase) (err error) {
  73. // 创建批量请求
  74. bulkBody := new(bytes.Buffer)
  75. for _, doc := range docs {
  76. enc := json.NewEncoder(bulkBody)
  77. if err = enc.Encode(map[string]interface{}{
  78. "index": map[string]interface{}{
  79. "_index": indexName,
  80. "_id": doc.GetId(),
  81. },
  82. }); err != nil {
  83. logger.Error("生成es批处理请求参数失败: %s", err)
  84. }
  85. if err = enc.Encode(doc); err != nil {
  86. logger.Error("生成es批处理文档失败: %s", err)
  87. }
  88. }
  89. bulkReq := esapi.BulkRequest{
  90. Body: bytes.NewReader(bulkBody.Bytes()),
  91. Refresh: "true",
  92. }
  93. res, err := bulkReq.Do(context.Background(), es.esOp)
  94. if err != nil {
  95. logger.Error("es批处理创建失败: %s", err)
  96. }
  97. defer res.Body.Close()
  98. if res.IsError() {
  99. var e map[string]interface{}
  100. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  101. logger.Error("解析es应答失败: %v", err)
  102. } else {
  103. // Print the response status and error information.
  104. logger.Error("es请求失败: %s: %v\n", res.Status(), err)
  105. }
  106. }
  107. return
  108. }
  109. type ESResponse struct {
  110. Took int `json:"took"`
  111. Count int `json:"count"`
  112. TimedOut bool `json:"timedOut"`
  113. Hits Hits `json:"hits"`
  114. _Shards ShardsInfo `json:"_shards"`
  115. }
  116. type Hits struct {
  117. Total TotalHits `json:"total"`
  118. MaxScore float64 `json:"maxScore"`
  119. Hits []Hit `json:"hits"`
  120. }
  121. type TotalHits struct {
  122. Value int `json:"value"`
  123. Relation string `json:"relation"`
  124. }
  125. type Hit struct {
  126. Index string `json:"_index"`
  127. Type string `json:"_type"`
  128. ID string `json:"_id"`
  129. Score float64 `json:"_score"`
  130. Source json.RawMessage `json:"_source"`
  131. Highlight json.RawMessage `json:"highlight"`
  132. }
  133. type ShardsInfo struct {
  134. Total int `json:"total"`
  135. Successful int `json:"successful"`
  136. Skipped int `json:"skipped"`
  137. Failed int `json:"failed"`
  138. }
  139. type ESQueryRequest struct {
  140. IndexName string
  141. From int
  142. Size int
  143. Key string
  144. Column string
  145. Condition string
  146. ConditionValue string
  147. Sorts []string
  148. Type SearchType
  149. RangeColumn string
  150. Max interface{}
  151. Min interface{}
  152. }
  153. func (req *ESQueryRequest) CreateESQueryRequest(index string, column string, key string, from int, size int, sorts []string, searchType SearchType) *ESQueryRequest {
  154. return &ESQueryRequest{
  155. IndexName: index,
  156. Type: searchType,
  157. From: from,
  158. Size: size,
  159. Key: key,
  160. Column: column,
  161. Sorts: sorts,
  162. }
  163. }
  164. func (req *ESQueryRequest) Range(from int64, to int64, column string) *ESQueryRequest {
  165. req.RangeColumn = column
  166. req.Max = to
  167. req.Min = from
  168. return req
  169. }
  170. func (req *ESQueryRequest) ByCondition(column string, value string) *ESQueryRequest {
  171. req.Condition = column
  172. req.ConditionValue = value
  173. return req
  174. }
  175. func (req *ESQueryRequest) parseJsonQuery() (queryMap map[string]interface{}) {
  176. switch req.Type {
  177. case MatchAll:
  178. queryMap = map[string]interface{}{
  179. "query": map[string]interface{}{
  180. "match": map[string]interface{}{
  181. req.Column: req.Key,
  182. },
  183. },
  184. }
  185. return
  186. case MatchAllByCondition:
  187. queryMap = map[string]interface{}{
  188. "query": map[string]interface{}{
  189. "bool": map[string]interface{}{
  190. "must": []map[string]interface{}{
  191. {
  192. "match": map[string]interface{}{
  193. req.Column: req.Key,
  194. },
  195. },
  196. {
  197. "term": map[string]interface{}{
  198. req.Condition: req.ConditionValue,
  199. },
  200. },
  201. },
  202. },
  203. },
  204. }
  205. return
  206. case Match:
  207. queryMap = map[string]interface{}{
  208. "query": map[string]interface{}{
  209. "match": map[string]interface{}{
  210. req.Column: req.Key,
  211. },
  212. },
  213. "highlight": map[string]interface{}{
  214. "fields": map[string]interface{}{
  215. req.Column: map[string]interface{}{},
  216. },
  217. "pre_tags": []string{"<span style='color:#0078E8'>"},
  218. "post_tags": []string{"</span>"},
  219. },
  220. }
  221. return
  222. case Range:
  223. queryMap = map[string]interface{}{
  224. "query": map[string]interface{}{
  225. "match": map[string]interface{}{
  226. req.Column: req.Key,
  227. },
  228. },
  229. "highlight": map[string]interface{}{
  230. "fields": map[string]interface{}{
  231. req.Column: map[string]interface{}{},
  232. },
  233. "pre_tags": []string{"<span style='color:#0078E8'>"},
  234. "post_tags": []string{"</span>"},
  235. },
  236. "post_filter": map[string]interface{}{
  237. "range": map[string]interface{}{
  238. req.RangeColumn: map[string]interface{}{
  239. "gte": req.Min,
  240. "lte": req.Max,
  241. },
  242. },
  243. },
  244. }
  245. return
  246. case RangeByCondition:
  247. queryMap = map[string]interface{}{
  248. "query": map[string]interface{}{
  249. "match": map[string]interface{}{
  250. req.Column: req.Key,
  251. },
  252. },
  253. "highlight": map[string]interface{}{
  254. "fields": map[string]interface{}{
  255. req.Column: map[string]interface{}{},
  256. },
  257. "pre_tags": []string{"<span style='color:#0078E8'>"},
  258. "post_tags": []string{"</span>"},
  259. },
  260. "post_filter": map[string]interface{}{
  261. "bool": map[string]interface{}{
  262. "must": []map[string]interface{}{
  263. {
  264. "range": map[string]interface{}{
  265. "mediaId": map[string]interface{}{
  266. "gte": req.Min,
  267. "lte": req.Max,
  268. },
  269. },
  270. },
  271. {
  272. "term": map[string]interface{}{
  273. "mediaType": "video",
  274. },
  275. },
  276. },
  277. },
  278. },
  279. }
  280. return
  281. default:
  282. queryMap = map[string]interface{}{}
  283. return
  284. }
  285. }
  286. // /*
  287. // *
  288. //
  289. // 搜索
  290. //
  291. // indexName 访问索引名
  292. // query 搜索条件
  293. // from 开始搜索位置
  294. // size 搜索条数
  295. // sort 排序
  296. // */
  297. func (es *ESClient) Count(params *ESQueryRequest) (response ESResponse, err error) {
  298. queryMap := params.parseJsonQuery()
  299. jsonQuery, _ := json.Marshal(queryMap)
  300. request := esapi.CountRequest{
  301. Index: []string{params.IndexName},
  302. Body: strings.NewReader(string(jsonQuery)),
  303. }
  304. res, err := request.Do(context.Background(), esClient.esOp)
  305. defer res.Body.Close()
  306. if err != nil {
  307. logger.Error("es查询失败: %s", err)
  308. }
  309. if res.IsError() {
  310. var e map[string]interface{}
  311. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  312. logger.Error("解析es应答失败: %v", err)
  313. } else {
  314. logger.Error("es请求失败: %s: %v\n", res.Status(), e)
  315. }
  316. }
  317. body, err := io.ReadAll(res.Body)
  318. if err != nil {
  319. logger.Error("获取es应答失败: %v", err)
  320. }
  321. return parseESResponse(body)
  322. }
  323. func (es *ESClient) Search(params *ESQueryRequest) (response ESResponse, err error) {
  324. queryMap := params.parseJsonQuery()
  325. jsonQuery, _ := json.Marshal(queryMap)
  326. request := esapi.SearchRequest{
  327. Index: []string{params.IndexName},
  328. Body: strings.NewReader(string(jsonQuery)),
  329. From: &params.From,
  330. Size: &params.Size,
  331. Sort: params.Sorts,
  332. }
  333. res, err := request.Do(context.Background(), esClient.esOp)
  334. defer res.Body.Close()
  335. if err != nil {
  336. logger.Error("es查询失败: %s", err)
  337. }
  338. if res.IsError() {
  339. var e map[string]interface{}
  340. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  341. logger.Error("解析es应答失败: %v", err)
  342. } else {
  343. logger.Error("es请求失败: %s: %v\n", res.Status(), e)
  344. }
  345. }
  346. body, err := io.ReadAll(res.Body)
  347. if err != nil {
  348. logger.Error("获取es应答失败: %v", err)
  349. }
  350. return parseESResponse(body)
  351. }
  352. func parseESResponse(body []byte) (ESResponse, error) {
  353. var response ESResponse
  354. if err := json.Unmarshal(body, &response); err != nil {
  355. return ESResponse{}, err
  356. }
  357. for _, hit := range response.Hits.Hits {
  358. var source map[string]interface{}
  359. if err := json.Unmarshal(hit.Source, &source); err != nil {
  360. return ESResponse{}, err
  361. }
  362. }
  363. return response, nil
  364. }
  365. func (es *ESClient) GetSource(hits Hits) []Hit {
  366. return hits.Hits
  367. }
  368. func (es *ESClient) GetCount(hits Hits) []Hit {
  369. return hits.Hits
  370. }
  371. // /*
  372. // *
  373. // 添加es
  374. // indexName 索引名
  375. // id es的id
  376. // body es的值
  377. // */
  378. func (es *ESClient) Add(indexName string, id int, doc interface{}) bool {
  379. jsonDoc, _ := json.Marshal(doc)
  380. req := esapi.IndexRequest{
  381. Index: indexName,
  382. DocumentID: strconv.Itoa(id),
  383. Body: strings.NewReader(string(jsonDoc)),
  384. Refresh: "true",
  385. }
  386. res, err := req.Do(context.Background(), es.es())
  387. defer res.Body.Close()
  388. if err != nil {
  389. logger.Error("es查询失败: %s", err)
  390. }
  391. if res.IsError() {
  392. var e map[string]interface{}
  393. if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
  394. logger.Error("解析es应答失败: %v", err)
  395. } else {
  396. logger.Error("es请求失败: %s: %v\n", res.Status(), e)
  397. }
  398. }
  399. body, err := io.ReadAll(res.Body)
  400. if err != nil {
  401. logger.Error("获取es应答失败: %v", err)
  402. }
  403. fmt.Printf("%s\n", string(body))
  404. return true
  405. }
  406. //
  407. ///*
  408. //*
  409. //修改es
  410. //indexName 索引名
  411. //id es的id
  412. //body es的值
  413. //*/
  414. //func EsUpdate(indexName string, id string, body map[string]interface{}) bool {
  415. // bodyData := map[string]interface{}{
  416. // "doc": body,
  417. // }
  418. // req := httplib.Post(esUrl + indexName + "/_doc/" + id + "/_update")
  419. // req.JSONBody(bodyData)
  420. // _, err := req.String()
  421. // if err != nil {
  422. // fmt.Println("elasticsearch is error ", err)
  423. // return false
  424. // }
  425. // return true
  426. //}
  427. //
  428. ///*
  429. //*
  430. //删除
  431. //indexName 索引名
  432. //id es的id
  433. //*/
  434. //func EsDelete(indexName string, id string) bool {
  435. // req := httplib.Delete(esUrl + indexName + "/_doc/" + id)
  436. // _, err := req.String()
  437. // if err != nil {
  438. // fmt.Println("elasticsearch is error ", err)
  439. // return false
  440. // }
  441. // return true
  442. //
  443. //}
  444. //
  445. //func CreateIndex(indexName string) error {
  446. // resp, err := esClient.es().Indices.
  447. // Create(indexName).
  448. // Do(context.Background())
  449. // if err != nil {
  450. // logger.Error("创建ES索引失败:%v", err)
  451. // return err
  452. // }
  453. // fmt.Printf("index:%#v\n", resp.Index)
  454. // return nil
  455. //}
  456. // DeleteIndex 删除索引
  457. //func DeleteIndex(indexName string) error {
  458. // _, err := esClient.es().Indices. // 表明是对索引的操作,而Index则表示是要操作具体索引下的文档
  459. // Delete(indexName).
  460. // Do(context.Background())
  461. // if err != nil {
  462. // fmt.Printf("delete index failed,err:%v\n", err)
  463. // return err
  464. // }
  465. // fmt.Printf("delete index successed,indexName:%s", indexName)
  466. // return nil
  467. //}
  468. //
  469. //// CreateDocument 创建文档
  470. //func CreateDocument(indexName string, id string, doc interface{}) {
  471. // // 添加文档
  472. // resp, err := esClient.esOp.Index(indexName).Id(id).Document(doc).Do(context.Background())
  473. // if err != nil {
  474. // logger.Error("indexing document failed, err:%v\n", err)
  475. // return
  476. // }
  477. // logger.Info("result:%#v\n", resp.Result)
  478. // return
  479. //}