report_rai.go 13 KB


  1. package services
  2. import (
  3. "encoding/json"
  4. "eta/eta_api/cache"
  5. "eta/eta_api/models"
  6. "eta/eta_api/models/report"
  7. "eta/eta_api/services/alarm_msg"
  8. "eta/eta_api/utils"
  9. "fmt"
  10. "html"
  11. "io/ioutil"
  12. "net/http"
  13. "strconv"
  14. "strings"
  15. "time"
  16. )
  17. func AutoInsertRaiReport() {
  18. for {
  19. utils.Rc.Brpop(utils.FICC_ARTICLE_UPDATE_KEY, func(b []byte) {
  20. var log models.RaiReportNotifyRedis
  21. if err := json.Unmarshal(b, &log); err != nil {
  22. utils.FileLog.Info("获取权益报告并更新处理Redis队列消息失败: json unmarshal wrong!", err.Error())
  23. go alarm_msg.SendAlarmMsg(fmt.Sprintf("获取权益报告并更新处理Redis队列消息失败: json unmarshal wrong! %s", err.Error()), 2)
  24. }
  25. // 这里直接go出去会出现并发,导致文章md5ID唯一索引限制报错
  26. err := HandleInsertRaiReport(log.ArticleId)
  27. if err != nil {
  28. utils.FileLog.Info("获取权益报告并更新处理Redis队列消息失败: HandleInsertRaiReport ", err.Error())
  29. go alarm_msg.SendAlarmMsg(fmt.Sprintf("获取权益报告并更新处理Redis队列消息失败: HandleInsertRaiReport %s", err.Error()), 2)
  30. }
  31. })
  32. }
  33. }
  34. func HandleInsertRaiReport(raiReportId int) (err error) {
  35. // 设置缓存,防止重复处理
  36. defer func() {
  37. if err != nil {
  38. msg := fmt.Sprintf("处理同步过来的文章失败"+"HandleArticleListByApi ErrMsg:%s artcleId:%d", err.Error(), raiReportId)
  39. utils.FileLog.Info(msg, 2)
  40. go alarm_msg.SendAlarmMsg(msg, 2)
  41. }
  42. }()
  43. body, err := getRaiReportLib(fmt.Sprintf("%s/articles/%d", utils.RaiReportLibUrl, raiReportId))
  44. if err != nil {
  45. fmt.Println(err)
  46. err = fmt.Errorf("获取权益报告失败, Err: %s", err.Error())
  47. return
  48. }
  49. var articleResultDate models.ArticleDetailResultApi
  50. err = json.Unmarshal(body, &articleResultDate)
  51. if err != nil {
  52. fmt.Println("Getres.PublicGetDate Err:", err.Error())
  53. return err
  54. }
  55. articleResult := articleResultDate.Data
  56. err = handleInsertRaiReport(articleResult)
  57. if err != nil {
  58. return err
  59. }
  60. return
  61. }
  62. func handleInsertRaiReport(articleResult models.ArticleResultApidate) (err error) {
  63. raiReportId := articleResult.ArticleId
  64. // 设置缓存,防止重复处理
  65. cacheKey := fmt.Sprintf("rai_report_notify_redis_%d", raiReportId)
  66. cacheValue := utils.Rc.GetStr(cacheKey)
  67. if cacheValue != "" {
  68. return nil
  69. }
  70. utils.Rc.SetNX(cacheKey, "1", 10*time.Second)
  71. defer func() {
  72. if err != nil {
  73. msg := fmt.Sprintf("处理同步过来的文章失败"+"HandleArticleListByApi ErrMsg:%s artcleId:%d", err.Error(), raiReportId)
  74. utils.FileLog.Info(msg, 2)
  75. go alarm_msg.SendAlarmMsg(msg, 2)
  76. }
  77. utils.Rc.Delete(cacheKey)
  78. }()
  79. // var clueApiUrl string
  80. // clueApiUrl = fmt.Sprint(utils.RaiReportLibUrl, "articles/", raiReportId)
  81. // fmt.Println(clueApiUrl)
  82. // body, err := getRaiReportLib(clueApiUrl)
  83. // if err != nil {
  84. // fmt.Println(err)
  85. // err = fmt.Errorf("获取权益报告失败, Err: %s", err.Error())
  86. // return
  87. // }
  88. // var articleResultDate models.ArticleDetailResultApi
  89. // err = json.Unmarshal(body, &articleResultDate)
  90. // if err != nil {
  91. // fmt.Println("Getres.PublicGetDate Err:", err.Error())
  92. // return err
  93. // }
  94. // articleResult := articleResultDate.Data
  95. // 判断是否是固收研究
  96. if articleResult.IndustrId != 12 {
  97. return nil
  98. }
  99. // 根据分类名称查找分类信息
  100. classifyItemList, e := models.GetReportClassifyByClassifyName([]string{articleResult.Industry.Name, articleResult.Series.Name})
  101. if e != nil {
  102. err = fmt.Errorf("GetReportClassifyByClassifyName err: %s", e.Error())
  103. return err
  104. }
  105. classifyMap := make(map[string]*models.Classify)
  106. for _, v := range classifyItemList {
  107. classifyMap[v.ClassifyName] = v
  108. }
  109. classifyFirst, ok := classifyMap[articleResult.Industry.Name]
  110. if !ok {
  111. err = fmt.Errorf("一级分类不存在")
  112. return err
  113. }
  114. classifySecond, ok := classifyMap[articleResult.Series.Name]
  115. if !ok {
  116. // 新增二级分类
  117. err, _, _ = AddReportClassify(articleResult.Series.Name, classifyFirst.Id, []int{})
  118. if err != nil {
  119. err = fmt.Errorf("添加二级分类失败, Err: %s", err.Error())
  120. return err
  121. }
  122. item, err := models.GetClassifyByName(articleResult.Series.Name, classifyFirst.Id)
  123. if err != nil {
  124. err = fmt.Errorf("添加二级分类失败, Err: %s", err.Error())
  125. return err
  126. }
  127. classifySecond = item
  128. }
  129. // 判断分类的层级关系是否合理
  130. if classifyFirst.Id != classifySecond.ParentId {
  131. err = fmt.Errorf("分类层级关系不合理")
  132. return err
  133. }
  134. // 判断报告是否已存在, 如果存在则更新报告,如果不存在则创建报告
  135. reportInfo, err := models.GetReportByRaiReportId(articleResult.ArticleId)
  136. if err != nil && err.Error() != utils.ErrNoRow() {
  137. return err
  138. }
  139. if err == nil && reportInfo.Id > 0 {
  140. var contentSub string
  141. if articleResult.Content.Body != "" {
  142. contentSub, err = GetReportContentSub(articleResult.Content.Body)
  143. if err != nil {
  144. go alarm_msg.SendAlarmMsg("ContentSub 失败,Err:"+err.Error(), 3)
  145. }
  146. }
  147. state := reportInfo.State
  148. publishSource := `publish` //同步至知识库
  149. // 报告已存在,更新报告
  150. if (articleResult.PublishStatus == 2 || articleResult.PublishStatus == 4) && articleResult.IsActive {
  151. // 报告状态为未发布,则更新报告
  152. state = models.ReportStatePublished
  153. reportInfo.PublishTime = articleResult.PublishDate
  154. } else if !articleResult.IsActive {
  155. publishSource = `un_publish` //同步至知识库
  156. // 删除报告
  157. err = models.DeleteReport(reportInfo.Id)
  158. if err != nil {
  159. err = fmt.Errorf("删除报告失败, Err: %s", err.Error())
  160. return
  161. }
  162. go UpdateReportEs(reportInfo.Id, 1)
  163. return
  164. } else {
  165. publishSource = `un_publish` //同步至知识库
  166. // 报告状态为未发布,则更新报告
  167. state = models.ReportStateUnpublished
  168. reportInfo.PublishTime = articleResult.PublishDate
  169. }
  170. // 过滤Abstracthtml标签,把<p>标签去掉
  171. abstract := strings.ReplaceAll(articleResult.Content.Abstract, "<p>", "")
  172. abstract = strings.ReplaceAll(abstract, "</p>", "")
  173. reportInfo.ClassifyIdFirst = classifyFirst.Id
  174. reportInfo.ClassifyNameFirst = articleResult.Industry.Name
  175. reportInfo.ClassifyIdSecond = classifySecond.Id
  176. reportInfo.ClassifyNameSecond = articleResult.Series.Name
  177. reportInfo.Title = articleResult.Title
  178. reportInfo.Abstract = abstract
  179. reportInfo.Author = articleResult.Author.Name
  180. reportInfo.Frequency = articleResult.Frequency
  181. reportInfo.State = state
  182. reportInfo.Content = html.EscapeString(articleResult.Content.Body)
  183. reportInfo.ContentSub = html.EscapeString(contentSub)
  184. //updateTime, _ := time.ParseInLocation(utils.FormatDateTime, articleResult.UpdateDate, time.Local)
  185. reportInfo.ModifyTime = articleResult.UpdateDate
  186. // 报告更新
  187. updateCols := []string{"ClassifyIdFirst", "ClassifyNameFirst", "ClassifyIdSecond", "ClassifyNameSecond", "Title", "Abstract", "Author", "Frequency", "State", "Content", "ContentSub", "ModifyTime", "PublishTime"}
  188. err = reportInfo.UpdateReport(updateCols)
  189. if err != nil {
  190. err = fmt.Errorf("更新报告失败, Err: %s", err.Error())
  191. return
  192. }
  193. go UpdateReportEs(reportInfo.Id, state)
  194. if state == models.ReportStatePublished {
  195. // 报告权限处理
  196. go handleReportPermission(int64(reportInfo.Id), reportInfo.ClassifyIdSecond)
  197. } else {
  198. // 重置小程序详情页海报
  199. _ = ResetMiniProgramReportDetailCover(reportInfo.Id)
  200. }
  201. // 报告发布成功后,需要将相关信息入知识库
  202. go cache.RagEtaReportOpToCache(reportInfo.Id, 0, publishSource)
  203. } else if reportInfo.Id == 0 {
  204. // 报告不存在,创建报告
  205. // 判断状态
  206. if (articleResult.PublishStatus == 2 || articleResult.PublishStatus == 4) && articleResult.IsActive {
  207. var contentSub string
  208. if articleResult.Content.Body != "" {
  209. contentSub, err = GetReportContentSub(articleResult.Content.Body)
  210. if err != nil {
  211. go alarm_msg.SendAlarmMsg("ContentSub 失败,Err:"+err.Error(), 3)
  212. }
  213. }
  214. // 已发布状态
  215. state := models.ReportStatePublished
  216. // 协作方式,1:个人,2:多人协作。默认:1
  217. collaborateType := 1
  218. // 报告布局,1:常规布局,2:智能布局。默认:1
  219. reportLayout := 1
  220. // 是否公开发布,1:是,2:否
  221. isPublicPublish := 1
  222. abstract := strings.ReplaceAll(articleResult.Content.Abstract, "<p>", "")
  223. abstract = strings.ReplaceAll(abstract, "</p>", "")
  224. item := new(models.Report)
  225. item.AddType = 1
  226. item.ReportVersion = 2
  227. item.ClassifyIdFirst = classifyFirst.Id
  228. item.ClassifyNameFirst = articleResult.Industry.Name
  229. item.ClassifyIdSecond = classifySecond.Id
  230. item.ClassifyNameSecond = articleResult.Series.Name
  231. item.Title = articleResult.Title
  232. item.Abstract = abstract
  233. item.Author = articleResult.Author.Name
  234. item.Frequency = articleResult.Frequency
  235. item.State = state
  236. item.Content = html.EscapeString(articleResult.Content.Body)
  237. item.Stage = 0
  238. item.ContentSub = html.EscapeString(contentSub)
  239. item.CreateTime = time.Now().Format(utils.FormatDateTime)
  240. //updateTime, _ := time.ParseInLocation(utils.FormatDateTime, articleResult.UpdateDate, time.Local)
  241. item.ModifyTime = articleResult.UpdateDate
  242. item.ReportVersion = 2
  243. item.AdminId = 0
  244. item.AdminRealName = ""
  245. item.PublishTime = articleResult.PublishDate
  246. item.ClassifyIdThird = 0
  247. item.ClassifyNameThird = ""
  248. item.LastModifyAdminId = 0
  249. item.LastModifyAdminName = ""
  250. item.ContentModifyTime = time.Now()
  251. item.NeedSplice = 1
  252. item.ContentStruct = ""
  253. item.HeadImg = ""
  254. item.EndImg = ""
  255. item.CanvasColor = ""
  256. item.HeadResourceId = 0
  257. item.EndResourceId = 0
  258. item.CollaborateType = int8(collaborateType)
  259. item.ReportLayout = int8(reportLayout)
  260. item.IsPublicPublish = int8(isPublicPublish)
  261. createTime, _ := time.ParseInLocation(utils.FormatDate, articleResult.CreateDate, time.Local)
  262. item.ReportCreateTime = createTime
  263. item.RaiReportId = articleResult.ArticleId
  264. // 新增报告及章节
  265. var reportId int64
  266. allGrantUserList := make([]*report.ReportGrant, 0)
  267. reportId, err = models.AddReportAndChapter(item, allGrantUserList, []models.AddReportChapter{})
  268. if err != nil {
  269. err = fmt.Errorf("新增报告及章节失败, Err: " + err.Error())
  270. return
  271. }
  272. reportCode := utils.MD5(strconv.Itoa(int(reportId)))
  273. item.ReportCode = reportCode
  274. // 修改唯一编码
  275. {
  276. go models.ModifyReportCode(reportId, reportCode)
  277. }
  278. // 报告权限处理
  279. go handleReportPermission(reportId, item.ClassifyIdSecond)
  280. // 更新报告Es
  281. _ = UpdateReportEs(int(reportId), 2)
  282. // 报告发布成功后,需要将相关信息入知识库
  283. go cache.RagEtaReportOpToCache(int(reportId), 0, `publish`)
  284. }
  285. }
  286. return
  287. }
  288. // get公共请求方法
  289. func getRaiReportLib(url string) (body []byte, err error) {
  290. if url == "" {
  291. err = fmt.Errorf("url is empty")
  292. return
  293. }
  294. if utils.RaiReportLibAuthorization == "" {
  295. err = fmt.Errorf("authorization is empty")
  296. return
  297. }
  298. defer func() {
  299. if err != nil {
  300. go utils.SendEmail(utils.APPNAME+"【"+utils.RunMode+"】"+"失败提醒", url+"Get ErrMsg:"+err.Error(), utils.EmailSendToUsers)
  301. }
  302. }()
  303. method := "GET"
  304. client := &http.Client{}
  305. req, err := http.NewRequest(method, url, nil)
  306. if err != nil {
  307. return
  308. }
  309. req.Header.Add("Authorization", utils.RaiReportLibAuthorization)
  310. res, err := client.Do(req)
  311. if err != nil {
  312. return
  313. }
  314. defer res.Body.Close()
  315. body, err = ioutil.ReadAll(res.Body)
  316. if err != nil {
  317. return
  318. }
  319. return
  320. }
  321. // 导入固收研究的历史报告,前提是需要确认报告的分类都已经建好,并且已绑定固定收益品种
  322. func RaiReportInit() (err error) {
  323. fmt.Println("开始导入固收研究的历史报告")
  324. defer func() {
  325. if err != nil {
  326. msg := fmt.Sprintf("导入固收研究的历史报告失败, Err: %s", err.Error())
  327. fmt.Println(msg)
  328. go alarm_msg.SendAlarmMsg(msg, 2)
  329. }
  330. }()
  331. // 分页调用接口获取历史报告
  332. totalPage := 1
  333. pageSize := 10
  334. successCount := 0
  335. failCount := 0
  336. for i := 0; i <= totalPage; i++ {
  337. pageindex := i * pageSize
  338. body, er := getRaiReportLib(fmt.Sprintf("%s/articles/mp?take=%d&skip=%d&publish_status=2&industry_id=12", utils.RaiReportLibUrl, pageSize, pageindex))
  339. if er != nil {
  340. err = fmt.Errorf("获取权益报告失败, Err: %s", er.Error())
  341. return
  342. }
  343. // 解析内容,获取真实的分页总数
  344. var articleResultDate models.RaiArticleListResultApi
  345. err = json.Unmarshal(body, &articleResultDate)
  346. if err != nil {
  347. err = fmt.Errorf("获取权益报告失败json解析失败, Err: %s", err.Error())
  348. return
  349. }
  350. if articleResultDate.Code != 0 {
  351. err = fmt.Errorf("获取权益报告失败, Err: %s", articleResultDate.Msg)
  352. return
  353. }
  354. totalPage = articleResultDate.Pagination.PageTotal
  355. articleResultList := articleResultDate.Data
  356. for _, articleResult := range articleResultList {
  357. err = handleInsertRaiReport(articleResult)
  358. if err != nil {
  359. fmt.Printf("导入固收研究的历史报告失败%d, Err: %s\n", articleResult.ArticleId, err.Error())
  360. failCount++
  361. continue
  362. } else {
  363. fmt.Printf("导入固收研究的历史报告成功%d\n", articleResult.ArticleId)
  364. successCount++
  365. }
  366. }
  367. }
  368. fmt.Printf("导入固收研究的历史报告完成,成功%d,失败%d\n", successCount, failCount)
  369. return nil
  370. }