Browse Source

fix:调整研报同步逻辑

zqbao 8 months ago
parent
commit
6378d5a123
4 changed files with 277 additions and 113 deletions
  1. 31 99
      controllers/report_push_status.go
  2. 78 14
      models/report_push_status.go
  3. 108 0
      scheduler/task.go
  4. 60 0
      services/task.go

+ 31 - 99
controllers/report_push_status.go

@@ -66,7 +66,7 @@ func (this *ReportPushStatusController) List() {
 	var condition string
 	var pars []interface{}
 	if publishStartDate != "" && publishEndDate != "" {
-		condition += " AND a.publish_time >= ?"
+		condition += " AND publish_time >= ?"
 		publishStartTime, err := time.Parse(utils.FormatDate, publishStartDate)
 		if err != nil {
 			br.Msg = "日期格式有误"
@@ -76,7 +76,7 @@ func (this *ReportPushStatusController) List() {
 		publishStartDateStr := publishStartTime.Format(utils.FormatDateTime)
 		pars = append(pars, publishStartDateStr)
 
-		condition += " AND a.publish_time <= ?"
+		condition += " AND publish_time <= ?"
 		publishEndTime, err := time.Parse(utils.FormatDate, publishEndDate)
 		if err != nil {
 			br.Msg = "日期格式有误"
@@ -88,7 +88,7 @@ func (this *ReportPushStatusController) List() {
 		pars = append(pars, publishEndDateStr)
 	}
 	if pushStartDate != "" && pushEndDate != "" {
-		condition += " AND b.push_time >= ?"
+		condition += " AND push_time >= ?"
 		pushStartTime, err := time.Parse(utils.FormatDate, pushStartDate)
 		if err != nil {
 			br.Msg = "日期格式有误"
@@ -98,7 +98,7 @@ func (this *ReportPushStatusController) List() {
 		pushStartDateStr := pushStartTime.Format(utils.FormatDateTime)
 		pars = append(pars, pushStartDateStr)
 
-		condition += " AND b.push_time <= ?"
+		condition += " AND push_time <= ?"
 		pushEndTime, err := time.Parse(utils.FormatDate, pushEndDate)
 		if err != nil {
 			br.Msg = "日期格式有误"
@@ -110,7 +110,7 @@ func (this *ReportPushStatusController) List() {
 		pars = append(pars, pushEndDateStr)
 	}
 	if keyWord != "" {
-		condition += ` AND a.title like ?  `
+		condition += ` AND title like ?  `
 		pars = utils.GetLikeKeywordPars(pars, keyWord, 1)
 	}
 	var sortCondition string
@@ -119,9 +119,9 @@ func (this *ReportPushStatusController) List() {
 		var param, sort string
 		switch sortParam {
 		case "PublishTime":
-			param = "a.publish_time"
+			param = "publish_time"
 		case "PushTime":
-			param = "b.push_time"
+			param = "push_time"
 		}
 		switch sortType {
 		case "asc":
@@ -136,7 +136,7 @@ func (this *ReportPushStatusController) List() {
 		}
 	}
 	if sortCondition == "" {
-		sortCondition = ` ORDER BY a.publish_time DESC `
+		sortCondition = ` ORDER BY publish_time DESC `
 	}
 
 	classifyIdList := make([]int, 0)
@@ -166,7 +166,7 @@ func (this *ReportPushStatusController) List() {
 	}
 	if len(classifyIdList) > 0 {
 		classifyIdList = utils.Unique(classifyIdList)
-		condition += ` AND (a.classify_id_first IN (%s) AND a.classify_id_second IN (%s) AND a.classify_id_third IN (%s))`
+		condition += ` AND (classify_id_first IN (%s) AND classify_id_second IN (%s) AND classify_id_third IN (%s))`
 		condition = fmt.Sprintf(condition, utils.GetOrmReplaceHolder(len(classifyIdList)), utils.GetOrmReplaceHolder(len(classifyIdList)), utils.GetOrmReplaceHolder(len(classifyIdList)))
 		pars = append(pars, classifyIdList, classifyIdList, classifyIdList)
 	}
@@ -174,7 +174,7 @@ func (this *ReportPushStatusController) List() {
 		if selectedIds != "" {
 			selectIdStrs := strings.Split(selectedIds, ",")
 			if len(selectIdStrs) > 0 {
-				condition += ` AND a.id NOT IN (` + utils.GetOrmReplaceHolder(len(selectIdStrs)) + `)`
+				condition += ` AND report_id NOT IN (` + utils.GetOrmReplaceHolder(len(selectIdStrs)) + `)`
 				pars = append(pars, selectedIds)
 			}
 		}
@@ -182,7 +182,7 @@ func (this *ReportPushStatusController) List() {
 		if selectedIds != "" {
 			selectIdStrs := strings.Split(selectedIds, ",")
 			if len(selectIdStrs) > 0 {
-				condition += ` AND a.id IN (` + utils.GetOrmReplaceHolder(len(selectIdStrs)) + `)`
+				condition += ` AND report_id IN (` + utils.GetOrmReplaceHolder(len(selectIdStrs)) + `)`
 				pars = append(pars, selectedIds)
 			}
 		}
@@ -245,19 +245,6 @@ func (this *ReportPushStatusController) PushCancel() {
 			br.ErrMsg = "取消推送失败,Err:" + err.Error()
 			return
 		}
-	} else {
-		reportPush := &models.ReportPushStatus{}
-		reportPush.ReportId = req.ReportId
-		reportPush.ReportType = 1
-		reportPush.State = 0
-		reportPush.CreateTime = time.Now()
-		reportPush.ModifyTime = time.Now()
-		_, err = reportPush.Insert()
-		if err != nil {
-			br.Msg = "取消推送失败"
-			br.ErrMsg = "新增推送记录失败,Err:" + err.Error()
-			return
-		}
 	}
 
 	br.Msg = "取消推送成功"
@@ -311,20 +298,6 @@ func (this *ReportPushStatusController) Push() {
 			return
 		}
 
-	} else {
-		reportPush := &models.ReportPushStatus{}
-		reportPush.ReportId = req.ReportId
-		reportPush.ReportType = 1
-		reportPush.State = 1
-		reportPush.PushTime = time.Now()
-		reportPush.CreateTime = time.Now()
-		reportPush.ModifyTime = time.Now()
-		_, err = reportPush.Insert()
-		if err != nil {
-			br.Msg = "推送失败"
-			br.ErrMsg = "推送失败,Err:" + err.Error()
-			return
-		}
 	}
 
 	br.Msg = "推送成功"
@@ -353,7 +326,7 @@ func (this *ReportPushStatusController) BatchPush() {
 	var condition string
 	var pars []interface{}
 	if req.PublishStartDate != "" && req.PublishEndDate != "" {
-		condition += " AND a.publish_time >= ?"
+		condition += " AND publish_time >= ?"
 		publishStartTime, err := time.Parse(utils.FormatDate, req.PublishStartDate)
 		if err != nil {
 			br.Msg = "日期格式有误"
@@ -363,7 +336,7 @@ func (this *ReportPushStatusController) BatchPush() {
 		publishStartDateStr := publishStartTime.Format(utils.FormatDateTime)
 		pars = append(pars, publishStartDateStr)
 
-		condition += " AND a.publish_time <= ?"
+		condition += " AND publish_time <= ?"
 		publishEndTime, err := time.Parse(utils.FormatDate, req.PublishEndDate)
 		if err != nil {
 			br.Msg = "日期格式有误"
@@ -375,7 +348,7 @@ func (this *ReportPushStatusController) BatchPush() {
 		pars = append(pars, publishEndDateStr)
 	}
 	if req.PushStartDate != "" && req.PushEndDate != "" {
-		condition += " AND b.push_time >= ?"
+		condition += " AND push_time >= ?"
 		pushStartTime, err := time.Parse(utils.FormatDate, req.PushStartDate)
 		if err != nil {
 			br.Msg = "日期格式有误"
@@ -385,7 +358,7 @@ func (this *ReportPushStatusController) BatchPush() {
 		pushStartDateStr := pushStartTime.Format(utils.FormatDateTime)
 		pars = append(pars, pushStartDateStr)
 
-		condition += " AND b.push_time <= ?"
+		condition += " AND push_time <= ?"
 		pushEndTime, err := time.Parse(utils.FormatDate, req.PushEndDate)
 		if err != nil {
 			br.Msg = "日期格式有误"
@@ -397,7 +370,7 @@ func (this *ReportPushStatusController) BatchPush() {
 		pars = append(pars, pushEndDateStr)
 	}
 	if req.KeyWord != "" {
-		condition += ` AND a.title like ?  `
+		condition += ` AND title like ?  `
 		pars = utils.GetLikeKeywordPars(pars, req.KeyWord, 1)
 	}
 	classifyIdList := make([]int, 0)
@@ -411,18 +384,18 @@ func (this *ReportPushStatusController) BatchPush() {
 	classifyIdList = append(classifyIdList, tmpClassifyList...)
 	if len(classifyIdList) > 0 {
 		classifyIdList = utils.Unique(classifyIdList)
-		condition += ` AND (a.classify_id_first IN (%s) AND a.classify_id_second IN (%s) AND a.classify_id_third IN (%s))`
+		condition += ` AND (classify_id_first IN (%s) AND classify_id_second IN (%s) AND classify_id_third IN (%s))`
 		condition = fmt.Sprintf(condition, utils.GetOrmReplaceHolder(len(classifyIdList)), utils.GetOrmReplaceHolder(len(classifyIdList)), utils.GetOrmReplaceHolder(len(classifyIdList)))
 		pars = append(pars, classifyIdList, classifyIdList, classifyIdList)
 	}
 	if req.IsSelectAll {
 		if len(req.SelectedIds) > 0 {
-			condition += ` AND a.id NOT IN (` + utils.GetOrmReplaceHolder(len(req.SelectedIds)) + `)`
+			condition += ` AND report_id NOT IN (` + utils.GetOrmReplaceHolder(len(req.SelectedIds)) + `)`
 			pars = append(pars, req.SelectedIds)
 		}
 	} else {
 		if len(req.SelectedIds) > 0 {
-			condition += ` AND a.id IN (` + utils.GetOrmReplaceHolder(len(req.SelectedIds)) + `)`
+			condition += ` AND report_id IN (` + utils.GetOrmReplaceHolder(len(req.SelectedIds)) + `)`
 			pars = append(pars, req.SelectedIds)
 		}
 	}
@@ -432,7 +405,7 @@ func (this *ReportPushStatusController) BatchPush() {
 		br.ErrMsg = "查询研报失败,Err:" + err.Error()
 		return
 	}
-	reportPush, err := models.GetReportPushStatusByReportIds(reportIds, 1)
+	reportPush, err := models.GetReportPushStatusByReportIdAndState(reportIds, 0)
 	if err != nil {
 		br.Msg = "批量推送失败"
 		br.ErrMsg = "查询推送状态失败,Err:" + err.Error()
@@ -443,11 +416,8 @@ func (this *ReportPushStatusController) BatchPush() {
 		existReportMap[v.ReportId] = struct{}{}
 	}
 	existReportIds := make([]int, 0)
-	noExistReportIds := make([]int, 0)
 	for _, v := range reportIds {
-		if _, ok := existReportMap[v]; !ok {
-			noExistReportIds = append(noExistReportIds, v)
-		} else {
+		if _, ok := existReportMap[v]; ok {
 			existReportIds = append(existReportIds, v)
 		}
 	}
@@ -458,24 +428,6 @@ func (this *ReportPushStatusController) BatchPush() {
 		br.ErrMsg = "批量修改推送失败,Err:" + err.Error()
 		return
 	}
-	insertReportPushList := make([]*models.ReportPushStatus, 0)
-	for _, v := range noExistReportIds {
-		insertReportPushList = append(insertReportPushList, &models.ReportPushStatus{
-			ReportId:   v,
-			State:      1,
-			ReportType: 1,
-			CreateTime: time.Now(),
-			ModifyTime: time.Now(),
-			PushTime:   time.Now(),
-		})
-	}
-	obj := &models.ReportPushStatus{}
-	err = obj.MultiInsert(insertReportPushList)
-	if err != nil {
-		br.Msg = "批量推送失败"
-		br.ErrMsg = "批量插入推送状态失败,Err:" + err.Error()
-		return
-	}
 
 	br.Msg = "推送成功"
 	br.Success = true
@@ -503,7 +455,7 @@ func (this *ReportPushStatusController) BatchPushCancel() {
 	var condition string
 	var pars []interface{}
 	if req.PublishStartDate != "" && req.PublishEndDate != "" {
-		condition += " AND a.publish_time >= ?"
+		condition += " AND publish_time >= ?"
 		publishStartTime, err := time.Parse(utils.FormatDate, req.PublishStartDate)
 		if err != nil {
 			br.Msg = "日期格式有误"
@@ -513,7 +465,7 @@ func (this *ReportPushStatusController) BatchPushCancel() {
 		publishStartDateStr := publishStartTime.Format(utils.FormatDateTime)
 		pars = append(pars, publishStartDateStr)
 
-		condition += " AND a.publish_time <= ?"
+		condition += " AND publish_time <= ?"
 		publishEndTime, err := time.Parse(utils.FormatDate, req.PublishEndDate)
 		if err != nil {
 			br.Msg = "日期格式有误"
@@ -525,7 +477,7 @@ func (this *ReportPushStatusController) BatchPushCancel() {
 		pars = append(pars, publishEndDateStr)
 	}
 	if req.PushStartDate != "" && req.PushEndDate != "" {
-		condition += " AND b.push_time >= ?"
+		condition += " AND push_time >= ?"
 		pushStartTime, err := time.Parse(utils.FormatDate, req.PushStartDate)
 		if err != nil {
 			br.Msg = "日期格式有误"
@@ -535,7 +487,7 @@ func (this *ReportPushStatusController) BatchPushCancel() {
 		pushStartDateStr := pushStartTime.Format(utils.FormatDateTime)
 		pars = append(pars, pushStartDateStr)
 
-		condition += " AND b.push_time <= ?"
+		condition += " AND push_time <= ?"
 		pushEndTime, err := time.Parse(utils.FormatDate, req.PushEndDate)
 		if err != nil {
 			br.Msg = "日期格式有误"
@@ -547,7 +499,7 @@ func (this *ReportPushStatusController) BatchPushCancel() {
 		pars = append(pars, pushEndDateStr)
 	}
 	if req.KeyWord != "" {
-		condition += ` AND a.title like ?  `
+		condition += ` AND title like ?  `
 		pars = utils.GetLikeKeywordPars(pars, req.KeyWord, 1)
 	}
 	classifyIdList := make([]int, 0)
@@ -561,18 +513,18 @@ func (this *ReportPushStatusController) BatchPushCancel() {
 	classifyIdList = append(classifyIdList, tmpClassifyList...)
 	if len(classifyIdList) > 0 {
 		classifyIdList = utils.Unique(classifyIdList)
-		condition += ` AND (a.classify_id_first IN (%s) AND a.classify_id_second IN (%s) AND a.classify_id_third IN (%s))`
+		condition += ` AND (classify_id_first IN (%s) AND classify_id_second IN (%s) AND classify_id_third IN (%s))`
 		condition = fmt.Sprintf(condition, utils.GetOrmReplaceHolder(len(classifyIdList)), utils.GetOrmReplaceHolder(len(classifyIdList)), utils.GetOrmReplaceHolder(len(classifyIdList)))
 		pars = append(pars, classifyIdList, classifyIdList, classifyIdList)
 	}
 	if req.IsSelectAll {
 		if len(req.SelectedIds) > 0 {
-			condition += ` AND a.id NOT IN (` + utils.GetOrmReplaceHolder(len(req.SelectedIds)) + `)`
+			condition += ` AND report_id NOT IN (` + utils.GetOrmReplaceHolder(len(req.SelectedIds)) + `)`
 			pars = append(pars, req.SelectedIds)
 		}
 	} else {
 		if len(req.SelectedIds) > 0 {
-			condition += ` AND a.id IN (` + utils.GetOrmReplaceHolder(len(req.SelectedIds)) + `)`
+			condition += ` AND report_id IN (` + utils.GetOrmReplaceHolder(len(req.SelectedIds)) + `)`
 			pars = append(pars, req.SelectedIds)
 		}
 	}
@@ -582,7 +534,7 @@ func (this *ReportPushStatusController) BatchPushCancel() {
 		br.ErrMsg = "查询研报失败,Err:" + err.Error()
 		return
 	}
-	reportPush, err := models.GetReportPushStatusByReportIds(reportIds, 1)
+	reportPush, err := models.GetReportPushStatusByReportIdAndState(reportIds, 1)
 	if err != nil {
 		br.Msg = "批量推送失败"
 		br.ErrMsg = "查询推送状态失败,Err:" + err.Error()
@@ -593,11 +545,8 @@ func (this *ReportPushStatusController) BatchPushCancel() {
 		existReportMap[v.ReportId] = struct{}{}
 	}
 	existReportIds := make([]int, 0)
-	noExistReportIds := make([]int, 0)
 	for _, v := range reportIds {
-		if _, ok := existReportMap[v]; !ok {
-			noExistReportIds = append(noExistReportIds, v)
-		} else {
+		if _, ok := existReportMap[v]; ok {
 			existReportIds = append(existReportIds, v)
 		}
 	}
@@ -608,23 +557,6 @@ func (this *ReportPushStatusController) BatchPushCancel() {
 		br.ErrMsg = "批量修改推送失败,Err:" + err.Error()
 		return
 	}
-	insertReportPushList := make([]*models.ReportPushStatus, 0)
-	for _, v := range noExistReportIds {
-		insertReportPushList = append(insertReportPushList, &models.ReportPushStatus{
-			ReportId:   v,
-			State:      0,
-			ReportType: 1,
-			CreateTime: time.Now(),
-			ModifyTime: time.Now(),
-		})
-	}
-	obj := &models.ReportPushStatus{}
-	err = obj.MultiInsert(insertReportPushList)
-	if err != nil {
-		br.Msg = "批量撤销推送失败"
-		br.ErrMsg = "批量插入推送状态失败,Err:" + err.Error()
-		return
-	}
 
 	br.Msg = "撤销推送成功"
 	br.Success = true

+ 78 - 14
models/report_push_status.go

@@ -11,12 +11,42 @@ type ReportPushStatus struct {
 	ReportPushStatusId int       `orm:"pk"`
 	ReportId           int       `description:"报告id"`
 	State              int       `description:"报告状态:0-未推送,1-已推送"`
+	Title              string    `description:"报告标题"`
+	Abstract           string    `description:"报告摘要"`
+	Stage              int       `description:"期数"`
+	ClassifyIdFirst    int       `description:"一级分类id"`
+	ClassifyNameFirst  string    `description:"一级分类名称"`
+	ClassifyIdSecond   int       `description:"二级分类id"`
+	ClassifyNameSecond string    `description:"二级分类名称"`
+	ClassifyIdThird    int       `description:"三级分类id"`
+	ClassifyNameThird  string    `description:"三级分类名称"`
+	Author             string    `description:"报告作者"`
 	ReportType         int       `description:"报告类型:1-eta报告"`
+	PublishTime        time.Time `description:"报告发布时间"`
 	CreateTime         time.Time `description:"创建时间"`
 	ModifyTime         time.Time `description:"修改时间"`
 	PushTime           time.Time `description:"推送时间"`
 }
 
+type Report struct {
+	Id                 int       `description:"报告id"`   // id
+	ClassifyIdFirst    int       `description:"一级分类id"` // 一级分类id
+	ClassifyNameFirst  string    `description:"一级分类名称"` // 一级分类名称
+	ClassifyIdSecond   int       `description:"二级分类id"` // 二级分类id
+	ClassifyNameSecond string    `description:"二级分类名称"` // 二级分类名称
+	ClassifyIdThird    int       `description:"三级分类id"` // 三级分类id
+	ClassifyNameThird  string    `description:"三级分类名称"` // 三级分类名称
+	Title              string    `description:"报告标题"`   // 标题
+	Abstract           string    `description:"摘要"`     // 摘要
+	Author             string    `description:"作者"`     // 作者
+	CreateTime         time.Time `description:"创建时间"`   // 创建时间
+	ModifyTime         time.Time `description:"修改时间"`   // 修改时间
+	PublishTime        time.Time `description:"发布时间"`   // 发布时间
+	Stage              int       `description:"期数"`     // 期数
+	ContentModifyTime  time.Time `description:"内容更新时间"` // 内容更新时间
+	ReportCreateTime   time.Time `description:"创建时间"`   // 报告时间创建时间
+}
+
 type ReportPushView struct {
 	ReportPushStatusId int    `orm:"pk"`
 	ReportId           int    `description:"报告id"`
@@ -62,7 +92,7 @@ func GetReportPushStatusByReportId(reportId, state int) (item *ReportPushStatus,
 	return
 }
 
-func GetReportPushStatusByReportIds(reportId []int, state int) (items []*ReportPushStatus, err error) {
+func GetReportPushStatusByReportIdAndState(reportId []int, state int) (items []*ReportPushStatus, err error) {
 	if len(reportId) == 0 {
 		return
 	}
@@ -72,13 +102,19 @@ func GetReportPushStatusByReportIds(reportId []int, state int) (items []*ReportP
 	return
 }
 
+func GetReportPushStatusByReportIds(reportId []int) (items []*ReportPushStatus, err error) {
+	if len(reportId) == 0 {
+		return
+	}
+	o := orm.NewOrm()
+	sql := `SELECT * FROM report_push_status WHERE report_id IN (` + utils.GetOrmReplaceHolder(len(reportId)) + `) `
+	_, err = o.Raw(sql, reportId).QueryRows(&items)
+	return
+}
+
 func GetReportPushStatusListByCondition(condition string, pars []interface{}, startSize, pageSize int) (items []*ReportPushView, err error) {
 	o := orm.NewOrm()
-	sql := `SELECT a.id AS report_id, a.title, a.abstract, a.classify_id_first, a.classify_name_first, a.classify_id_second, a.classify_name_second, a.classify_id_third, a.classify_name_third,
-		a.author, b.report_push_status_id, b.create_time, b.state, b.modify_time, b.push_time, b.create_time
-		FROM test_v2_hongze_rddp.report AS a
-		LEFT JOIN report_push_status AS b ON a.id=b.report_id	
-		WHERE 1=1 AND (a.state=2 OR a.state=6) `
+	sql := `SELECT * FROM report_push_status WHERE 1=1  `
 	if condition != "" {
 		sql += condition
 	}
@@ -87,12 +123,19 @@ func GetReportPushStatusListByCondition(condition string, pars []interface{}, st
 	return
 }
 
+func GetReportByCondition(condition string) (items []*Report, err error) {
+	o := orm.NewOrmUsingDB("rddp")
+	sql := `SELECT * FROM report WHERE 1=1 AND (state=2 OR state=6) `
+	if condition != "" {
+		sql += condition
+	}
+	_, err = o.Raw(sql).QueryRows(&items)
+	return
+}
+
 func GetReportIdListByCondition(condition string, pars []interface{}) (reportId []int, err error) {
 	o := orm.NewOrm()
-	sql := `SELECT a.id AS report_id
-		FROM test_v2_hongze_rddp.report AS a
-		LEFT JOIN report_push_status AS b ON a.id=b.report_id	
-		WHERE 1=1 AND (a.state=2 OR a.state=6) `
+	sql := `SELECT report_id FROM report_push_status WHERE 1=1 `
 	if condition != "" {
 		sql += condition
 	}
@@ -109,10 +152,7 @@ func GetReportCountById(id int) (count int, err error) {
 
 func GetReportCountByCondition(condition string, pars []interface{}) (count int, err error) {
 	o := orm.NewOrm()
-	sql := `SELECT COUNT(*) AS count
-		FROM test_v2_hongze_rddp.report AS a
-		LEFT JOIN report_push_status AS b ON a.id=b.report_id	
-		WHERE 1=1 AND (a.state=2 OR a.state=6) `
+	sql := `SELECT COUNT(*) AS count FROM report_push_status WHERE 1=1 `
 	if condition != "" {
 		sql += condition
 	}
@@ -135,3 +175,27 @@ func BatchPushCancelReport(reportId []int) (err error) {
 	_, err = o.Raw(sql, reportId).Exec()
 	return
 }
+
+// BatchAddReportPushStatus 批量添加报告
+func BatchAddReportPushStatus(items []*ReportPushStatus) (err error) {
+	if len(items) == 0 {
+		return
+	}
+	o := orm.NewOrm()
+	_, err = o.InsertMulti(100, items)
+	return
+}
+
+func GetMaxSyncIdReportPush(report_type int) (count int, err error) {
+	o := orm.NewOrm()
+	sql := `SELECT MAX(report_id) AS max_id FROM report_push_status WHERE report_type=?`
+	err = o.Raw(sql, report_type).QueryRow(&count)
+	return
+}
+
+func GetBatchReport(maxId, batchSize int) (items []*Report, err error) {
+	o := orm.NewOrmUsingDB("rddp")
+	sql := `SELECT * FROM report WHERE id>? AND (state=2 OR state=6) LIMIT ?`
+	o.Raw(sql, maxId, batchSize).QueryRows(&items)
+	return
+}

+ 108 - 0
scheduler/task.go

@@ -18,9 +18,117 @@ func InitJob() {
 	// 每天凌晨12点检测, 修改用户状态
 	tk2 := task.NewTask("ModifyUserStatus", "5 0 0 * * *", ModifyUserStatus)
 	task.AddTask("定时修改用户状态", tk2)
+	// 每隔1分钟,同步报告状态
+	tk3 := task.NewTask("SyncReportPushStatus", "0 0/1 * * * *", SyncReportPushStatus)
+	task.AddTask("定时同步报告推送状态", tk3)
 	task.StartTask()
 }
 
+func SyncReportPushStatus(ctx context.Context) (err error) {
+	defer func() {
+		if err != nil {
+			utils.FileLog.Info("同步研报推送状态出错,Err: %s", err)
+		}
+		if err := recover(); err != nil {
+			utils.FileLog.Warn("同步研报推送状态出错,定时任务出错,Err: %s", err)
+		}
+	}()
+
+	// 获取前三分钟修改的数据
+	conditon := " AND modify_time >= NOW() - INTERVAL 3 MINUTE"
+	reportList, err := models.GetReportByCondition(conditon)
+	var reportIds []int
+	for _, v := range reportList {
+		reportIds = append(reportIds, v.Id)
+	}
+	reportPushList, err := models.GetReportPushStatusByReportIds(reportIds)
+	if err != nil {
+		return
+	}
+	reportPushMap := make(map[int]*models.ReportPushStatus)
+	for _, v := range reportPushList {
+		reportPushMap[v.ReportId] = v
+	}
+	var insertReportPushList []*models.ReportPushStatus
+	for _, v := range reportList {
+		var updateCols []string
+		if reportPush, ok := reportPushMap[v.Id]; ok {
+			if reportPush.Title != v.Title {
+				reportPush.Title = v.Title
+				updateCols = append(updateCols, "title")
+			}
+			if reportPush.Abstract != v.Abstract {
+				reportPush.Abstract = v.Abstract
+				updateCols = append(updateCols, "abstract")
+			}
+			if reportPush.Stage != v.Stage {
+				reportPush.Stage = v.Stage
+				updateCols = append(updateCols, "stage")
+			}
+			if reportPush.ClassifyIdFirst != v.ClassifyIdFirst {
+				reportPush.ClassifyIdFirst = v.ClassifyIdFirst
+				updateCols = append(updateCols, "classify_id_first")
+			}
+			if reportPush.ClassifyNameFirst != v.ClassifyNameFirst {
+				reportPush.ClassifyNameFirst = v.ClassifyNameFirst
+				updateCols = append(updateCols, "classify_name_first")
+			}
+			if reportPush.ClassifyIdSecond != v.ClassifyIdSecond {
+				reportPush.ClassifyIdSecond = v.ClassifyIdSecond
+				updateCols = append(updateCols, "classify_id_second")
+			}
+			if reportPush.ClassifyNameSecond != v.ClassifyNameSecond {
+				reportPush.ClassifyNameSecond = v.ClassifyNameSecond
+				updateCols = append(updateCols, "classify_name_second")
+			}
+			if reportPush.ClassifyIdThird != v.ClassifyIdThird {
+				reportPush.ClassifyIdThird = v.ClassifyIdThird
+				updateCols = append(updateCols, "classify_id_third")
+			}
+			if reportPush.ClassifyNameThird != v.ClassifyNameThird {
+				reportPush.ClassifyNameThird = v.ClassifyNameThird
+				updateCols = append(updateCols, "classify_name_third")
+			}
+			if reportPush.Author != v.Author {
+				reportPush.Author = v.Author
+				updateCols = append(updateCols, "author")
+			}
+			if reportPush.PublishTime != v.PublishTime {
+				reportPush.PublishTime = v.PublishTime
+				updateCols = append(updateCols, "publish_time")
+			}
+			if len(updateCols) > 0 {
+				reportPush.Update(updateCols)
+			}
+		} else {
+			insertReportPushList = append(insertReportPushList, &models.ReportPushStatus{
+				ReportId:           v.Id,
+				State:              0,
+				Title:              v.Title,
+				Abstract:           v.Abstract,
+				Stage:              v.Stage,
+				ClassifyIdFirst:    v.ClassifyIdFirst,
+				ClassifyNameFirst:  v.ClassifyNameFirst,
+				ClassifyIdSecond:   v.ClassifyIdSecond,
+				ClassifyNameSecond: v.ClassifyNameSecond,
+				ClassifyIdThird:    v.ClassifyIdThird,
+				ClassifyNameThird:  v.ClassifyNameThird,
+				Author:             v.Author,
+				ReportType:         1,
+				PublishTime:        v.PublishTime,
+				CreateTime:         time.Now(),
+				ModifyTime:         time.Now(),
+			})
+		}
+	}
+
+	err = models.BatchAddReportPushStatus(insertReportPushList)
+	if err != nil {
+		return
+	}
+	return
+}
+
 func ModifyUserStatus(ctx context.Context) (err error) {
 	defer func() {
 		if err != nil {

+ 60 - 0
services/task.go

@@ -1,14 +1,17 @@
 package services
 
 import (
+	"eta/eta_mini_crm/models"
 	"eta/eta_mini_crm/services/elastic"
 	"eta/eta_mini_crm/utils"
 	"fmt"
+	"time"
 )
 
 func InitTask() {
 	fmt.Println("start task")
 	CreateIndex()
+	InitReportPushStatus()
 	fmt.Println("end task!")
 }
 
@@ -50,3 +53,60 @@ func CreateIndex() {
 		fmt.Println(err)
 	}
 }
+
+func InitReportPushStatus() {
+	for {
+		maxId, err := models.GetMaxSyncIdReportPush(1)
+		fmt.Println("同步研报开始, maxId:", maxId)
+		if err != nil {
+			fmt.Println("同步研报失败, Err:", err)
+		}
+		reportList, err := models.GetBatchReport(maxId, 100)
+		if err != nil {
+			fmt.Println("同步研报失败, Err:", err)
+		}
+		var reportIds []int
+		for _, v := range reportList {
+			reportIds = append(reportIds, v.Id)
+		}
+		reportPushList, err := models.GetReportPushStatusByReportIds(reportIds)
+		if err != nil {
+			return
+		}
+		reportPushMap := make(map[int]struct{})
+		for _, v := range reportPushList {
+			reportPushMap[v.ReportId] = struct{}{}
+		}
+		var insertReportPushList []*models.ReportPushStatus
+		for _, v := range reportList {
+			if _, ok := reportPushMap[v.Id]; !ok {
+				insertReportPushList = append(insertReportPushList, &models.ReportPushStatus{
+					ReportId:           v.Id,
+					State:              0,
+					Title:              v.Title,
+					Abstract:           v.Abstract,
+					Stage:              v.Stage,
+					ClassifyIdFirst:    v.ClassifyIdFirst,
+					ClassifyNameFirst:  v.ClassifyNameFirst,
+					ClassifyIdSecond:   v.ClassifyIdSecond,
+					ClassifyNameSecond: v.ClassifyNameSecond,
+					ClassifyIdThird:    v.ClassifyIdThird,
+					ClassifyNameThird:  v.ClassifyNameThird,
+					Author:             v.Author,
+					ReportType:         1,
+					PublishTime:        v.PublishTime,
+					CreateTime:         time.Now(),
+					ModifyTime:         time.Now(),
+				})
+			}
+		}
+		err = models.BatchAddReportPushStatus(insertReportPushList)
+		if err != nil {
+			return
+		}
+		if len(reportList) != 100 {
+			fmt.Println("同步研报完成")
+			return
+		}
+	}
+}