report_rai.go 13 KB


  1. package services
  2. import (
  3. "encoding/json"
  4. "eta/eta_api/models"
  5. "eta/eta_api/models/report"
  6. "eta/eta_api/services/alarm_msg"
  7. "eta/eta_api/utils"
  8. "fmt"
  9. "html"
  10. "io/ioutil"
  11. "net/http"
  12. "strconv"
  13. "strings"
  14. "time"
  15. )
  16. func AutoInsertRaiReport() {
  17. for {
  18. utils.Rc.Brpop(utils.FICC_ARTICLE_UPDATE_KEY, func(b []byte) {
  19. var log models.RaiReportNotifyRedis
  20. if err := json.Unmarshal(b, &log); err != nil {
  21. utils.FileLog.Info("获取权益报告并更新处理Redis队列消息失败: json unmarshal wrong!", err.Error())
  22. go alarm_msg.SendAlarmMsg(fmt.Sprintf("获取权益报告并更新处理Redis队列消息失败: json unmarshal wrong! %s", err.Error()), 2)
  23. }
  24. // 这里直接go出去会出现并发,导致文章md5ID唯一索引限制报错
  25. err := HandleInsertRaiReport(log.ArticleId)
  26. if err != nil {
  27. utils.FileLog.Info("获取权益报告并更新处理Redis队列消息失败: HandleInsertRaiReport ", err.Error())
  28. go alarm_msg.SendAlarmMsg(fmt.Sprintf("获取权益报告并更新处理Redis队列消息失败: HandleInsertRaiReport %s", err.Error()), 2)
  29. }
  30. })
  31. }
  32. }
  33. func HandleInsertRaiReport(raiReportId int) (err error) {
  34. // 设置缓存,防止重复处理
  35. defer func() {
  36. if err != nil {
  37. msg := fmt.Sprintf("处理同步过来的文章失败"+"HandleArticleListByApi ErrMsg:%s artcleId:%d", err.Error(), raiReportId)
  38. utils.FileLog.Info(msg, 2)
  39. go alarm_msg.SendAlarmMsg(msg, 2)
  40. }
  41. }()
  42. body, err := getRaiReportLib(fmt.Sprintf("%s/articles/%d", utils.RaiReportLibUrl, raiReportId))
  43. if err != nil {
  44. fmt.Println(err)
  45. err = fmt.Errorf("获取权益报告失败, Err: %s", err.Error())
  46. return
  47. }
  48. var articleResultDate models.ArticleDetailResultApi
  49. err = json.Unmarshal(body, &articleResultDate)
  50. if err != nil {
  51. fmt.Println("Getres.PublicGetDate Err:", err.Error())
  52. return err
  53. }
  54. articleResult := articleResultDate.Data
  55. err = handleInsertRaiReport(articleResult)
  56. if err != nil {
  57. return err
  58. }
  59. return
  60. }
  61. func handleInsertRaiReport(articleResult models.ArticleResultApidate) (err error) {
  62. raiReportId := articleResult.ArticleId
  63. // 设置缓存,防止重复处理
  64. cacheKey := fmt.Sprintf("rai_report_notify_redis_%d", raiReportId)
  65. cacheValue := utils.Rc.GetStr(cacheKey)
  66. if cacheValue != "" {
  67. return nil
  68. }
  69. utils.Rc.SetNX(cacheKey, "1", 10*time.Second)
  70. defer func() {
  71. if err != nil {
  72. msg := fmt.Sprintf("处理同步过来的文章失败"+"HandleArticleListByApi ErrMsg:%s artcleId:%d", err.Error(), raiReportId)
  73. utils.FileLog.Info(msg, 2)
  74. go alarm_msg.SendAlarmMsg(msg, 2)
  75. }
  76. utils.Rc.Delete(cacheKey)
  77. }()
  78. // var clueApiUrl string
  79. // clueApiUrl = fmt.Sprint(utils.RaiReportLibUrl, "articles/", raiReportId)
  80. // fmt.Println(clueApiUrl)
  81. // body, err := getRaiReportLib(clueApiUrl)
  82. // if err != nil {
  83. // fmt.Println(err)
  84. // err = fmt.Errorf("获取权益报告失败, Err: %s", err.Error())
  85. // return
  86. // }
  87. // var articleResultDate models.ArticleDetailResultApi
  88. // err = json.Unmarshal(body, &articleResultDate)
  89. // if err != nil {
  90. // fmt.Println("Getres.PublicGetDate Err:", err.Error())
  91. // return err
  92. // }
  93. // articleResult := articleResultDate.Data
  94. // 判断是否是固收研究
  95. if articleResult.IndustrId != 12 {
  96. return nil
  97. }
  98. // 根据分类名称查找分类信息
  99. classifyItemList, e := models.GetReportClassifyByClassifyName([]string{articleResult.Industry.Name, articleResult.Series.Name})
  100. if e != nil {
  101. err = fmt.Errorf("GetReportClassifyByClassifyName err: %s", e.Error())
  102. return err
  103. }
  104. classifyMap := make(map[string]*models.Classify)
  105. for _, v := range classifyItemList {
  106. classifyMap[v.ClassifyName] = v
  107. }
  108. classifyFirst, ok := classifyMap[articleResult.Industry.Name]
  109. if !ok {
  110. err = fmt.Errorf("一级分类不存在")
  111. return err
  112. }
  113. classifySecond, ok := classifyMap[articleResult.Series.Name]
  114. if !ok {
  115. // 新增二级分类
  116. err, _, _ = AddReportClassify(articleResult.Series.Name, classifyFirst.Id, []int{})
  117. if err != nil {
  118. err = fmt.Errorf("添加二级分类失败, Err: %s", err.Error())
  119. return err
  120. }
  121. item, err := models.GetClassifyByName(articleResult.Series.Name, classifyFirst.Id)
  122. if err != nil {
  123. err = fmt.Errorf("添加二级分类失败, Err: %s", err.Error())
  124. return err
  125. }
  126. classifySecond = item
  127. }
  128. // 判断分类的层级关系是否合理
  129. if classifyFirst.Id != classifySecond.ParentId {
  130. err = fmt.Errorf("分类层级关系不合理")
  131. return err
  132. }
  133. // 判断报告是否已存在, 如果存在则更新报告,如果不存在则创建报告
  134. reportInfo, err := models.GetReportByRaiReportId(articleResult.ArticleId)
  135. if err != nil && err.Error() != utils.ErrNoRow() {
  136. return err
  137. }
  138. if err == nil && reportInfo.Id > 0 {
  139. var contentSub string
  140. if articleResult.Content.Body != "" {
  141. contentSub, err = GetReportContentSub(articleResult.Content.Body)
  142. if err != nil {
  143. go alarm_msg.SendAlarmMsg("ContentSub 失败,Err:"+err.Error(), 3)
  144. }
  145. }
  146. state := reportInfo.State
  147. // 报告已存在,更新报告
  148. if (articleResult.PublishStatus == 2 || articleResult.PublishStatus == 4) && articleResult.IsActive {
  149. // 报告状态为未发布,则更新报告
  150. state = models.ReportStatePublished
  151. reportInfo.PublishTime = articleResult.PublishDate
  152. }else if !articleResult.IsActive {
  153. // 删除报告
  154. err = models.DeleteReport(reportInfo.Id)
  155. if err != nil {
  156. err = fmt.Errorf("删除报告失败, Err: %s", err.Error())
  157. return
  158. }
  159. go UpdateReportEs(reportInfo.Id, 1)
  160. return
  161. }else {
  162. // 报告状态为未发布,则更新报告
  163. state = models.ReportStateUnpublished
  164. reportInfo.PublishTime = articleResult.PublishDate
  165. }
  166. // 过滤Abstracthtml标签,把<p>标签去掉
  167. abstract := strings.ReplaceAll(articleResult.Content.Abstract, "<p>", "")
  168. abstract = strings.ReplaceAll(abstract, "</p>", "")
  169. reportInfo.ClassifyIdFirst = classifyFirst.Id
  170. reportInfo.ClassifyNameFirst = articleResult.Industry.Name
  171. reportInfo.ClassifyIdSecond = classifySecond.Id
  172. reportInfo.ClassifyNameSecond = articleResult.Series.Name
  173. reportInfo.Title = articleResult.Title
  174. reportInfo.Abstract = abstract
  175. reportInfo.Author = articleResult.Author.Name
  176. reportInfo.Frequency = articleResult.Frequency
  177. reportInfo.State = state
  178. reportInfo.Content = html.EscapeString(articleResult.Content.Body)
  179. reportInfo.ContentSub = html.EscapeString(contentSub)
  180. //updateTime, _ := time.ParseInLocation(utils.FormatDateTime, articleResult.UpdateDate, time.Local)
  181. reportInfo.ModifyTime = articleResult.UpdateDate
  182. // 报告更新
  183. updateCols := []string{"ClassifyIdFirst","ClassifyNameFirst","ClassifyIdSecond","ClassifyNameSecond","Title","Abstract","Author","Frequency","State","Content","ContentSub","ModifyTime","PublishTime"}
  184. err = reportInfo.UpdateReport(updateCols)
  185. if err != nil {
  186. err = fmt.Errorf("更新报告失败, Err: %s", err.Error())
  187. return
  188. }
  189. go UpdateReportEs(reportInfo.Id, state)
  190. if state == models.ReportStatePublished {
  191. // 报告权限处理
  192. go handleReportPermission(int64(reportInfo.Id), reportInfo.ClassifyIdSecond)
  193. }else {
  194. // 重置小程序详情页海报
  195. _ = ResetMiniProgramReportDetailCover(reportInfo.Id)
  196. }
  197. }else if reportInfo.Id == 0 {
  198. // 报告不存在,创建报告
  199. // 判断状态
  200. if (articleResult.PublishStatus == 2 || articleResult.PublishStatus == 4) && articleResult.IsActive {
  201. var contentSub string
  202. if articleResult.Content.Body != "" {
  203. contentSub, err = GetReportContentSub(articleResult.Content.Body)
  204. if err != nil {
  205. go alarm_msg.SendAlarmMsg("ContentSub 失败,Err:"+err.Error(), 3)
  206. }
  207. }
  208. // 已发布状态
  209. state := models.ReportStatePublished
  210. // 协作方式,1:个人,2:多人协作。默认:1
  211. collaborateType := 1
  212. // 报告布局,1:常规布局,2:智能布局。默认:1
  213. reportLayout := 1
  214. // 是否公开发布,1:是,2:否
  215. isPublicPublish := 1
  216. abstract := strings.ReplaceAll(articleResult.Content.Abstract, "<p>", "")
  217. abstract = strings.ReplaceAll(abstract, "</p>", "")
  218. item := new(models.Report)
  219. item.AddType = 1
  220. item.ReportVersion = 2
  221. item.ClassifyIdFirst = classifyFirst.Id
  222. item.ClassifyNameFirst = articleResult.Industry.Name
  223. item.ClassifyIdSecond = classifySecond.Id
  224. item.ClassifyNameSecond = articleResult.Series.Name
  225. item.Title = articleResult.Title
  226. item.Abstract = abstract
  227. item.Author = articleResult.Author.Name
  228. item.Frequency = articleResult.Frequency
  229. item.State = state
  230. item.Content = html.EscapeString(articleResult.Content.Body)
  231. item.Stage = 0
  232. item.ContentSub = html.EscapeString(contentSub)
  233. item.CreateTime = time.Now().Format(utils.FormatDateTime)
  234. //updateTime, _ := time.ParseInLocation(utils.FormatDateTime, articleResult.UpdateDate, time.Local)
  235. item.ModifyTime = articleResult.UpdateDate
  236. item.ReportVersion = 2
  237. item.AdminId = 0
  238. item.AdminRealName = ""
  239. item.PublishTime = articleResult.PublishDate
  240. item.ClassifyIdThird = 0
  241. item.ClassifyNameThird = ""
  242. item.LastModifyAdminId = 0
  243. item.LastModifyAdminName = ""
  244. item.ContentModifyTime = time.Now()
  245. item.NeedSplice = 1
  246. item.ContentStruct = ""
  247. item.HeadImg = ""
  248. item.EndImg = ""
  249. item.CanvasColor = ""
  250. item.HeadResourceId = 0
  251. item.EndResourceId = 0
  252. item.CollaborateType = int8(collaborateType)
  253. item.ReportLayout = int8(reportLayout)
  254. item.IsPublicPublish = int8(isPublicPublish)
  255. createTime, _ := time.ParseInLocation(utils.FormatDate, articleResult.CreateDate, time.Local)
  256. item.ReportCreateTime = createTime
  257. item.RaiReportId = articleResult.ArticleId
  258. // 新增报告及章节
  259. var reportId int64
  260. allGrantUserList := make([]*report.ReportGrant, 0)
  261. reportId, err = models.AddReportAndChapter(item, allGrantUserList, []models.AddReportChapter{})
  262. if err != nil {
  263. err = fmt.Errorf("新增报告及章节失败, Err: " + err.Error())
  264. return
  265. }
  266. reportCode := utils.MD5(strconv.Itoa(int(reportId)))
  267. item.ReportCode = reportCode
  268. // 修改唯一编码
  269. {
  270. go models.ModifyReportCode(reportId, reportCode)
  271. }
  272. // 报告权限处理
  273. go handleReportPermission(reportId, item.ClassifyIdSecond)
  274. // 更新报告Es
  275. _ = UpdateReportEs(int(reportId), 2)
  276. }
  277. }
  278. return
  279. }
  280. // get公共请求方法
  281. func getRaiReportLib(url string) (body []byte, err error) {
  282. if url == "" {
  283. err = fmt.Errorf("url is empty")
  284. return
  285. }
  286. if utils.RaiReportLibAuthorization == "" {
  287. err = fmt.Errorf("authorization is empty")
  288. return
  289. }
  290. defer func() {
  291. if err != nil {
  292. go utils.SendEmail(utils.APPNAME+"【"+utils.RunMode+"】"+"失败提醒", url+"Get ErrMsg:"+err.Error(), utils.EmailSendToUsers)
  293. }
  294. }()
  295. method := "GET"
  296. client := &http.Client{}
  297. req, err := http.NewRequest(method, url, nil)
  298. if err != nil {
  299. return
  300. }
  301. req.Header.Add("Authorization", utils.RaiReportLibAuthorization)
  302. res, err := client.Do(req)
  303. if err != nil {
  304. return
  305. }
  306. defer res.Body.Close()
  307. body, err = ioutil.ReadAll(res.Body)
  308. if err != nil {
  309. return
  310. }
  311. return
  312. }
  313. // 导入固收研究的历史报告,前提是需要确认报告的分类都已经建好,并且已绑定固定收益品种
  314. func RaiReportInit() (err error) {
  315. fmt.Println("开始导入固收研究的历史报告")
  316. defer func() {
  317. if err != nil {
  318. msg := fmt.Sprintf("导入固收研究的历史报告失败, Err: %s", err.Error())
  319. fmt.Println(msg)
  320. go alarm_msg.SendAlarmMsg(msg, 2)
  321. }
  322. }()
  323. // 分页调用接口获取历史报告
  324. totalPage := 1
  325. pageSize := 10
  326. successCount := 0
  327. failCount := 0
  328. for i := 0; i <= totalPage; i++ {
  329. pageindex := i*pageSize
  330. body, er := getRaiReportLib(fmt.Sprintf("%s/articles/mp?take=%d&skip=%d&publish_status=2&industry_id=12", utils.RaiReportLibUrl, pageSize, pageindex))
  331. if er != nil {
  332. err = fmt.Errorf("获取权益报告失败, Err: %s", er.Error())
  333. return
  334. }
  335. // 解析内容,获取真实的分页总数
  336. var articleResultDate models.RaiArticleListResultApi
  337. err = json.Unmarshal(body, &articleResultDate)
  338. if err != nil {
  339. err = fmt.Errorf("获取权益报告失败json解析失败, Err: %s", err.Error())
  340. return
  341. }
  342. if articleResultDate.Code != 0 {
  343. err = fmt.Errorf("获取权益报告失败, Err: %s", articleResultDate.Msg)
  344. return
  345. }
  346. totalPage = articleResultDate.Pagination.PageTotal
  347. articleResultList := articleResultDate.Data
  348. for _, articleResult := range articleResultList {
  349. err = handleInsertRaiReport(articleResult)
  350. if err != nil {
  351. fmt.Printf("导入固收研究的历史报告失败%d, Err: %s\n", articleResult.ArticleId, err.Error())
  352. failCount++
  353. continue
  354. }else {
  355. fmt.Printf("导入固收研究的历史报告成功%d\n", articleResult.ArticleId)
  356. successCount++
  357. }
  358. }
  359. }
  360. fmt.Printf("导入固收研究的历史报告完成,成功%d,失败%d\n", successCount, failCount)
  361. return nil
  362. }