浏览代码

报告同步

kobe6258 7 月之前
父节点
当前提交
7a882a6e48

+ 7 - 0
common/component/cache/redis.go

@@ -115,6 +115,13 @@ func (r *RedisCache) IsExistWithContext(ctx context.Context, key string) bool {
 	return result > 0
 }
 
+func (r *RedisCache) SetIfNotExist(key string, val string, expired int) bool {
+	return r.SetIfNotExistWithContext(context.Background(), key, val, expired)
+}
+func (r *RedisCache) SetIfNotExistWithContext(ctx context.Context, key string, val string, expired int) bool {
+	return r.redisTemplate.SetNX(ctx, key, val, time.Duration(expired)*time.Second).Val()
+}
+
 // Delete 删除
 func (r *RedisCache) Delete(key string) error {
 	return r.DeleteWithContext(context.Background(), key)

+ 4 - 0
common/utils/redis/key_generator.go

@@ -18,3 +18,7 @@ func GenerateSmsKey(mobile string) string {
 func GenerateTokenKey(mobile string) string {
 	return fmt.Sprint(LoginTokenPrefix, mobile)
 }
+
+func GenerateReportRefreshKey(source string, id int, modifyTime int64) string {
+	return fmt.Sprintf("%s:%d:%d", source, id, modifyTime)
+}

+ 9 - 6
common/utils/redis/redis_client.go

@@ -9,16 +9,19 @@ import (
 type RedisClient interface {
 	GetString(key string) string
 	GetStringWithContext(ctx context.Context, key string) string
-	SetString(key string, val string, timeout time.Duration) error
-	SetStringWithContext(ctx context.Context, key string, val string, timeout time.Duration) error
-	GetHSet(key string) string
-	GetHSetWithContext(ctx context.Context, key string) string
-	SetHSet(key string, val string, timeout time.Duration) error
-	SetHSetWithContext(ctx context.Context, key string, val string, timeout time.Duration) error
+	SetString(key string, val string, expired int) error
+	SetStringWithContext(ctx context.Context, key string, val string, expired int) error
+	GetHSet(key string) map[string]string
+	GetHSetWithContext(ctx context.Context, key string) map[string]string
+	SetHSet(key string, timeout time.Duration, val ...interface{}) error
+	SetHSetWithContext(ctx context.Context, key string, timeout time.Duration, val ...interface{}) error
 	Delete(key string) error
 	DeleteWithContext(ctx context.Context, key string) error
 	IsExist(key string) bool
 	IsExistWithContext(ctx context.Context, key string) bool
 	Do(commandName string, args ...interface{}) (reply interface{}, err error)
 	DoWithContext(ctx context.Context, commandName string, args ...interface{}) (reply interface{}, err error)
+
+	SetIfNotExist(key string, val string, expired int) bool // 0:设置失败,1:设置成功
+	SetIfNotExistWithContext(ctx context.Context, key string, val string, expired int) error
 }

+ 42 - 51
domian/report/report_service.go

@@ -17,7 +17,6 @@ import (
 	mediaDao "eta/eta_mini_ht_api/models/media"
 	reportDao "eta/eta_mini_ht_api/models/report"
 	userDao "eta/eta_mini_ht_api/models/user"
-	"github.com/google/uuid"
 	"math/rand"
 	"strconv"
 	"strings"
@@ -349,28 +348,12 @@ func SyncETAReportList(list []eta.ETAReport) (err error) {
 				coverSrc = ids[randomIndex]
 				break
 			}
-			//permissionsId := permissions[0].ChartPermissionID
-			//var ids []int
-			//ids, err = mediaDao.GetIdsByPermissionId(permissionsId)
-			//if err != nil {
-			//	logger.Error("获取图片资源失败:%v", err)
-			//}
-			//if ids == nil || len(ids) == 0 {
-			//	coverSrc = 0
-			//} else {
-			//	src := rand.NewSource(time.Now().UnixNano())
-			//	r := rand.New(src)
-			//	// 从切片中随机选择一个元素
-			//	randomIndex := r.Intn(len(ids))
-			//	coverSrc = ids[randomIndex]
-			//}
 		}
 		status := etaStatus(etaRp.State)
 		destRp := convertEtaReport(etaRp, status)
 		destRp.CoverSrc = coverSrc
 		reports = append(reports, destRp)
 	}
-	//}
 	esList, err := reportDao.InsertOrUpdateReport(reports, SourceETA)
 	if esList == nil {
 		return
@@ -445,41 +428,53 @@ func syncESAndSendMessage(reports []reportDao.Report) (err error) {
 	//生产meta信息
 	logger.Info("生成推送META信息")
 	for _, report := range reports {
-		userIds := userService.GetPostUser(report.Author, report.PublishedTime)
-		var author analystService.FinancialAnalystDTO
-		author, err = analystService.GetAnalystByName(report.Author)
-		if err != nil {
-			logger.Error("获取研报作者失败:%v", err)
+		if report.Status == reportDao.StatusUnPublish {
+			logger.Info("报告取消发布,不需要生成推送消息")
 			continue
 		}
+		var From string
+		switch report.Source {
+		case SourceETA:
+			From = "ETA"
+		case SourceHT:
+			From = "HT"
+		default:
+			From = "UNKNOWN"
+		}
 		authors := strings.Split(report.Author, ",")
 		authors = stringUtils.RemoveEmptyStrings(authors)
-		if len(authors) > 0 && len(userIds) > 0 {
-			logger.Info("推送META信息,用户ID:%v", userIds)
+		if len(authors) > 0 {
 			for _, authorName := range authors {
-				usersStr := stringUtils.IntToStringSlice(userIds)
-				Meta := userService.MetaData{
-					AuthorName:    authorName,
-					AuthorId:      author.Id,
-					SourceId:      report.ID,
-					PublishedTime: report.PublishedTime,
-				}
-				metaStr, _ := json.Marshal(Meta)
-				toStr := strings.Join(usersStr, ",")
-				UUID := uuid.New()
-				uuidStr := UUID.String()
-				metaContent := userService.MetaInfoDTO{
-					From:       "HT",
-					Uid:        "report:" + uuidStr,
-					Meta:       string(metaStr),
-					MetaType:   "USER_NOTICE",
-					SourceType: "REPORT",
-					To:         toStr,
-				}
-				err = userService.CreateMetaInfo(metaContent)
-				if err != nil {
-					logger.Error("创建Meta信息失败:%v", err)
-					return err
+				userIds := userService.GetPostUser(authorName, report.PublishedTime)
+				if len(userIds) > 0 {
+					logger.Info("推送META信息,用户ID:%v", userIds)
+					var author analystService.FinancialAnalystDTO
+					author, err = analystService.GetAnalystByName(authorName)
+					if err != nil {
+						logger.Error("获取研报作者失败:%v", err)
+						continue
+					}
+					usersStr := stringUtils.IntToStringSlice(userIds)
+					Meta := userService.MetaData{
+						AuthorName:    author.Name,
+						AuthorId:      author.Id,
+						SourceId:      report.ID,
+						PublishedTime: report.PublishedTime,
+					}
+					metaStr, _ := json.Marshal(Meta)
+					toStr := strings.Join(usersStr, ",")
+					metaContent := userService.MetaInfoDTO{
+						From:       From,
+						Meta:       string(metaStr),
+						MetaType:   "USER_NOTICE",
+						SourceType: "REPORT",
+						To:         toStr,
+					}
+					err = userService.CreateMetaInfo(metaContent)
+					if err != nil {
+						logger.Error("创建Meta信息失败:%v", err)
+						return err
+					}
 				}
 			}
 		}
@@ -582,12 +577,8 @@ func SyncHTReportList(list []ht.HTReport) (noRecord bool, err error) {
 				if permission.AuthorNames != "" {
 					htRp.PublishUserName = permission.AuthorNames
 				}
-				//authorNames := strings.Split(htRp.PublishUserName, ",")
-				//authorNamesWithOutEmpty := stringUtils.RemoveEmptyStrings(authorNames)
-				//for _, authorName := range authorNamesWithOutEmpty {
 				status := htStatus(htRp.Status, htRp.IsDelete)
 				destRp := convertHTReport(htRp, status)
-				//destRp.Author = authorName
 				var coverSrc int
 				var permissionId int
 				permissionId, err = etaDao.GetPermissionIdByName(htRp.PermissionName)

+ 0 - 3
domian/user/meta_info.go

@@ -6,7 +6,6 @@ import (
 
 type MetaInfoDTO struct {
 	Id         int    `json:"id"`
-	Uid        string `json:"Uid,omitempty"`
 	Meta       string `json:"Meta,omitempty"`
 	From       string `json:"From,omitempty"`
 	To         string `json:"To,omitempty"`
@@ -26,7 +25,6 @@ func CreateMetaInfo(dto MetaInfoDTO) (err error) {
 }
 func convertToMetaInfo(dto MetaInfoDTO) userDao.MetaInfo {
 	return userDao.MetaInfo{
-		Uid:        dto.Uid,
 		Meta:       dto.Meta,
 		From:       dto.From,
 		To:         dto.To,
@@ -37,7 +35,6 @@ func convertToMetaInfo(dto MetaInfoDTO) userDao.MetaInfo {
 func convertToMetaDTO(dto userDao.MetaInfo) MetaInfoDTO {
 	return MetaInfoDTO{
 		Id:         dto.Id,
-		Uid:        dto.Uid,
 		Meta:       dto.Meta,
 		From:       dto.From,
 		To:         dto.To,

+ 22 - 10
models/eta/eta_report.go

@@ -1,15 +1,17 @@
 package eta
 
 import (
+	"eta/eta_mini_ht_api/common/component/cache"
 	logger "eta/eta_mini_ht_api/common/component/log"
+	"eta/eta_mini_ht_api/common/utils/redis"
 	"eta/eta_mini_ht_api/models"
 	"strings"
 	"time"
 )
 
 const (
-	colunms      = "id,author,abstract,title,publish_time,state,"
-	detailColumn = "id,author,abstract,title,publish_time,content,collaborate_type,report_layout,video_url,video_name,video_play_seconds,head_resource_id,end_resource_id,has_chapter,need_splice,state"
+	colunms      = "id,author,abstract,title,publish_time,state,modify_time,"
+	detailColumn = "id,author,abstract,title,publish_time,content,collaborate_type,report_layout,video_url,video_name,video_play_seconds,head_resource_id,end_resource_id,has_chapter,need_splice,state,modify_time,"
 	Published    = 2
 	Passed       = 6
 
@@ -19,8 +21,16 @@ const (
 
 var (
 	classifyIds = []string{"classify_id_third", "classify_id_second", "classify_id_first"}
+
+	redisCache *cache.RedisCache
 )
 
+func rd() *cache.RedisCache {
+	if redisCache == nil {
+		redisCache = cache.GetInstance()
+	}
+	return redisCache
+}
 func (ETAReport) TableName() string {
 	return "report"
 }
@@ -45,6 +55,7 @@ type ETAReport struct {
 	EndResourceId    int       `gorm:"column:end_resource_id"`
 	HasChapter       bool      `gorm:"column:has_chapter"`
 	NeedSplice       bool      `gorm:"column:need_splice"`
+	ModifyTime       time.Time `gorm:"column:modify_time"`
 	State            int       `gorm:"column:state"`
 }
 
@@ -68,23 +79,24 @@ func GetETAReports(id int) (reports []ETAReport, err error) {
 	return
 }
 
-func GetUpdateETAReports() (reports []ETAReport, err error) {
+func GetUpdateETAReports() (filterList []ETAReport, err error) {
 	duration := time.Now().Add(-30 * time.Second)
 	modifyTime := duration.Format(time.DateTime)
+	var reports []ETAReport
 	err = models.ETA().Table("report").Select(colunms+strings.Join(classifyIds, ",")).Where("modify_time >=?", modifyTime).Order("id asc").Find(&reports).Error
 	if err != nil {
 		logger.Error("同步eta研报数据失败:%v", err)
 	}
 	if reports != nil {
 		for i := 0; i < len(reports); i++ {
-			//var date time.Time
-			//date, err = time.Parse(time.DateTime, reports[i].PublishTime)
-			//if err != nil {
-			//	logger.Error("时间转换错误:%v", err)
-			//} else {
-			//	reports[i].PublishedTime = date
-			//}
 			setClassifyIdValue(&reports[i])
+			key := redis.GenerateReportRefreshKey("ETA", reports[i].ID, reports[i].ModifyTime.UnixMilli())
+			if rd().SetIfNotExist(key, "processed", 60) {
+				logger.Info("同步eta报告 ID :%d,修改时间:%v", reports[i].ID, reports[i].ModifyTime)
+				filterList = append(filterList, reports[i])
+			} else {
+				logger.Info("过滤ETA重复的消息 ID :%d,修改时间:%v", reports[i].ID, reports[i].ModifyTime)
+			}
 		}
 	}
 	return

+ 0 - 1
models/user/meta_info.go

@@ -26,7 +26,6 @@ const (
 // MetaInfo 表示 meta_infos 表的模型
 type MetaInfo struct {
 	Id          int        `gorm:"primaryKey;autoIncrement;column:id"`
-	Uid         string     `gorm:"column:uid"`
 	Meta        string     `gorm:"column:meta"`
 	From        string     `gorm:"column:from"`
 	To          string     `gorm:"column:to"`

+ 30 - 10
task/report/report_update_task.go

@@ -2,8 +2,10 @@ package report
 
 import (
 	"encoding/json"
+	"eta/eta_mini_ht_api/common/component/cache"
 	logger "eta/eta_mini_ht_api/common/component/log"
 	"eta/eta_mini_ht_api/common/contants"
+	"eta/eta_mini_ht_api/common/utils/redis"
 	"eta/eta_mini_ht_api/domian/report"
 	"eta/eta_mini_ht_api/models/eta"
 	"eta/eta_mini_ht_api/models/ht"
@@ -15,8 +17,16 @@ import (
 var (
 	updateTaskName base.TaskType = "ReportUpdateSyncTask"
 	updateCron                   = "0/10 * * * * *"
+	redisCache     *cache.RedisCache
 )
 
+func rd() *cache.RedisCache {
+	if redisCache == nil {
+		redisCache = cache.GetInstance()
+	}
+	return redisCache
+}
+
 // Execute Task ETA取研报的数据
 func (re *ReportUpdateTask) Execute(taskDetail *base.TaskDetail) error {
 	logger.Info(contants.TaskFormat, "更新研报库开始")
@@ -45,31 +55,41 @@ func (re *ReportUpdateTask) Execute(taskDetail *base.TaskDetail) error {
 	go func() {
 		defer wg.Done()
 		var htReportList []ht.HTReport
-		etaReportList, err := ht.GetUpdateHTReports()
+		var filterReportList []ht.HTReport
+		htReportList, err := ht.GetUpdateHTReports()
+		for i := 0; i < len(htReportList); i++ {
+			key := redis.GenerateReportRefreshKey("HT", htReportList[i].Id, int64(htReportList[i].PublishTime))
+			if rd().SetIfNotExist(key, "processed", 60) {
+				logger.Info("同步eta报告 ID :%d,修改时间:%v", htReportList[i].Id, int64(htReportList[i].PublishTime))
+				filterReportList = append(filterReportList, htReportList[i])
+			} else {
+				logger.Info("过滤ETA重复的消息 ID :%d,修改时间:%v", htReportList[i].Id, int64(htReportList[i].PublishTime))
+			}
+		}
 		if err != nil {
 			logger.Error("获取ETA研报列表失败:%v", err)
 		}
-		if len(etaReportList) > 0 {
-			for i := 0; i < len(htReportList); i++ {
-				timestamp := int64(htReportList[i].PublishTime)
+		if len(filterReportList) > 0 {
+			for i := 0; i < len(filterReportList); i++ {
+				timestamp := int64(filterReportList[i].PublishTime)
 				t := time.UnixMilli(timestamp)
-				htReportList[i].PublishedTime = t.Format(time.DateTime)
-				plateId := htReportList[i].PlateId
+				filterReportList[i].PublishedTime = t.Format(time.DateTime)
+				plateId := filterReportList[i].PlateId
 				plate, err := ht.GetPermissionNameById(plateId)
 				if err != nil || plate.ParentId == 0 {
-					htReportList[i].PermissionName = htReportList[i].PlateName
+					filterReportList[i].PermissionName = filterReportList[i].PlateName
 				} else {
 					PermissionName, err := getPermissionNameById(plate.ParentId)
 					if err != nil {
 						logger.Error("获取ETA研报列表失败:%v", err)
-						htReportList[i].PermissionName = ""
+						filterReportList[i].PermissionName = ""
 					} else {
-						htReportList[i].PermissionName = PermissionName
+						filterReportList[i].PermissionName = PermissionName
 					}
 				}
 			}
 			var stop bool
-			stop, err = report.SyncHTReportList(htReportList)
+			stop, err = report.SyncHTReportList(filterReportList)
 			if err != nil {
 				logger.Error("同步ETA研报列表失败:%v", err)
 			}