فهرست منبع

增加一期研报同步

kobe6258 8 ماه پیش
والد
کامیت
389f866322

+ 45 - 0
common/component/config/gl_config.go

@@ -0,0 +1,45 @@
+package config
+
+import (
+	"eta/eta_mini_ht_api/common/contants"
+	"sync"
+)
+
+var (
+	glOnce   sync.Once
+	glConfig *GLConfig
+)
+
+type GLOpts struct {
+	DBUrl string
+}
+type GLConfig struct {
+	BaseConfig
+	opts GLOpts
+}
+
+func (r *GLConfig) GetDBUrl() string {
+	return r.opts.DBUrl
+}
+
+func (r *GLConfig) InitConfig() {
+	opts := GLOpts{
+		DBUrl: r.GetString("database.url"),
+	}
+	r.opts = opts
+}
+func NewGLConfig() Config {
+	if glConfig == nil {
+		glOnce.Do(func() {
+			glConfig = &GLConfig{
+				BaseConfig: BaseConfig{prefix: contants.GL},
+				opts:       GLOpts{},
+			}
+		})
+	}
+	return glConfig
+}
+
+func init() {
+	Register(contants.GL, NewGLConfig)
+}

+ 39 - 0
common/component/config/ht_biz_config.go

@@ -0,0 +1,39 @@
+package config
+
+import "eta/eta_mini_ht_api/common/contants"
+
+// ESOpts es连接属性
+type HTOpts struct {
+	ReportIndex string
+	MediaIndex  string
+}
+type HTBizConfig struct {
+	BaseConfig
+	opts HTOpts
+}
+
+func (e *HTBizConfig) GetReportIndex() string {
+	return e.opts.ReportIndex
+}
+
+func (e *HTBizConfig) GetMediaIndex() string {
+	return e.opts.MediaIndex
+}
+
+func (e *HTBizConfig) InitConfig() {
+	opts := HTOpts{
+		ReportIndex: e.GetString("es_report_index"),
+		MediaIndex:  e.GetString("es_media_index"),
+	}
+	e.opts = opts
+}
+func NewHT() Config {
+	return &HTBizConfig{
+		BaseConfig: BaseConfig{prefix: contants.HT},
+		opts:       HTOpts{},
+	}
+}
+
+func init() {
+	Register(contants.HT, NewHT)
+}

+ 24 - 1
common/component/database/db_connector.go

@@ -14,13 +14,18 @@ const (
 	MAIN = "main"
 	EMAS = "emas"
 
-	ETA       = "eta"
+	ETA = "eta"
+
+	GL        = "gl"
 	etaDriver = "mysql"
+
+	glDriver = "mysql"
 )
 
 var (
 	dbOnce    sync.Once
 	etaDBOnce sync.Once
+	glDBOnce  sync.Once
 )
 
 var dbInsts = make(map[string]*gorm.DB)
@@ -65,6 +70,7 @@ func init() {
 
 func initDataBases() {
 	initEtaDatabase()
+	initGLDatabase()
 }
 func initEtaDatabase() {
 	etaDBOnce.Do(func() {
@@ -82,3 +88,20 @@ func initEtaDatabase() {
 		logger.Info("初始化ETA数据库成功")
 	})
 }
+
+func initGLDatabase() {
+	glDBOnce.Do(func() {
+		etaConfig, ok := config.GetConfig(contants.GL).(*config.GLConfig)
+		if !ok {
+			panic("初始化钢联数据库失败,配置文件格式不正确")
+		}
+		dns := etaConfig.GetDBUrl()
+		open, err := gorm.Open(dialector.GetGormDial(glDriver).GetDial(dns), &gorm.Config{})
+		if err != nil {
+			logger.Error("初始化钢联数据库失败:%v", err)
+			os.Exit(0)
+		}
+		Register(GL, open)
+		logger.Info("初始化钢联数据库成功")
+	})
+}

+ 34 - 40
common/component/es/es.go

@@ -110,7 +110,6 @@ func (es *ESClient) BulkInsert(indexName string, docs []ESBase) (err error) {
 		if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
 			logger.Error("解析es应答失败: %v", err)
 		} else {
-			// Print the response status and error information.
 			logger.Error("es请求失败: %s: %v\n", res.Status(), err)
 		}
 	}
@@ -399,7 +398,7 @@ func (es *ESClient) GetCount(hits Hits) []Hit {
 // id es的id
 // body es的值
 // */
-func (es *ESClient) Add(indexName string, id int, doc interface{}) bool {
+func (es *ESClient) AddOrUpdate(indexName string, id int, doc interface{}) bool {
 	jsonDoc, _ := json.Marshal(doc)
 	req := esapi.IndexRequest{
 		Index:      indexName,
@@ -428,44 +427,39 @@ func (es *ESClient) Add(indexName string, id int, doc interface{}) bool {
 	return true
 }
 
-//
-///*
-//*
-//修改es
-//indexName 索引名
-//id es的id
-//body es的值
-//*/
-//func EsUpdate(indexName string, id string, body map[string]interface{}) bool {
-//	bodyData := map[string]interface{}{
-//		"doc": body,
-//	}
-//	req := httplib.Post(esUrl + indexName + "/_doc/" + id + "/_update")
-//	req.JSONBody(bodyData)
-//	_, err := req.String()
-//	if err != nil {
-//		fmt.Println("elasticsearch is error ", err)
-//		return false
-//	}
-//	return true
-//}
-//
-///*
-//*
-//删除
-//indexName 索引名
-//id es的id
-//*/
-//func EsDelete(indexName string, id string) bool {
-//	req := httplib.Delete(esUrl + indexName + "/_doc/" + id)
-//	_, err := req.String()
-//	if err != nil {
-//		fmt.Println("elasticsearch is error ", err)
-//		return false
-//	}
-//	return true
-//
-//}
+// Delete *
+// 删除
+// indexName 索引名
+// id es的id
+// */
+func (es *ESClient) Delete(indexName string, id int) bool {
+	req := esapi.DeleteRequest{
+		Index:      indexName,
+		DocumentID: strconv.Itoa(id),
+		Refresh:    "true",
+	}
+	res, err := req.Do(context.Background(), es.es())
+	defer res.Body.Close()
+	if err != nil {
+		logger.Error("es查询失败: %s", err)
+	}
+	if res.IsError() {
+		var e map[string]interface{}
+		if err = json.NewDecoder(res.Body).Decode(&e); err != nil {
+			logger.Error("解析es应答失败: %v", err)
+		} else {
+			logger.Error("es请求失败: %s: %v\n", res.Status(), e)
+		}
+	}
+	body, err := io.ReadAll(res.Body)
+	if err != nil {
+		logger.Error("获取es应答失败: %v", err)
+	}
+	fmt.Printf("%s\n", string(body))
+	return true
+
+}
+
 //
 //func CreateIndex(indexName string) error {
 //	resp, err := esClient.es().Indices.

+ 3 - 2
common/contants/contants.go

@@ -6,8 +6,9 @@ const (
 	SMS      = "sms"
 	REDIS    = "redis"
 
-	WECHAT = "wechat"
-
+	WECHAT          = "wechat"
+	HT              = "ht"
+	GL              = "gl"
 	ETA             = "eta"
 	ES              = "es"
 	PageSizeDefault = 5 //列表页每页数据量

+ 14 - 10
domian/media/media_service.go

@@ -2,8 +2,10 @@ package media
 
 import (
 	"encoding/json"
+	"eta/eta_mini_ht_api/common/component/config"
 	"eta/eta_mini_ht_api/common/component/es"
 	logger "eta/eta_mini_ht_api/common/component/log"
+	"eta/eta_mini_ht_api/common/contants"
 	"eta/eta_mini_ht_api/common/utils/page"
 	reportService "eta/eta_mini_ht_api/domian/report"
 	"eta/eta_mini_ht_api/models"
@@ -12,14 +14,16 @@ import (
 	"time"
 )
 
-const (
-	DESC models.Order = "desc"
-	ASC  models.Order = "asc"
+var (
+	htConfig = config.GetConfig(contants.HT).(*config.HTBizConfig)
+)
 
-	ESIndex         = "media_index"
-	ESColumn        = "mediaName"
-	ESRangeColumn   = "mediaId"
-	ConditionColumn = "mediaType"
+const (
+	DESC            models.Order = "desc"
+	ASC             models.Order = "asc"
+	ESColumn                     = "mediaName"
+	ESRangeColumn                = "mediaId"
+	ConditionColumn              = "mediaType"
 )
 
 var (
@@ -172,14 +176,14 @@ func GetPermissionsByIds(ids []int) (permissionDTOs []reportService.PermissionDT
 
 func matchAllByCondition(sorts []string, key string, column string, value string) (request *es.ESQueryRequest) {
 	req := new(es.ESQueryRequest)
-	return req.CreateESQueryRequest(ESIndex, ESColumn, key, 0, 1, sorts, es.MatchAllByCondition).ByCondition(column, value)
+	return req.CreateESQueryRequest(htConfig.GetMediaIndex(), ESColumn, key, 0, 1, sorts, es.MatchAllByCondition).ByCondition(column, value)
 }
 
 func matchByCondition(sorts []string, key string, column string, value string, from int, to int) (request *es.ESQueryRequest) {
 	req := new(es.ESQueryRequest)
-	return req.CreateESQueryRequest(ESIndex, ESColumn, key, from, to, sorts, es.MatchAllByCondition).ByCondition(column, value)
+	return req.CreateESQueryRequest(htConfig.GetMediaIndex(), ESColumn, key, from, to, sorts, es.MatchAllByCondition).ByCondition(column, value)
 }
 func matchRangeByCondition(key string, from int, to int, max int64, sorts []string, column string, value string) (request *es.ESQueryRequest) {
 	req := new(es.ESQueryRequest)
-	return req.CreateESQueryRequest(ESIndex, ESColumn, key, from, to, sorts, es.RangeByCondition).Range(0, max, ESRangeColumn).ByCondition(column, value)
+	return req.CreateESQueryRequest(htConfig.GetMediaIndex(), ESColumn, key, from, to, sorts, es.RangeByCondition).Range(0, max, ESRangeColumn).ByCondition(column, value)
 }

+ 103 - 10
domian/report/report_service.go

@@ -2,8 +2,10 @@ package report
 
 import (
 	"encoding/json"
+	"eta/eta_mini_ht_api/common/component/config"
 	"eta/eta_mini_ht_api/common/component/es"
 	logger "eta/eta_mini_ht_api/common/component/log"
+	"eta/eta_mini_ht_api/common/contants"
 	"eta/eta_mini_ht_api/common/utils/page"
 	stringUtils "eta/eta_mini_ht_api/common/utils/string"
 	analystService "eta/eta_mini_ht_api/domian/financial_analyst"
@@ -25,13 +27,13 @@ const (
 	DESC models.Order = "desc"
 	ASC  models.Order = "asc"
 
-	ESIndex       = "report_index"
 	ESColumn      = "title"
 	ESRangeColumn = "reportId"
 )
 
 var (
 	sortField = []string{"_score:desc"}
+	htConfig  = config.GetConfig(contants.HT).(*config.HTBizConfig)
 )
 
 func elastic() *es.ESClient {
@@ -234,13 +236,16 @@ func (es ESReport) GetId() string {
 	return strconv.Itoa(es.ReportID)
 }
 func GetETALatestReportId() (id int, err error) {
-	return reportDao.GetETALatestReportId()
+	return reportDao.GetLatestReportIdBySource(reportDao.SourceETA)
+}
+
+func GetHTLatestReportId() (id int, err error) {
+	return reportDao.GetLatestReportIdBySource(reportDao.SourceHT)
 }
 
 func SyncETAReportList(list []eta.ETAReport) (err error) {
 	logger.Info("同步研报数量%d", len(list))
 	var reports []reportDao.Report
-	var esReports []es.ESBase
 	for _, etaRp := range list {
 		authorNames := strings.Split(etaRp.Author, ",")
 		authorNamesWithOutEmpty := stringUtils.RemoveEmptyStrings(authorNames)
@@ -255,12 +260,63 @@ func SyncETAReportList(list []eta.ETAReport) (err error) {
 		logger.Error("同步ETA研报失败:%v", err)
 		return
 	}
+	//for _, etaRp := range reports {
+	//	esRp := convertEsReport(etaRp)
+	//	esReports = append(esReports, esRp)
+	//}
+	return syncESAndSendMessage(reports)
+	////同步es
+	//err = elastic().BulkInsert(htConfig.GetReportIndex(), esReports)
+	//if err != nil {
+	//	logger.Error("同步ETA研报到es失败:%v", err)
+	//	return
+	//}
+	////生产meta信息
+	//logger.Info("生成推送META信息")
+	//for _, report := range reports {
+	//	userIds := userService.GetPostUser(report.Author, report.PublishedTime)
+	//	var author analystService.FinancialAnalystDTO
+	//	author, err = analystService.GetAnalystByName(report.Author)
+	//	if err != nil {
+	//		logger.Error("获取研报作者失败:%v", err)
+	//		continue
+	//	}
+	//	if len(userIds) > 0 {
+	//		usersStr := stringUtils.IntToStringSlice(userIds)
+	//		Meta := userService.MetaData{
+	//			AuthorName:    report.Author,
+	//			AuthorId:      author.Id,
+	//			SourceId:      report.ID,
+	//			PublishedTime: report.PublishedTime,
+	//		}
+	//		metaStr, _ := json.Marshal(Meta)
+	//		toStr := strings.Join(usersStr, ",")
+	//		UUID := uuid.New()
+	//		uuidStr := UUID.String()
+	//		metaContent := userService.MetaInfoDTO{
+	//			From:       "ETA",
+	//			Uid:        "report:" + uuidStr,
+	//			Meta:       string(metaStr),
+	//			MetaType:   "USER_NOTICE",
+	//			SourceType: "REPORT",
+	//			To:         toStr,
+	//		}
+	//		err = userService.CreateMetaInfo(metaContent)
+	//		if err != nil {
+	//			logger.Error("创建Meta信息失败:%v", err)
+	//			return err
+	//		}
+	//	}
+	//}
+}
+func syncESAndSendMessage(reports []reportDao.Report) (err error) {
+	var esReports []es.ESBase
 	for _, etaRp := range reports {
 		esRp := convertEsReport(etaRp)
 		esReports = append(esReports, esRp)
 	}
 	//同步es
-	err = elastic().BulkInsert(ESIndex, esReports)
+	err = elastic().BulkInsert(htConfig.GetReportIndex(), esReports)
 	if err != nil {
 		logger.Error("同步ETA研报到es失败:%v", err)
 		return
@@ -269,7 +325,12 @@ func SyncETAReportList(list []eta.ETAReport) (err error) {
 	logger.Info("生成推送META信息")
 	for _, report := range reports {
 		userIds := userService.GetPostUser(report.Author, report.PublishedTime)
-		author, _ := analystService.GetAnalystByName(report.Author)
+		var author analystService.FinancialAnalystDTO
+		author, err = analystService.GetAnalystByName(report.Author)
+		if err != nil {
+			logger.Error("获取研报作者失败:%v", err)
+			continue
+		}
 		if len(userIds) > 0 {
 			usersStr := stringUtils.IntToStringSlice(userIds)
 			Meta := userService.MetaData{
@@ -283,7 +344,7 @@ func SyncETAReportList(list []eta.ETAReport) (err error) {
 			UUID := uuid.New()
 			uuidStr := UUID.String()
 			metaContent := userService.MetaInfoDTO{
-				From:       "ETA",
+				From:       "HT",
 				Uid:        "report:" + uuidStr,
 				Meta:       string(metaStr),
 				MetaType:   "USER_NOTICE",
@@ -296,11 +357,29 @@ func SyncETAReportList(list []eta.ETAReport) (err error) {
 				return err
 			}
 		}
-
 	}
 	return
 }
+func SyncHTReportList(list []eta.HTReport) (err error) {
+	logger.Info("同步研报数量%d", len(list))
+	var reports []reportDao.Report
 
+	for _, etaRp := range list {
+		authorNames := strings.Split(etaRp.Author, ",")
+		authorNamesWithOutEmpty := stringUtils.RemoveEmptyStrings(authorNames)
+		for _, authorName := range authorNamesWithOutEmpty {
+			destRp := convertHTReport(etaRp)
+			destRp.Author = authorName
+			reports = append(reports, destRp)
+		}
+	}
+	err = reportDao.BatchInsertReport(&reports)
+	if err != nil {
+		logger.Error("同步HT研报失败:%v", err)
+		return
+	}
+	return syncESAndSendMessage(reports)
+}
 func GetListOrderByConditionWeekly(week bool, column string, limit int, order models.Order) (dtoList []ReportDTO, err error) {
 	reports, err := reportDao.GetListOrderByCondition(week, column, limit, order)
 	if err != nil {
@@ -352,6 +431,20 @@ func convertEtaReport(etaRp eta.ETAReport) reportDao.Report {
 		Author:        etaRp.Author,
 		PublishedTime: etaRp.PublishTime,
 		Source:        reportDao.SourceETA,
+		SendStatus:    reportDao.UNSEND,
+		Status:        reportDao.StatusInit,
+	}
+}
+
+func convertHTReport(etaRp eta.HTReport) reportDao.Report {
+	return reportDao.Report{
+		OrgID:         etaRp.ID,
+		Title:         etaRp.Title,
+		Abstract:      etaRp.Abstract,
+		Author:        etaRp.Author,
+		PublishedTime: etaRp.PublishTime,
+		Source:        reportDao.SourceHT,
+		SendStatus:    reportDao.UNSEND,
 		Status:        reportDao.StatusInit,
 	}
 }
@@ -388,13 +481,13 @@ func convertReportDTO(report reportDao.Report) (reportDTO ReportDTO) {
 
 func matchAll(sorts []string, key string) (request *es.ESQueryRequest) {
 	req := new(es.ESQueryRequest)
-	return req.CreateESQueryRequest(ESIndex, ESColumn, key, 0, 1, sorts, es.MatchAll)
+	return req.CreateESQueryRequest(htConfig.GetReportIndex(), ESColumn, key, 0, 1, sorts, es.MatchAll)
 }
 func match(key string, from int, to int, sorts []string) (request *es.ESQueryRequest) {
 	req := new(es.ESQueryRequest)
-	return req.CreateESQueryRequest(ESIndex, ESColumn, key, from, to, sorts, es.Match)
+	return req.CreateESQueryRequest(htConfig.GetReportIndex(), ESColumn, key, from, to, sorts, es.Match)
 }
 func matchRange(key string, from int, to int, max int64, sorts []string) (request *es.ESQueryRequest) {
 	req := new(es.ESQueryRequest)
-	return req.CreateESQueryRequest(ESIndex, ESColumn, key, from, to, sorts, es.Range).Range(0, max, ESRangeColumn)
+	return req.CreateESQueryRequest(htConfig.GetReportIndex(), ESColumn, key, from, to, sorts, es.Range).Range(0, max, ESRangeColumn)
 }

+ 4 - 0
models/base_db.go

@@ -23,3 +23,7 @@ func EMAS() *gorm.DB {
 func ETA() *gorm.DB {
 	return database.Select(database.ETA)
 }
+
+func HT() *gorm.DB {
+	return database.Select(database.GL)
+}

+ 23 - 0
models/eta/eta_report.go

@@ -3,6 +3,7 @@ package eta
 import (
 	"eta/eta_mini_ht_api/models"
 	"strings"
+	"time"
 )
 
 const (
@@ -45,6 +46,14 @@ type ETAReport struct {
 	NeedSplice       bool   `gorm:"column:need_splice"`
 }
 
+type HTReport struct {
+	ID          int    `gorm:"primary_key;auto_increment"`
+	Title       string `gorm:"column:title;size:125;"`
+	Abstract    string `gorm:"column:abstract;size:255;"`
+	Author      string `gorm:"column:author;size:50;"`
+	PublishTime string `gorm:"column:publish_time"`
+}
+
 //type ReportClassify struct {
 //	ClassifyID int `gorm:"column:classify_id" json:"classify_id"`
 //	ReportPermission
@@ -65,6 +74,20 @@ func GetETAReports(id int) (reports []ETAReport, err error) {
 	return
 }
 
+func GetHTReports(id int) (reports []HTReport, err error) {
+	err = models.HT().Table("report").Select(colunms+strings.Join(classifyIds, ",")).Where("state =? or state=?", published, passed).Where("id > ?", id).Order("id asc").Limit(limit).Find(&reports).Error
+	return
+}
+func GetUpdateETAReports(id int) (reports []ETAReport, err error) {
+	duration := time.Now().Add(-1 * time.Minute)
+	err = models.ETA().Table("report").Select(colunms+strings.Join(classifyIds, ",")).Where("state =? or state=? and modify_time >=?", published, passed, duration).Where("id <= ?", id).Order("id asc").Find(&reports).Error
+	if reports != nil {
+		for _, report := range reports {
+			setClassifyIdValue(&report)
+		}
+	}
+	return
+}
 func GetETAReportById(id int) (report ETAReport, err error) {
 	err = models.ETA().Table("report").Select(detailColumn).Where("id = ?", id).Where("state =? or state=?", published, passed).First(&report).Error
 	return

+ 15 - 13
models/media/media.go

@@ -4,6 +4,7 @@ import (
 	"errors"
 	logger "eta/eta_mini_ht_api/common/component/log"
 	"eta/eta_mini_ht_api/models"
+	"eta/eta_mini_ht_api/models/report"
 	"gorm.io/gorm"
 	"time"
 )
@@ -21,19 +22,20 @@ const (
 )
 
 type Media struct {
-	Id                    int       `gorm:"primary_key;auto_increment;column:id"`
-	AuthorID              int       `gorm:"column:author_id"`
-	AuthorName            string    `gorm:"column:author_name"`
-	MediaType             MediaType `gorm:"column:media_type"`
-	Src                   string    `gorm:"column:src"`
-	MediaName             string    `gorm:"column:media_name"`
-	SourceType            string    `gorm:"column:source_type"`
-	MediaPlayMilliseconds int       `gorm:"column:media_play_milliseconds"`
-	PermissionIDs         string    `gorm:"column:permission_ids"`
-	PublishedTime         time.Time `gorm:"column:published_time"`
-	Deleted               int       `gorm:"column:deleted"`
-	CreatedTime           time.Time `gorm:"column:created_time"`
-	UpdatedTime           time.Time `gorm:"column:updated_time"`
+	Id                    int               `gorm:"primary_key;auto_increment;column:id"`
+	AuthorID              int               `gorm:"column:author_id"`
+	AuthorName            string            `gorm:"column:author_name"`
+	MediaType             MediaType         `gorm:"column:media_type"`
+	Src                   string            `gorm:"column:src"`
+	MediaName             string            `gorm:"column:media_name"`
+	SourceType            string            `gorm:"column:source_type"`
+	MediaPlayMilliseconds int               `gorm:"column:media_play_milliseconds"`
+	PermissionIDs         string            `gorm:"column:permission_ids"`
+	PublishedTime         time.Time         `gorm:"column:published_time"`
+	SendStatus            report.SendStatus `gorm:"column:send_status"`
+	Deleted               int               `gorm:"column:deleted"`
+	CreatedTime           time.Time         `gorm:"column:created_time"`
+	UpdatedTime           time.Time         `gorm:"column:updated_time"`
 }
 
 func (m *Media) BeforeCreate(_ *gorm.DB) (err error) {

+ 12 - 2
models/report/report.go

@@ -12,8 +12,11 @@ import (
 
 type ReportStatus string
 type ReportSource string
+type SendStatus string
 
 const (
+	SEND          SendStatus   = "SEND"
+	UNSEND        SendStatus   = "UNSEND"
 	SourceETA     ReportSource = "ETA"
 	SourceHT      ReportSource = "HT"
 	StatusInit    ReportStatus = "INIT"
@@ -34,6 +37,7 @@ type Report struct {
 	Author        string       `gorm:"column:author;comment:'作者'" json:"author"`
 	CoverSrc      string       `gorm:"column:cover_src;comment:'封面图片'" json:"cover_src"`
 	Status        ReportStatus `gorm:"column:status;comment:'报告状态 init:初始化 pending:同步中 done:完成同步'" json:"status"`
+	SendStatus    SendStatus   `gorm:"column:send_status;comment:'发送状态'" json:"send_status"`
 	PublishedTime string       `gorm:"column:published_time;comment:'发布时间'" json:"published_time"`
 	CreatedTime   time.Time    `gorm:"column:created_time;comment:'创建时间'" json:"created_time"`
 	UpdatedTime   time.Time    `gorm:"column:updated_time;comment:'修改时间'" json:"updated_time"`
@@ -63,6 +67,12 @@ func GetAuthorByOrgId(orgId int, source string) (names []string, err error) {
 	err = db.Model(&Report{}).Select("author").Where("org_id = ? and source =? ", orgId, source).Scan(&names).Error
 	return
 }
+
+func GetReportByOrgId(orgId int, source string) (reports []Report, err error) {
+	db := models.Main()
+	err = db.Select(CommonColumns).Where("org_id = ? and source =? ", orgId, source).Find(&reports).Error
+	return
+}
 func GetReportById(reportId int) (report Report, err error) {
 	db := models.Main()
 	err = db.Select(CommonColumns).Where("id = ?", reportId).First(&report).Error
@@ -71,9 +81,9 @@ func GetReportById(reportId int) (report Report, err error) {
 	}
 	return
 }
-func GetETALatestReportId() (id int, err error) {
+func GetLatestReportIdBySource(source ReportSource) (id int, err error) {
 	sql := "select IFNULL(max(org_id),0)  from reports where source = ?"
-	err = DoSql(sql, &id, SourceETA)
+	err = DoSql(sql, &id, source)
 	return
 }
 func DoSql(sql string, result interface{}, values ...interface{}) (err error) {

+ 4 - 3
task/base/eta_task.go

@@ -8,9 +8,10 @@ import (
 )
 
 const (
-	DEV  = "dev"
-	TEST = "test"
-	PROD = "prod"
+	FORBIDDEN = "forbidden"
+	DEV       = "dev"
+	TEST      = "test"
+	PROD      = "prod"
 )
 
 type TaskType string

+ 2 - 2
task/eta/media/eta_media_task.go

@@ -31,7 +31,7 @@ func (re *MediaTask) Execute(taskDetail *base.TaskDetail) error {
 	}
 	es1 := convert(lists)
 	for _, item := range es1 {
-		es.GetInstance().Add("media_index", item.MediaId, item)
+		es.GetInstance().AddOrUpdate("media_index", item.MediaId, item)
 	}
 	return nil
 }
@@ -70,6 +70,6 @@ type MediaTask struct {
 }
 
 func init() {
-	reportTask := base.NewTask(taskName, cron, new(MediaTask), base.PROD)
+	reportTask := base.NewTask(taskName, cron, new(MediaTask), base.FORBIDDEN)
 	base.RegisterTask(&reportTask)
 }

+ 0 - 50
task/eta/report/eta_report_task.go

@@ -1,50 +0,0 @@
-package report
-
-import (
-	"encoding/json"
-	logger "eta/eta_mini_ht_api/common/component/log"
-	"eta/eta_mini_ht_api/common/contants"
-	"eta/eta_mini_ht_api/domian/report"
-	"eta/eta_mini_ht_api/models/eta"
-	"eta/eta_mini_ht_api/task/base"
-)
-
-var (
-	taskName base.TaskType = "ETAReportSyncTask"
-	cron                   = "0/10 * * * * *"
-)
-
-// Execute Task ETA取研报的数据
-func (re *ReportTask) Execute(taskDetail *base.TaskDetail) error {
-	logger.Info(contants.TaskFormat, "同步ETA研报库开始")
-	id, err := report.GetETALatestReportId()
-	var etaReportList []eta.ETAReport
-	etaReportList, err = eta.GetETAReports(id)
-	if err != nil {
-		logger.Error("获取ETA研报列表失败:%v", err)
-		return err
-	}
-	if len(etaReportList) > 0 {
-		var list []byte
-		list, err = json.Marshal(etaReportList)
-		if err == nil {
-			taskDetail.Content = string(list)
-		}
-		err = report.SyncETAReportList(etaReportList)
-		if err != nil {
-			logger.Error("同步ETA研报列表失败:%v", err)
-			return err
-		}
-	}
-	logger.Info(contants.TaskFormat, "同步ETA研报库结束")
-
-	return nil
-}
-
-type ReportTask struct {
-}
-
-func init() {
-	reportTask := base.NewTask(taskName, cron, new(ReportTask), base.DEV)
-	base.RegisterTask(&reportTask)
-}

+ 77 - 0
task/report/report_task.go

@@ -0,0 +1,77 @@
+package report
+
+import (
+	"encoding/json"
+	logger "eta/eta_mini_ht_api/common/component/log"
+	"eta/eta_mini_ht_api/common/contants"
+	"eta/eta_mini_ht_api/domian/report"
+	"eta/eta_mini_ht_api/models/eta"
+	"eta/eta_mini_ht_api/task/base"
+	"sync"
+)
+
+var (
+	taskName base.TaskType = "ETAReportSyncTask"
+	cron                   = "0/10 * * * * *"
+)
+
+// Execute Task ETA取研报的数据
+func (re *ReportTask) Execute(taskDetail *base.TaskDetail) error {
+	logger.Info(contants.TaskFormat, "同步ETA研报库开始")
+	var wg sync.WaitGroup
+	wg.Add(2)
+	//ETA报告
+	go func() {
+		defer wg.Done()
+		id, err := report.GetETALatestReportId()
+		var etaReportList []eta.ETAReport
+		etaReportList, err = eta.GetETAReports(id)
+		if err != nil {
+			logger.Error("获取ETA研报列表失败:%v", err)
+		}
+		if len(etaReportList) > 0 {
+			var list []byte
+			list, err = json.Marshal(etaReportList)
+			if err == nil {
+				taskDetail.Content = string(list)
+			}
+			err = report.SyncETAReportList(etaReportList)
+			if err != nil {
+				logger.Error("同步ETA研报列表失败:%v", err)
+			}
+		}
+		logger.Info(contants.TaskFormat, "同步ETA研报库结束")
+	}()
+	//HT报告
+	go func() {
+		defer wg.Done()
+		id, err := report.GetHTLatestReportId()
+		var htReportList []eta.HTReport
+		htReportList, err = eta.GetHTReports(id)
+		if err != nil {
+			logger.Error("获取ETA研报列表失败:%v", err)
+		}
+		if len(htReportList) > 0 {
+			var list []byte
+			list, err = json.Marshal(htReportList)
+			if err == nil {
+				taskDetail.Content = string(list)
+			}
+			err = report.SyncHTReportList(htReportList)
+			if err != nil {
+				logger.Error("同步ETA研报列表失败:%v", err)
+			}
+		}
+		logger.Info(contants.TaskFormat, "同步ETA研报库结束")
+	}()
+	wg.Wait()
+	return nil
+}
+
+type ReportTask struct {
+}
+
+func init() {
+	reportTask := base.NewTask(taskName, cron, new(ReportTask), base.DEV)
+	base.RegisterTask(&reportTask)
+}

+ 50 - 0
task/report/report_update_task.go

@@ -0,0 +1,50 @@
+package report
+
+import (
+	logger "eta/eta_mini_ht_api/common/component/log"
+	"eta/eta_mini_ht_api/common/contants"
+	"eta/eta_mini_ht_api/task/base"
+)
+
+var (
+	updateTaskName base.TaskType = "ReportUpdateSyncTask"
+	updateCron                   = "0/10 * * * * *"
+)
+
+// Execute Task ETA取研报的数据
+func (re *ReportUpdateTask) Execute(taskDetail *base.TaskDetail) error {
+	logger.Info(contants.TaskFormat, "更新研报库开始")
+	//var wg sync.WaitGroup
+	//wg.Add(1)
+	//go func() {
+	//	defer wg.Done()
+	//	id, err := report.GetETALatestReportId()
+	//	var etaReportList []eta.ETAReport
+	//	etaReportList, err = eta.GetUpdateETAReports(id)
+	//	if err != nil {
+	//		logger.Error("获取ETA研报列表失败:%v", err)
+	//	}
+	//	if len(etaReportList) > 0 {
+	//		var list []byte
+	//		list, err = json.Marshal(etaReportList)
+	//		if err == nil {
+	//			taskDetail.Content = string(list)
+	//		}
+	//		err = report.SyncETAReportList(etaReportList)
+	//		if err != nil {
+	//			logger.Error("同步ETA研报列表失败:%v", err)
+	//		}
+	//	}
+	//	logger.Info(contants.TaskFormat, "同步ETA研报库结束")
+	//}()
+	//wg.Wait()
+	return nil
+}
+
+type ReportUpdateTask struct {
+}
+
+func init() {
+	reportTask := base.NewTask(updateTaskName, updateCron, new(ReportUpdateTask), base.DEV)
+	base.RegisterTask(&reportTask)
+}

+ 1 - 1
task/task_starter.go

@@ -4,8 +4,8 @@ import (
 	"eta/eta_mini_ht_api/task/base"
 	_ "eta/eta_mini_ht_api/task/eta/author"
 	_ "eta/eta_mini_ht_api/task/eta/media"
-	_ "eta/eta_mini_ht_api/task/eta/report"
 	_ "eta/eta_mini_ht_api/task/message"
+	_ "eta/eta_mini_ht_api/task/report"
 	_ "eta/eta_mini_ht_api/task/sms"
 	"github.com/beego/beego/v2/server/web"
 	"github.com/beego/beego/v2/task"