task.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package services
  2. import (
  3. "eta/eta_mini_crm/models"
  4. "eta/eta_mini_crm/services/elastic"
  5. "eta/eta_mini_crm/utils"
  6. "fmt"
  7. "time"
  8. )
  9. func InitTask() {
  10. fmt.Println("start task")
  11. CreateIndex()
  12. InitReportPushStatus()
  13. fmt.Println("end task!")
  14. }
  15. func CreateIndex() {
  16. var mappingJson = `{
  17. "mappings": {
  18. "properties": {
  19. "ReportPdfId": {"type": "integer"},
  20. "PdfUrl": {"type": "text"},
  21. "PdfName": {"type": "text"},
  22. "Title": {"type": "text"},
  23. "Author": {"type": "text"},
  24. "Abstract": {"type": "text"},
  25. "ClassifyIdFirst": {"type": "integer"},
  26. "ClassifyNameFirst": {"type": "text"},
  27. "ClassifyIdSecond": {"type": "integer"},
  28. "ClassifyNameSecond": {"type": "text"},
  29. "Stage": {"type": "integer"},
  30. "PublishTime": {
  31. "type": "text",
  32. "fields": {
  33. "keyword": {
  34. "type": "keyword",
  35. "ignore_above": 256
  36. }
  37. }
  38. },
  39. "ModifyTime": {"type": "date"},
  40. "Pv": {"type": "integer"},
  41. "Uv": {"type": "integer"},
  42. "SysUserId": {"type": "integer"},
  43. "SysRealName": {"type": "text"},
  44. "State": {"type": "integer"}
  45. }
  46. }
  47. }`
  48. err := elastic.EsCreateIndex(utils.MINI_REPORT_INDEX_NAME, mappingJson)
  49. if err != nil {
  50. fmt.Println(err)
  51. }
  52. }
  53. func InitReportPushStatus() {
  54. for {
  55. maxId, err := models.GetMaxSyncIdReportPush(1)
  56. fmt.Println("同步研报开始, maxId:", maxId)
  57. if err != nil {
  58. fmt.Println("同步研报失败, Err:", err)
  59. }
  60. reportList, err := models.GetBatchReport(maxId, 100)
  61. if err != nil {
  62. fmt.Println("同步研报失败, Err:", err)
  63. }
  64. var reportIds []int
  65. for _, v := range reportList {
  66. reportIds = append(reportIds, v.Id)
  67. }
  68. reportPushList, err := models.GetReportPushStatusByReportIds(reportIds)
  69. if err != nil {
  70. return
  71. }
  72. reportPushMap := make(map[int]struct{})
  73. for _, v := range reportPushList {
  74. reportPushMap[v.ReportId] = struct{}{}
  75. }
  76. var insertReportPushList []*models.ReportPushStatus
  77. for _, v := range reportList {
  78. if _, ok := reportPushMap[v.Id]; !ok {
  79. insertReportPushList = append(insertReportPushList, &models.ReportPushStatus{
  80. ReportId: v.Id,
  81. State: 0,
  82. Title: v.Title,
  83. Abstract: v.Abstract,
  84. Stage: v.Stage,
  85. ClassifyIdFirst: v.ClassifyIdFirst,
  86. ClassifyNameFirst: v.ClassifyNameFirst,
  87. ClassifyIdSecond: v.ClassifyIdSecond,
  88. ClassifyNameSecond: v.ClassifyNameSecond,
  89. ClassifyIdThird: v.ClassifyIdThird,
  90. ClassifyNameThird: v.ClassifyNameThird,
  91. Author: v.Author,
  92. ReportType: 1,
  93. PublishTime: v.PublishTime,
  94. CreateTime: time.Now(),
  95. ModifyTime: time.Now(),
  96. })
  97. }
  98. }
  99. err = models.BatchAddReportPushStatus(insertReportPushList)
  100. if err != nil {
  101. return
  102. }
  103. if len(reportList) != 100 {
  104. fmt.Println("同步研报完成")
  105. return
  106. }
  107. }
  108. }