Browse Source

增加一期研报同步

kobe6258 7 months ago
parent
commit
bf443d13b4

+ 125 - 2
domian/report/report_service.go

@@ -16,6 +16,7 @@ import (
 	"eta/eta_mini_ht_api/models/ht"
 	reportDao "eta/eta_mini_ht_api/models/report"
 	"github.com/google/uuid"
+	"math/rand"
 	"strconv"
 	"strings"
 	"time"
@@ -49,7 +50,7 @@ type ESReport struct {
 	Author        string                 `json:"author"`
 	Source        reportDao.ReportSource `json:"source"`
 	Abstract      string                 `json:"abstract"`
-	CoverSrc      string                 `json:"coverSrc"`
+	CoverSrc      int                    `json:"coverSrc"`
 	Status        reportDao.ReportStatus `json:"status"`
 	PublishedTime string                 `json:"publishedTime"`
 }
@@ -67,7 +68,7 @@ type ReportDTO struct {
 	PermissionNames  interface{}     `json:"permissionNames,omitempty"`
 	Highlight        []string        `json:"highlight,omitempty"`
 	Detail           json.RawMessage `json:"detail,omitempty"`
-	CoverSrc         string          `json:"coverSrc"`
+	CoverSrc         int             `json:"coverSrc"`
 }
 
 type Detail struct {
@@ -261,7 +262,49 @@ func GetETALatestReportId() (id int, err error) {
 func GetHTLatestReportId() (id int, err error) {
 	return reportDao.GetLatestReportIdBySource(reportDao.SourceHT)
 }
+func InitETAReportList(list []eta.ETAReport) (err error) {
+	logger.Info("同步研报数量%d", len(list))
+	var reports []reportDao.Report
+	for _, etaRp := range list {
+		authorNames := strings.Split(etaRp.Author, ",")
+		authorNamesWithOutEmpty := stringUtils.RemoveEmptyStrings(authorNames)
+		for _, authorName := range authorNamesWithOutEmpty {
+			var permissions []etaDao.ChartPermission
+			permissions, err = etaDao.GetSecondPermissionsByClassifyID(etaRp.ClassifyID)
+			permissionsId := permissions[0].ChartPermissionID
+			if err != nil {
+				coverSrc = 0
+			} else {
+				var ids []int
+				ids, err = models.GetImageIdByPermissionId(permissionsId)
+				if err != nil {
+					br.Msg = "上传视频失败"
+					br.ErrMsg = "获取封面图片失败"
+					return
+				}
+				if ids == nil || len(ids) == 0 {
+					coverSrc = 0
+				} else {
+					rand.Seed(time.Now().UnixNano())
+					// 从切片中随机选择一个元素
+					randomIndex := rand.Intn(len(ids))
+					coverSrc = ids[randomIndex]
+				}
+			}
+			destRp := convertEtaReport(etaRp)
+			destRp.Author = authorName
+			reports = append(reports, destRp)
+		}
+	}
+	err = reportDao.BatchInsertReport(&reports)
+	if err != nil {
+		logger.Error("同步ETA研报失败:%v", err)
+		return
+	}
 
+	return syncES(reports)
+
+}
 func SyncETAReportList(list []eta.ETAReport) (err error) {
 	logger.Info("同步研报数量%d", len(list))
 	var reports []reportDao.Report
@@ -379,6 +422,85 @@ func syncESAndSendMessage(reports []reportDao.Report) (err error) {
 	}
 	return
 }
+
+func syncES(reports []reportDao.Report) (err error) {
+	var esReports []es.ESBase
+	for _, etaRp := range reports {
+		esRp := convertEsReport(etaRp)
+		esReports = append(esReports, esRp)
+	}
+	//同步es
+	err = elastic().BulkInsert(htConfig.GetReportIndex(), esReports)
+	if err != nil {
+		logger.Error("同步ETA研报到es失败:%v", err)
+		return
+	}
+	////生产meta信息
+	//logger.Info("生成推送META信息")
+	//for _, report := range reports {
+	//	userIds := userService.GetPostUser(report.Author, report.PublishedTime)
+	//	var author analystService.FinancialAnalystDTO
+	//	author, err = analystService.GetAnalystByName(report.Author)
+	//	if err != nil {
+	//		logger.Error("获取研报作者失败:%v", err)
+	//		continue
+	//	}
+	//	if len(userIds) > 0 {
+	//		usersStr := stringUtils.IntToStringSlice(userIds)
+	//		Meta := userService.MetaData{
+	//			AuthorName:    report.Author,
+	//			AuthorId:      author.Id,
+	//			SourceId:      report.ID,
+	//			PublishedTime: report.PublishedTime,
+	//		}
+	//		metaStr, _ := json.Marshal(Meta)
+	//		toStr := strings.Join(usersStr, ",")
+	//		UUID := uuid.New()
+	//		uuidStr := UUID.String()
+	//		metaContent := userService.MetaInfoDTO{
+	//			From:       "HT",
+	//			Uid:        "report:" + uuidStr,
+	//			Meta:       string(metaStr),
+	//			MetaType:   "USER_NOTICE",
+	//			SourceType: "REPORT",
+	//			To:         toStr,
+	//		}
+	//		err = userService.CreateMetaInfo(metaContent)
+	//		if err != nil {
+	//			logger.Error("创建Meta信息失败:%v", err)
+	//			return err
+	//		}
+	//	}
+	//}
+	return
+}
+func InitHTReportList(list []ht.HTReport) (err error) {
+	logger.Info("同步研报数量%d", len(list))
+	var reports []reportDao.Report
+	for _, htRp := range list {
+		var authorStr string
+		authorStr, err = reportDao.GetGLAuthorNames(htRp.Plate, htRp.Permission)
+		if err != nil {
+			logger.Error("获取钢联研报作者失败:%v", err)
+		}
+		if authorStr != "" {
+			htRp.Author = authorStr
+		}
+		authorNames := strings.Split(htRp.Author, ",")
+		authorNamesWithOutEmpty := stringUtils.RemoveEmptyStrings(authorNames)
+		for _, authorName := range authorNamesWithOutEmpty {
+			destRp := convertHTReport(htRp)
+			destRp.Author = authorName
+			reports = append(reports, destRp)
+		}
+	}
+	err = reportDao.BatchInsertReport(&reports)
+	if err != nil {
+		logger.Error("同步HT研报失败:%v", err)
+		return
+	}
+	return syncES(reports)
+}
 func SyncHTReportList(list []ht.HTReport) (err error) {
 	logger.Info("同步研报数量%d", len(list))
 	var reports []reportDao.Report
@@ -459,6 +581,7 @@ func convertEtaReport(etaRp eta.ETAReport) reportDao.Report {
 		Title:         etaRp.Title,
 		Abstract:      etaRp.Abstract,
 		Author:        etaRp.Author,
+		CoverSrc:      0,
 		PublishedTime: etaRp.PublishTime,
 		Source:        reportDao.SourceETA,
 		SendStatus:    reportDao.UNSEND,

+ 51 - 1
main.go

@@ -3,10 +3,15 @@ package main
 import (
 	_ "eta/eta_mini_ht_api/common/component"
 	logger "eta/eta_mini_ht_api/common/component/log"
+	"eta/eta_mini_ht_api/common/contants"
 	"eta/eta_mini_ht_api/common/exception"
+	"eta/eta_mini_ht_api/domian/report"
+	"eta/eta_mini_ht_api/models/eta"
+	"eta/eta_mini_ht_api/models/ht"
 	_ "eta/eta_mini_ht_api/routers"
 	_ "eta/eta_mini_ht_api/task"
 	"github.com/beego/beego/v2/server/web"
+	"sync"
 )
 
 func main() {
@@ -20,8 +25,53 @@ func main() {
 	go func() {
 		//内存数据预热预加载
 		logger.Info("开始预加载数据")
-
+		//初始化研报库
+		initReport()
 	}()
 	logger.Info("初始化成功")
 	web.Run()
 }
+
+func initReport() {
+	var wg sync.WaitGroup
+	wg.Add(1)
+	logger.Info("开始初始化研报库")
+	go func() {
+		defer wg.Done()
+		for {
+			id, err := report.GetETALatestReportId()
+			var etaReportList []eta.ETAReport
+			etaReportList, err = eta.GetETAReports(id)
+			if err != nil {
+				logger.Error("获取ETA研报列表失败:%v", err)
+			}
+			if len(etaReportList) > 0 {
+				err = report.InitETAReportList(etaReportList)
+				if err != nil {
+					logger.Error("同步ETA研报列表失败:%v", err)
+				}
+			} else {
+				logger.Info(contants.TaskFormat, "同步ETA研报库结束")
+				break
+			}
+		}
+	}()
+	go func() {
+		defer wg.Done()
+		id, err := report.GetHTLatestReportId()
+		var htReportList []ht.HTReport
+		htReportList, err = ht.GetHTReports(id)
+		if err != nil {
+			logger.Error("获取ETA研报列表失败:%v", err)
+		}
+		if len(htReportList) > 0 {
+			err = report.InitHTReportList(htReportList)
+			if err != nil {
+				logger.Error("同步ETA研报列表失败:%v", err)
+			}
+		}
+		logger.Info(contants.TaskFormat, "同步ETA研报库结束")
+	}()
+	wg.Wait()
+	logger.Info("初始化研报库完成")
+}

+ 5 - 0
models/eta/eta_permission.go

@@ -30,6 +30,11 @@ func GetChartPermissionList() (chartPermissionList []ChartPermission, err error)
 	}
 	return
 }
+func GetPermissionNamesByClassifyID(ids []int) (chartPermissionNames []string, err error) {
+	sql := "select chart_permission_id, permission_name from chart_permission WHERE chart_permission_id in ?"
+	err = doSql(sql, &chartPermissionNames, ids)
+	return
+}
 func GetPermissionNamesByPermissionIds(ids []int) (chartPermissionNames []string, err error) {
 	sql := "select chart_permission_id, permission_name from chart_permission WHERE chart_permission_id in ?"
 	err = doSql(sql, &chartPermissionNames, ids)

+ 1 - 1
models/eta/eta_report.go

@@ -67,7 +67,7 @@ func GetETAReports(id int) (reports []ETAReport, err error) {
 }
 
 func GetUpdateETAReports(id int) (reports []ETAReport, err error) {
-	duration := time.Now().Add(-1 * time.Minute)
+	duration := time.Now().Add(-10 * time.Minute)
 	err = models.ETA().Table("report").Select(colunms+strings.Join(classifyIds, ",")).Where("state =? or state=? and modify_time >=?", published, passed, duration).Where("id <= ?", id).Order("id asc").Find(&reports).Error
 	if reports != nil {
 		for _, report := range reports {

+ 19 - 0
models/media/images_sources.go

@@ -0,0 +1,19 @@
+package media
+
+import (
+	"time"
+)
+
+type ImageSource struct {
+	Id           int       `gorm:"column:id;primary_key;autoIncrement:'id'"`
+	ImgName      string    `gorm:"column:img_name"`
+	SrcUrl       string    `gorm:"column:src_url"`
+	PermissionId int       `gorm:"column:permission_id"`
+	Deleted      bool      `gorm:"deleted"`
+	CreatedTime  time.Time `gorm:"column:created_time"`
+	UpdatedTime  time.Time `gorm:"column:updated_time"`
+}
+
+func (i *ImageSource) TableName() string {
+	return "image_sources"
+}

+ 1 - 1
models/report/report.go

@@ -35,7 +35,7 @@ type Report struct {
 	Title         string       `gorm:"column:title;comment:'标题'" json:"title"`
 	Abstract      string       `gorm:"column:abstract;comment:'摘要'" json:"abstract"`
 	Author        string       `gorm:"column:author;comment:'作者'" json:"author"`
-	CoverSrc      string       `gorm:"column:cover_src;comment:'封面图片'" json:"cover_src"`
+	CoverSrc      int          `gorm:"column:cover_src;comment:'封面图片'" json:"cover_src"`
 	Status        ReportStatus `gorm:"column:status;comment:'报告状态 init:初始化 pending:同步中 done:完成同步'" json:"status"`
 	SendStatus    SendStatus   `gorm:"column:send_status;comment:'发送状态'" json:"send_status"`
 	PublishedTime string       `gorm:"column:published_time;comment:'发布时间'" json:"published_time"`

+ 0 - 75
task/eta/media/eta_media_task.go

@@ -1,75 +0,0 @@
-package report
-
-import (
-	"eta/eta_mini_ht_api/common/component/es"
-	"eta/eta_mini_ht_api/models/media"
-	"eta/eta_mini_ht_api/task/base"
-	"strconv"
-	"strings"
-	"time"
-)
-
-var (
-	taskName base.TaskType = "ETAMediaDataTask"
-	cron                   = "0/10 * * * * *"
-)
-
-// Execute Task ETA取研报的数据
-func (re *MediaTask) Execute(taskDetail *base.TaskDetail) error {
-	lists := media.GetlIST()
-	for _, list := range lists {
-		ids := strings.Split(list.PermissionIDs, ",")
-		for _, item := range ids {
-			x, _ := strconv.Atoi(item)
-			aa := media.MediaPermissionMapping{
-				MediaID:      list.Id,
-				MediaType:    list.MediaType,
-				PermissionID: x,
-			}
-			media.InsertMediaPermissionMapping(aa)
-		}
-	}
-	es1 := convert(lists)
-	for _, item := range es1 {
-		es.GetInstance().AddOrUpdate("media_index", item.MediaId, item)
-	}
-	return nil
-}
-func convert(list []media.Media) []ESMedia {
-	var esMedia []ESMedia
-	for _, item := range list {
-		esMedia = append(esMedia, ESMedia{
-			MediaId:               item.Id,
-			AuthorId:              item.AuthorID,
-			AuthorName:            item.AuthorName,
-			MediaType:             string(item.MediaType),
-			Src:                   item.Src,
-			MediaName:             item.MediaName,
-			SourceType:            item.SourceType,
-			MediaPlayMilliseconds: item.MediaPlayMilliseconds,
-			PermissionIDs:         item.PermissionIDs,
-			PublishedTime:         item.PublishedTime.Format(time.DateTime),
-		})
-	}
-	return esMedia
-}
-
-type ESMedia struct {
-	MediaId               int    `json:"mediaId"`
-	AuthorId              int    `json:"authorId"`
-	AuthorName            string `json:"authorName"`
-	MediaType             string `json:"mediaType"`
-	Src                   string `json:"src"`
-	MediaName             string `json:"mediaName"`
-	SourceType            string `json:"sourceType"`
-	MediaPlayMilliseconds int    `json:"mediaPlayMilliseconds"`
-	PermissionIDs         string `json:"permissionIds"`
-	PublishedTime         string `json:"publishedTime"`
-}
-type MediaTask struct {
-}
-
-func init() {
-	reportTask := base.NewTask(taskName, cron, new(MediaTask), base.FORBIDDEN)
-	base.RegisterTask(&reportTask)
-}

+ 1 - 1
task/report/report_task.go

@@ -73,6 +73,6 @@ type ReportTask struct {
 }
 
 func init() {
-	reportTask := base.NewTask(taskName, cron, new(ReportTask), base.DEV)
+	reportTask := base.NewTask(taskName, cron, new(ReportTask), base.FORBIDDEN)
 	base.RegisterTask(&reportTask)
 }

+ 29 - 25
task/report/report_update_task.go

@@ -1,9 +1,13 @@
 package report
 
 import (
+	"encoding/json"
 	logger "eta/eta_mini_ht_api/common/component/log"
 	"eta/eta_mini_ht_api/common/contants"
+	"eta/eta_mini_ht_api/domian/report"
+	"eta/eta_mini_ht_api/models/eta"
 	"eta/eta_mini_ht_api/task/base"
+	"sync"
 )
 
 var (
@@ -14,30 +18,30 @@ var (
 // Execute Task ETA取研报的数据
 func (re *ReportUpdateTask) Execute(taskDetail *base.TaskDetail) error {
 	logger.Info(contants.TaskFormat, "更新研报库开始")
-	//var wg sync.WaitGroup
-	//wg.Add(1)
-	//go func() {
-	//	defer wg.Done()
-	//	id, err := report.GetETALatestReportId()
-	//	var etaReportList []eta.ETAReport
-	//	etaReportList, err = eta.GetUpdateETAReports(id)
-	//	if err != nil {
-	//		logger.Error("获取ETA研报列表失败:%v", err)
-	//	}
-	//	if len(etaReportList) > 0 {
-	//		var list []byte
-	//		list, err = json.Marshal(etaReportList)
-	//		if err == nil {
-	//			taskDetail.Content = string(list)
-	//		}
-	//		err = report.SyncETAReportList(etaReportList)
-	//		if err != nil {
-	//			logger.Error("同步ETA研报列表失败:%v", err)
-	//		}
-	//	}
-	//	logger.Info(contants.TaskFormat, "同步ETA研报库结束")
-	//}()
-	//wg.Wait()
+	var wg sync.WaitGroup
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		id, err := report.GetETALatestReportId()
+		var etaReportList []eta.ETAReport
+		etaReportList, err = eta.GetUpdateETAReports(id)
+		if err != nil {
+			logger.Error("获取ETA研报列表失败:%v", err)
+		}
+		if len(etaReportList) > 0 {
+			var list []byte
+			list, err = json.Marshal(etaReportList)
+			if err == nil {
+				taskDetail.Content = string(list)
+			}
+			err = report.SyncETAReportList(etaReportList)
+			if err != nil {
+				logger.Error("同步ETA研报列表失败:%v", err)
+			}
+		}
+		logger.Info(contants.TaskFormat, "同步ETA研报库结束")
+	}()
+	wg.Wait()
 	return nil
 }
 
@@ -45,6 +49,6 @@ type ReportUpdateTask struct {
 }
 
 func init() {
-	reportTask := base.NewTask(updateTaskName, updateCron, new(ReportUpdateTask), base.DEV)
+	reportTask := base.NewTask(updateTaskName, updateCron, new(ReportUpdateTask), base.FORBIDDEN)
 	base.RegisterTask(&reportTask)
 }