es_comprehensive.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573
  1. package elastic
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/PuerkitoBio/goquery"
  7. "github.com/olivere/elastic/v7"
  8. "hongze/hz_crm_api/models/cygx"
  9. "hongze/hz_crm_api/services/alarm_msg"
  10. "hongze/hz_crm_api/utils"
  11. "html"
  12. "strconv"
  13. "strings"
  14. "time"
  15. )
  16. type ElasticComprehensiveDetail struct {
  17. SourceId int `description:"资源ID"`
  18. IsSummary int `description:"是否是纪要"`
  19. Source string `description:"资源类型 报告 :article 、图表 :newchart、微路演 :roadshow、活动 :activity、活动视频:activityvideo、活动音频:activityvoice、专项调研活动:activityspecial 、 本周研究汇总: researchsummary 、 上周纪要汇总 :minutessummary 、晨会精华 :meetingreviewchapt 、 产品内测:productinterior 、 产业资源包:industrialsource"`
  20. Title string `description:"标题"`
  21. BodyText string `description:"内容"`
  22. PublishDate string `description:"发布时间"`
  23. Abstract string `description:"摘要"`
  24. Annotation string `description:"核心观点"`
  25. IndustryName string `description:"产业名称"`
  26. SubjectNames string `description:"标的名称"`
  27. }
  28. // 新增和修改数据
  29. func EsAddOrEditComprehensiveData(item *ElasticComprehensiveDetail) (err error) {
  30. indexName := utils.IndexNameComprehensive // 避免调用错别的项目的索引 ,这里写死
  31. //return
  32. defer func() {
  33. if err != nil {
  34. go alarm_msg.SendAlarmMsg("更新综合页面数据Es失败"+err.Error()+fmt.Sprint(item), 2)
  35. }
  36. }()
  37. client, err := NewClient()
  38. if err != nil {
  39. fmt.Println(err, "err1")
  40. return
  41. }
  42. mustMap := make([]interface{}, 0)
  43. mustMap = append(mustMap, map[string]interface{}{
  44. "term": map[string]interface{}{
  45. "SourceId": item.SourceId,
  46. },
  47. })
  48. mustMap = append(mustMap, map[string]interface{}{
  49. "term": map[string]interface{}{
  50. "Source": item.Source,
  51. },
  52. })
  53. //fmt.Println(item.SourceId)
  54. queryMap := map[string]interface{}{
  55. "query": map[string]interface{}{
  56. "bool": map[string]interface{}{
  57. "must": mustMap,
  58. },
  59. },
  60. }
  61. requestTotalHits := client.Count(indexName).BodyJson(queryMap)
  62. total, e := requestTotalHits.Do(context.Background())
  63. if e != nil {
  64. err = errors.New("requestTotalHits.Do(context.Background()), Err: " + e.Error())
  65. return
  66. }
  67. //return
  68. //根据来源以及ID ,判断内容是否存在,如果存在就新增,如果不存在就修改
  69. if total == 0 {
  70. resp, e := client.Index().Index(indexName).BodyJson(item).Do(context.Background())
  71. if e != nil {
  72. err = errors.New("client.Index().Index(indexName).BodyJson(item).Do(context.Background()), Err: " + e.Error())
  73. return
  74. }
  75. if resp.Status == 0 && resp.Result == "created" {
  76. //fmt.Println("新增成功")
  77. //err = nil
  78. return
  79. } else {
  80. err = errors.New(fmt.Sprint(resp))
  81. return
  82. }
  83. } else {
  84. //拼接需要改动的前置条件
  85. bool_query := elastic.NewBoolQuery()
  86. bool_query.Must(elastic.NewTermQuery("SourceId", item.SourceId))
  87. bool_query.Must(elastic.NewTermQuery("Source", item.Source))
  88. //设置需要改动的内容
  89. var script string
  90. script += fmt.Sprint("ctx._source['SubjectNames'] = '", item.SubjectNames, "';")
  91. script += fmt.Sprint("ctx._source['PublishDate'] = '", item.PublishDate, "';")
  92. script += fmt.Sprint("ctx._source['IsSummary'] = ", item.IsSummary, ";")
  93. script += fmt.Sprint("ctx._source['Abstract'] = '", item.Abstract, "';")
  94. script += fmt.Sprint("ctx._source['Title'] = '", item.Title, "';")
  95. script += fmt.Sprint("ctx._source['BodyText'] = '", item.BodyText, "';")
  96. script += fmt.Sprint("ctx._source['Annotation'] = '", item.Annotation, "';")
  97. script += fmt.Sprint("ctx._source['IndustryName'] = '", item.IndustryName, "'")
  98. _, e := client.UpdateByQuery(indexName).
  99. Query(bool_query).
  100. Script(elastic.NewScriptInline(script)).
  101. Refresh("true").
  102. Do(context.Background())
  103. if e != nil {
  104. err = errors.New(" client.UpdateByQuery(indexName), Err: " + e.Error())
  105. return
  106. }
  107. }
  108. return
  109. }
  110. // 删除数据
  111. func EsDeleteComprehensiveData(item *ElasticComprehensiveDetail) (err error) {
  112. indexName := utils.IndexNameComprehensive // 避免调用错别的项目的索引 ,这里写死
  113. defer func() {
  114. if err != nil {
  115. fmt.Println(err)
  116. go alarm_msg.SendAlarmMsg("删除数据综合页面数据Es失败"+err.Error()+fmt.Sprint(item), 2)
  117. }
  118. }()
  119. fmt.Println("删除", item.SourceId)
  120. client, err := NewClient()
  121. //拼接需要删除的前置条件
  122. bool_query := elastic.NewBoolQuery()
  123. bool_query.Must(elastic.NewTermQuery("SourceId", item.SourceId))
  124. bool_query.Must(elastic.NewTermQuery("Source", item.Source))
  125. _, e := client.DeleteByQuery(indexName).
  126. Query(bool_query).
  127. Do(context.Background())
  128. if e != nil {
  129. err = errors.New(" client.DeleteByQuery(indexName), Err: " + e.Error())
  130. return
  131. }
  132. return
  133. }
  134. // ES添加文章:报告、纪要
  135. func AddComprehensiveArticle(sourceId int) {
  136. time.Sleep(3 * time.Second) // 延迟三秒处理
  137. var err error
  138. defer func() {
  139. if err != nil {
  140. fmt.Println("err:", err)
  141. go alarm_msg.SendAlarmMsg("AddComprehensiveArticle,Err:"+err.Error(), 3)
  142. }
  143. }()
  144. v, e := cygx.GetArticleInfoOtherByArticleId(sourceId)
  145. if e != nil {
  146. err = errors.New("GetArticleInfoOtherByArticleId" + e.Error())
  147. return
  148. }
  149. content := html.UnescapeString(v.Body)
  150. doc, e := goquery.NewDocumentFromReader(strings.NewReader(content))
  151. if e != nil {
  152. err = errors.New("goquery.NewDocumentFromReader" + e.Error())
  153. return
  154. }
  155. bodyText := doc.Text()
  156. item := new(ElasticComprehensiveDetail)
  157. item.SourceId = v.ArticleId
  158. item.IsSummary = v.IsSummary
  159. item.Source = utils.CYGX_OBJ_ARTICLE
  160. item.Title = v.Title
  161. item.PublishDate = v.PublishDate.Format(utils.FormatDateTime)
  162. item.BodyText = bodyText
  163. item.Annotation = html.UnescapeString(v.Annotation)
  164. item.Abstract = html.UnescapeString(v.Abstract)
  165. if v.PublishStatus == 1 {
  166. EsAddOrEditComprehensiveData(item) //如果发布了就新增
  167. } else {
  168. EsDeleteComprehensiveData(item) // 没有发布就删除
  169. }
  170. return
  171. }
  172. // Es添加活动
  173. func AddComprehensiveActivity(sourceId int) {
  174. time.Sleep(3 * time.Second) // 延迟三秒处理
  175. var err error
  176. defer func() {
  177. if err != nil {
  178. fmt.Println("err:", err)
  179. go alarm_msg.SendAlarmMsg("AddComprehensiveActivity,Err:"+err.Error(), 3)
  180. }
  181. }()
  182. activityId := sourceId
  183. detail, e := cygx.GetAddActivityInfoById(sourceId)
  184. if e != nil {
  185. err = errors.New("GetAddActivityInfoById" + e.Error())
  186. return
  187. }
  188. mapActivityIndustrialManagement := make(map[int][]string)
  189. mapActivitySubject := make(map[int][]string)
  190. industrialList, e := cygx.GetIndustrialActivityGroupManagementList(activityId, 1)
  191. if e != nil && e.Error() != utils.ErrNoRow() {
  192. err = errors.New("GetIndustrialActivityGroupManagementList" + e.Error())
  193. return
  194. }
  195. if len(industrialList) > 0 {
  196. for _, v := range industrialList {
  197. mapActivityIndustrialManagement[activityId] = append(mapActivityIndustrialManagement[activityId], v.IndustryName)
  198. }
  199. subjectList, e := cygx.GetSubjectActivityGroupManagementList(activityId, 1)
  200. if e != nil && e.Error() != utils.ErrNoRow() {
  201. err = errors.New("GetSubjectActivityGroupManagementList" + e.Error())
  202. return
  203. }
  204. if len(subjectList) > 0 {
  205. for _, v := range subjectList {
  206. mapActivitySubject[activityId] = append(mapActivitySubject[activityId], v.SubjectName)
  207. }
  208. }
  209. }
  210. item := new(ElasticComprehensiveDetail)
  211. item.SourceId = detail.ActivityId
  212. item.Source = utils.CYGX_OBJ_ACTIVITY
  213. item.SubjectNames = strings.Join(mapActivitySubject[detail.ActivityId], ",")
  214. item.IndustryName = strings.Join(mapActivityIndustrialManagement[detail.ActivityId], ",")
  215. item.Title = detail.ActivityName
  216. item.PublishDate = detail.ActivityTime
  217. if detail.PublishStatus == 1 {
  218. EsAddOrEditComprehensiveData(item) //如果发布了就新增
  219. } else {
  220. EsDeleteComprehensiveData(item) // 没有发布就删除
  221. }
  222. return
  223. }
  224. // Es添加专项调研活动
  225. func AddComprehensiveActivitySpecial(sourceId int) {
  226. time.Sleep(3 * time.Second) // 延迟三秒处理
  227. var err error
  228. defer func() {
  229. if err != nil {
  230. fmt.Println("err:", err)
  231. go alarm_msg.SendAlarmMsg("AddComprehensiveActivitySpecial,Err:"+err.Error(), 3)
  232. }
  233. }()
  234. activityId := sourceId
  235. detail, e := cygx.GetAddActivityInfoSpecialById(sourceId)
  236. if e != nil {
  237. err = errors.New("GetAddActivityInfoSpecialById" + e.Error())
  238. return
  239. }
  240. mapActivityIndustrialManagement := make(map[int][]string)
  241. mapActivitySubject := make(map[int][]string)
  242. industrialList, e := cygx.GetIndustrialActivityGroupManagementList(activityId, 2)
  243. if e != nil && e.Error() != utils.ErrNoRow() {
  244. err = errors.New("GetIndustrialActivityGroupManagementList" + e.Error())
  245. return
  246. }
  247. if len(industrialList) > 0 {
  248. for _, v := range industrialList {
  249. mapActivityIndustrialManagement[activityId] = append(mapActivityIndustrialManagement[activityId], v.IndustryName)
  250. }
  251. subjectList, e := cygx.GetSubjectActivityGroupManagementList(activityId, 2)
  252. if e != nil && e.Error() != utils.ErrNoRow() {
  253. err = errors.New("GetSubjectActivityGroupManagementList" + e.Error())
  254. return
  255. }
  256. if len(subjectList) > 0 {
  257. for _, v := range subjectList {
  258. mapActivitySubject[activityId] = append(mapActivitySubject[activityId], v.SubjectName)
  259. }
  260. }
  261. }
  262. item := new(ElasticComprehensiveDetail)
  263. item.SourceId = detail.ActivityId
  264. item.Source = utils.CYGX_OBJ_ACTIVITYSPECIAL
  265. item.SubjectNames = strings.Join(mapActivitySubject[detail.ActivityId], ",")
  266. item.IndustryName = strings.Join(mapActivityIndustrialManagement[detail.ActivityId], ",")
  267. item.Title = detail.ResearchTheme
  268. if detail.ActivityTime == utils.EmptyDateTimeStr {
  269. item.PublishDate = detail.LastUpdatedTime.Format(utils.FormatDateTime)
  270. } else {
  271. item.PublishDate = detail.ActivityTime
  272. }
  273. if detail.PublishStatus == 1 {
  274. EsAddOrEditComprehensiveData(item) //如果发布了就新增
  275. } else {
  276. EsDeleteComprehensiveData(item) // 没有发布就删除
  277. }
  278. return
  279. }
  280. // Es添加活动视频
  281. func AddComprehensiveActivityVideo(activityId int) {
  282. time.Sleep(3 * time.Second) // 延迟三秒处理
  283. var err error
  284. defer func() {
  285. if err != nil {
  286. fmt.Println("err:", err)
  287. go alarm_msg.SendAlarmMsg("AddComprehensiveActivityVideo,Err:"+err.Error(), 3)
  288. }
  289. }()
  290. detailactivity, e := cygx.GetAddActivityInfoById(activityId)
  291. if e != nil {
  292. err = errors.New("GetAddActivityInfoById" + e.Error())
  293. return
  294. }
  295. detail, e := cygx.GetCygxActivityVideoReqDetail(activityId)
  296. if e != nil {
  297. err = errors.New("GetAddActivityInfoById" + e.Error())
  298. return
  299. }
  300. item := new(ElasticComprehensiveDetail)
  301. item.SourceId = detail.VideoId
  302. item.Source = utils.CYGX_OBJ_ACTIVITYVIDEO
  303. if detail == nil {
  304. EsDeleteComprehensiveData(item) //如果活动视频不存在 没有发布就删除
  305. }
  306. mapActivityIndustrialManagement := make(map[int][]string)
  307. mapActivitySubject := make(map[int][]string)
  308. industrialList, e := cygx.GetIndustrialActivityGroupManagementList(activityId, 1)
  309. if e != nil && e.Error() != utils.ErrNoRow() {
  310. err = errors.New("GetIndustrialActivityGroupManagementList" + e.Error())
  311. return
  312. }
  313. if len(industrialList) > 0 {
  314. for _, v := range industrialList {
  315. mapActivityIndustrialManagement[activityId] = append(mapActivityIndustrialManagement[activityId], v.IndustryName)
  316. }
  317. subjectList, e := cygx.GetSubjectActivityGroupManagementList(activityId, 1)
  318. if e != nil && e.Error() != utils.ErrNoRow() {
  319. err = errors.New("GetSubjectActivityGroupManagementList" + e.Error())
  320. return
  321. }
  322. if len(subjectList) > 0 {
  323. for _, v := range subjectList {
  324. mapActivitySubject[activityId] = append(mapActivitySubject[activityId], v.SubjectName)
  325. }
  326. }
  327. }
  328. item.SubjectNames = strings.Join(mapActivitySubject[detail.ActivityId], ",")
  329. item.IndustryName = strings.Join(mapActivityIndustrialManagement[detail.ActivityId], ",")
  330. item.Title = detail.VideoName
  331. item.BodyText = detailactivity.ActivityName
  332. item.PublishDate = detailactivity.ActivityTime
  333. EsAddOrEditComprehensiveData(item) //新增或者修改
  334. return
  335. }
  336. // Es添加活动音频
  337. func AddComprehensiveActivityVoice(activityId int) {
  338. time.Sleep(3 * time.Second) // 延迟三秒处理
  339. var err error
  340. defer func() {
  341. if err != nil {
  342. fmt.Println("err:", err)
  343. go alarm_msg.SendAlarmMsg("AddComprehensiveActivityVoice,Err:"+err.Error(), 3)
  344. }
  345. }()
  346. detailactivity, e := cygx.GetAddActivityInfoById(activityId)
  347. if e != nil {
  348. err = errors.New("GetAddActivityInfoById" + e.Error())
  349. return
  350. }
  351. detail, e := cygx.GetCygxActivityVoiceReqDetail(activityId)
  352. if e != nil {
  353. err = errors.New("GetAddActivityInfoById" + e.Error())
  354. return
  355. }
  356. item := new(ElasticComprehensiveDetail)
  357. item.SourceId = detail.ActivityVoiceId
  358. item.Source = utils.CYGX_OBJ_ACTIVITYVOICE
  359. if detail == nil {
  360. EsDeleteComprehensiveData(item) //如果不存在就先删除
  361. }
  362. mapActivityIndustrialManagement := make(map[int][]string)
  363. mapActivitySubject := make(map[int][]string)
  364. industrialList, e := cygx.GetIndustrialActivityGroupManagementList(activityId, 1)
  365. if e != nil && e.Error() != utils.ErrNoRow() {
  366. err = errors.New("GetIndustrialActivityGroupManagementList" + e.Error())
  367. return
  368. }
  369. if len(industrialList) > 0 {
  370. for _, v := range industrialList {
  371. mapActivityIndustrialManagement[activityId] = append(mapActivityIndustrialManagement[activityId], v.IndustryName)
  372. }
  373. subjectList, e := cygx.GetSubjectActivityGroupManagementList(activityId, 1)
  374. if e != nil && e.Error() != utils.ErrNoRow() {
  375. err = errors.New("GetSubjectActivityGroupManagementList" + e.Error())
  376. return
  377. }
  378. if len(subjectList) > 0 {
  379. for _, v := range subjectList {
  380. mapActivitySubject[activityId] = append(mapActivitySubject[activityId], v.SubjectName)
  381. }
  382. }
  383. }
  384. item.SubjectNames = strings.Join(mapActivitySubject[detail.ActivityId], ",")
  385. item.IndustryName = strings.Join(mapActivityIndustrialManagement[detail.ActivityId], ",")
  386. item.Title = detail.VoiceName
  387. item.BodyText = detailactivity.ActivityName
  388. item.PublishDate = detailactivity.ActivityTime
  389. EsAddOrEditComprehensiveData(item) //新增或者修改
  390. return
  391. }
  392. // Es添加微路演
  393. func AddComprehensiveRoadshow(sourceId int) {
  394. time.Sleep(3 * time.Second) // 延迟三秒处理
  395. var err error
  396. defer func() {
  397. if err != nil {
  398. fmt.Println("err:", err)
  399. go alarm_msg.SendAlarmMsg("AddComprehensiveActivityVoice,Err:"+err.Error(), 3)
  400. }
  401. }()
  402. v, e := cygx.GetMicroRoadshowVideoByVideoId(sourceId)
  403. if e != nil {
  404. err = errors.New("GetMicroRoadshowVideoByVideoId" + e.Error())
  405. return
  406. }
  407. item := new(ElasticComprehensiveDetail)
  408. item.SourceId = v.VideoId
  409. item.Source = utils.CYGX_OBJ_ROADSHOW
  410. item.IndustryName = v.IndustryName
  411. item.Title = v.VideoName
  412. item.PublishDate = v.PublishDate.Format(utils.FormatDateTime)
  413. if v.PublishStatus == 1 {
  414. EsAddOrEditComprehensiveData(item) //如果发布了就新增
  415. } else {
  416. EsDeleteComprehensiveData(item) // 没有发布就删除
  417. }
  418. return
  419. }
  420. // Es添加晨会精华
  421. func AddComprehensiveMeetingreviewchapt(sourceId int) {
  422. time.Sleep(3 * time.Second) // 延迟三秒处理
  423. var err error
  424. defer func() {
  425. if err != nil {
  426. fmt.Println("err:", err)
  427. go alarm_msg.SendAlarmMsg("AddComprehensiveMeetingreviewchapt,Err:"+err.Error(), 3)
  428. }
  429. }()
  430. v, e := cygx.GetCygxMorningMeetingReviewChapterDetail(sourceId)
  431. if e != nil {
  432. err = errors.New("GetCygxMorningMeetingReviewChapterDetail" + e.Error())
  433. return
  434. }
  435. content := html.UnescapeString(v.Content)
  436. doc, e := goquery.NewDocumentFromReader(strings.NewReader(content))
  437. if e != nil {
  438. err = errors.New("goquery.NewDocumentFromReader" + e.Error())
  439. return
  440. }
  441. bodyText := doc.Text()
  442. item := new(ElasticComprehensiveDetail)
  443. item.SourceId = sourceId
  444. item.Source = utils.CYGX_OBJ_MEETINGREVIEWCHAPT
  445. item.IndustryName = v.IndustryName
  446. item.PublishDate = v.MeetingTime.Format(utils.FormatDateTime)
  447. item.Abstract = bodyText
  448. EsAddOrEditComprehensiveData(item) //如果发布了就新增
  449. return
  450. }
  451. // Es删除晨会精华
  452. func DeleteComprehensiveMeetingreviewchapt(sourceId int) {
  453. time.Sleep(3 * time.Second) // 延迟三秒处理
  454. var err error
  455. defer func() {
  456. if err != nil {
  457. fmt.Println("err:", err)
  458. go alarm_msg.SendAlarmMsg("AddComprehensiveMeetingreviewchapt,Err:"+err.Error(), 3)
  459. }
  460. }()
  461. item := new(ElasticComprehensiveDetail)
  462. item.SourceId = sourceId
  463. item.Source = utils.CYGX_OBJ_MEETINGREVIEWCHAPT
  464. EsDeleteComprehensiveData(item) //删除
  465. return
  466. }
  467. // 添加产业资源包
  468. func AddComprehensiveIndustrialSource(sourceType string, articleId int) {
  469. time.Sleep(3 * time.Second) // 延迟三秒处理
  470. var err error
  471. defer func() {
  472. if err != nil {
  473. fmt.Println(err)
  474. go alarm_msg.SendAlarmMsg("AddComprehensiveIndustrialSource"+err.Error(), 2)
  475. }
  476. }()
  477. var condition string
  478. var pars []interface{}
  479. mapActivitySubject := make(map[int][]string, 0)
  480. listsubject, e := cygx.GetCygxIndustrialSubjectListCondition(condition, pars)
  481. if e != nil {
  482. err = errors.New("GetIndustrialManagementRepList, Err: " + e.Error())
  483. return
  484. }
  485. for _, v := range listsubject {
  486. mapActivitySubject[v.IndustrialManagementId] = append(mapActivitySubject[v.IndustrialManagementId], v.SubjectName)
  487. }
  488. var industrialsource string
  489. if sourceType == "Hz" {
  490. condition = " AND a.article_type_id = 0 " // 弘则资源包
  491. industrialsource = "industrialsourceHz"
  492. } else {
  493. condition = " AND a.article_type_id > 0 " //研选资源包
  494. industrialsource = "industrialsourceYx"
  495. }
  496. if articleId > 0 {
  497. condition += " AND a.article_id = " + strconv.Itoa(articleId)
  498. }
  499. list, err := cygx.GetSearchResourceList(0, condition, 0, 0)
  500. if err != nil {
  501. fmt.Println(err)
  502. return
  503. }
  504. if len(list) == 0 {
  505. return
  506. }
  507. for _, v := range list {
  508. item := new(ElasticComprehensiveDetail)
  509. item.SourceId = v.IndustrialManagementId
  510. item.Source = industrialsource
  511. item.IndustryName = v.IndustryName
  512. item.SubjectNames = strings.Join(mapActivitySubject[v.IndustrialManagementId], ",")
  513. item.PublishDate = v.PublishDate + " 00:00:00"
  514. EsAddOrEditComprehensiveData(item)
  515. }
  516. }
  517. // Es研选专栏
  518. func EsAddYanxuanSpecial(sourceId int) {
  519. var err error
  520. defer func() {
  521. if err != nil {
  522. fmt.Println("err:", err)
  523. go alarm_msg.SendAlarmMsg(fmt.Sprint("更新研选专栏失败sourceId: ", sourceId), 2)
  524. }
  525. }()
  526. detail, e := cygx.GetYanxuanSpecialItemById(sourceId)
  527. if e != nil {
  528. err = errors.New("GetArticleInfoOtherByArticleId" + e.Error())
  529. return
  530. }
  531. content := html.UnescapeString(detail.Content)
  532. doc, e := goquery.NewDocumentFromReader(strings.NewReader(content))
  533. if e != nil {
  534. err = errors.New("goquery.NewDocumentFromReader" + e.Error())
  535. return
  536. }
  537. bodyText := doc.Text()
  538. item := new(ElasticComprehensiveDetail)
  539. item.SourceId = detail.Id
  540. item.Source = utils.CYGX_OBJ_YANXUANSPECIAL
  541. item.Title = detail.Title
  542. item.PublishDate = detail.PublishTime
  543. item.BodyText = bodyText
  544. if detail.Status == 3 {
  545. EsAddOrEditComprehensiveData(item) //如果发布了就新增
  546. } else {
  547. EsDeleteComprehensiveData(item) // 没有发布就删除
  548. }
  549. return
  550. }