report_service.go 34 KB


  1. package report
  2. import (
  3. "encoding/json"
  4. "eta/eta_mini_ht_api/common/component/config"
  5. "eta/eta_mini_ht_api/common/component/es"
  6. logger "eta/eta_mini_ht_api/common/component/log"
  7. "eta/eta_mini_ht_api/common/contants"
  8. "eta/eta_mini_ht_api/common/utils/page"
  9. stringUtils "eta/eta_mini_ht_api/common/utils/string"
  10. configService "eta/eta_mini_ht_api/domian/config"
  11. analystService "eta/eta_mini_ht_api/domian/financial_analyst"
  12. messageDomian "eta/eta_mini_ht_api/domian/message"
  13. userService "eta/eta_mini_ht_api/domian/user"
  14. "eta/eta_mini_ht_api/models"
  15. configDao "eta/eta_mini_ht_api/models/config"
  16. permissionDao "eta/eta_mini_ht_api/models/config"
  17. "eta/eta_mini_ht_api/models/eta"
  18. etaDao "eta/eta_mini_ht_api/models/eta"
  19. "eta/eta_mini_ht_api/models/ht"
  20. mediaDao "eta/eta_mini_ht_api/models/image"
  21. merchantDao "eta/eta_mini_ht_api/models/merchant"
  22. "eta/eta_mini_ht_api/models/message"
  23. messageDao "eta/eta_mini_ht_api/models/message"
  24. reportDao "eta/eta_mini_ht_api/models/report"
  25. userDao "eta/eta_mini_ht_api/models/user"
  26. "math/rand"
  27. "strconv"
  28. "strings"
  29. "sync"
  30. "time"
  31. )
  32. const (
  33. SourceETA = "ETA"
  34. SourceHT = "HT"
  35. DESC models.Order = "desc"
  36. ASC models.Order = "asc"
  37. ESColumn = "title"
  38. ESRangeColumn = "reportId"
  39. )
  40. var (
  41. sortField = []string{"_score:desc"}
  42. htConfig = config.GetConfig(contants.HT).(*config.HTBizConfig)
  43. )
  44. func elastic() *es.ESClient {
  45. return es.GetInstance()
  46. }
  47. // ESReport Report ES研报mapping
  48. type ESReport struct {
  49. ReportID int `json:"reportId"`
  50. OrgId int `json:"orgId"`
  51. Title string `json:"title"`
  52. Author string `json:"author"`
  53. Source reportDao.ReportSource `json:"source"`
  54. Abstract string `json:"abstract"`
  55. CoverSrc int `json:"coverSrc"`
  56. Status reportDao.ReportStatus `json:"status"`
  57. PublishedTime string `json:"publishedTime"`
  58. }
  59. type ReportDTO struct {
  60. Type string `json:"type"`
  61. ReportID int `json:"reportId"`
  62. OrgId int `json:"orgId"`
  63. Title string `json:"title"`
  64. Author string `json:"author"`
  65. AuthorInfo []Anthor `json:"authorInfo"`
  66. Source string `json:"source"`
  67. Abstract string `json:"abstract"`
  68. PublishedTime string `json:"publishedTime"`
  69. RiskLevel string `json:"riskLevel"`
  70. PlateName string `json:"-"`
  71. ClassifyId int `json:"-"`
  72. SecondPermission map[int]string `json:"-"`
  73. Permissions map[int]string `json:"-"`
  74. PermissionNames interface {
  75. } `json:"permissionNames"`
  76. Highlight []string `json:"highlight"`
  77. Detail json.RawMessage `json:"detail"`
  78. PdfUrl string `json:"pdfUrl"`
  79. CoverSrc int `json:"coverSrc"`
  80. CoverUrl string `json:"coverUrl"`
  81. Login bool `json:"login"`
  82. RiskLevelStatus string `json:"riskLevelStatus"`
  83. IsFree bool `json:"isFree"`
  84. IsSubscribe bool `json:"isSubscribe"`
  85. Price string `json:"price"`
  86. ProductId int `json:"productId"`
  87. IsPackage bool `json:"isPackage"`
  88. Score float64 `json:"score"`
  89. Show bool `json:"-"`
  90. }
  91. type Detail struct {
  92. }
  93. type Anthor struct {
  94. Id int `json:"id"`
  95. Name string `json:"name"`
  96. HeadImgUrl string `json:"headImgUrl"`
  97. }
  98. func GetReportById(reportId int) (ReportDTO ReportDTO, err error) {
  99. report, err := reportDao.GetReportById(reportId)
  100. if err != nil {
  101. return
  102. }
  103. ReportDTO = convertReportDTO(report, true)
  104. authorStr := ReportDTO.Author
  105. authorNames := strings.Split(authorStr, ",")
  106. authorNames = stringUtils.RemoveEmptyStrings(authorNames)
  107. var authorList []Anthor
  108. if len(authorNames) > 0 {
  109. for _, name := range authorNames {
  110. var author analystService.FinancialAnalystDTO
  111. author, err = analystService.GetAnalystByName(name)
  112. var item Anthor
  113. if err != nil {
  114. item = Anthor{
  115. Id: 0,
  116. Name: name,
  117. HeadImgUrl: "",
  118. }
  119. } else {
  120. item = Anthor{
  121. Id: author.Id,
  122. Name: author.Name,
  123. HeadImgUrl: author.HeadImgUrl,
  124. }
  125. }
  126. authorList = append(authorList, item)
  127. }
  128. }
  129. ReportDTO.AuthorInfo = authorList
  130. return
  131. }
  132. func GetTotalPageCountByAnalyst(analyst string, permissionIds []int) (total int64, latestId int64, ids []int) {
  133. ids, err := reportDao.GetReportsByAnalyst(analyst)
  134. if err != nil {
  135. logger.Error("查询研究研报告列表id失败:%v", err)
  136. return
  137. }
  138. var wg sync.WaitGroup
  139. wg.Add(2)
  140. var htOrgIds []int
  141. var etaOrgIds []int
  142. go func() {
  143. defer wg.Done()
  144. htOrgIds, err = getHtOrgIds(permissionIds)
  145. if err != nil {
  146. logger.Error("品种筛选ht报告id失败:%v", err)
  147. }
  148. }()
  149. go func() {
  150. defer wg.Done()
  151. etaOrgIds, err = getEtaOrgIds(permissionIds)
  152. if err != nil {
  153. logger.Error("品种筛选eta报告id失败:%v", err)
  154. }
  155. }()
  156. wg.Wait()
  157. totalCol := int64(len(etaOrgIds) + len(htOrgIds))
  158. if totalCol == 0 {
  159. latestId = 0
  160. return
  161. }
  162. if len(etaOrgIds) == 0 && len(htOrgIds) == 0 {
  163. logger.Info("没有符合权限的研报")
  164. return
  165. }
  166. orgIds := make(map[string][]int, 2)
  167. if len(etaOrgIds) == 0 {
  168. orgIds["ETA"] = []int{}
  169. } else {
  170. orgIds["ETA"] = etaOrgIds
  171. }
  172. if len(htOrgIds) == 0 {
  173. orgIds["HT"] = []int{}
  174. } else {
  175. orgIds["HT"] = htOrgIds
  176. }
  177. permitReportIds, err := reportDao.GetReportIdListByOrgIds(orgIds)
  178. if err != nil {
  179. logger.Error("根据原始报告id获取报告id列表失败:%v", err)
  180. return
  181. }
  182. var filterReportIds []int
  183. for _, id := range ids {
  184. for _, permitReportId := range permitReportIds {
  185. if id == permitReportId {
  186. filterReportIds = append(filterReportIds, id)
  187. }
  188. }
  189. }
  190. if len(filterReportIds) == 0 {
  191. logger.Info("没有符合权限的研究员研报")
  192. return
  193. }
  194. ids = filterReportIds
  195. //获取一下下架的报告产品
  196. var disCardReportIds []int
  197. offSaleProducts, err := merchantDao.GetOffSaleProducts([]merchantDao.MerchantProductType{merchantDao.Report, merchantDao.Package})
  198. if err != nil {
  199. logger.Error("获取下架的报告产品失败:%v", err)
  200. return
  201. }
  202. var ProductPermissionIds []int
  203. for _, product := range offSaleProducts {
  204. if product.Type == "package" {
  205. ProductPermissionIds = append(ProductPermissionIds, product.SourceId)
  206. }
  207. if product.Type == "report" {
  208. disCardReportIds = append(disCardReportIds, product.SourceId)
  209. }
  210. }
  211. if len(ProductPermissionIds) > 0 {
  212. wg.Add(2)
  213. var permissionNames []string
  214. var classifyIds []int
  215. go func() {
  216. defer wg.Done()
  217. var permissionErr error
  218. permissionNames, permissionErr = GetPermissionNamesByPermissionIds(ProductPermissionIds)
  219. if permissionErr != nil {
  220. logger.Error("获取ETA品种名称失败:%v", err)
  221. }
  222. }()
  223. go func() {
  224. defer wg.Done()
  225. var classifyErr error
  226. classifyIds, classifyErr = permissionDao.GetClassifyIdsByPermissionIds(ProductPermissionIds)
  227. if classifyErr != nil {
  228. logger.Error("获取ETA报告分类id失败:%v", err)
  229. }
  230. }()
  231. wg.Wait()
  232. disCardIds, _ := reportDao.GetHiddenReportIds(classifyIds, permissionNames)
  233. if len(disCardIds) > 0 {
  234. disCardReportIds = append(disCardReportIds, disCardIds...)
  235. }
  236. }
  237. //对数据去重
  238. disCardReportIds = uniqueArray(disCardReportIds)
  239. //获取报告中还包含上架套餐的id
  240. if len(disCardReportIds) > 0 {
  241. reportIdsSalePackage, _ := merchantDao.GetReportOnSalePackageIds(disCardReportIds)
  242. reportIdsSaleProduct, _ := merchantDao.GetOnSaleReportIds(disCardReportIds)
  243. showReportMap := make(map[int]bool)
  244. disCardMap := make(map[int]bool)
  245. for _, reportId := range reportIdsSalePackage {
  246. showReportMap[reportId] = true
  247. }
  248. for _, reportId := range reportIdsSaleProduct {
  249. showReportMap[reportId] = true
  250. }
  251. var filterDisCardReportIds []int
  252. for _, id := range disCardReportIds {
  253. if _, ok := showReportMap[id]; !ok {
  254. filterDisCardReportIds = append(filterDisCardReportIds, id)
  255. disCardMap[id] = true
  256. }
  257. }
  258. disCardReportIds = filterDisCardReportIds
  259. var cardReportIds []int
  260. for _, id := range filterReportIds {
  261. if _, ok := disCardMap[id]; !ok {
  262. cardReportIds = append(cardReportIds, id)
  263. }
  264. }
  265. filterReportIds = cardReportIds
  266. }
  267. total = int64(len(filterReportIds))
  268. latestId = int64(findMax(filterReportIds))
  269. return
  270. }
  271. // findMaxWithError 函数用于找到整型数组中的最大值,并返回错误信息
  272. func findMax(nums []int) (max int) {
  273. if len(nums) == 0 {
  274. return 0
  275. }
  276. // 初始化最大值为数组的第一个元素
  277. max = nums[0]
  278. // 遍历数组,找到最大值
  279. for _, num := range nums {
  280. if num > max {
  281. max = num
  282. }
  283. }
  284. return
  285. }
  286. func SearchMaxReportIdWithRange(key string, reportIds []int) (total int64) {
  287. sort := []string{"reportId:desc"}
  288. var docIds []string
  289. for _, reportId := range reportIds {
  290. docIds = append(docIds, strconv.Itoa(reportId))
  291. }
  292. request := CountByDocId(key, sort, docIds)
  293. //同步es
  294. re, err := elastic().Count(request)
  295. if err != nil {
  296. logger.Error("es搜索异常:%v", err)
  297. }
  298. count := re.Count
  299. total = int64(count)
  300. if total > 0 {
  301. request = match(key, 0, count, sort)
  302. re, err = elastic().Search(request)
  303. if err != nil {
  304. logger.Error("es搜索异常:%v", err)
  305. }
  306. hits := elastic().GetSource(re.Hits)
  307. data := hits[0].Source
  308. report := ReportDTO{}
  309. err = json.Unmarshal(data, &report)
  310. if err != nil {
  311. logger.Error("获取当前最大研报id失败:%v", err)
  312. return
  313. }
  314. }
  315. return
  316. }
  317. func SearchReportList(key string, ids []int, from int, size int, max int64) (reports []ReportDTO, err error) {
  318. //同步es
  319. var docIds []string
  320. for _, id := range ids {
  321. docIds = append(docIds, strconv.Itoa(id))
  322. }
  323. sorts := append(sortField, "publishedTime:desc")
  324. request := matchRangeByDocId(key, from, size, max, sorts, docIds)
  325. re, err := elastic().Search(request)
  326. if err != nil {
  327. logger.Error("es搜索异常:%v", err)
  328. }
  329. hits := elastic().GetSource(re.Hits)
  330. if len(hits) == 0 {
  331. reports = []ReportDTO{}
  332. return
  333. }
  334. for _, hit := range hits {
  335. var content map[string][]string
  336. err = json.Unmarshal(hit.Highlight, &content)
  337. report := ReportDTO{}
  338. err = json.Unmarshal(hit.Source, &report)
  339. if err != nil {
  340. logger.Error("解析研报数据失败:%v", err)
  341. continue
  342. }
  343. report.Highlight = content[ESColumn]
  344. report.Title = report.Highlight[0]
  345. report.PublishedTime = report.PublishedTime[:10]
  346. reports = append(reports, report)
  347. }
  348. return
  349. }
  350. func GetReportPageByAnalyst(pageInfo page.PageInfo, analyst string, reportIds []int) (list []ReportDTO, err error) {
  351. offset := page.StartIndex(pageInfo.Current, pageInfo.PageSize)
  352. reports, err := reportDao.GetReportPageByAnalyst(pageInfo.LatestId, pageInfo.PageSize, offset, analyst, reportIds)
  353. if err != nil {
  354. logger.Error("分页查询报告列表失败:%v", err)
  355. return
  356. }
  357. list = make([]ReportDTO, 0)
  358. if reports != nil {
  359. for _, report := range reports {
  360. dto := convertReportDTO(report, false)
  361. list = append(list, dto)
  362. }
  363. }
  364. return
  365. }
  366. func GetReportPageByOrgIds(pageInfo page.PageInfo, orgIds map[string][]int, discardIds []int) (list []ReportDTO, err error) {
  367. offset := page.StartIndex(pageInfo.Current, pageInfo.PageSize)
  368. reports, err := reportDao.GetReportPageByOrgIds(pageInfo.LatestId, pageInfo.PageSize, offset, orgIds, discardIds)
  369. if err != nil {
  370. logger.Error("分页查询报告列表失败:%v", err)
  371. return
  372. }
  373. list = make([]ReportDTO, 0)
  374. if reports != nil {
  375. for _, report := range reports {
  376. dto := convertReportDTO(report, false)
  377. list = append(list, dto)
  378. }
  379. }
  380. return
  381. }
  382. func getETAReportFirstPermissions(id int) (permissionDTOs []configService.PermissionDTO) {
  383. classifyId, err := etaDao.GetReportClassifyById(id)
  384. if err != nil || classifyId == 0 {
  385. logger.Error("获取研报分类信息失败:%v", err)
  386. return
  387. }
  388. permissions, err := permissionDao.GetFirstPermissionsByClassifyID(classifyId)
  389. if err != nil {
  390. logger.Error("获取研报一级品种信息失败:%v", err)
  391. return
  392. }
  393. for _, permission := range permissions {
  394. permissionDTOs = append(permissionDTOs, convertPermissionDTO(permission))
  395. }
  396. return
  397. }
  398. func getHTReportFirstPermissions(id int) (permissionDTOs []configService.PermissionDTO) {
  399. report, err := reportDao.GetReportByOrgId(id, SourceHT)
  400. if err != nil {
  401. logger.Error("获取报告失败:%v", err)
  402. }
  403. permissionName := report.PlateName
  404. plateName, err := reportDao.GetPlateNameByPermissionName(permissionName)
  405. if err != nil {
  406. return []configService.PermissionDTO{}
  407. }
  408. return []configService.PermissionDTO{
  409. {
  410. PermissionId: 0,
  411. PermissionName: plateName,
  412. ParentId: 0,
  413. },
  414. }
  415. }
  416. func getETAReportSecondPermissions(id int) (permissionDTOs []configService.PermissionDTO) {
  417. classifyId, err := reportDao.GetReportClassifyById(id)
  418. //classifyId, err := etaDao.GetReportClassifyById(id)
  419. if err != nil || classifyId == 0 {
  420. logger.Error("获取研报分类信息失败:%v", err)
  421. return
  422. }
  423. permissions, err := permissionDao.GetSecondPermissionsByClassifyID(classifyId)
  424. //permissions, err := eta.GetSecondPermissionsByClassifyID(classifyId)
  425. if err != nil {
  426. logger.Error("获取研报二级品种信息失败:%v", err)
  427. return
  428. }
  429. for _, permission := range permissions {
  430. permissionDTOs = append(permissionDTOs, convertPermissionDTO(permission))
  431. }
  432. return
  433. }
  434. func getHTReportSecondPermissions(id int) (permissionDTOs []configService.PermissionDTO) {
  435. report, err := reportDao.GetReportByOrgId(id, SourceHT)
  436. if err != nil {
  437. logger.Error("获取报告失败:%v", err)
  438. }
  439. var permission permissionDao.Permission
  440. if report.PlateName != "" {
  441. permission, err = permissionDao.GetPermissionByName(report.PlateName)
  442. if err != nil {
  443. logger.Error("获取品种信息失败:%v", err)
  444. }
  445. }
  446. return []configService.PermissionDTO{
  447. {
  448. PermissionId: permission.PermissionId,
  449. PermissionName: report.PlateName,
  450. ParentId: permission.ParentId,
  451. RiskLevel: permission.RiskLevel,
  452. },
  453. }
  454. }
  455. func (es ESReport) GetId() string {
  456. return strconv.Itoa(es.ReportID)
  457. }
  458. func GetETALatestReportId() (id int, err error) {
  459. return reportDao.GetLatestReportIdBySource(reportDao.SourceETA)
  460. }
  461. func GetHTLatestReportId() (id int, err error) {
  462. return reportDao.GetLatestReportIdBySource(reportDao.SourceHT)
  463. }
  464. func InitETAReportList(list []eta.ETAReport) (err error) {
  465. logger.Info("同步研报数量%d", len(list))
  466. var reports []reportDao.Report
  467. for _, etaRp := range list {
  468. var coverSrc int
  469. //var permissions []etaDao.ChartPermission
  470. //permissions, err = etaDao.GetSecondPermissionsByClassifyID(etaRp.ClassifyID)
  471. var permissions []permissionDao.Permission
  472. permissions, err = permissionDao.GetSecondPermissionsByClassifyID(etaRp.ClassifyID)
  473. if err != nil || len(permissions) == 0 {
  474. logger.Error("获取研报二级品种信息失败:%v", err)
  475. coverSrc = 0
  476. } else {
  477. coverSrc = 0
  478. for _, permission := range permissions {
  479. permissionsId := permission.PermissionId
  480. var ids []int
  481. ids, err = mediaDao.GetIdsByPermissionId(permissionsId)
  482. if err != nil {
  483. logger.Error("获取图片资源失败:%v", err)
  484. continue
  485. }
  486. if ids == nil || len(ids) == 0 {
  487. continue
  488. }
  489. src := rand.NewSource(time.Now().UnixNano())
  490. r := rand.New(src)
  491. // 从切片中随机选择一个元素
  492. randomIndex := r.Intn(len(ids))
  493. coverSrc = ids[randomIndex]
  494. break
  495. }
  496. }
  497. destRp := convertEtaReport(etaRp, reportDao.StatusPublish)
  498. //destRp.Author = authorName
  499. destRp.CoverSrc = coverSrc
  500. reports = append(reports, destRp)
  501. //}
  502. }
  503. err = reportDao.BatchInsertReport(&reports)
  504. if err != nil {
  505. logger.Error("同步ETA研报失败:%v", err)
  506. return
  507. }
  508. return initES(reports)
  509. }
  510. func etaStatus(status int) reportDao.ReportStatus {
  511. if status == etaDao.Passed || status == etaDao.Published {
  512. return reportDao.StatusPublish
  513. } else {
  514. return reportDao.StatusUnPublish
  515. }
  516. }
  517. func SyncETAReportList(list []eta.ETAReport) (err error) {
  518. logger.Info("同步研报数量%d", len(list))
  519. var reports []reportDao.Report
  520. for _, etaRp := range list {
  521. var coverSrc int
  522. //var permissions []etaDao.ChartPermission
  523. //permissions, err = etaDao.GetSecondPermissionsByClassifyID(etaRp.ClassifyID)
  524. var permissions []permissionDao.Permission
  525. permissions, err = permissionDao.GetSecondPermissionsByClassifyID(etaRp.ClassifyID)
  526. if err != nil || len(permissions) == 0 {
  527. logger.Error("获取研报二级品种信息失败:%v", err)
  528. coverSrc = 0
  529. } else {
  530. coverSrc = 0
  531. for _, permission := range permissions {
  532. permissionsId := permission.PermissionId
  533. var ids []int
  534. ids, err = mediaDao.GetIdsByPermissionId(permissionsId)
  535. if err != nil {
  536. logger.Error("获取图片资源失败:%v", err)
  537. continue
  538. }
  539. if ids == nil || len(ids) == 0 {
  540. continue
  541. }
  542. src := rand.NewSource(time.Now().UnixNano())
  543. r := rand.New(src)
  544. // 从切片中随机选择一个元素
  545. randomIndex := r.Intn(len(ids))
  546. coverSrc = ids[randomIndex]
  547. break
  548. }
  549. }
  550. status := etaStatus(etaRp.State)
  551. destRp := convertEtaReport(etaRp, status)
  552. destRp.CoverSrc = coverSrc
  553. reports = append(reports, destRp)
  554. }
  555. esList, err := reportDao.InsertOrUpdateReport(reports, SourceETA)
  556. if esList == nil {
  557. return
  558. }
  559. return syncESAndSendMessage(esList)
  560. }
  561. type UpdateESReport struct {
  562. Title string `json:"title"`
  563. Author string `json:"author"`
  564. Abstract string `json:"abstract"`
  565. PublishedTime string `json:"publishedTime"`
  566. Status string `json:"status"`
  567. }
  568. func syncESAndSendMessage(reports []reportDao.Report) (err error) {
  569. var esReports []es.ESBase
  570. for _, etaRp := range reports {
  571. esRp := convertEsReport(etaRp)
  572. esReports = append(esReports, esRp)
  573. }
  574. //同步es
  575. for _, report := range reports {
  576. var exist bool
  577. exist, err = elastic().Exist(htConfig.GetReportIndex(), report.ID)
  578. if err != nil {
  579. logger.Error("查询es失败,reportId::%d,err:%v", report.ID, err)
  580. }
  581. if exist {
  582. update := UpdateESReport{
  583. Title: report.Title,
  584. Author: report.Author,
  585. PublishedTime: report.PublishedTime,
  586. Abstract: report.Abstract,
  587. Status: string(report.Status),
  588. }
  589. success := elastic().Update(htConfig.GetReportIndex(), report.ID, update)
  590. if !success {
  591. logger.Error("更新es失败,reportId::%d,err:%v", report.ID, err)
  592. }
  593. if report.Status == reportDao.StatusUnPublish {
  594. //隐藏热度搜索
  595. err = userDao.HiddenFlows(report.ID, message.ReportSourceType)
  596. if err != nil {
  597. logger.Error("隐藏热度搜索失败,reportId::%d,err:%v", report.ID, err)
  598. }
  599. } else {
  600. err = userDao.ShowFlows(report.ID, message.ReportSourceType)
  601. if err != nil {
  602. logger.Error("重置热度搜索失败,reportId::%d,err:%v", report.ID, err)
  603. }
  604. }
  605. } else {
  606. insert := ESReport{
  607. ReportID: report.ID,
  608. OrgId: report.OrgID,
  609. Title: report.Title,
  610. Author: report.Author,
  611. Source: report.Source,
  612. Abstract: report.Abstract,
  613. CoverSrc: report.CoverSrc,
  614. Status: report.Status,
  615. PublishedTime: report.PublishedTime,
  616. }
  617. success := elastic().CreateDocument(htConfig.GetReportIndex(), report.ID, insert)
  618. if !success {
  619. logger.Error("创建es文档失败,reportId::%d,err:%v", report.ID, err)
  620. }
  621. }
  622. }
  623. //err = elastic().BulkInsert(htConfig.GetReportIndex(), esReports)
  624. if err != nil {
  625. logger.Error("同步ETA研报到es失败:%v", err)
  626. return
  627. }
  628. //生产meta信息
  629. logger.Info("生成推送META信息")
  630. for _, report := range reports {
  631. if report.Status == reportDao.StatusUnPublish {
  632. logger.Info("报告取消发布,不需要生成推送消息")
  633. continue
  634. }
  635. var From string
  636. switch report.Source {
  637. case SourceETA:
  638. From = "ETA"
  639. case SourceHT:
  640. From = "HT"
  641. default:
  642. From = "UNKNOWN"
  643. }
  644. authors := strings.Split(report.Author, ",")
  645. authors = stringUtils.RemoveEmptyStrings(authors)
  646. if len(authors) > 0 {
  647. for _, authorName := range authors {
  648. userIds := userService.GetPostUser(authorName, report.PublishedTime)
  649. if len(userIds) > 0 {
  650. logger.Info("推送META信息,用户ID:%v", userIds)
  651. var author analystService.FinancialAnalystDTO
  652. author, err = analystService.GetAnalystByName(authorName)
  653. if err != nil {
  654. logger.Error("获取研报作者失败:%v", err)
  655. continue
  656. }
  657. usersStr := stringUtils.IntToStringSlice(userIds)
  658. Meta := messageDao.MetaData{
  659. AuthorName: author.Name,
  660. AuthorId: author.Id,
  661. SourceId: report.ID,
  662. PublishedTime: report.PublishedTime,
  663. }
  664. metaStr, _ := json.Marshal(Meta)
  665. toStr := strings.Join(usersStr, ",")
  666. metaContent := messageDomian.MetaInfoDTO{
  667. From: From,
  668. Meta: string(metaStr),
  669. MetaType: "USER_NOTICE",
  670. SourceType: "REPORT",
  671. To: toStr,
  672. }
  673. err = messageDomian.CreateMetaInfo(metaContent)
  674. if err != nil {
  675. logger.Error("创建Meta信息失败:%v", err)
  676. return err
  677. }
  678. }
  679. }
  680. }
  681. }
  682. return
  683. }
  684. func initES(reports []reportDao.Report) (err error) {
  685. var esReports []es.ESBase
  686. for _, etaRp := range reports {
  687. esRp := convertEsReport(etaRp)
  688. esReports = append(esReports, esRp)
  689. }
  690. //同步es
  691. err = elastic().BulkInsert(htConfig.GetReportIndex(), esReports)
  692. if err != nil {
  693. logger.Error("同步ETA研报到es失败:%v", err)
  694. return
  695. }
  696. return
  697. }
  698. func InitHTReportList(list []ht.HTReport) (noRecord bool, err error) {
  699. var reports []reportDao.Report
  700. //获取系统中ht品种名
  701. permissions, err := reportDao.GetGLAuthorNames()
  702. if err != nil {
  703. logger.Error("获取钢联研报作者失败:%v", err)
  704. return
  705. }
  706. for _, htRp := range list {
  707. for _, permission := range permissions {
  708. if htRp.PermissionName == permission.Permission {
  709. if permission.AuthorNames != "" {
  710. htRp.PublishUserName = permission.AuthorNames
  711. }
  712. destRp := convertHTReport(htRp, reportDao.StatusPublish)
  713. var coverSrc int
  714. var permissionId int
  715. permissionId, err = permissionDao.GetPermissionIdByName(htRp.PermissionName)
  716. //permissionId, err = etaDao.GetPermissionIdByName(htRp.PermissionName)
  717. if err != nil {
  718. logger.Error("HT获取eta品种id失败:%v", err)
  719. coverSrc = 0
  720. }
  721. var ids []int
  722. ids, err = mediaDao.GetIdsByPermissionId(permissionId)
  723. if err != nil {
  724. logger.Error("获取图片资源失败:%v", err)
  725. coverSrc = 0
  726. }
  727. if ids == nil || len(ids) == 0 {
  728. coverSrc = 0
  729. } else {
  730. src := rand.NewSource(time.Now().UnixNano())
  731. r := rand.New(src)
  732. // 从切片中随机选择一个元素
  733. randomIndex := r.Intn(len(ids))
  734. coverSrc = ids[randomIndex]
  735. }
  736. destRp.CoverSrc = coverSrc
  737. destRp.PlateName = htRp.PermissionName
  738. reports = append(reports, destRp)
  739. }
  740. //}
  741. }
  742. }
  743. if len(reports) == 0 {
  744. return true, nil
  745. } else {
  746. logger.Info("同步研报数量%d", len(reports))
  747. err = reportDao.BatchInsertReport(&reports)
  748. if err != nil {
  749. logger.Error("同步HT研报失败:%v", err)
  750. return false, err
  751. }
  752. return false, initES(reports)
  753. }
  754. }
  755. func htStatus(status int, isDelete int) reportDao.ReportStatus {
  756. if isDelete == 1 || status != ht.Publish {
  757. return reportDao.StatusUnPublish
  758. }
  759. return reportDao.StatusPublish
  760. }
  761. func SyncHTReportList(list []ht.HTReport) (noRecord bool, err error) {
  762. var reports []reportDao.Report
  763. permissions, err := reportDao.GetGLAuthorNames()
  764. if err != nil {
  765. logger.Error("获取钢联研报作者失败:%v", err)
  766. return
  767. }
  768. for _, htRp := range list {
  769. for _, permission := range permissions {
  770. if htRp.PermissionName == permission.Permission {
  771. if permission.AuthorNames != "" {
  772. htRp.PublishUserName = permission.AuthorNames
  773. }
  774. status := htStatus(htRp.Status, htRp.IsDelete)
  775. destRp := convertHTReport(htRp, status)
  776. var coverSrc int
  777. var permissionId int
  778. permissionId, err = permissionDao.GetPermissionIdByName(htRp.PermissionName)
  779. //permissionId, err = etaDao.GetPermissionIdByName(htRp.PermissionName)
  780. if err != nil {
  781. logger.Error("HT获取eta品种id失败:%v", err)
  782. coverSrc = 0
  783. }
  784. var ids []int
  785. ids, err = mediaDao.GetIdsByPermissionId(permissionId)
  786. if err != nil {
  787. logger.Error("获取图片资源失败:%v", err)
  788. coverSrc = 0
  789. }
  790. if ids == nil || len(ids) == 0 {
  791. coverSrc = 0
  792. } else {
  793. src := rand.NewSource(time.Now().UnixNano())
  794. r := rand.New(src)
  795. // 从切片中随机选择一个元素
  796. randomIndex := r.Intn(len(ids))
  797. coverSrc = ids[randomIndex]
  798. }
  799. destRp.CoverSrc = coverSrc
  800. destRp.PlateName = htRp.PermissionName
  801. reports = append(reports, destRp)
  802. }
  803. //}
  804. }
  805. }
  806. if len(reports) == 0 {
  807. return true, nil
  808. } else {
  809. logger.Info("同步研报数量%d", len(list))
  810. }
  811. esList, err := reportDao.InsertOrUpdateReport(reports, SourceHT)
  812. if esList == nil {
  813. return false, err
  814. }
  815. return false, syncESAndSendMessage(reports)
  816. }
  817. func GetListOrderByConditionWeekly(week bool, column string, limit int, order models.Order) (dtoList []ReportDTO, err error) {
  818. reports, err := reportDao.GetListOrderByCondition(week, column, limit, order)
  819. if err != nil {
  820. logger.Error("获取研报失败:%v", err)
  821. return
  822. }
  823. for _, reportItem := range reports {
  824. dto := convertReportDTO(reportItem, false)
  825. dtoList = append(dtoList, dto)
  826. }
  827. return
  828. }
  829. func GetListByCondition[T any](column string, ids []T) (dtoList []ReportDTO, err error) {
  830. var values []interface{}
  831. for _, id := range ids {
  832. values = append(values, id)
  833. }
  834. reports, err := reportDao.GetListByCondition(column, ids)
  835. if err != nil {
  836. logger.Error("获取研报失败:%v", err)
  837. return
  838. }
  839. for _, reportItem := range reports {
  840. dto := convertReportDTO(reportItem, false)
  841. dtoList = append(dtoList, dto)
  842. }
  843. return
  844. }
  845. func GetReportByIdListByOrgIds(orgIds map[string][]int) (ids []int, err error) {
  846. return reportDao.GetReportIdListByOrgIds(orgIds)
  847. }
  848. func getHtOrgIds(permissionIds []int) (htOrgIds []int, err error) {
  849. return GetHTReportIdsByPermissionIdsWithRiskLevel(permissionIds)
  850. }
  851. func getEtaOrgIds(permissionIds []int) (htOrgIds []int, err error) {
  852. return GetETAReportIdsByPermissionIdsWithRiskLevel(permissionIds)
  853. }
  854. func GetTotalPageCountByPermissionIds(permissionIds []int) (total int64, latestId int64, ids map[string][]int, disCardReportIds []int) {
  855. var wg sync.WaitGroup
  856. wg.Add(2)
  857. var htOrgIds []int
  858. var etaOrgIds []int
  859. go func() {
  860. defer wg.Done()
  861. var err error
  862. htOrgIds, err = getHtOrgIds(permissionIds)
  863. if err != nil {
  864. logger.Error("品种筛选ht报告id失败:%v", err)
  865. }
  866. }()
  867. go func() {
  868. defer wg.Done()
  869. var err error
  870. etaOrgIds, err = getEtaOrgIds(permissionIds)
  871. if err != nil {
  872. logger.Error("品种筛选eta报告id失败:%v", err)
  873. }
  874. }()
  875. wg.Wait()
  876. totalCol := int64(len(etaOrgIds) + len(htOrgIds))
  877. if totalCol == 0 {
  878. latestId = 0
  879. return
  880. }
  881. ids = make(map[string][]int, 2)
  882. if len(etaOrgIds) == 0 {
  883. ids["ETA"] = []int{}
  884. } else {
  885. ids["ETA"] = etaOrgIds
  886. }
  887. if len(htOrgIds) == 0 {
  888. ids["HT"] = []int{}
  889. } else {
  890. ids["HT"] = htOrgIds
  891. }
  892. //获取一下下架的报告产品
  893. offSaleProducts, err := merchantDao.GetOffSaleProducts([]merchantDao.MerchantProductType{merchantDao.Report, merchantDao.Package})
  894. if err != nil {
  895. logger.Error("获取下架的报告产品失败:%v", err)
  896. return
  897. }
  898. var ProductPermissionIds []int
  899. for _, product := range offSaleProducts {
  900. if product.Type == "package" {
  901. ProductPermissionIds = append(ProductPermissionIds, product.SourceId)
  902. }
  903. if product.Type == "report" {
  904. disCardReportIds = append(disCardReportIds, product.SourceId)
  905. }
  906. }
  907. if len(ProductPermissionIds) > 0 {
  908. wg.Add(2)
  909. var permissionNames []string
  910. var classifyIds []int
  911. go func() {
  912. defer wg.Done()
  913. var permissionErr error
  914. permissionNames, permissionErr = GetPermissionNamesByPermissionIds(ProductPermissionIds)
  915. if permissionErr != nil {
  916. logger.Error("获取ETA品种名称失败:%v", err)
  917. }
  918. }()
  919. go func() {
  920. defer wg.Done()
  921. var classifyErr error
  922. classifyIds, classifyErr = permissionDao.GetClassifyIdsByPermissionIds(ProductPermissionIds)
  923. if classifyErr != nil {
  924. logger.Error("获取ETA报告分类id失败:%v", err)
  925. }
  926. }()
  927. wg.Wait()
  928. disCardIds, _ := reportDao.GetHiddenReportIds(classifyIds, permissionNames)
  929. if len(disCardIds) > 0 {
  930. disCardReportIds = append(disCardReportIds, disCardIds...)
  931. }
  932. }
  933. //对数据去重
  934. disCardReportIds = uniqueArray(disCardReportIds)
  935. //获取报告中还包含上架套餐的id
  936. if len(disCardReportIds) > 0 {
  937. reportIdsSalePackage, _ := merchantDao.GetReportOnSalePackageIds(disCardReportIds)
  938. reportIdsSaleProduct, _ := merchantDao.GetOnSaleReportIds(disCardReportIds)
  939. showReportMap := make(map[int]bool)
  940. for _, reportId := range reportIdsSalePackage {
  941. showReportMap[reportId] = true
  942. }
  943. for _, reportId := range reportIdsSaleProduct {
  944. showReportMap[reportId] = true
  945. }
  946. var filterDisCardReportIds []int
  947. for _, id := range disCardReportIds {
  948. if _, ok := showReportMap[id]; !ok {
  949. filterDisCardReportIds = append(filterDisCardReportIds, id)
  950. }
  951. }
  952. disCardReportIds = filterDisCardReportIds
  953. }
  954. //获取这些id的产品
  955. total, latestId, err = reportDao.GetMaxIdByPermissionIds(ids, disCardReportIds)
  956. if err != nil {
  957. logger.Error("获取筛选报告的最大记录和记录数失败:%v", err)
  958. return
  959. }
  960. return
  961. }
  962. type etaReport struct {
  963. Id int `json:"id"`
  964. ClassifyId int `json:"classifyId"`
  965. PermissionIds []int `json:"permissionIds"`
  966. }
  967. type htReport struct {
  968. Id int `json:"id"`
  969. PlateName string `json:"plateName"`
  970. PermissionId []int `json:"permissionId"`
  971. }
  972. func uniqueArray(arr []int) []int {
  973. uniqueMap := make(map[int]bool)
  974. var result []int
  975. for _, value := range arr {
  976. if _, exists := uniqueMap[value]; !exists {
  977. uniqueMap[value] = true
  978. result = append(result, value)
  979. }
  980. }
  981. return result
  982. }
  983. func convertEtaReport(etaRp eta.ETAReport, status reportDao.ReportStatus) reportDao.Report {
  984. return reportDao.Report{
  985. OrgID: etaRp.ID,
  986. Title: etaRp.Title,
  987. Abstract: etaRp.Abstract,
  988. Author: etaRp.Author,
  989. ClassifyId: etaRp.ClassifyID,
  990. CoverSrc: 0,
  991. PublishedTime: etaRp.PublishTime.Format(time.DateTime),
  992. Source: reportDao.SourceETA,
  993. SendStatus: reportDao.UNSEND,
  994. Status: status,
  995. }
  996. }
  997. func convertHTReport(etaRp ht.HTReport, status reportDao.ReportStatus) reportDao.Report {
  998. return reportDao.Report{
  999. OrgID: etaRp.Id,
  1000. Title: etaRp.ReportName,
  1001. Author: etaRp.PublishUserName,
  1002. PublishedTime: etaRp.PublishedTime,
  1003. CoverSrc: 0,
  1004. Source: reportDao.SourceHT,
  1005. SendStatus: reportDao.UNSEND,
  1006. Status: status,
  1007. }
  1008. }
  1009. func convertEsReport(report reportDao.Report) ESReport {
  1010. return ESReport{
  1011. ReportID: report.ID,
  1012. Title: report.Title,
  1013. OrgId: report.OrgID,
  1014. Author: report.Author,
  1015. Source: report.Source,
  1016. Abstract: report.Abstract,
  1017. Status: report.Status,
  1018. CoverSrc: report.CoverSrc,
  1019. PublishedTime: report.PublishedTime,
  1020. }
  1021. }
  1022. func convertReportDTO(report reportDao.Report, fullTime bool) (reportDTO ReportDTO) {
  1023. reportDTO = ReportDTO{
  1024. ReportID: report.ID,
  1025. Title: report.Title,
  1026. OrgId: report.OrgID,
  1027. Author: report.Author,
  1028. Source: string(report.Source),
  1029. CoverSrc: report.CoverSrc,
  1030. Abstract: report.Abstract,
  1031. PublishedTime: report.PublishedTime,
  1032. PlateName: report.PlateName,
  1033. ClassifyId: report.ClassifyId,
  1034. }
  1035. publishDate, err := time.Parse(time.DateTime, reportDTO.PublishedTime)
  1036. if err == nil && !fullTime {
  1037. reportDTO.PublishedTime = publishDate.Format(time.DateOnly)
  1038. }
  1039. return
  1040. }
  1041. func matchAll(sorts []string, key string) (request *es.ESQueryRequest) {
  1042. req := new(es.ESQueryRequest)
  1043. return req.CreateESQueryRequest(htConfig.GetReportIndex(), ESColumn, key, 0, 1, sorts, es.MatchAllByCondition).ByCondition("status", "PUBLISH")
  1044. }
  1045. func match(key string, from int, to int, sorts []string) (request *es.ESQueryRequest) {
  1046. req := new(es.ESQueryRequest)
  1047. return req.CreateESQueryRequest(htConfig.GetReportIndex(), ESColumn, key, from, to, sorts, es.MatchAllByCondition).ByCondition("status", "PUBLISH")
  1048. }
  1049. func matchRange(key string, from int, to int, max int64, sorts []string) (request *es.ESQueryRequest) {
  1050. req := new(es.ESQueryRequest)
  1051. return req.CreateESQueryRequest(htConfig.GetReportIndex(), ESColumn, key, from, to, sorts, es.RangeByCondition).Range(0, max, ESRangeColumn).ByCondition("status", "PUBLISH")
  1052. }
  1053. func matchRangeByDocId(key string, from int, to int, max int64, sorts []string, docIds []string) (request *es.ESQueryRequest) {
  1054. req := new(es.ESQueryRequest)
  1055. return req.CreateESQueryRequest(htConfig.GetReportIndex(), ESColumn, key, from, to, sorts, es.RangeByConditionWithDocIds).Range(0, max, ESRangeColumn).ByCondition("status", "PUBLISH").WithDocs(docIds)
  1056. }
  1057. func CountByDocId(key string, sorts []string, docIds []string) (request *es.ESQueryRequest) {
  1058. req := new(es.ESQueryRequest)
  1059. return req.CreateESQueryRequest(htConfig.GetReportIndex(), ESColumn, key, 0, 1, sorts, es.CountWithDocIds).WithDocs(docIds)
  1060. }
  1061. func matchLimitByScore(key string, limit int, score float64, docIds []string) (request *es.ESQueryRequest) {
  1062. req := new(es.ESQueryRequest)
  1063. return req.CreateESQueryRequest(htConfig.GetReportIndex(), ESColumn, key, 0, limit, sortField, es.LimitByScore).WithScore(score).WithDocs(docIds)
  1064. }
  1065. func SearchReportProduct(key string, limit int, score float64, docIds []int) (reports []ReportDTO, err error) {
  1066. var docStrIds []string
  1067. for _, id := range docIds {
  1068. docStrIds = append(docStrIds, strconv.Itoa(id))
  1069. }
  1070. request := matchLimitByScore(key, limit, score, docStrIds)
  1071. re, err := elastic().Search(request)
  1072. if err != nil {
  1073. logger.Error("es搜索异常:%v", err)
  1074. }
  1075. hits := elastic().GetSource(re.Hits)
  1076. if len(hits) == 0 {
  1077. reports = []ReportDTO{}
  1078. return
  1079. }
  1080. for _, hit := range hits {
  1081. var content map[string][]string
  1082. err = json.Unmarshal(hit.Highlight, &content)
  1083. report := ReportDTO{}
  1084. err = json.Unmarshal(hit.Source, &report)
  1085. if err != nil {
  1086. logger.Error("解析研报数据失败:%v", err)
  1087. continue
  1088. }
  1089. report.Score = hit.Score
  1090. report.Highlight = content[ESColumn]
  1091. report.Title = report.Highlight[0]
  1092. report.PublishedTime = report.PublishedTime[:10]
  1093. reports = append(reports, report)
  1094. }
  1095. return
  1096. }
  1097. type ProductSearchDTO struct {
  1098. HighLight string
  1099. SourceId int
  1100. SourceType string
  1101. Score float64
  1102. }
  1103. func CountPermissionWeight(ids []int) (list []configDao.PermissionWeight, err error) {
  1104. return reportDao.CountPermissionWeight(ids)
  1105. }