package services import ( "encoding/json" "eta/eta_api/cache" "eta/eta_api/models" "eta/eta_api/models/report" "eta/eta_api/services/alarm_msg" "eta/eta_api/utils" "fmt" "html" "io/ioutil" "net/http" "strconv" "strings" "time" ) func AutoInsertRaiReport() { for { utils.Rc.Brpop(utils.FICC_ARTICLE_UPDATE_KEY, func(b []byte) { var log models.RaiReportNotifyRedis if err := json.Unmarshal(b, &log); err != nil { utils.FileLog.Info("获取权益报告并更新处理Redis队列消息失败: json unmarshal wrong!", err.Error()) go alarm_msg.SendAlarmMsg(fmt.Sprintf("获取权益报告并更新处理Redis队列消息失败: json unmarshal wrong! %s", err.Error()), 2) } // 这里直接go出去会出现并发,导致文章md5ID唯一索引限制报错 err := HandleInsertRaiReport(log.ArticleId) if err != nil { utils.FileLog.Info("获取权益报告并更新处理Redis队列消息失败: HandleInsertRaiReport ", err.Error()) go alarm_msg.SendAlarmMsg(fmt.Sprintf("获取权益报告并更新处理Redis队列消息失败: HandleInsertRaiReport %s", err.Error()), 2) } }) } } func HandleInsertRaiReport(raiReportId int) (err error) { // 设置缓存,防止重复处理 defer func() { if err != nil { msg := fmt.Sprintf("处理同步过来的文章失败"+"HandleArticleListByApi ErrMsg:%s artcleId:%d", err.Error(), raiReportId) utils.FileLog.Info(msg, 2) go alarm_msg.SendAlarmMsg(msg, 2) } }() body, err := getRaiReportLib(fmt.Sprintf("%s/articles/%d", utils.RaiReportLibUrl, raiReportId)) if err != nil { fmt.Println(err) err = fmt.Errorf("获取权益报告失败, Err: %s", err.Error()) return } var articleResultDate models.ArticleDetailResultApi err = json.Unmarshal(body, &articleResultDate) if err != nil { fmt.Println("Getres.PublicGetDate Err:", err.Error()) return err } articleResult := articleResultDate.Data err = handleInsertRaiReport(articleResult) if err != nil { return err } return } func handleInsertRaiReport(articleResult models.ArticleResultApidate) (err error) { raiReportId := articleResult.ArticleId // 设置缓存,防止重复处理 cacheKey := fmt.Sprintf("rai_report_notify_redis_%d", raiReportId) cacheValue := utils.Rc.GetStr(cacheKey) if cacheValue != "" { return nil } utils.Rc.SetNX(cacheKey, "1", 10*time.Second) defer func() { if err != nil { msg := fmt.Sprintf("处理同步过来的文章失败"+"HandleArticleListByApi ErrMsg:%s artcleId:%d", err.Error(), raiReportId) utils.FileLog.Info(msg, 2) go alarm_msg.SendAlarmMsg(msg, 2) } utils.Rc.Delete(cacheKey) }() // var clueApiUrl string // clueApiUrl = fmt.Sprint(utils.RaiReportLibUrl, "articles/", raiReportId) // fmt.Println(clueApiUrl) // body, err := getRaiReportLib(clueApiUrl) // if err != nil { // fmt.Println(err) // err = fmt.Errorf("获取权益报告失败, Err: %s", err.Error()) // return // } // var articleResultDate models.ArticleDetailResultApi // err = json.Unmarshal(body, &articleResultDate) // if err != nil { // fmt.Println("Getres.PublicGetDate Err:", err.Error()) // return err // } // articleResult := articleResultDate.Data // 判断是否是固收研究 if articleResult.IndustrId != 12 { return nil } // 根据分类名称查找分类信息 classifyItemList, e := models.GetReportClassifyByClassifyName([]string{articleResult.Industry.Name, articleResult.Series.Name}) if e != nil { err = fmt.Errorf("GetReportClassifyByClassifyName err: %s", e.Error()) return err } classifyMap := make(map[string]*models.Classify) for _, v := range classifyItemList { classifyMap[v.ClassifyName] = v } classifyFirst, ok := classifyMap[articleResult.Industry.Name] if !ok { err = fmt.Errorf("一级分类不存在") return err } classifySecond, ok := classifyMap[articleResult.Series.Name] if !ok { // 新增二级分类 err, _, _ = AddReportClassify(articleResult.Series.Name, classifyFirst.Id, []int{}) if err != nil { err = fmt.Errorf("添加二级分类失败, Err: %s", err.Error()) return err } item, err := models.GetClassifyByName(articleResult.Series.Name, classifyFirst.Id) if err != nil { err = fmt.Errorf("添加二级分类失败, Err: %s", err.Error()) return err } classifySecond = item } // 判断分类的层级关系是否合理 if classifyFirst.Id != classifySecond.ParentId { err = fmt.Errorf("分类层级关系不合理") return err } // 判断报告是否已存在, 如果存在则更新报告,如果不存在则创建报告 reportInfo, err := models.GetReportByRaiReportId(articleResult.ArticleId) if err != nil && err.Error() != utils.ErrNoRow() { return err } if err == nil && reportInfo.Id > 0 { var contentSub string if articleResult.Content.Body != "" { contentSub, err = GetReportContentSub(articleResult.Content.Body) if err != nil { go alarm_msg.SendAlarmMsg("ContentSub 失败,Err:"+err.Error(), 3) } } state := reportInfo.State publishSource := `publish` //同步至知识库 // 报告已存在,更新报告 if (articleResult.PublishStatus == 2 || articleResult.PublishStatus == 4) && articleResult.IsActive { // 报告状态为未发布,则更新报告 state = models.ReportStatePublished reportInfo.PublishTime = articleResult.PublishDate } else if !articleResult.IsActive { publishSource = `un_publish` //同步至知识库 // 删除报告 err = models.DeleteReport(reportInfo.Id) if err != nil { err = fmt.Errorf("删除报告失败, Err: %s", err.Error()) return } go UpdateReportEs(reportInfo.Id, 1) return } else { publishSource = `un_publish` //同步至知识库 // 报告状态为未发布,则更新报告 state = models.ReportStateUnpublished reportInfo.PublishTime = articleResult.PublishDate } // 过滤Abstracthtml标签,把

标签去掉 abstract := strings.ReplaceAll(articleResult.Content.Abstract, "

", "") abstract = strings.ReplaceAll(abstract, "

", "") reportInfo.ClassifyIdFirst = classifyFirst.Id reportInfo.ClassifyNameFirst = articleResult.Industry.Name reportInfo.ClassifyIdSecond = classifySecond.Id reportInfo.ClassifyNameSecond = articleResult.Series.Name reportInfo.Title = articleResult.Title reportInfo.Abstract = abstract reportInfo.Author = articleResult.Author.Name reportInfo.Frequency = articleResult.Frequency reportInfo.State = state reportInfo.Content = html.EscapeString(articleResult.Content.Body) reportInfo.ContentSub = html.EscapeString(contentSub) //updateTime, _ := time.ParseInLocation(utils.FormatDateTime, articleResult.UpdateDate, time.Local) reportInfo.ModifyTime = articleResult.UpdateDate // 报告更新 updateCols := []string{"ClassifyIdFirst", "ClassifyNameFirst", "ClassifyIdSecond", "ClassifyNameSecond", "Title", "Abstract", "Author", "Frequency", "State", "Content", "ContentSub", "ModifyTime", "PublishTime"} err = reportInfo.UpdateReport(updateCols) if err != nil { err = fmt.Errorf("更新报告失败, Err: %s", err.Error()) return } go UpdateReportEs(reportInfo.Id, state) if state == models.ReportStatePublished { // 报告权限处理 go handleReportPermission(int64(reportInfo.Id), reportInfo.ClassifyIdSecond) } else { // 重置小程序详情页海报 _ = ResetMiniProgramReportDetailCover(reportInfo.Id) } // 报告发布成功后,需要将相关信息入知识库 go cache.RagEtaReportOpToCache(reportInfo.Id, 0, publishSource) } else if reportInfo.Id == 0 { err = nil // 报告不存在,创建报告 // 判断状态 if (articleResult.PublishStatus == 2 || articleResult.PublishStatus == 4) && articleResult.IsActive { var contentSub string if articleResult.Content.Body != "" { contentSub, err = GetReportContentSub(articleResult.Content.Body) if err != nil { go alarm_msg.SendAlarmMsg("ContentSub 失败,Err:"+err.Error(), 3) } } // 已发布状态 state := models.ReportStatePublished // 协作方式,1:个人,2:多人协作。默认:1 collaborateType := 1 // 报告布局,1:常规布局,2:智能布局。默认:1 reportLayout := 1 // 是否公开发布,1:是,2:否 isPublicPublish := 1 abstract := strings.ReplaceAll(articleResult.Content.Abstract, "

", "") abstract = strings.ReplaceAll(abstract, "

", "") item := new(models.Report) item.AddType = 1 item.ReportVersion = 2 item.ClassifyIdFirst = classifyFirst.Id item.ClassifyNameFirst = articleResult.Industry.Name item.ClassifyIdSecond = classifySecond.Id item.ClassifyNameSecond = articleResult.Series.Name item.Title = articleResult.Title item.Abstract = abstract item.Author = articleResult.Author.Name item.Frequency = articleResult.Frequency item.State = state item.Content = html.EscapeString(articleResult.Content.Body) item.Stage = 0 item.ContentSub = html.EscapeString(contentSub) item.CreateTime = time.Now().Format(utils.FormatDateTime) //updateTime, _ := time.ParseInLocation(utils.FormatDateTime, articleResult.UpdateDate, time.Local) item.ModifyTime = articleResult.UpdateDate item.ReportVersion = 2 item.AdminId = 0 item.AdminRealName = "" item.PublishTime = articleResult.PublishDate item.ClassifyIdThird = 0 item.ClassifyNameThird = "" item.LastModifyAdminId = 0 item.LastModifyAdminName = "" item.ContentModifyTime = time.Now() item.NeedSplice = 1 item.ContentStruct = "" item.HeadImg = "" item.EndImg = "" item.CanvasColor = "" item.HeadResourceId = 0 item.EndResourceId = 0 item.CollaborateType = int8(collaborateType) item.ReportLayout = int8(reportLayout) item.IsPublicPublish = int8(isPublicPublish) createTime, _ := time.ParseInLocation(utils.FormatDate, articleResult.CreateDate, time.Local) item.ReportCreateTime = createTime item.RaiReportId = articleResult.ArticleId // 新增报告及章节 var reportId int64 allGrantUserList := make([]*report.ReportGrant, 0) reportId, err = models.AddReportAndChapter(item, allGrantUserList, []models.AddReportChapter{}) if err != nil { err = fmt.Errorf("新增报告及章节失败, Err: " + err.Error()) return } reportCode := utils.MD5(strconv.Itoa(int(reportId))) item.ReportCode = reportCode // 修改唯一编码 { go models.ModifyReportCode(reportId, reportCode) } // 报告权限处理 go handleReportPermission(reportId, item.ClassifyIdSecond) // 更新报告Es _ = UpdateReportEs(int(reportId), 2) // 报告发布成功后,需要将相关信息入知识库 go cache.RagEtaReportOpToCache(int(reportId), 0, `publish`) } } return } // get公共请求方法 func getRaiReportLib(url string) (body []byte, err error) { if url == "" { err = fmt.Errorf("url is empty") return } if utils.RaiReportLibAuthorization == "" { err = fmt.Errorf("authorization is empty") return } defer func() { if err != nil { go utils.SendEmail(utils.APPNAME+"【"+utils.RunMode+"】"+"失败提醒", url+"Get ErrMsg:"+err.Error(), utils.EmailSendToUsers) } }() method := "GET" client := &http.Client{} req, err := http.NewRequest(method, url, nil) if err != nil { return } req.Header.Add("Authorization", utils.RaiReportLibAuthorization) res, err := client.Do(req) if err != nil { return } defer res.Body.Close() body, err = ioutil.ReadAll(res.Body) if err != nil { return } return } // 导入固收研究的历史报告,前提是需要确认报告的分类都已经建好,并且已绑定固定收益品种 func RaiReportInit() (err error) { fmt.Println("开始导入固收研究的历史报告") defer func() { if err != nil { msg := fmt.Sprintf("导入固收研究的历史报告失败, Err: %s", err.Error()) fmt.Println(msg) go alarm_msg.SendAlarmMsg(msg, 2) } }() // 分页调用接口获取历史报告 totalPage := 1 pageSize := 10 successCount := 0 failCount := 0 for i := 0; i <= totalPage; i++ { pageindex := i * pageSize body, er := getRaiReportLib(fmt.Sprintf("%s/articles/mp?take=%d&skip=%d&publish_status=2&industry_id=12", utils.RaiReportLibUrl, pageSize, pageindex)) if er != nil { err = fmt.Errorf("获取权益报告失败, Err: %s", er.Error()) return } // 解析内容,获取真实的分页总数 var articleResultDate models.RaiArticleListResultApi err = json.Unmarshal(body, &articleResultDate) if err != nil { err = fmt.Errorf("获取权益报告失败json解析失败, Err: %s", err.Error()) return } if articleResultDate.Code != 0 { err = fmt.Errorf("获取权益报告失败, Err: %s", articleResultDate.Msg) return } totalPage = articleResultDate.Pagination.PageTotal articleResultList := articleResultDate.Data for _, articleResult := range articleResultList { err = handleInsertRaiReport(articleResult) if err != nil { fmt.Printf("导入固收研究的历史报告失败%d, Err: %s\n", articleResult.ArticleId, err.Error()) failCount++ continue } else { fmt.Printf("导入固收研究的历史报告成功%d\n", articleResult.ArticleId) successCount++ } } } fmt.Printf("导入固收研究的历史报告完成,成功%d,失败%d\n", successCount, failCount) return nil }