report_service.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. package report
  2. import (
  3. "encoding/json"
  4. "eta/eta_mini_ht_api/common/component/es"
  5. logger "eta/eta_mini_ht_api/common/component/log"
  6. "eta/eta_mini_ht_api/common/utils/page"
  7. stringUtils "eta/eta_mini_ht_api/common/utils/string"
  8. analystService "eta/eta_mini_ht_api/domian/financial_analyst"
  9. userService "eta/eta_mini_ht_api/domian/user"
  10. "eta/eta_mini_ht_api/models"
  11. "eta/eta_mini_ht_api/models/eta"
  12. etaDao "eta/eta_mini_ht_api/models/eta"
  13. reportDao "eta/eta_mini_ht_api/models/report"
  14. "github.com/google/uuid"
  15. "strconv"
  16. "strings"
  17. "time"
  18. )
  19. const (
  20. SourceETA = "ETA"
  21. SourceHT = "HT"
  22. DESC models.Order = "desc"
  23. ASC models.Order = "asc"
  24. ESIndex = "report_index"
  25. ESColumn = "title"
  26. ESRangeColumn = "reportId"
  27. )
  28. var (
  29. sortField = []string{"_score:desc"}
  30. )
  31. func elastic() *es.ESClient {
  32. return es.GetInstance()
  33. }
  34. // ESReport Report ES研报mapping
  35. type ESReport struct {
  36. ReportID int `json:"reportId"`
  37. OrgId int `json:"orgId"`
  38. Title string `json:"title"`
  39. Author string `json:"author"`
  40. Source reportDao.ReportSource `json:"source"`
  41. Abstract string `json:"abstract"`
  42. CoverSrc string `json:"coverSrc"`
  43. Status reportDao.ReportStatus `json:"status"`
  44. PublishedTime string `json:"publishedTime"`
  45. }
  46. type ReportDTO struct {
  47. ReportID int `json:"reportId"`
  48. OrgId int `json:"orgId"`
  49. Title string `json:"title"`
  50. Author string `json:"author"`
  51. Source string `json:"source"`
  52. Abstract string `json:"abstract"`
  53. PublishedTime string `json:"publishedTime"`
  54. Permissions map[int]string `json:"-"`
  55. PermissionNames interface{} `json:"permissionNames,omitempty"`
  56. Highlight []string `json:"highlight,omitempty"`
  57. Detail json.RawMessage `json:"detail,omitempty"`
  58. CoverSrc string `json:"coverSrc"`
  59. }
  60. type Detail struct {
  61. }
  62. type PermissionDTO struct {
  63. ID int
  64. Name string
  65. ParentID int
  66. }
  67. func GetGetReportById(reportId int) (ReportDTO ReportDTO, err error) {
  68. report, err := reportDao.GetReportById(reportId)
  69. if err == nil {
  70. ReportDTO = convertReportDTO(report)
  71. }
  72. return
  73. }
  74. func GetTotalPageCount() (total int64) {
  75. return reportDao.GetTotalPageCount()
  76. }
  77. func GetTotalPageCountByAnalyst(analyst string) (total int64, latestId int64) {
  78. return reportDao.GetTotalPageCountByAnalyst(analyst)
  79. }
  80. func SearchMaxReportId(key string) (total int64, reportId int64) {
  81. sort := []string{"reportId:desc"}
  82. request := matchAll(sort, key)
  83. //同步es
  84. re, err := elastic().Count(request)
  85. if err != nil {
  86. logger.Error("es搜索异常:%v", err)
  87. }
  88. count := re.Count
  89. total = int64(count)
  90. if total > 0 {
  91. request = match(key, 0, count, sort)
  92. re, err = elastic().Search(request)
  93. if err != nil {
  94. logger.Error("es搜索异常:%v", err)
  95. }
  96. hits := elastic().GetSource(re.Hits)
  97. data := hits[0].Source
  98. report := ReportDTO{}
  99. err = json.Unmarshal(data, &report)
  100. if err != nil {
  101. logger.Error("获取当前最大研报id失败:%v", err)
  102. return
  103. }
  104. reportId = int64(report.ReportID)
  105. }
  106. return
  107. }
  108. func SearchReportList(key string, from int, size int, max int64) (reports []ReportDTO, err error) {
  109. //同步es
  110. sorts := append(sortField, "publishedTime:desc")
  111. request := matchRange(key, from, size, max, sorts)
  112. re, err := elastic().Search(request)
  113. if err != nil {
  114. logger.Error("es搜索异常:%v", err)
  115. }
  116. hits := elastic().GetSource(re.Hits)
  117. if len(hits) == 0 {
  118. reports = []ReportDTO{}
  119. return
  120. }
  121. for _, hit := range hits {
  122. var content map[string][]string
  123. err = json.Unmarshal(hit.Highlight, &content)
  124. report := ReportDTO{}
  125. err = json.Unmarshal(hit.Source, &report)
  126. if err != nil {
  127. logger.Error("解析研报数据失败:%v", err)
  128. continue
  129. }
  130. report.Highlight = content[ESColumn]
  131. report.Title = report.Highlight[0]
  132. report.PublishedTime = report.PublishedTime[:10]
  133. reports = append(reports, report)
  134. }
  135. return
  136. }
  137. func GetReportPageByAnalyst(pageInfo page.PageInfo, analyst string) (list []ReportDTO, err error) {
  138. offset := page.StartIndex(pageInfo.Current, pageInfo.PageSize)
  139. reports, err := reportDao.GetReportPageByAnalyst(pageInfo.LatestId, pageInfo.PageSize, offset, analyst)
  140. if err != nil {
  141. logger.Error("分页查询报告列表失败:%v", err)
  142. return
  143. }
  144. list = make([]ReportDTO, 0)
  145. if reports != nil {
  146. for _, report := range reports {
  147. dto := convertReportDTO(report)
  148. list = append(list, dto)
  149. }
  150. }
  151. return
  152. }
  153. func GetReportPageByOrgIds(pageInfo page.PageInfo, orgIds []int) (list []ReportDTO, err error) {
  154. offset := page.StartIndex(pageInfo.Current, pageInfo.PageSize)
  155. reports, err := reportDao.GetReportPageByOrgIds(pageInfo.LatestId, pageInfo.PageSize, offset, orgIds)
  156. if err != nil {
  157. logger.Error("分页查询报告列表失败:%v", err)
  158. return
  159. }
  160. list = make([]ReportDTO, 0)
  161. if reports != nil {
  162. for _, report := range reports {
  163. dto := convertReportDTO(report)
  164. list = append(list, dto)
  165. }
  166. }
  167. return
  168. }
  169. func GetNewReportByPublishTime(time time.Time) (reports []ReportDTO) {
  170. list := reportDao.GetNewReportByPublishTime(time)
  171. if list != nil {
  172. for _, report := range list {
  173. dto := convertReportDTO(report)
  174. reports = append(reports, dto)
  175. }
  176. }
  177. return
  178. }
  179. func GetReportPage(pageInfo page.PageInfo) (list []ReportDTO, err error) {
  180. offset := page.StartIndex(pageInfo.Current, pageInfo.PageSize)
  181. reports, err := reportDao.GetReportPage(pageInfo.LatestId, pageInfo.PageSize, offset)
  182. if err != nil {
  183. logger.Error("分页查询报告列表失败:%v", err)
  184. return
  185. }
  186. list = make([]ReportDTO, 0)
  187. if reports != nil {
  188. for _, report := range reports {
  189. dto := convertReportDTO(report)
  190. list = append(list, dto)
  191. }
  192. }
  193. return
  194. }
  195. func getETAReportFirstPermissions(id int) (permissionDTOs []PermissionDTO) {
  196. classifyId, err := etaDao.GetReportClassifyById(id)
  197. if err != nil || classifyId == 0 {
  198. logger.Error("获取研报分类信息失败:%v", err)
  199. return
  200. }
  201. permissions, err := etaDao.GetFirstPermissionsByClassifyID(classifyId)
  202. if err != nil {
  203. logger.Error("获取研报一级品种信息失败:%v", err)
  204. return
  205. }
  206. for _, permission := range permissions {
  207. permissionDTOs = append(permissionDTOs, convertPermissionDTO(permission))
  208. }
  209. return
  210. }
  211. func (es ESReport) GetId() string {
  212. return strconv.Itoa(es.ReportID)
  213. }
  214. func GetETALatestReportId() (id int, err error) {
  215. return reportDao.GetETALatestReportId()
  216. }
  217. func SyncETAReportList(list []eta.ETAReport) (err error) {
  218. logger.Info("同步研报数量%d", len(list))
  219. var reports []reportDao.Report
  220. var esReports []es.ESBase
  221. for _, etaRp := range list {
  222. authorNames := strings.Split(etaRp.Author, ",")
  223. authorNamesWithOutEmpty := stringUtils.RemoveEmptyStrings(authorNames)
  224. for _, authorName := range authorNamesWithOutEmpty {
  225. destRp := convertEtaReport(etaRp)
  226. destRp.Author = authorName
  227. reports = append(reports, destRp)
  228. }
  229. }
  230. err = reportDao.BatchInsertReport(&reports)
  231. if err != nil {
  232. logger.Error("同步ETA研报失败:%v", err)
  233. return
  234. }
  235. for _, etaRp := range reports {
  236. esRp := convertEsReport(etaRp)
  237. esReports = append(esReports, esRp)
  238. }
  239. //同步es
  240. err = elastic().BulkInsert(ESIndex, esReports)
  241. if err != nil {
  242. logger.Error("同步ETA研报到es失败:%v", err)
  243. return
  244. }
  245. //生产meta信息
  246. logger.Info("生成推送META信息")
  247. for _, report := range reports {
  248. userIds := userService.GetPostUser(report.Author, report.PublishedTime)
  249. author, _ := analystService.GetAnalystByName(report.Author)
  250. if len(userIds) > 0 {
  251. usersStr := stringUtils.IntToStringSlice(userIds)
  252. Meta := userService.MetaData{
  253. AuthorName: report.Author,
  254. AuthorId: author.Id,
  255. SourceId: report.ID,
  256. PublishedTime: report.PublishedTime,
  257. }
  258. metaStr, _ := json.Marshal(Meta)
  259. toStr := strings.Join(usersStr, ",")
  260. UUID := uuid.New()
  261. uuidStr := UUID.String()
  262. metaContent := userService.MetaInfoDTO{
  263. From: "ETA",
  264. Uid: "report:" + uuidStr,
  265. Meta: string(metaStr),
  266. MetaType: "USER_NOTICE",
  267. SourceType: "REPORT",
  268. To: toStr,
  269. }
  270. err = userService.CreateMetaInfo(metaContent)
  271. if err != nil {
  272. logger.Error("创建Meta信息失败:%v", err)
  273. return err
  274. }
  275. }
  276. }
  277. return
  278. }
  279. func GetListOrderByConditionWeekly(week bool, column string, limit int, order models.Order) (dtoList []ReportDTO, err error) {
  280. reports, err := reportDao.GetListOrderByCondition(week, column, limit, order)
  281. if err != nil {
  282. logger.Error("获取研报失败:%v", err)
  283. return
  284. }
  285. for _, report := range reports {
  286. dto := convertReportDTO(report)
  287. dtoList = append(dtoList, dto)
  288. }
  289. return
  290. }
  291. func GetListByCondition[T any](column string, ids []T) (dtoList []ReportDTO, err error) {
  292. var values []interface{}
  293. for _, id := range ids {
  294. values = append(values, id)
  295. }
  296. reports, err := reportDao.GetListByCondition(column, ids)
  297. if err != nil {
  298. logger.Error("获取研报失败:%v", err)
  299. return
  300. }
  301. for _, report := range reports {
  302. dto := convertReportDTO(report)
  303. dtoList = append(dtoList, dto)
  304. }
  305. return
  306. }
  307. func GetTotalPageCountByPermissionIds(permissionIds []int) (total int64, latestId int64, ids []int) {
  308. //TODO 一期品种筛选reportIds
  309. htIds := []int{}
  310. etaIds, err := GetETAReportIdsByPermissionIds(permissionIds)
  311. if err != nil {
  312. logger.Error("品种筛选eta报告id失败:%v", err)
  313. etaIds = []int{}
  314. }
  315. total = int64(len(etaIds) + len(htIds))
  316. ids = append(etaIds, htIds...)
  317. latestId = reportDao.GetMaxIdByPermissionIds(ids)
  318. return
  319. }
  320. func convertEtaReport(etaRp eta.ETAReport) reportDao.Report {
  321. return reportDao.Report{
  322. OrgID: etaRp.ID,
  323. Title: etaRp.Title,
  324. Abstract: etaRp.Abstract,
  325. Author: etaRp.Author,
  326. PublishedTime: etaRp.PublishTime,
  327. Source: reportDao.SourceETA,
  328. Status: reportDao.StatusInit,
  329. }
  330. }
  331. func convertEsReport(report reportDao.Report) ESReport {
  332. return ESReport{
  333. ReportID: report.ID,
  334. Title: report.Title,
  335. OrgId: report.OrgID,
  336. Author: report.Author,
  337. Source: report.Source,
  338. Abstract: report.Abstract,
  339. Status: report.Status,
  340. CoverSrc: report.CoverSrc,
  341. PublishedTime: report.PublishedTime,
  342. }
  343. }
  344. func convertReportDTO(report reportDao.Report) (reportDTO ReportDTO) {
  345. reportDTO = ReportDTO{
  346. ReportID: report.ID,
  347. Title: report.Title,
  348. OrgId: report.OrgID,
  349. Author: report.Author,
  350. Source: string(report.Source),
  351. Abstract: report.Abstract,
  352. PublishedTime: report.PublishedTime,
  353. }
  354. publishDate, err := time.Parse(time.DateTime, reportDTO.PublishedTime)
  355. if err == nil {
  356. reportDTO.PublishedTime = publishDate.Format(time.DateOnly)
  357. }
  358. return
  359. }
  360. func matchAll(sorts []string, key string) (request *es.ESQueryRequest) {
  361. req := new(es.ESQueryRequest)
  362. return req.CreateESQueryRequest(ESIndex, ESColumn, key, 0, 1, sorts, es.MatchAll)
  363. }
  364. func match(key string, from int, to int, sorts []string) (request *es.ESQueryRequest) {
  365. req := new(es.ESQueryRequest)
  366. return req.CreateESQueryRequest(ESIndex, ESColumn, key, from, to, sorts, es.Match)
  367. }
  368. func matchRange(key string, from int, to int, max int64, sorts []string) (request *es.ESQueryRequest) {
  369. req := new(es.ESQueryRequest)
  370. return req.CreateESQueryRequest(ESIndex, ESColumn, key, from, to, sorts, es.Range).Range(0, max, ESRangeColumn)
  371. }