report_service.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955
  1. package report
  2. import (
  3. "encoding/json"
  4. "eta/eta_mini_ht_api/common/component/config"
  5. "eta/eta_mini_ht_api/common/component/es"
  6. logger "eta/eta_mini_ht_api/common/component/log"
  7. "eta/eta_mini_ht_api/common/contants"
  8. "eta/eta_mini_ht_api/common/utils/page"
  9. stringUtils "eta/eta_mini_ht_api/common/utils/string"
  10. configService "eta/eta_mini_ht_api/domian/config"
  11. analystService "eta/eta_mini_ht_api/domian/financial_analyst"
  12. userService "eta/eta_mini_ht_api/domian/user"
  13. "eta/eta_mini_ht_api/models"
  14. permissionDao "eta/eta_mini_ht_api/models/config"
  15. "eta/eta_mini_ht_api/models/eta"
  16. etaDao "eta/eta_mini_ht_api/models/eta"
  17. "eta/eta_mini_ht_api/models/ht"
  18. mediaDao "eta/eta_mini_ht_api/models/media"
  19. reportDao "eta/eta_mini_ht_api/models/report"
  20. userDao "eta/eta_mini_ht_api/models/user"
  21. "math/rand"
  22. "strconv"
  23. "strings"
  24. "time"
  25. )
  26. const (
  27. SourceETA = "ETA"
  28. SourceHT = "HT"
  29. DESC models.Order = "desc"
  30. ASC models.Order = "asc"
  31. ESColumn = "title"
  32. ESRangeColumn = "reportId"
  33. )
  34. var (
  35. sortField = []string{"_score:desc"}
  36. htConfig = config.GetConfig(contants.HT).(*config.HTBizConfig)
  37. )
  38. func elastic() *es.ESClient {
  39. return es.GetInstance()
  40. }
  41. // ESReport Report ES研报mapping
  42. type ESReport struct {
  43. ReportID int `json:"reportId"`
  44. OrgId int `json:"orgId"`
  45. Title string `json:"title"`
  46. Author string `json:"author"`
  47. Source reportDao.ReportSource `json:"source"`
  48. Abstract string `json:"abstract"`
  49. CoverSrc int `json:"coverSrc"`
  50. Status reportDao.ReportStatus `json:"status"`
  51. PublishedTime string `json:"publishedTime"`
  52. }
  53. type ReportDTO struct {
  54. ReportID int `json:"reportId"`
  55. OrgId int `json:"orgId"`
  56. Title string `json:"title"`
  57. Author string `json:"author"`
  58. AuthorInfo []Anthor `json:"authorInfo"`
  59. Source string `json:"source"`
  60. Abstract string `json:"abstract"`
  61. PublishedTime string `json:"publishedTime"`
  62. RiskLevel string `json:"riskLevel"`
  63. PlateName string `json:"-"`
  64. ClassifyId int `json:"-"`
  65. SecondPermission map[int]string `json:"-"`
  66. Permissions map[int]string `json:"-"`
  67. PermissionNames interface{} `json:"permissionNames"`
  68. Highlight []string `json:"highlight"`
  69. Detail json.RawMessage `json:"detail"`
  70. PdfUrl string `json:"pdfUrl"`
  71. CoverSrc int `json:"coverSrc"`
  72. CoverUrl string `json:"coverUrl"`
  73. Login bool `json:"login"`
  74. RiskLevelStatus string `json:"riskLevelStatus"`
  75. IsFree bool `json:"isFree"`
  76. IsSubscribe bool `json:"isSubscribe"`
  77. Price string `json:"price"`
  78. }
  79. type Detail struct {
  80. }
  81. type Anthor struct {
  82. Id int `json:"id"`
  83. Name string `json:"name"`
  84. HeadImgUrl string `json:"headImgUrl"`
  85. }
  86. func GetReportById(reportId int) (ReportDTO ReportDTO, err error) {
  87. report, err := reportDao.GetReportById(reportId)
  88. if err != nil {
  89. return
  90. }
  91. ReportDTO = convertReportDTO(report, true)
  92. authorStr := ReportDTO.Author
  93. authorNames := strings.Split(authorStr, ",")
  94. authorNames = stringUtils.RemoveEmptyStrings(authorNames)
  95. var authorList []Anthor
  96. if len(authorNames) > 0 {
  97. for _, name := range authorNames {
  98. var author analystService.FinancialAnalystDTO
  99. author, err = analystService.GetAnalystByName(name)
  100. var item Anthor
  101. if err != nil {
  102. item = Anthor{
  103. Id: 0,
  104. Name: name,
  105. HeadImgUrl: "",
  106. }
  107. } else {
  108. item = Anthor{
  109. Id: author.Id,
  110. Name: author.Name,
  111. HeadImgUrl: author.HeadImgUrl,
  112. }
  113. }
  114. authorList = append(authorList, item)
  115. }
  116. }
  117. ReportDTO.AuthorInfo = authorList
  118. return
  119. }
  120. func GetTotalPageCount() (total int64, latestId int64, err error) {
  121. return reportDao.GetTotalPageCount()
  122. }
  123. func GetTotalPageCountByAnalyst(analyst string, permissionIds []int, riskLevel string) (total int64, latestId int64, ids []int) {
  124. ids, err := reportDao.GetReportsByAnalyst(analyst)
  125. if err != nil {
  126. logger.Error("查询研究研报告列表id失败:%v", err)
  127. return
  128. }
  129. //查询这些包含在列表中的权限的报告ids
  130. htOrgIds, err := GetHTReportIdsByPermissionIds(permissionIds)
  131. if err != nil {
  132. logger.Error("品种筛选ht报告id失败:%v", err)
  133. htOrgIds = []int{}
  134. }
  135. etaOrgIds, err := GetETAReportIdsByPermissionIds(permissionIds)
  136. if err != nil {
  137. logger.Error("品种筛选eta报告id失败:%v", err)
  138. etaOrgIds = []int{}
  139. }
  140. if len(etaOrgIds) == 0 && len(htOrgIds) == 0 {
  141. logger.Info("没有符合权限的研报")
  142. return
  143. }
  144. orgIds := make(map[string][]int, 2)
  145. if len(etaOrgIds) == 0 {
  146. orgIds["ETA"] = []int{}
  147. } else {
  148. orgIds["ETA"] = etaOrgIds
  149. }
  150. if len(htOrgIds) == 0 {
  151. orgIds["HT"] = []int{}
  152. } else {
  153. orgIds["HT"] = htOrgIds
  154. }
  155. permitReportIds, err := reportDao.GetReportIdListByOrgIds(orgIds)
  156. if err != nil {
  157. logger.Error("根据原始报告id获取报告id列表失败:%v", err)
  158. return
  159. }
  160. var filterReportIds []int
  161. for _, id := range ids {
  162. for _, permitReportId := range permitReportIds {
  163. if id == permitReportId {
  164. filterReportIds = append(filterReportIds, id)
  165. }
  166. }
  167. }
  168. if len(filterReportIds) == 0 {
  169. logger.Info("没有符合权限的研究员研报")
  170. return
  171. }
  172. ids = filterReportIds
  173. total = int64(len(filterReportIds))
  174. latestId = int64(findMax(filterReportIds))
  175. return
  176. }
  177. // findMaxWithError 函数用于找到整型数组中的最大值,并返回错误信息
  178. func findMax(nums []int) (max int) {
  179. if len(nums) == 0 {
  180. return 0
  181. }
  182. // 初始化最大值为数组的第一个元素
  183. max = nums[0]
  184. // 遍历数组,找到最大值
  185. for _, num := range nums {
  186. if num > max {
  187. max = num
  188. }
  189. }
  190. return
  191. }
  192. func SearchMaxReportId(key string) (total int64, reportId int64) {
  193. sort := []string{"reportId:desc"}
  194. request := matchAll(sort, key)
  195. //同步es
  196. re, err := elastic().Count(request)
  197. if err != nil {
  198. logger.Error("es搜索异常:%v", err)
  199. }
  200. count := re.Count
  201. total = int64(count)
  202. if total > 0 {
  203. request = match(key, 0, count, sort)
  204. re, err = elastic().Search(request)
  205. if err != nil {
  206. logger.Error("es搜索异常:%v", err)
  207. }
  208. hits := elastic().GetSource(re.Hits)
  209. data := hits[0].Source
  210. report := ReportDTO{}
  211. err = json.Unmarshal(data, &report)
  212. if err != nil {
  213. logger.Error("获取当前最大研报id失败:%v", err)
  214. return
  215. }
  216. reportId = int64(report.ReportID)
  217. }
  218. return
  219. }
  220. func SearchMaxReportIdWithRange(key string, reportIds []int) (total int64) {
  221. sort := []string{"reportId:desc"}
  222. var docIds []string
  223. for _, reportId := range reportIds {
  224. docIds = append(docIds, strconv.Itoa(reportId))
  225. }
  226. request := CountByDocId(key, sort, docIds)
  227. //同步es
  228. re, err := elastic().Count(request)
  229. if err != nil {
  230. logger.Error("es搜索异常:%v", err)
  231. }
  232. count := re.Count
  233. total = int64(count)
  234. if total > 0 {
  235. request = match(key, 0, count, sort)
  236. re, err = elastic().Search(request)
  237. if err != nil {
  238. logger.Error("es搜索异常:%v", err)
  239. }
  240. hits := elastic().GetSource(re.Hits)
  241. data := hits[0].Source
  242. report := ReportDTO{}
  243. err = json.Unmarshal(data, &report)
  244. if err != nil {
  245. logger.Error("获取当前最大研报id失败:%v", err)
  246. return
  247. }
  248. }
  249. return
  250. }
  251. func SearchReportList(key string, ids []int, from int, size int, max int64) (reports []ReportDTO, err error) {
  252. //同步es
  253. var docIds []string
  254. for _, id := range ids {
  255. docIds = append(docIds, strconv.Itoa(id))
  256. }
  257. sorts := append(sortField, "publishedTime:desc")
  258. request := matchRangeByDocId(key, from, size, max, sorts, docIds)
  259. re, err := elastic().Search(request)
  260. if err != nil {
  261. logger.Error("es搜索异常:%v", err)
  262. }
  263. hits := elastic().GetSource(re.Hits)
  264. if len(hits) == 0 {
  265. reports = []ReportDTO{}
  266. return
  267. }
  268. for _, hit := range hits {
  269. var content map[string][]string
  270. err = json.Unmarshal(hit.Highlight, &content)
  271. report := ReportDTO{}
  272. err = json.Unmarshal(hit.Source, &report)
  273. if err != nil {
  274. logger.Error("解析研报数据失败:%v", err)
  275. continue
  276. }
  277. report.Highlight = content[ESColumn]
  278. report.Title = report.Highlight[0]
  279. report.PublishedTime = report.PublishedTime[:10]
  280. reports = append(reports, report)
  281. }
  282. return
  283. }
  284. func GetReportPageByAnalyst(pageInfo page.PageInfo, analyst string, reportIds []int) (list []ReportDTO, err error) {
  285. offset := page.StartIndex(pageInfo.Current, pageInfo.PageSize)
  286. reports, err := reportDao.GetReportPageByAnalyst(pageInfo.LatestId, pageInfo.PageSize, offset, analyst, reportIds)
  287. if err != nil {
  288. logger.Error("分页查询报告列表失败:%v", err)
  289. return
  290. }
  291. list = make([]ReportDTO, 0)
  292. if reports != nil {
  293. for _, report := range reports {
  294. dto := convertReportDTO(report, false)
  295. list = append(list, dto)
  296. }
  297. }
  298. return
  299. }
  300. func GetReportPageByOrgIds(pageInfo page.PageInfo, orgIds map[string][]int, searchAll bool) (list []ReportDTO, err error) {
  301. offset := page.StartIndex(pageInfo.Current, pageInfo.PageSize)
  302. reports, err := reportDao.GetReportPageByOrgIds(pageInfo.LatestId, pageInfo.PageSize, offset, orgIds, searchAll)
  303. if err != nil {
  304. logger.Error("分页查询报告列表失败:%v", err)
  305. return
  306. }
  307. list = make([]ReportDTO, 0)
  308. if reports != nil {
  309. for _, report := range reports {
  310. dto := convertReportDTO(report, false)
  311. list = append(list, dto)
  312. }
  313. }
  314. return
  315. }
  316. func GetNewReportByPublishTime(time time.Time) (reports []ReportDTO) {
  317. list := reportDao.GetNewReportByPublishTime(time)
  318. if list != nil {
  319. for _, report := range list {
  320. dto := convertReportDTO(report, false)
  321. reports = append(reports, dto)
  322. }
  323. }
  324. return
  325. }
  326. func GetReportPage(pageInfo page.PageInfo) (list []ReportDTO, err error) {
  327. offset := page.StartIndex(pageInfo.Current, pageInfo.PageSize)
  328. reports, err := reportDao.GetReportPage(pageInfo.LatestId, pageInfo.PageSize, offset)
  329. if err != nil {
  330. logger.Error("分页查询报告列表失败:%v", err)
  331. return
  332. }
  333. list = make([]ReportDTO, 0)
  334. if reports != nil {
  335. for _, report := range reports {
  336. dto := convertReportDTO(report, false)
  337. list = append(list, dto)
  338. }
  339. }
  340. return
  341. }
  342. func getETAReportFirstPermissions(id int) (permissionDTOs []configService.PermissionDTO) {
  343. classifyId, err := etaDao.GetReportClassifyById(id)
  344. if err != nil || classifyId == 0 {
  345. logger.Error("获取研报分类信息失败:%v", err)
  346. return
  347. }
  348. permissions, err := permissionDao.GetFirstPermissionsByClassifyID(classifyId)
  349. //permissions, err := etaDao.GetFirstPermissionsByClassifyID(classifyId)
  350. if err != nil {
  351. logger.Error("获取研报一级品种信息失败:%v", err)
  352. return
  353. }
  354. for _, permission := range permissions {
  355. permissionDTOs = append(permissionDTOs, convertPermissionDTO(permission))
  356. }
  357. return
  358. }
  359. func getHTReportFirstPermissions(id int) (permissionDTOs []configService.PermissionDTO) {
  360. report, err := reportDao.GetReportByOrgId(id, SourceHT)
  361. if err != nil {
  362. logger.Error("获取报告失败:%v", err)
  363. }
  364. permissionName := report.PlateName
  365. plateName, err := reportDao.GetPlateNameByPermissionName(permissionName)
  366. if err != nil {
  367. return []configService.PermissionDTO{}
  368. }
  369. return []configService.PermissionDTO{
  370. {
  371. PermissionId: 0,
  372. PermissionName: plateName,
  373. ParentId: 0,
  374. },
  375. }
  376. }
  377. func getETAReportSecondPermissions(id int) (permissionDTOs []configService.PermissionDTO) {
  378. classifyId, err := reportDao.GetReportClassifyById(id)
  379. //classifyId, err := etaDao.GetReportClassifyById(id)
  380. if err != nil || classifyId == 0 {
  381. logger.Error("获取研报分类信息失败:%v", err)
  382. return
  383. }
  384. permissions, err := permissionDao.GetSecondPermissionsByClassifyID(classifyId)
  385. //permissions, err := eta.GetSecondPermissionsByClassifyID(classifyId)
  386. if err != nil {
  387. logger.Error("获取研报二级品种信息失败:%v", err)
  388. return
  389. }
  390. for _, permission := range permissions {
  391. permissionDTOs = append(permissionDTOs, convertPermissionDTO(permission))
  392. }
  393. return
  394. }
  395. func getHTReportSecondPermissions(id int) (permissionDTOs []configService.PermissionDTO) {
  396. report, err := reportDao.GetReportByOrgId(id, SourceHT)
  397. if err != nil {
  398. logger.Error("获取报告失败:%v", err)
  399. }
  400. var permission permissionDao.Permission
  401. if report.PlateName != "" {
  402. permission, err = permissionDao.GetPermissionByName(report.PlateName)
  403. if err != nil {
  404. logger.Error("获取品种信息失败:%v", err)
  405. }
  406. }
  407. return []configService.PermissionDTO{
  408. {
  409. PermissionId: permission.PermissionId,
  410. PermissionName: report.PlateName,
  411. ParentId: permission.ParentId,
  412. RiskLevel: permission.RiskLevel,
  413. },
  414. }
  415. }
  416. func (es ESReport) GetId() string {
  417. return strconv.Itoa(es.ReportID)
  418. }
  419. func GetETALatestReportId() (id int, err error) {
  420. return reportDao.GetLatestReportIdBySource(reportDao.SourceETA)
  421. }
  422. func GetHTLatestReportId() (id int, err error) {
  423. return reportDao.GetLatestReportIdBySource(reportDao.SourceHT)
  424. }
  425. func InitETAReportList(list []eta.ETAReport) (err error) {
  426. logger.Info("同步研报数量%d", len(list))
  427. var reports []reportDao.Report
  428. for _, etaRp := range list {
  429. var coverSrc int
  430. //var permissions []etaDao.ChartPermission
  431. //permissions, err = etaDao.GetSecondPermissionsByClassifyID(etaRp.ClassifyID)
  432. var permissions []permissionDao.Permission
  433. permissions, err = permissionDao.GetSecondPermissionsByClassifyID(etaRp.ClassifyID)
  434. if err != nil || len(permissions) == 0 {
  435. logger.Error("获取研报二级品种信息失败:%v", err)
  436. coverSrc = 0
  437. } else {
  438. coverSrc = 0
  439. for _, permission := range permissions {
  440. permissionsId := permission.PermissionId
  441. var ids []int
  442. ids, err = mediaDao.GetIdsByPermissionId(permissionsId)
  443. if err != nil {
  444. logger.Error("获取图片资源失败:%v", err)
  445. continue
  446. }
  447. if ids == nil || len(ids) == 0 {
  448. continue
  449. }
  450. src := rand.NewSource(time.Now().UnixNano())
  451. r := rand.New(src)
  452. // 从切片中随机选择一个元素
  453. randomIndex := r.Intn(len(ids))
  454. coverSrc = ids[randomIndex]
  455. break
  456. }
  457. }
  458. destRp := convertEtaReport(etaRp, reportDao.StatusPublish)
  459. //destRp.Author = authorName
  460. destRp.CoverSrc = coverSrc
  461. reports = append(reports, destRp)
  462. //}
  463. }
  464. err = reportDao.BatchInsertReport(&reports)
  465. if err != nil {
  466. logger.Error("同步ETA研报失败:%v", err)
  467. return
  468. }
  469. return initES(reports)
  470. }
  471. func etaStatus(status int) reportDao.ReportStatus {
  472. if status == etaDao.Passed || status == etaDao.Published {
  473. return reportDao.StatusPublish
  474. } else {
  475. return reportDao.StatusUnPublish
  476. }
  477. }
  478. func SyncETAReportList(list []eta.ETAReport) (err error) {
  479. logger.Info("同步研报数量%d", len(list))
  480. var reports []reportDao.Report
  481. for _, etaRp := range list {
  482. var coverSrc int
  483. //var permissions []etaDao.ChartPermission
  484. //permissions, err = etaDao.GetSecondPermissionsByClassifyID(etaRp.ClassifyID)
  485. var permissions []permissionDao.Permission
  486. permissions, err = permissionDao.GetSecondPermissionsByClassifyID(etaRp.ClassifyID)
  487. if err != nil || len(permissions) == 0 {
  488. logger.Error("获取研报二级品种信息失败:%v", err)
  489. coverSrc = 0
  490. } else {
  491. coverSrc = 0
  492. for _, permission := range permissions {
  493. permissionsId := permission.PermissionId
  494. var ids []int
  495. ids, err = mediaDao.GetIdsByPermissionId(permissionsId)
  496. if err != nil {
  497. logger.Error("获取图片资源失败:%v", err)
  498. continue
  499. }
  500. if ids == nil || len(ids) == 0 {
  501. continue
  502. }
  503. src := rand.NewSource(time.Now().UnixNano())
  504. r := rand.New(src)
  505. // 从切片中随机选择一个元素
  506. randomIndex := r.Intn(len(ids))
  507. coverSrc = ids[randomIndex]
  508. break
  509. }
  510. }
  511. status := etaStatus(etaRp.State)
  512. destRp := convertEtaReport(etaRp, status)
  513. destRp.CoverSrc = coverSrc
  514. reports = append(reports, destRp)
  515. }
  516. esList, err := reportDao.InsertOrUpdateReport(reports, SourceETA)
  517. if esList == nil {
  518. return
  519. }
  520. return syncESAndSendMessage(esList)
  521. }
  522. type UpdateESReport struct {
  523. Title string `json:"title"`
  524. Author string `json:"author"`
  525. Abstract string `json:"abstract"`
  526. PublishedTime string `json:"publishedTime"`
  527. Status string `json:"status"`
  528. }
  529. func syncESAndSendMessage(reports []reportDao.Report) (err error) {
  530. var esReports []es.ESBase
  531. for _, etaRp := range reports {
  532. esRp := convertEsReport(etaRp)
  533. esReports = append(esReports, esRp)
  534. }
  535. //同步es
  536. for _, report := range reports {
  537. var exist bool
  538. exist, err = elastic().Exist(htConfig.GetReportIndex(), report.ID)
  539. if err != nil {
  540. logger.Error("查询es失败,reportId::%d,err:%v", report.ID, err)
  541. }
  542. if exist {
  543. update := UpdateESReport{
  544. Title: report.Title,
  545. Author: report.Author,
  546. PublishedTime: report.PublishedTime,
  547. Abstract: report.Abstract,
  548. Status: string(report.Status),
  549. }
  550. success := elastic().Update(htConfig.GetReportIndex(), report.ID, update)
  551. if !success {
  552. logger.Error("更新es失败,reportId::%d,err:%v", report.ID, err)
  553. }
  554. if report.Status == reportDao.StatusUnPublish {
  555. //隐藏热度搜索
  556. err = userDao.HiddenFlows(report.ID, userDao.ReportSourceType)
  557. if err != nil {
  558. logger.Error("隐藏热度搜索失败,reportId::%d,err:%v", report.ID, err)
  559. }
  560. } else {
  561. err = userDao.ShowFlows(report.ID, userDao.ReportSourceType)
  562. if err != nil {
  563. logger.Error("重置热度搜索失败,reportId::%d,err:%v", report.ID, err)
  564. }
  565. }
  566. } else {
  567. insert := ESReport{
  568. ReportID: report.ID,
  569. OrgId: report.OrgID,
  570. Title: report.Title,
  571. Author: report.Author,
  572. Source: report.Source,
  573. Abstract: report.Abstract,
  574. CoverSrc: report.CoverSrc,
  575. Status: report.Status,
  576. PublishedTime: report.PublishedTime,
  577. }
  578. success := elastic().CreateDocument(htConfig.GetReportIndex(), report.ID, insert)
  579. if !success {
  580. logger.Error("创建es文档失败,reportId::%d,err:%v", report.ID, err)
  581. }
  582. }
  583. }
  584. //err = elastic().BulkInsert(htConfig.GetReportIndex(), esReports)
  585. if err != nil {
  586. logger.Error("同步ETA研报到es失败:%v", err)
  587. return
  588. }
  589. //生产meta信息
  590. logger.Info("生成推送META信息")
  591. for _, report := range reports {
  592. if report.Status == reportDao.StatusUnPublish {
  593. logger.Info("报告取消发布,不需要生成推送消息")
  594. continue
  595. }
  596. var From string
  597. switch report.Source {
  598. case SourceETA:
  599. From = "ETA"
  600. case SourceHT:
  601. From = "HT"
  602. default:
  603. From = "UNKNOWN"
  604. }
  605. authors := strings.Split(report.Author, ",")
  606. authors = stringUtils.RemoveEmptyStrings(authors)
  607. if len(authors) > 0 {
  608. for _, authorName := range authors {
  609. userIds := userService.GetPostUser(authorName, report.PublishedTime)
  610. if len(userIds) > 0 {
  611. logger.Info("推送META信息,用户ID:%v", userIds)
  612. var author analystService.FinancialAnalystDTO
  613. author, err = analystService.GetAnalystByName(authorName)
  614. if err != nil {
  615. logger.Error("获取研报作者失败:%v", err)
  616. continue
  617. }
  618. usersStr := stringUtils.IntToStringSlice(userIds)
  619. Meta := userService.MetaData{
  620. AuthorName: author.Name,
  621. AuthorId: author.Id,
  622. SourceId: report.ID,
  623. PublishedTime: report.PublishedTime,
  624. }
  625. metaStr, _ := json.Marshal(Meta)
  626. toStr := strings.Join(usersStr, ",")
  627. metaContent := userService.MetaInfoDTO{
  628. From: From,
  629. Meta: string(metaStr),
  630. MetaType: "USER_NOTICE",
  631. SourceType: "REPORT",
  632. To: toStr,
  633. }
  634. err = userService.CreateMetaInfo(metaContent)
  635. if err != nil {
  636. logger.Error("创建Meta信息失败:%v", err)
  637. return err
  638. }
  639. }
  640. }
  641. }
  642. }
  643. return
  644. }
  645. func initES(reports []reportDao.Report) (err error) {
  646. var esReports []es.ESBase
  647. for _, etaRp := range reports {
  648. esRp := convertEsReport(etaRp)
  649. esReports = append(esReports, esRp)
  650. }
  651. //同步es
  652. err = elastic().BulkInsert(htConfig.GetReportIndex(), esReports)
  653. if err != nil {
  654. logger.Error("同步ETA研报到es失败:%v", err)
  655. return
  656. }
  657. return
  658. }
  659. func InitHTReportList(list []ht.HTReport) (noRecord bool, err error) {
  660. var reports []reportDao.Report
  661. //获取系统中ht品种名
  662. permissions, err := reportDao.GetGLAuthorNames()
  663. if err != nil {
  664. logger.Error("获取钢联研报作者失败:%v", err)
  665. return
  666. }
  667. for _, htRp := range list {
  668. for _, permission := range permissions {
  669. if htRp.PermissionName == permission.Permission {
  670. if permission.AuthorNames != "" {
  671. htRp.PublishUserName = permission.AuthorNames
  672. }
  673. destRp := convertHTReport(htRp, reportDao.StatusPublish)
  674. var coverSrc int
  675. var permissionId int
  676. permissionId, err = permissionDao.GetPermissionIdByName(htRp.PermissionName)
  677. //permissionId, err = etaDao.GetPermissionIdByName(htRp.PermissionName)
  678. if err != nil {
  679. logger.Error("HT获取eta品种id失败:%v", err)
  680. coverSrc = 0
  681. }
  682. var ids []int
  683. ids, err = mediaDao.GetIdsByPermissionId(permissionId)
  684. if err != nil {
  685. logger.Error("获取图片资源失败:%v", err)
  686. coverSrc = 0
  687. }
  688. if ids == nil || len(ids) == 0 {
  689. coverSrc = 0
  690. } else {
  691. src := rand.NewSource(time.Now().UnixNano())
  692. r := rand.New(src)
  693. // 从切片中随机选择一个元素
  694. randomIndex := r.Intn(len(ids))
  695. coverSrc = ids[randomIndex]
  696. }
  697. destRp.CoverSrc = coverSrc
  698. destRp.PlateName = htRp.PermissionName
  699. reports = append(reports, destRp)
  700. }
  701. //}
  702. }
  703. }
  704. if len(reports) == 0 {
  705. return true, nil
  706. } else {
  707. logger.Info("同步研报数量%d", len(reports))
  708. err = reportDao.BatchInsertReport(&reports)
  709. if err != nil {
  710. logger.Error("同步HT研报失败:%v", err)
  711. return false, err
  712. }
  713. return false, initES(reports)
  714. }
  715. }
  716. func htStatus(status int, isDelete int) reportDao.ReportStatus {
  717. if isDelete == 1 || status != ht.Publish {
  718. return reportDao.StatusUnPublish
  719. }
  720. return reportDao.StatusPublish
  721. }
  722. func SyncHTReportList(list []ht.HTReport) (noRecord bool, err error) {
  723. var reports []reportDao.Report
  724. permissions, err := reportDao.GetGLAuthorNames()
  725. if err != nil {
  726. logger.Error("获取钢联研报作者失败:%v", err)
  727. return
  728. }
  729. for _, htRp := range list {
  730. for _, permission := range permissions {
  731. if htRp.PermissionName == permission.Permission {
  732. if permission.AuthorNames != "" {
  733. htRp.PublishUserName = permission.AuthorNames
  734. }
  735. status := htStatus(htRp.Status, htRp.IsDelete)
  736. destRp := convertHTReport(htRp, status)
  737. var coverSrc int
  738. var permissionId int
  739. permissionId, err = permissionDao.GetPermissionIdByName(htRp.PermissionName)
  740. //permissionId, err = etaDao.GetPermissionIdByName(htRp.PermissionName)
  741. if err != nil {
  742. logger.Error("HT获取eta品种id失败:%v", err)
  743. coverSrc = 0
  744. }
  745. var ids []int
  746. ids, err = mediaDao.GetIdsByPermissionId(permissionId)
  747. if err != nil {
  748. logger.Error("获取图片资源失败:%v", err)
  749. coverSrc = 0
  750. }
  751. if ids == nil || len(ids) == 0 {
  752. coverSrc = 0
  753. } else {
  754. src := rand.NewSource(time.Now().UnixNano())
  755. r := rand.New(src)
  756. // 从切片中随机选择一个元素
  757. randomIndex := r.Intn(len(ids))
  758. coverSrc = ids[randomIndex]
  759. }
  760. destRp.CoverSrc = coverSrc
  761. destRp.PlateName = htRp.PermissionName
  762. reports = append(reports, destRp)
  763. }
  764. //}
  765. }
  766. }
  767. if len(reports) == 0 {
  768. return true, nil
  769. } else {
  770. logger.Info("同步研报数量%d", len(list))
  771. }
  772. esList, err := reportDao.InsertOrUpdateReport(reports, SourceHT)
  773. if esList == nil {
  774. return false, err
  775. }
  776. return false, syncESAndSendMessage(reports)
  777. }
  778. func GetListOrderByConditionWeekly(week bool, column string, limit int, order models.Order) (dtoList []ReportDTO, err error) {
  779. reports, err := reportDao.GetListOrderByCondition(week, column, limit, order)
  780. if err != nil {
  781. logger.Error("获取研报失败:%v", err)
  782. return
  783. }
  784. for _, reportItem := range reports {
  785. dto := convertReportDTO(reportItem, false)
  786. dtoList = append(dtoList, dto)
  787. }
  788. return
  789. }
  790. func GetListByCondition[T any](column string, ids []T) (dtoList []ReportDTO, err error) {
  791. var values []interface{}
  792. for _, id := range ids {
  793. values = append(values, id)
  794. }
  795. reports, err := reportDao.GetListByCondition(column, ids)
  796. if err != nil {
  797. logger.Error("获取研报失败:%v", err)
  798. return
  799. }
  800. for _, reportItem := range reports {
  801. dto := convertReportDTO(reportItem, false)
  802. dtoList = append(dtoList, dto)
  803. }
  804. return
  805. }
  806. func GetReportByIdListByOrgIds(orgIds map[string][]int) (ids []int, err error) {
  807. return reportDao.GetReportIdListByOrgIds(orgIds)
  808. }
  809. func GetTotalPageCountByPermissionIds(permissionIds []int, riskLevel string) (total int64, latestId int64, ids map[string][]int) {
  810. htOrgIds, err := GetHTReportIdsByPermissionIdsWithRiskLevel(permissionIds, riskLevel)
  811. if err != nil {
  812. logger.Error("品种筛选ht报告id失败:%v", err)
  813. htOrgIds = []int{}
  814. }
  815. etaOrgIds, err := GetETAReportIdsByPermissionIdsWithRiskLevel(permissionIds, riskLevel)
  816. if err != nil {
  817. logger.Error("品种筛选eta报告id失败:%v", err)
  818. etaOrgIds = []int{}
  819. }
  820. totalCol := int64(len(etaOrgIds) + len(htOrgIds))
  821. if totalCol == 0 {
  822. latestId = 0
  823. return
  824. }
  825. ids = make(map[string][]int, 2)
  826. if len(etaOrgIds) == 0 {
  827. ids["ETA"] = []int{}
  828. } else {
  829. ids["ETA"] = etaOrgIds
  830. }
  831. if len(htOrgIds) == 0 {
  832. ids["HT"] = []int{}
  833. } else {
  834. ids["HT"] = htOrgIds
  835. }
  836. total, latestId, err = reportDao.GetMaxIdByPermissionIds(ids)
  837. if err != nil {
  838. logger.Error("获取筛选报告的最大记录和记录数失败:%v", err)
  839. return
  840. }
  841. return
  842. }
  843. func convertEtaReport(etaRp eta.ETAReport, status reportDao.ReportStatus) reportDao.Report {
  844. return reportDao.Report{
  845. OrgID: etaRp.ID,
  846. Title: etaRp.Title,
  847. Abstract: etaRp.Abstract,
  848. Author: etaRp.Author,
  849. ClassifyId: etaRp.ClassifyID,
  850. CoverSrc: 0,
  851. PublishedTime: etaRp.PublishTime.Format(time.DateTime),
  852. Source: reportDao.SourceETA,
  853. SendStatus: reportDao.UNSEND,
  854. Status: status,
  855. }
  856. }
  857. func convertHTReport(etaRp ht.HTReport, status reportDao.ReportStatus) reportDao.Report {
  858. return reportDao.Report{
  859. OrgID: etaRp.Id,
  860. Title: etaRp.ReportName,
  861. Author: etaRp.PublishUserName,
  862. PublishedTime: etaRp.PublishedTime,
  863. CoverSrc: 0,
  864. Source: reportDao.SourceHT,
  865. SendStatus: reportDao.UNSEND,
  866. Status: status,
  867. }
  868. }
  869. func convertEsReport(report reportDao.Report) ESReport {
  870. return ESReport{
  871. ReportID: report.ID,
  872. Title: report.Title,
  873. OrgId: report.OrgID,
  874. Author: report.Author,
  875. Source: report.Source,
  876. Abstract: report.Abstract,
  877. Status: report.Status,
  878. CoverSrc: report.CoverSrc,
  879. PublishedTime: report.PublishedTime,
  880. }
  881. }
  882. func convertReportDTO(report reportDao.Report, fullTime bool) (reportDTO ReportDTO) {
  883. reportDTO = ReportDTO{
  884. ReportID: report.ID,
  885. Title: report.Title,
  886. OrgId: report.OrgID,
  887. Author: report.Author,
  888. Source: string(report.Source),
  889. CoverSrc: report.CoverSrc,
  890. Abstract: report.Abstract,
  891. PublishedTime: report.PublishedTime,
  892. PlateName: report.PlateName,
  893. ClassifyId: report.ClassifyId,
  894. }
  895. publishDate, err := time.Parse(time.DateTime, reportDTO.PublishedTime)
  896. if err == nil && !fullTime {
  897. reportDTO.PublishedTime = publishDate.Format(time.DateOnly)
  898. }
  899. return
  900. }
  901. func matchAll(sorts []string, key string) (request *es.ESQueryRequest) {
  902. req := new(es.ESQueryRequest)
  903. return req.CreateESQueryRequest(htConfig.GetReportIndex(), ESColumn, key, 0, 1, sorts, es.MatchAllByCondition).ByCondition("status", "PUBLISH")
  904. }
  905. func match(key string, from int, to int, sorts []string) (request *es.ESQueryRequest) {
  906. req := new(es.ESQueryRequest)
  907. return req.CreateESQueryRequest(htConfig.GetReportIndex(), ESColumn, key, from, to, sorts, es.MatchAllByCondition).ByCondition("status", "PUBLISH")
  908. }
  909. func matchRange(key string, from int, to int, max int64, sorts []string) (request *es.ESQueryRequest) {
  910. req := new(es.ESQueryRequest)
  911. return req.CreateESQueryRequest(htConfig.GetReportIndex(), ESColumn, key, from, to, sorts, es.RangeByCondition).Range(0, max, ESRangeColumn).ByCondition("status", "PUBLISH")
  912. }
  913. func matchRangeByDocId(key string, from int, to int, max int64, sorts []string, docIds []string) (request *es.ESQueryRequest) {
  914. req := new(es.ESQueryRequest)
  915. return req.CreateESQueryRequest(htConfig.GetReportIndex(), ESColumn, key, from, to, sorts, es.RangeByConditionWithDocIds).Range(0, max, ESRangeColumn).ByCondition("status", "PUBLISH").WithDocs(docIds)
  916. }
  917. func CountByDocId(key string, sorts []string, docIds []string) (request *es.ESQueryRequest) {
  918. req := new(es.ESQueryRequest)
  919. return req.CreateESQueryRequest(htConfig.GetReportIndex(), ESColumn, key, 0, 1, sorts, es.CountWithDocIds).WithDocs(docIds)
  920. }