elastic.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505
  1. package services
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/PuerkitoBio/goquery"
  7. "github.com/olivere/elastic/v7"
  8. "hongze/hz_crm_api/models"
  9. "hongze/hz_crm_api/models/cygx"
  10. "hongze/hz_crm_api/services/alarm_msg"
  11. "hongze/hz_crm_api/utils"
  12. "html"
  13. "strconv"
  14. "strings"
  15. )
  16. func NewClient() (client *elastic.Client, err error) {
  17. client, err = elastic.NewClient(
  18. elastic.SetURL(ES_URL),
  19. elastic.SetBasicAuth(ES_USERNAME, ES_PASSWORD),
  20. elastic.SetSniff(false))
  21. return
  22. }
  23. // indexName:索引名称
  24. // mappingJson:表结构
  25. func EsCreateIndex(indexName, mappingJson string) (err error) {
  26. client, err := NewClient()
  27. if err != nil {
  28. return
  29. }
  30. //定义表结构
  31. exists, err := client.IndexExists(indexName).Do(context.Background()) //<5>
  32. if err != nil {
  33. return
  34. }
  35. if !exists {
  36. resp, err := client.CreateIndex(indexName).BodyJson(mappingJson).Do(context.Background())
  37. //BodyJson(bodyJson).Do(context.Background())
  38. if err != nil {
  39. fmt.Println("CreateIndex Err:" + err.Error())
  40. return err
  41. }
  42. fmt.Println(resp.Index, resp.ShardsAcknowledged, resp.Acknowledged)
  43. } else {
  44. fmt.Println(indexName + " 已存在")
  45. }
  46. return
  47. }
  48. // 新增和修改数据
  49. func EsAddOrEditData(item *cygx.CygxArticle) (err error) {
  50. defer func() {
  51. if err != nil {
  52. fmt.Println("EsAddOrEditData Err:", err.Error())
  53. }
  54. }()
  55. indexName := utils.IndexName
  56. content := html.UnescapeString(item.Body)
  57. doc, errDoc := goquery.NewDocumentFromReader(strings.NewReader(content))
  58. if errDoc != nil {
  59. return
  60. }
  61. doc.Find("a").Each(func(i int, a *goquery.Selection) {
  62. a.Remove()
  63. })
  64. bodyText := doc.Text()
  65. client, err := NewClient()
  66. if err != nil {
  67. fmt.Println(err, "err1")
  68. return
  69. }
  70. var Annotation string
  71. var Abstract string
  72. //Annotation, _ := cygxService.GetReportContentTextSubNew(item.Annotation)
  73. //Abstract, _ := cygxService.GetReportContentTextSubNew(item.Abstract)
  74. docId := strconv.Itoa(item.ArticleId)
  75. searchById, err := client.Get().Index(indexName).Id(docId).Do(context.Background())
  76. if err != nil && !strings.Contains(err.Error(), "404") {
  77. fmt.Println("Get Err" + err.Error())
  78. return
  79. }
  80. if err != nil && strings.Contains(err.Error(), "404") {
  81. err = nil
  82. }
  83. if searchById != nil && searchById.Found {
  84. resp, err := client.Update().Index(indexName).Id(docId).Doc(map[string]interface{}{
  85. "BodyText": bodyText,
  86. "Title": item.Title,
  87. "Annotation": Annotation,
  88. "Abstract": Abstract,
  89. "PublishDate": item.PublishDate.Format(utils.FormatDateTime),
  90. "CategoryId": strconv.Itoa(item.CategoryId), // es的坑 方便json转结构体这里做一个强制转换
  91. "ExpertBackground": item.ExpertBackground,
  92. }).Do(context.Background())
  93. if err != nil {
  94. fmt.Println(err, "err")
  95. return err
  96. }
  97. if resp.Status == 0 {
  98. fmt.Println("修改成功")
  99. } else {
  100. fmt.Println("EditData", resp.Status, resp.Result)
  101. }
  102. } else {
  103. itemEs := new(cygx.ElasticTestArticleDetail)
  104. itemEs.ArticleId = item.ArticleId
  105. itemEs.Title = item.Title
  106. itemEs.PublishDate = item.PublishDate.Format(utils.FormatDateTime)
  107. itemEs.BodyText = bodyText
  108. itemEs.CategoryId = strconv.Itoa(item.CategoryId)
  109. itemEs.ExpertBackground = item.ExpertBackground
  110. itemEs.Abstract = Abstract
  111. itemEs.Annotation = Annotation
  112. resp, err := client.Index().Index(indexName).Id(docId).BodyJson(itemEs).Do(context.Background())
  113. if err != nil {
  114. fmt.Println("新增失败:", err.Error())
  115. return err
  116. }
  117. if resp.Status == 0 && resp.Result == "created" {
  118. fmt.Println("新增成功")
  119. err = nil
  120. } else {
  121. fmt.Println("AddData", resp.Status, resp.Result)
  122. }
  123. }
  124. return
  125. }
  126. // 删除数据
  127. func EsDeleteData(indexName, docId string) (err error) {
  128. client, err := NewClient()
  129. if err != nil {
  130. return
  131. }
  132. resp, err := client.Delete().Index(indexName).Id(docId).Do(context.Background())
  133. fmt.Println(resp)
  134. if err != nil {
  135. return
  136. }
  137. if resp.Status == 0 {
  138. fmt.Println("删除成功")
  139. } else {
  140. fmt.Println("AddData", resp.Status, resp.Result)
  141. }
  142. return
  143. }
  144. func MappingModify(indexName, mappingJson string) {
  145. client, err := NewClient()
  146. if err != nil {
  147. return
  148. }
  149. result, err := client.PutMapping().Index(indexName).BodyString(mappingJson).Do(context.Background())
  150. fmt.Println(err)
  151. fmt.Println(result)
  152. return
  153. }
  154. // EsAddOrEditReport 新增编辑es报告
  155. //func EsAddOrEditReport(indexName, docId string, item *models.ElasticReportDetail) (err error) {
  156. // defer func() {
  157. // if err != nil {
  158. // fmt.Println("EsAddOrEditReport Err:", err.Error())
  159. // }
  160. // }()
  161. // client, err := NewClient()
  162. // if err != nil {
  163. // return
  164. // }
  165. // // docId为报告ID+章节ID
  166. // searchById, err := client.Get().Index(indexName).Id(docId).Do(context.Background())
  167. // if err != nil && !strings.Contains(err.Error(), "404") {
  168. // fmt.Println("Get Err" + err.Error())
  169. // return
  170. // }
  171. // if searchById != nil && searchById.Found {
  172. // resp, err := client.Update().Index(indexName).Id(docId).Doc(map[string]interface{}{
  173. // "ReportId": item.ReportId,
  174. // "ReportChapterId": item.ReportChapterId,
  175. // "Title": item.Title,
  176. // "Abstract": item.Abstract,
  177. // "BodyContent": item.BodyContent,
  178. // "PublishTime": item.PublishTime,
  179. // "PublishState": item.PublishState,
  180. // "Author": item.Author,
  181. // "ClassifyIdFirst": item.ClassifyIdFirst,
  182. // "ClassifyNameFirst": item.ClassifyNameFirst,
  183. // "ClassifyIdSecond": item.ClassifyIdSecond,
  184. // "ClassifyNameSecond": item.ClassifyNameSecond,
  185. // "Categories": item.Categories,
  186. // "StageStr": item.StageStr,
  187. // }).Do(context.Background())
  188. // if err != nil {
  189. // return err
  190. // }
  191. // //fmt.Println(resp.Status, resp.Result)
  192. // if resp.Status == 0 {
  193. // fmt.Println("修改成功" + docId)
  194. // err = nil
  195. // } else {
  196. // fmt.Println("EditData", resp.Status, resp.Result)
  197. // }
  198. // } else {
  199. // resp, err := client.Index().Index(indexName).Id(docId).BodyJson(item).Do(context.Background())
  200. // if err != nil {
  201. // fmt.Println("新增失败:", err.Error())
  202. // return err
  203. // }
  204. // if resp.Status == 0 && resp.Result == "created" {
  205. // fmt.Println("新增成功" + docId)
  206. // return nil
  207. // } else {
  208. // fmt.Println("AddData", resp.Status, resp.Result)
  209. // }
  210. // }
  211. // return
  212. //}
  213. // EsAddOrEditEnglishReport 新增编辑es英文报告
  214. func EsAddOrEditEnglishReport(indexName, docId string, item *models.ElasticEnglishReportDetail) (err error) {
  215. defer func() {
  216. if err != nil {
  217. fmt.Println("EsAddOrEditEnglishReport Err:", err.Error())
  218. go alarm_msg.SendAlarmMsg("新增编辑es英文报告 EsAddOrEditEnglishReport,Err:"+err.Error(), 3)
  219. }
  220. }()
  221. client, err := NewClient()
  222. if err != nil {
  223. return
  224. }
  225. // docId为报告ID
  226. searchById, err := client.Get().Index(indexName).Id(docId).Do(context.Background())
  227. if err != nil {
  228. if strings.Contains(err.Error(), "404") {
  229. err = nil
  230. } else {
  231. fmt.Println("Get Err" + err.Error())
  232. return
  233. }
  234. }
  235. if searchById != nil && searchById.Found {
  236. resp, e := client.Update().Index(indexName).Id(docId).Doc(map[string]interface{}{
  237. "Id": item.Id,
  238. "ReportId": item.ReportId,
  239. "VideoId": item.VideoId,
  240. "Title": item.Title,
  241. "Abstract": item.Abstract,
  242. "BodyContent": item.BodyContent,
  243. "PublishTime": item.PublishTime,
  244. "PublishState": item.PublishState,
  245. "Author": item.Author,
  246. "ClassifyIdFirst": item.ClassifyIdFirst,
  247. "ClassifyNameFirst": item.ClassifyNameFirst,
  248. "ClassifyIdSecond": item.ClassifyIdSecond,
  249. "ClassifyNameSecond": item.ClassifyNameSecond,
  250. "CreateTime": item.CreateTime,
  251. "Overview": item.Overview,
  252. "ReportCode": item.ReportCode,
  253. "Frequency": item.Frequency,
  254. "StageStr": item.StageStr,
  255. "ContentSub": item.ContentSub,
  256. }).Do(context.Background())
  257. if e != nil {
  258. err = e
  259. return
  260. }
  261. //fmt.Println(resp.Status, resp.Result)
  262. if resp.Status == 0 {
  263. fmt.Println("修改成功" + docId)
  264. err = nil
  265. } else {
  266. fmt.Println("EditData", resp.Status, resp.Result)
  267. }
  268. } else {
  269. resp, e := client.Index().Index(indexName).Id(docId).BodyJson(item).Do(context.Background())
  270. if e != nil {
  271. err = e
  272. fmt.Println("新增失败:", err.Error())
  273. return
  274. }
  275. if resp.Status == 0 && resp.Result == "created" {
  276. fmt.Println("新增成功" + docId)
  277. return
  278. } else {
  279. fmt.Println("AddData", resp.Status, resp.Result)
  280. }
  281. }
  282. return
  283. }
  284. // SearcCygxArticleHistoryData 查询查研观向的文章阅读记录
  285. func SearcCygxArticleHistoryData(indexName, keyword, startDate, endDate, companyIds string, from, size int) (total int64, list []*cygx.EsUserInteraction, err error) {
  286. list = make([]*cygx.EsUserInteraction, 0)
  287. defer func() {
  288. if err != nil {
  289. fmt.Println("EsUserInteraction Err:", err.Error())
  290. }
  291. }()
  292. client, err := NewClient()
  293. if err != nil {
  294. return
  295. }
  296. mustMap := make([]interface{}, 0)
  297. seliceid := []int{}
  298. //管理员权限||对应销售
  299. if companyIds != "" {
  300. silceCompanyId := strings.Split(companyIds, ",")
  301. for _, v := range silceCompanyId {
  302. companyId, _ := strconv.Atoi(v)
  303. seliceid = append(seliceid, companyId)
  304. }
  305. mustMap = append(mustMap, map[string]interface{}{
  306. "terms": map[string]interface{}{
  307. "CompanyId": seliceid,
  308. },
  309. })
  310. }
  311. //时间
  312. if startDate != "" && endDate != "" {
  313. mustMap = append(mustMap, map[string]interface{}{
  314. "range": map[string]interface{}{
  315. "CreateTime": map[string]interface{}{
  316. "gte": startDate,
  317. "lte": endDate,
  318. },
  319. },
  320. })
  321. }
  322. shouldMap := make(map[string]interface{}, 0)
  323. //关键字匹配
  324. if keyword != "" {
  325. shouldMap = map[string]interface{}{
  326. "should": []interface{}{
  327. map[string]interface{}{
  328. "wildcard": map[string]interface{}{
  329. "RealName": "*" + keyword + "*",
  330. },
  331. },
  332. map[string]interface{}{
  333. "wildcard": map[string]interface{}{
  334. "Email": "*" + keyword + "*",
  335. },
  336. },
  337. map[string]interface{}{
  338. "wildcard": map[string]interface{}{
  339. "Mobile": "*" + keyword + "*",
  340. },
  341. },
  342. map[string]interface{}{
  343. "wildcard": map[string]interface{}{
  344. "CompanyName": "*" + keyword + "*",
  345. },
  346. },
  347. },
  348. }
  349. }
  350. mustMap = append(mustMap, map[string]interface{}{
  351. "bool": shouldMap,
  352. })
  353. queryMap := map[string]interface{}{
  354. "query": map[string]interface{}{
  355. "bool": map[string]interface{}{
  356. "must": mustMap,
  357. },
  358. },
  359. }
  360. //排序
  361. //机构阅读数量
  362. sortMap := make([]interface{}, 0)
  363. //如果是一家公司就不按照这个字段排序
  364. //if len(seliceid) > 1 {
  365. // sortMap = append(sortMap, map[string]interface{}{
  366. // "CompanyArticleHistoryNum": map[string]interface{}{
  367. // "order": "desc",
  368. // },
  369. // })
  370. //}
  371. //
  372. ////用户阅读数量
  373. //sortMap = append(sortMap, map[string]interface{}{
  374. // "UserArticleHistoryNum": map[string]interface{}{
  375. // "order": "desc",
  376. // },
  377. //})
  378. //时间
  379. sortMap = append(sortMap, map[string]interface{}{
  380. "CreateTime": map[string]interface{}{
  381. "order": "desc",
  382. },
  383. })
  384. //根据条件数量统计
  385. requestTotalHits := client.Count(indexName).BodyJson(queryMap)
  386. total, err = requestTotalHits.Do(context.Background())
  387. if err != nil {
  388. return
  389. }
  390. queryMap["sort"] = sortMap
  391. queryMap["from"] = from
  392. queryMap["size"] = size
  393. jsonBytes, _ := json.Marshal(queryMap)
  394. fmt.Println(string(jsonBytes))
  395. request := client.Search(indexName).Source(queryMap) // sets the JSON request
  396. searchMap := make(map[string]string)
  397. searchResp, err := request.Do(context.Background())
  398. if err != nil {
  399. return
  400. }
  401. //fmt.Println(searchResp)
  402. //fmt.Println(searchResp.Status)
  403. if searchResp.Status != 0 {
  404. return
  405. }
  406. if searchResp.Hits != nil {
  407. for _, v := range searchResp.Hits.Hits {
  408. if _, ok := searchMap[v.Id]; !ok {
  409. itemJson, tmpErr := v.Source.MarshalJSON()
  410. if tmpErr != nil {
  411. err = tmpErr
  412. fmt.Println("movieJson err:", err)
  413. return
  414. }
  415. edbInfoItem := new(cygx.EsUserInteraction)
  416. tmpErr = json.Unmarshal(itemJson, &edbInfoItem)
  417. if tmpErr != nil {
  418. fmt.Println("json.Unmarshal movieJson err:", err)
  419. err = tmpErr
  420. return
  421. }
  422. list = append(list, edbInfoItem)
  423. searchMap[v.Id] = v.Id
  424. }
  425. }
  426. }
  427. return
  428. }
  429. // EsAddOrEditSaDoc 新增编辑语义分析文档
  430. //func EsAddOrEditSaDoc(indexName, docId string, item *saModel.ElasticSaDoc) (err error) {
  431. // defer func() {
  432. // if err != nil {
  433. // fmt.Println("EsAddOrEditSaDoc Err:", err.Error())
  434. // }
  435. // }()
  436. //
  437. // client, e := NewClient()
  438. // if e != nil {
  439. // err = e
  440. // return
  441. // }
  442. //
  443. // // docId为语义分析文档ID+段落ID
  444. // searchById, e := client.Get().Index(indexName).Id(docId).Do(context.Background())
  445. // if e != nil && !strings.Contains(e.Error(), "404") {
  446. // err = fmt.Errorf("query sa doc err: %s", e.Error())
  447. // return
  448. // }
  449. //
  450. // // 更新
  451. // if searchById != nil && searchById.Found {
  452. // docMap := map[string]interface{}{
  453. // "SaDocId": item.SaDocId,
  454. // "SaDocSectionId": item.SaDocSectionId,
  455. // "ClassifyId": item.ClassifyId,
  456. // "ClassifyName": item.ClassifyName,
  457. // "Title": item.Title,
  458. // "Theme": item.Theme,
  459. // "BodyContent": item.BodyContent,
  460. // "Author": item.Author,
  461. // "CoverImg": item.CoverImg,
  462. // "CreateTime": item.CreateTime,
  463. // }
  464. //
  465. // resp, e := client.Update().Index(indexName).Id(docId).Doc(docMap).Do(context.Background())
  466. // if e != nil {
  467. // err = fmt.Errorf("update sa doc err: %s", e.Error())
  468. // return
  469. // }
  470. // if resp.Status == 0 {
  471. // return
  472. // }
  473. // fmt.Println("EditData", resp.Status, resp.Result)
  474. // return
  475. // }
  476. //
  477. // // 新增
  478. // resp, e := client.Index().Index(indexName).Id(docId).BodyJson(item).Do(context.Background())
  479. // if e != nil {
  480. // err = fmt.Errorf("insert sa doc err: %s", e.Error())
  481. // return
  482. // }
  483. // if resp.Status == 0 && resp.Result == "created" {
  484. // return
  485. // }
  486. // fmt.Println("AddData", resp.Status, resp.Result)
  487. // return
  488. //}