Browse Source

一期研报

kobe6258 8 months ago
parent
commit
f29e1edac1

+ 15 - 0
common/utils/silce/slice_utils.go

@@ -0,0 +1,15 @@
+package silce_utils
+
+// RemoveDuplicates 去除整数切片中的重复元素
+func RemoveDuplicates(nums []int) []int {
+	seen := make(map[int]bool)
+	var result []int
+	for _, num := range nums {
+		if _, ok := seen[num]; !ok {
+			seen[num] = true
+			result = append(result, num)
+		}
+	}
+
+	return result
+}

+ 75 - 32
domian/report/report_service.go

@@ -322,11 +322,18 @@ func SyncETAReportList(list []eta.ETAReport) (err error) {
 			reports = append(reports, destRp)
 		}
 	}
-	err = reportDao.BatchInsertReport(&reports)
+	var ids []int
+	ids, err = reportDao.InsertOrUpdateReport(&reports, SourceETA)
 	if err != nil {
 		logger.Error("同步ETA研报失败:%v", err)
 		return
 	}
+	for _, id := range ids {
+		success := elastic().Delete(htConfig.GetReportIndex(), id)
+		if !success {
+			logger.Error("删除es失败,reportId::%d,err:%v", id, err)
+		}
+	}
 	return syncESAndSendMessage(reports)
 }
 func syncESAndSendMessage(reports []reportDao.Report) (err error) {
@@ -454,33 +461,69 @@ func InitHTReportList(list []ht.HTReport) (noRecord bool, err error) {
 		return false, syncES(reports)
 	}
 }
-func SyncHTReportList(list []ht.HTReport) (err error) {
-	//logger.Info("同步研报数量%d", len(list))
-	//var reports []reportDao.Report
-	//for _, htRp := range list {
-	//	var authorStr string
-	//	authorStr, err = reportDao.GetGLAuthorNames(htRp.Plate, htRp.Permission)
-	//	if err != nil {
-	//		logger.Error("获取钢联研报作者失败:%v", err)
-	//	}
-	//	if authorStr != "" {
-	//		htRp.Author = authorStr
-	//	}
-	//	authorNames := strings.Split(htRp.Author, ",")
-	//	authorNamesWithOutEmpty := stringUtils.RemoveEmptyStrings(authorNames)
-	//	for _, authorName := range authorNamesWithOutEmpty {
-	//		destRp := convertHTReport(htRp)
-	//		destRp.Author = authorName
-	//		reports = append(reports, destRp)
-	//	}
-	//}
-	//err = reportDao.BatchInsertReport(&reports)
-	//if err != nil {
-	//	logger.Error("同步HT研报失败:%v", err)
-	//	return
-	//}
-	//return syncESAndSendMessage(reports)
-	return
+func SyncHTReportList(list []ht.HTReport) (noRecord bool, err error) {
+	var reports []reportDao.Report
+	permissions, err := reportDao.GetGLAuthorNames()
+	if err != nil {
+		logger.Error("获取钢联研报作者失败:%v", err)
+		return
+	}
+	for _, htRp := range list {
+		for _, permission := range permissions {
+			if htRp.PermissionName == permission.Permission {
+				if permission.AuthorNames != "" {
+					htRp.PublishUserName = permission.AuthorNames
+				}
+				authorNames := strings.Split(htRp.PublishUserName, ",")
+				authorNamesWithOutEmpty := stringUtils.RemoveEmptyStrings(authorNames)
+				for _, authorName := range authorNamesWithOutEmpty {
+					destRp := convertHTReport(htRp)
+					destRp.Author = authorName
+					var coverSrc int
+					permissionId, err := etaDao.GetPermissionIdByName(htRp.PermissionName)
+					if err != nil {
+						logger.Error("HT获取eta品种id失败:%v", err)
+						coverSrc = 0
+					}
+					ids, err := mediaDao.GetIdsByPermissionId(permissionId)
+					if err != nil {
+						logger.Error("获取图片资源失败:%v", err)
+						coverSrc = 0
+					}
+					if ids == nil || len(ids) == 0 {
+						coverSrc = 0
+					} else {
+						src := rand.NewSource(time.Now().UnixNano())
+						r := rand.New(src)
+						// 从切片中随机选择一个元素
+						randomIndex := r.Intn(len(ids))
+						coverSrc = ids[randomIndex]
+					}
+					destRp.CoverSrc = coverSrc
+					destRp.PlateName = htRp.PermissionName
+					reports = append(reports, destRp)
+				}
+			}
+		}
+	}
+	if len(reports) == 0 {
+		return true, nil
+	} else {
+		logger.Info("同步研报数量%d", len(list))
+	}
+	var ids []int
+	ids, err = reportDao.InsertOrUpdateReport(&reports, SourceHT)
+	if err != nil {
+		logger.Error("同步HT研报失败:%v", err)
+		return
+	}
+	for _, id := range ids {
+		success := elastic().Delete(htConfig.GetReportIndex(), id)
+		if !success {
+			logger.Error("删除es失败,reportId::%d,err:%v", id, err)
+		}
+	}
+	return false, syncESAndSendMessage(reports)
 }
 func GetListOrderByConditionWeekly(week bool, column string, limit int, order models.Order) (dtoList []ReportDTO, err error) {
 	reports, err := reportDao.GetListOrderByCondition(week, column, limit, order)
@@ -488,8 +531,8 @@ func GetListOrderByConditionWeekly(week bool, column string, limit int, order mo
 		logger.Error("获取研报失败:%v", err)
 		return
 	}
-	for _, report := range reports {
-		dto := convertReportDTO(report)
+	for _, reportItem := range reports {
+		dto := convertReportDTO(reportItem)
 		dtoList = append(dtoList, dto)
 	}
 	return
@@ -505,8 +548,8 @@ func GetListByCondition[T any](column string, ids []T) (dtoList []ReportDTO, err
 		logger.Error("获取研报失败:%v", err)
 		return
 	}
-	for _, report := range reports {
-		dto := convertReportDTO(report)
+	for _, reportItem := range reports {
+		dto := convertReportDTO(reportItem)
 		dtoList = append(dtoList, dto)
 	}
 	return

+ 27 - 21
main.go

@@ -66,30 +66,36 @@ func initReport() {
 			if err != nil {
 				logger.Error("获取ETA研报列表失败:%v", err)
 			}
-			for i := 0; i < len(htReportList); i++ {
-				timestamp := int64(htReportList[i].PublishTime)
-				t := time.UnixMilli(timestamp)
-				htReportList[i].PublishedTime = t.Format(time.DateTime)
-				plateId := htReportList[i].PlateId
-				plate, err := ht.GetPermissionNameById(plateId)
-				if err != nil || plate.ParentId == 0 {
-					htReportList[i].PermissionName = htReportList[i].PlateName
-				} else {
-					PermissionName, err := getPermissionNameById(plate.ParentId)
-					if err != nil {
-						logger.Error("获取ETA研报列表失败:%v", err)
-						htReportList[i].PermissionName = ""
+			if len(htReportList) > 0 {
+				for i := 0; i < len(htReportList); i++ {
+					timestamp := int64(htReportList[i].PublishTime)
+					t := time.UnixMilli(timestamp)
+					htReportList[i].PublishedTime = t.Format(time.DateTime)
+					plateId := htReportList[i].PlateId
+					plate, err := ht.GetPermissionNameById(plateId)
+					if err != nil || plate.ParentId == 0 {
+						htReportList[i].PermissionName = htReportList[i].PlateName
 					} else {
-						htReportList[i].PermissionName = PermissionName
+						PermissionName, err := getPermissionNameById(plate.ParentId)
+						if err != nil {
+							logger.Error("获取ETA研报列表失败:%v", err)
+							htReportList[i].PermissionName = ""
+						} else {
+							htReportList[i].PermissionName = PermissionName
+						}
 					}
 				}
-			}
-			stop, err := report.InitHTReportList(htReportList)
-			if err != nil {
-				logger.Error("同步ETA研报列表失败:%v", err)
-				break
-			}
-			if stop {
+				var stop bool
+				stop, err = report.InitHTReportList(htReportList)
+				if err != nil {
+					logger.Error("同步ETA研报列表失败:%v", err)
+					break
+				}
+				if stop {
+					logger.Info(contants.TaskFormat, "同步HT研报库结束")
+					break
+				}
+			} else {
 				logger.Info(contants.TaskFormat, "同步HT研报库结束")
 				break
 			}

+ 3 - 2
models/eta/eta_report.go

@@ -66,9 +66,10 @@ func GetETAReports(id int) (reports []ETAReport, err error) {
 	return
 }
 
-func GetUpdateETAReports(id int) (reports []ETAReport, err error) {
+func GetUpdateETAReports() (reports []ETAReport, err error) {
 	duration := time.Now().Add(-10 * time.Minute)
-	err = models.ETA().Table("report").Select(colunms+strings.Join(classifyIds, ",")).Where("state =? or state=? and modify_time >=?", published, passed, duration).Where("id <= ?", id).Order("id asc").Find(&reports).Error
+	modifyTime := duration.Format(time.DateTime)
+	err = models.ETA().Table("report").Select(colunms+strings.Join(classifyIds, ",")).Where("(state =? or state=?) and modify_time >=?", published, passed, modifyTime).Order("id asc").Find(&reports).Error
 	if reports != nil {
 		for _, report := range reports {
 			setClassifyIdValue(&report)

+ 12 - 2
models/ht/ht_report.go

@@ -1,6 +1,9 @@
 package ht
 
-import "eta/eta_mini_ht_api/models"
+import (
+	"eta/eta_mini_ht_api/models"
+	"time"
+)
 
 type HTReport struct {
 	Id              int    `gorm:"primary_key;auto_increment"`
@@ -19,7 +22,14 @@ func GetHTReports(id int) (reports []HTReport, err error) {
 	err = db.Model(&HTReport{}).Raw(sql, id).Scan(&reports).Error
 	return
 }
-
+func GetUpdateHTReports() (reports []HTReport, err error) {
+	duration := time.Now().Add(-10 * time.Minute)
+	updateTime := duration.UnixMilli()
+	db := models.HT()
+	sql := "select tirtp.plate_id as plate_id, tip.plate_name as plate_name, t.id as id ,t.report_name as report_name,t.publish_user_name as publish_user_name,t.publish_time as publish_time from t_iirp_report_to_plate tirtp left join t_iirp_report_info t on t.id=tirtp.report_id left JOIN t_iirp_plate tip on tip.id=tirtp.plate_id  where t.is_delete =0 and t.`status` =3 and t.report_file_path<>'' and t.update_time > ? order by t.id asc"
+	err = db.Model(&HTReport{}).Raw(sql, updateTime).Scan(&reports).Error
+	return
+}
 func GetPDFUrl(id int) (url string, err error) {
 	db := models.HT()
 	sql := "select report_file_path from t_iirp_report_info where id=?"

+ 31 - 1
models/report/report.go

@@ -4,6 +4,7 @@ import (
 	"errors"
 	logger "eta/eta_mini_ht_api/common/component/log"
 	"eta/eta_mini_ht_api/common/utils/date"
+	silce_utils "eta/eta_mini_ht_api/common/utils/silce"
 	"eta/eta_mini_ht_api/models"
 	"fmt"
 	"gorm.io/gorm"
@@ -53,7 +54,7 @@ func BatchInsertReport(list *[]Report) (err error) {
 	db := models.Main()
 	//手动事务
 	tx := db.Begin()
-	err = db.CreateInBatches(list, MaxBatchNum).Error
+	err = tx.CreateInBatches(list, MaxBatchNum).Error
 	if err != nil {
 		logger.Error("批量插入研报失败:%v", err)
 		tx.Rollback()
@@ -63,6 +64,35 @@ func BatchInsertReport(list *[]Report) (err error) {
 	return nil
 }
 
+func InsertOrUpdateReport(list *[]Report, source string) (ids []int, err error) {
+	var orgIds []int
+	for _, report := range *list {
+		orgIds = append(orgIds, report.OrgID)
+	}
+	orgIds = silce_utils.RemoveDuplicates(orgIds)
+	db := models.Main()
+	//手动事务
+	err = db.Select("distinct id").Where("org_id in ? and source =? ", orgIds, source).Scan(&ids).Error
+	if err != nil {
+		logger.Error("查询研报失败:%v", err)
+		return
+	}
+	tx := db.Begin()
+	err = tx.Where("org_id in ? and source =? ", orgIds, source).Delete(&Report{}).Error
+	if err != nil {
+		logger.Error("批量删除研报失败:%v", err)
+		tx.Rollback()
+		return
+	}
+	err = tx.CreateInBatches(list, MaxBatchNum).Error
+	if err != nil {
+		logger.Error("批量插入研报失败:%v", err)
+		tx.Rollback()
+		return
+	}
+	tx.Commit()
+	return
+}
 func (t *Report) BeforeCreate(_ *gorm.DB) (err error) {
 	t.CreatedTime = time.Now()
 	return

+ 77 - 76
task/report/report_task.go

@@ -1,78 +1,79 @@
 package report
 
-import (
-	"encoding/json"
-	logger "eta/eta_mini_ht_api/common/component/log"
-	"eta/eta_mini_ht_api/common/contants"
-	"eta/eta_mini_ht_api/domian/report"
-	"eta/eta_mini_ht_api/models/eta"
-	"eta/eta_mini_ht_api/models/ht"
-	"eta/eta_mini_ht_api/task/base"
-	"sync"
-)
-
-var (
-	taskName base.TaskType = "ETAReportSyncTask"
-	cron                   = "0/10 * * * * *"
-)
-
-// Execute Task ETA取研报的数据
-func (re *ReportTask) Execute(taskDetail *base.TaskDetail) error {
-	logger.Info(contants.TaskFormat, "同步ETA研报库开始")
-	var wg sync.WaitGroup
-	wg.Add(2)
-	//ETA报告
-	go func() {
-		defer wg.Done()
-		id, err := report.GetETALatestReportId()
-		var etaReportList []eta.ETAReport
-		etaReportList, err = eta.GetETAReports(id)
-		if err != nil {
-			logger.Error("获取ETA研报列表失败:%v", err)
-		}
-		if len(etaReportList) > 0 {
-			var list []byte
-			list, err = json.Marshal(etaReportList)
-			if err == nil {
-				taskDetail.Content = string(list)
-			}
-			err = report.SyncETAReportList(etaReportList)
-			if err != nil {
-				logger.Error("同步ETA研报列表失败:%v", err)
-			}
-		}
-		logger.Info(contants.TaskFormat, "同步ETA研报库结束")
-	}()
-	//HT报告
-	go func() {
-		defer wg.Done()
-		id, err := report.GetHTLatestReportId()
-		var htReportList []ht.HTReport
-		htReportList, err = ht.GetHTReports(id)
-		if err != nil {
-			logger.Error("获取ETA研报列表失败:%v", err)
-		}
-		if len(htReportList) > 0 {
-			var list []byte
-			list, err = json.Marshal(htReportList)
-			if err == nil {
-				taskDetail.Content = string(list)
-			}
-			err = report.SyncHTReportList(htReportList)
-			if err != nil {
-				logger.Error("同步ETA研报列表失败:%v", err)
-			}
-		}
-		logger.Info(contants.TaskFormat, "同步ETA研报库结束")
-	}()
-	wg.Wait()
-	return nil
-}
-
-type ReportTask struct {
-}
-
-func init() {
-	reportTask := base.NewTask(taskName, cron, new(ReportTask), base.FORBIDDEN)
-	base.RegisterTask(&reportTask)
-}
+//
+//import (
+//	"encoding/json"
+//	logger "eta/eta_mini_ht_api/common/component/log"
+//	"eta/eta_mini_ht_api/common/contants"
+//	"eta/eta_mini_ht_api/domian/report"
+//	"eta/eta_mini_ht_api/models/eta"
+//	"eta/eta_mini_ht_api/models/ht"
+//	"eta/eta_mini_ht_api/task/base"
+//	"sync"
+//)
+//
+//var (
+//	taskName base.TaskType = "ETAReportSyncTask"
+//	cron                   = "0/10 * * * * *"
+//)
+//
+//// Execute Task ETA取研报的数据
+//func (re *ReportTask) Execute(taskDetail *base.TaskDetail) error {
+//	logger.Info(contants.TaskFormat, "同步ETA研报库开始")
+//	var wg sync.WaitGroup
+//	wg.Add(2)
+//	//ETA报告
+//	go func() {
+//		defer wg.Done()
+//		id, err := report.GetETALatestReportId()
+//		var etaReportList []eta.ETAReport
+//		etaReportList, err = eta.GetETAReports(id)
+//		if err != nil {
+//			logger.Error("获取ETA研报列表失败:%v", err)
+//		}
+//		if len(etaReportList) > 0 {
+//			var list []byte
+//			list, err = json.Marshal(etaReportList)
+//			if err == nil {
+//				taskDetail.Content = string(list)
+//			}
+//			err = report.SyncETAReportList(etaReportList)
+//			if err != nil {
+//				logger.Error("同步ETA研报列表失败:%v", err)
+//			}
+//		}
+//		logger.Info(contants.TaskFormat, "同步ETA研报库结束")
+//	}()
+//	//HT报告
+//	go func() {
+//		defer wg.Done()
+//		id, err := report.GetHTLatestReportId()
+//		var htReportList []ht.HTReport
+//		htReportList, err = ht.GetHTReports(id)
+//		if err != nil {
+//			logger.Error("获取ETA研报列表失败:%v", err)
+//		}
+//		if len(htReportList) > 0 {
+//			var list []byte
+//			list, err = json.Marshal(htReportList)
+//			if err == nil {
+//				taskDetail.Content = string(list)
+//			}
+//			bool,err = report.SyncHTReportList(htReportList)
+//			if err != nil {
+//				logger.Error("同步ETA研报列表失败:%v", err)
+//			}
+//		}
+//		logger.Info(contants.TaskFormat, "同步ETA研报库结束")
+//	}()
+//	wg.Wait()
+//	return nil
+//}
+//
+//type ReportTask struct {
+//}
+//
+//func init() {
+//	reportTask := base.NewTask(taskName, cron, new(ReportTask), base.FORBIDDEN)
+//	base.RegisterTask(&reportTask)
+//}

+ 55 - 4
task/report/report_update_task.go

@@ -6,8 +6,10 @@ import (
 	"eta/eta_mini_ht_api/common/contants"
 	"eta/eta_mini_ht_api/domian/report"
 	"eta/eta_mini_ht_api/models/eta"
+	"eta/eta_mini_ht_api/models/ht"
 	"eta/eta_mini_ht_api/task/base"
 	"sync"
+	"time"
 )
 
 var (
@@ -19,12 +21,11 @@ var (
 func (re *ReportUpdateTask) Execute(taskDetail *base.TaskDetail) error {
 	logger.Info(contants.TaskFormat, "更新研报库开始")
 	var wg sync.WaitGroup
-	wg.Add(1)
+	wg.Add(2)
 	go func() {
 		defer wg.Done()
-		id, err := report.GetETALatestReportId()
 		var etaReportList []eta.ETAReport
-		etaReportList, err = eta.GetUpdateETAReports(id)
+		etaReportList, err := eta.GetUpdateETAReports()
 		if err != nil {
 			logger.Error("获取ETA研报列表失败:%v", err)
 		}
@@ -41,14 +42,64 @@ func (re *ReportUpdateTask) Execute(taskDetail *base.TaskDetail) error {
 		}
 		logger.Info(contants.TaskFormat, "同步ETA研报库结束")
 	}()
+	go func() {
+		defer wg.Done()
+		var htReportList []ht.HTReport
+		etaReportList, err := ht.GetUpdateHTReports()
+		if err != nil {
+			logger.Error("获取ETA研报列表失败:%v", err)
+		}
+		if len(etaReportList) > 0 {
+			for i := 0; i < len(htReportList); i++ {
+				timestamp := int64(htReportList[i].PublishTime)
+				t := time.UnixMilli(timestamp)
+				htReportList[i].PublishedTime = t.Format(time.DateTime)
+				plateId := htReportList[i].PlateId
+				plate, err := ht.GetPermissionNameById(plateId)
+				if err != nil || plate.ParentId == 0 {
+					htReportList[i].PermissionName = htReportList[i].PlateName
+				} else {
+					PermissionName, err := getPermissionNameById(plate.ParentId)
+					if err != nil {
+						logger.Error("获取ETA研报列表失败:%v", err)
+						htReportList[i].PermissionName = ""
+					} else {
+						htReportList[i].PermissionName = PermissionName
+					}
+				}
+			}
+			var stop bool
+			stop, err = report.SyncHTReportList(htReportList)
+			if err != nil {
+				logger.Error("同步ETA研报列表失败:%v", err)
+			}
+			if stop {
+				logger.Info(contants.TaskFormat, "同步HT研报库结束")
+			}
+		} else {
+			logger.Info(contants.TaskFormat, "同步ETA研报库结束")
+		}
+	}()
 	wg.Wait()
 	return nil
 }
 
+func getPermissionNameById(id int) (name string, err error) {
+	plate, err := ht.GetPermissionNameById(id)
+	if err != nil {
+		return
+	}
+	if plate.ParentId != 0 {
+		return getPermissionNameById(plate.ParentId)
+	} else {
+		return plate.PlateName, nil
+	}
+}
+
 type ReportUpdateTask struct {
 }
 
 func init() {
-	reportTask := base.NewTask(updateTaskName, updateCron, new(ReportUpdateTask), base.FORBIDDEN)
+	reportTask := base.NewTask(updateTaskName, updateCron, new(ReportUpdateTask), base.DEV)
 	base.RegisterTask(&reportTask)
 }