Переглянути джерело

Merge branch 'debug' of http://8.136.199.33:3000/eta_mini/eta_mini_ht_api into debug

hongze 7 місяців тому
батько
коміт
6b21c62bf4

+ 31 - 10
common/component/log/log_plugin.go

@@ -8,6 +8,7 @@ import (
 	"log"
 	"os"
 	"path"
+	"strings"
 	"sync"
 )
 
@@ -23,7 +24,7 @@ var (
 
 type logger struct {
 	*logs.BeeLogger
-	filter string
+	filter Filter
 }
 
 type CustomLogger struct {
@@ -58,17 +59,17 @@ func Debug(msg string, v ...interface{}) {
 	loggerHandler.Debug(msg, v...)
 }
 func (c *CustomLogger) Debug(msg string, v ...interface{}) {
-	for _, appender := range c.logs {
-		if appender.GetLevel() >= logs.LevelInfo {
-			appender.Debug(msg, v...)
+	for _, logger := range c.logs {
+		if logger.GetLevel() >= logs.LevelInfo && logger.filter.ShouldLog(msg) {
+			logger.Debug(msg, v...)
 		}
 
 	}
 }
 func (c *CustomLogger) Info(msg string, v ...interface{}) {
-	for _, appender := range c.logs {
-		if appender.GetLevel() >= logs.LevelInfo {
-			appender.Info(msg, v...)
+	for _, logger := range c.logs {
+		if logger.GetLevel() >= logs.LevelInfo && logger.filter.ShouldLog(msg) {
+			logger.Info(msg, v...)
 		}
 
 	}
@@ -76,7 +77,7 @@ func (c *CustomLogger) Info(msg string, v ...interface{}) {
 
 func (c *CustomLogger) Error(msg string, v ...interface{}) {
 	for _, logger := range c.logs {
-		if logger.GetLevel() >= logs.LevelError {
+		if logger.GetLevel() >= logs.LevelError && logger.filter.ShouldLog(msg) {
 			logger.Error(msg, v...)
 		}
 	}
@@ -84,7 +85,7 @@ func (c *CustomLogger) Error(msg string, v ...interface{}) {
 
 func (c *CustomLogger) Warn(msg string, v ...interface{}) {
 	for _, logger := range c.logs {
-		if logger.GetLevel() >= logs.LevelWarning {
+		if logger.GetLevel() >= logs.LevelWarning && logger.filter.ShouldLog(msg) {
 			logger.Warn(msg, v...)
 		}
 	}
@@ -163,7 +164,7 @@ func initLogger(logCfg logConfig) {
 			beeLogger.EnableFuncCallDepth(true)
 		}
 		loggerHandler.logs = append(loggerHandler.logs, &logger{BeeLogger: beeLogger,
-			filter: appender.Filter})
+			filter: NewLevelFilter(appender.Filter)})
 	}
 }
 
@@ -187,6 +188,26 @@ func convertAppenderToLog(a *appender) (*logProps, error) {
 	}, nil
 }
 
+// Filter 接口定义
+type Filter interface {
+	ShouldLog(message string) bool
+}
+
+// LevelFilter 实现 Filter 接口,根据日志级别过滤日志
+type ContentFilter struct {
+	filterStr string
+}
+
+// NewLevelFilter 创建一个新的 LevelFilter 实例
+func NewLevelFilter(filterStr string) *ContentFilter {
+	return &ContentFilter{filterStr: filterStr}
+}
+
+// ShouldLog 根据日志级别决定是否记录日志
+func (lf *ContentFilter) ShouldLog(message string) bool {
+	return strings.Contains(message, lf.filterStr)
+}
+
 // logConfig 日志配置
 type logConfig struct {
 	FilePath  string     `json:"filepath" description:"日志路径"`

+ 0 - 27
common/component/log/log_plugin_test.go

@@ -1,27 +0,0 @@
-package logger
-
-import (
-	"errors"
-	"testing"
-)
-
-func TestError(t *testing.T) {
-	type args struct {
-		msg string
-		v   []interface{}
-	}
-	tests := []struct {
-		name string
-		args args
-	}{
-		{
-			name: "错误",
-			args: args{msg: "错误", v: []interface{}{errors.New("1212")}},
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			Error(tt.args.msg, tt.args.v...)
-		})
-	}
-}

+ 12 - 0
conf/log/log_config.json

@@ -29,6 +29,18 @@
       "level": "info",
       "color": true
     },
+    {
+      "filter": "apiRequest",
+      "type": "file",
+      "filename": "api/api.log",
+      "maxlines": 1000000,
+      "maxsize": 102400,
+      "daily": true,
+      "maxdays": 30,
+      "rotate": true,
+      "level": "info",
+      "color": true
+    },
     {
       "filter": "coin,orm....*",
       "type": "file",

+ 41 - 0
controllers/base_controller.go

@@ -7,6 +7,7 @@ import (
 	"eta/eta_mini_ht_api/common/exception"
 	"eta/eta_mini_ht_api/common/http"
 	"github.com/beego/beego/v2/server/web"
+	"net/url"
 )
 
 type WrapData struct {
@@ -87,3 +88,43 @@ func (b *BaseController) SuccessResult(msg string, data interface{}, wrapData *W
 func (b *BaseController) FailedResult(msg string, wrapData *WrapData) {
 	wrapData.Msg = msg
 }
+
+func (b *BaseController) Prepare() {
+	var requestBody string
+	uri := b.Ctx.Input.URI()
+	method := b.Ctx.Input.Method()
+	if method == "GET" {
+		requestBody = b.Ctx.Request.RequestURI
+	} else {
+		requestBody, _ = url.QueryUnescape(string(b.Ctx.Input.RequestBody))
+	}
+	ip := b.Ctx.Input.IP()
+	b.Ctx.Input.URL()
+	logger.Info("apiRequest:[uri:%s, requestBody:%s, ip:%s]", uri, requestBody, ip)
+}
+func (b *BaseController) Finish() {
+	runMode := web.BConfig.RunMode
+	if b.Data["json"] == nil {
+		logger.Warn("apiRequest:[异常提醒:%v 接口:URI:%v;无返回值]", runMode, b.Ctx.Input.URI())
+		return
+	}
+	baseRes := b.Data["json"].(BaseResponse)
+	if !baseRes.Success {
+		logger.Info("apiRequest:[异常提醒:%v接口:URI:%v;ErrMsg:&v;Msg:%v]", b.Ctx.Input.URI(), baseRes.ErrMsg, baseRes.Msg)
+	} else {
+		if baseRes.Data == nil {
+			logger.Warn("apiRequest:[异常提醒:%v 接口:URI:%v;无返回值]", runMode, b.Ctx.Input.URI())
+			return
+		} else {
+			logger.Info("apiRequest:[uri:%s, resData:%s, ip:%s]", b.Ctx.Input.URI(), baseRes.Data)
+		}
+	}
+
+}
+
+type RequestInfo struct {
+	Uri       string `json:"uri"`
+	IpAddress string `json:"ip_address"`
+	Method    string `json:"method"`
+	Params    string `json:"params"`
+}

+ 36 - 18
domian/report/report_service.go

@@ -5,11 +5,16 @@ import (
 	"eta/eta_mini_ht_api/common/component/es"
 	logger "eta/eta_mini_ht_api/common/component/log"
 	"eta/eta_mini_ht_api/common/utils/page"
+	stringUtils "eta/eta_mini_ht_api/common/utils/string"
+	analystService "eta/eta_mini_ht_api/domian/financial_analyst"
+	userService "eta/eta_mini_ht_api/domian/user"
 	"eta/eta_mini_ht_api/models"
 	"eta/eta_mini_ht_api/models/eta"
 	etaDao "eta/eta_mini_ht_api/models/eta"
 	reportDao "eta/eta_mini_ht_api/models/report"
+	"github.com/google/uuid"
 	"strconv"
+	"strings"
 	"time"
 )
 
@@ -257,25 +262,38 @@ func SyncETAReportList(list []eta.ETAReport) (err error) {
 		return
 	}
 	//生产meta信息
-	//for _, report := range reports {
-	//userIds := userService.GetPostUser(report.Author, report.PublishedTime)
-	//usersStr := stringUtils.IntToStringSlice(userIds)
-	//Meta :=
-	//toStr := strings.Join(usersStr, ",")
-	////user.SyncMeta(etaReportList[i])
-	//userService.MetaInfoDTO{
-	//	From:       "ETA",
-	//	Meta:       string(list),
-	//	MetaType:   "USER_NOTICE",
-	//	SourceType: "REPORT",
-	//	To:         toStr,
-	//	Uid:        userIds[0],
-	//}
-	//}
-	return
-}
+	logger.Info("生成推送META信息")
+	for _, report := range reports {
+		userIds := userService.GetPostUser(report.Author, report.PublishedTime)
+		author, _ := analystService.GetAnalystByName(report.Author)
+		if len(userIds) > 0 {
+			usersStr := stringUtils.IntToStringSlice(userIds)
+			Meta := userService.MetaData{
+				AuthorName: report.Author,
+				AuthorId:   author.Id,
+				SourceId:   report.ID,
+			}
+			metaStr, _ := json.Marshal(Meta)
+			toStr := strings.Join(usersStr, ",")
+			UUID := uuid.New()
+			uuidStr := UUID.String()
+			metaContent := userService.MetaInfoDTO{
+				From:       "ETA",
+				Uid:        "report:" + uuidStr,
+				Meta:       string(metaStr),
+				MetaType:   "USER_NOTICE",
+				SourceType: "REPORT",
+				To:         toStr,
+			}
+			err = userService.CreateMetaInfo(metaContent)
+			if err != nil {
+				logger.Error("创建Meta信息失败:%v", err)
+				return err
+			}
+		}
 
-type reportMeta struct {
+	}
+	return
 }
 
 func GetListOrderByConditionWeekly(week bool, column string, limit int, order models.Order) (dtoList []ReportDTO, err error) {

+ 48 - 6
domian/user/meta_info.go

@@ -1,10 +1,52 @@
 package user
 
+import (
+	userDao "eta/eta_mini_ht_api/models/user"
+)
+
 type MetaInfoDTO struct {
-	Uid        int
-	Meta       string
-	From       string
-	To         string
-	SourceType string
-	MetaType   string
+	Uid        string `json:"Uid,omitempty"`
+	Meta       string `json:"Meta,omitempty"`
+	From       string `json:"From,omitempty"`
+	To         string `json:"To,omitempty"`
+	SourceType string `json:"SourceType,omitempty"`
+	MetaType   string `json:"MetaType,omitempty"`
+}
+
+type MetaData struct {
+	SourceId   int    `json:"reportId"`
+	AuthorId   int    `json:"AuthorId"`
+	AuthorName string `json:"authorName"`
+}
+
+func CreateMetaInfo(dto MetaInfoDTO) (err error) {
+	return userDao.CreateMetaInfo(convertToMetaInfo(dto))
+}
+func convertToMetaInfo(dto MetaInfoDTO) userDao.MetaInfo {
+	return userDao.MetaInfo{
+		Uid:        dto.Uid,
+		Meta:       dto.Meta,
+		From:       dto.From,
+		To:         dto.To,
+		SourceType: userDao.SourceType(dto.SourceType),
+		MetaType:   userDao.MetaType(dto.MetaType),
+	}
+}
+func convertToMetaDTO(dto userDao.MetaInfo) MetaInfoDTO {
+	return MetaInfoDTO{
+		Uid:        dto.Uid,
+		Meta:       dto.Meta,
+		From:       dto.From,
+		To:         dto.To,
+		SourceType: string(dto.SourceType),
+		MetaType:   string(dto.MetaType),
+	}
+}
+
+func GetInitMetaInfos() (list []MetaInfoDTO) {
+	metas := userDao.GetInitMetaInfos()
+	for _, meta := range metas {
+		list = append(list, convertToMetaDTO(meta))
+	}
+	return
 }

+ 63 - 0
domian/user/user_message_service.go

@@ -0,0 +1,63 @@
+package user
+
+import (
+	"encoding/json"
+	logger "eta/eta_mini_ht_api/common/component/log"
+	userDao "eta/eta_mini_ht_api/models/user"
+	"fmt"
+	"strconv"
+	"strings"
+)
+
+type MessageDTO struct {
+	From    int    `gorm:"column:from"`
+	To      int    `gorm:"column:to"`
+	Message string `gorm:"column:message"`
+	Type    string `gorm:"column:type;type:enum('REPORT','VIDEO','AUDIO')"`
+	Status  string `gorm:"column:status;type:enum('UNREAD','READ')"`
+}
+
+const (
+	ReportMessageTemplate = "您关注的研究员%v更新了一篇报告"
+	VideoMessageTemplate  = "您关注的研究员%v更新了一个视频"
+	AudioMessageTemplate  = "您关注的研究员%v更新了一个音频"
+)
+
+type MessageInfo struct {
+	analyst string
+}
+
+func CreateMessage(meta MetaInfoDTO) (err error) {
+	messageType := userDao.SourceType(meta.SourceType)
+	var messageList []userDao.UserMessage
+	users := strings.Split(meta.To, ",")
+	if err != nil {
+		logger.Error("unmarshal meta info error")
+		return
+	}
+	var message string
+	var content MetaData
+	err = json.Unmarshal([]byte(meta.Meta), &content)
+	switch userDao.SourceType(meta.SourceType) {
+	case userDao.ReportSourceType:
+		message = fmt.Sprintf(ReportMessageTemplate, content.AuthorName)
+		messageType = userDao.ReportSourceType
+	case userDao.VideoSourceType:
+		message = fmt.Sprintf(VideoMessageTemplate, content.AuthorName)
+	case userDao.AudioSourceType:
+		message = fmt.Sprintf(AudioMessageTemplate, content.AuthorName)
+	}
+	for _, user := range users {
+		id, _ := strconv.Atoi(user)
+		userMessage := userDao.UserMessage{
+			From:     content.AuthorId,
+			To:       id,
+			Message:  message,
+			SourceId: content.SourceId,
+			Type:     messageType,
+			Status:   userDao.UnReadStatus,
+		}
+		messageList = append(messageList, userMessage)
+	}
+	return userDao.BatchInsertMessage(messageList)
+}

+ 4 - 2
models/report/report.go

@@ -39,13 +39,15 @@ type Report struct {
 	UpdatedTime   time.Time    `gorm:"column:updated_time;comment:'修改时间'" json:"updated_time"`
 }
 
-func BatchInsertReport(list *[]Report) error {
+func BatchInsertReport(list *[]Report) (err error) {
 	db := models.Main()
 	//手动事务
 	tx := db.Begin()
-	err := db.CreateInBatches(list, MaxBatchNum).Error
+	err = db.CreateInBatches(list, MaxBatchNum).Error
 	if err != nil {
 		logger.Error("批量插入研报失败:%v", err)
+		tx.Rollback()
+		return
 	}
 	tx.Commit()
 	return nil

+ 30 - 6
models/user/meta_info.go

@@ -1,6 +1,9 @@
 package user
 
 import (
+	logger "eta/eta_mini_ht_api/common/component/log"
+	"eta/eta_mini_ht_api/models"
+	"gorm.io/gorm"
 	"time"
 )
 
@@ -9,11 +12,11 @@ type SourceType string
 type MetaType string
 
 const (
-	UserNoticeType  MetaType = "USER_NOTICE"
-	InitMetaType    MetaType = "INIT"
-	PendingMetaType MetaType = "PENDING"
-	FinishMetaType  MetaType = "FINISH"
-	FailedMetaType  MetaType = "FAILED"
+	UserNoticeType    MetaType   = "USER_NOTICE"
+	InitStatusType    StatusType = "INIT"
+	PendingStatusType StatusType = "PENDING"
+	FinishStatusType  StatusType = "FINISH"
+	FailedStatusType  StatusType = "FAILED"
 
 	ReportSourceType SourceType = "REPORT"
 	VideoSourceType  SourceType = "VIDEO"
@@ -23,7 +26,7 @@ const (
 // MetaInfo 表示 meta_infos 表的模型
 type MetaInfo struct {
 	Id          int        `gorm:"primaryKey;autoIncrement;column:id"`
-	Uid         int        `gorm:"column:uid"`
+	Uid         string     `gorm:"column:uid"`
 	Meta        string     `gorm:"column:meta"`
 	From        string     `gorm:"column:from"`
 	To          string     `gorm:"column:to"`
@@ -33,3 +36,24 @@ type MetaInfo struct {
 	CreatedTime time.Time
 	UpdatedTime time.Time
 }
+
+func (mt *MetaInfo) BeforeCreate(_ *gorm.DB) (err error) {
+	mt.CreatedTime = time.Now()
+	mt.Status = InitStatusType
+	return nil
+}
+
+func CreateMetaInfo(metaInfo MetaInfo) (err error) {
+	db := models.Main()
+	return db.Create(&metaInfo).Error
+}
+
+func GetInitMetaInfos() (list []MetaInfo) {
+	db := models.Main()
+	err := db.Where("status = ?", InitStatusType).Order("id asc").Limit(100).Find(&list).Error
+	if err != nil {
+		logger.Error("获取meta数据失败:%v", err)
+		return []MetaInfo{}
+	}
+	return list
+}

+ 1 - 1
models/user/user_analyst_follow_list.go

@@ -30,7 +30,7 @@ type UserAnalystFollowList struct {
 
 func GetPostUser(authorName string, PublishTime string) (ids []int) {
 	db := models.Main()
-	err := db.Select("user_id").Where("financial_analyst_name = ? and followed = ? and financial_analyst_name=? and followed_time <=?", authorName, Following, PublishTime).Order("followed_time desc").Find(&ids).Error
+	err := db.Model(&UserAnalystFollowList{}).Select("user_id").Where("financial_analyst_name = ? and followed = ? and followed_time <=?", authorName, Following, PublishTime).Order("followed_time desc").Find(&ids).Error
 	if err != nil {
 		return []int{}
 	}

+ 53 - 0
models/user/user_message.go

@@ -0,0 +1,53 @@
+package user
+
+import (
+	logger "eta/eta_mini_ht_api/common/component/log"
+	"eta/eta_mini_ht_api/models"
+	"gorm.io/gorm"
+	"time"
+)
+
+type MessageType string
+
+const (
+	UnReadStatus StatusType = "UNREAD"
+	ReadStatus   StatusType = "READ"
+	MaxBatchNum             = 1000
+)
+
+// UserMessage 表示 user_message 表的模型
+type UserMessage struct {
+	Id          int        `gorm:"primaryKey;autoIncrement;column:id"`
+	From        int        `gorm:"column:from"`
+	To          int        `gorm:"column:to"`
+	SourceId    int        `gorm:"column:source_id"`
+	Message     string     `gorm:"column:message"`
+	Type        SourceType `gorm:"column:type;type:enum('REPORT','VIDEO','AUDIO')"`
+	Status      StatusType `gorm:"column:status;type:enum('UNREAD','READ')"`
+	CreatedTime time.Time  `gorm:"column:created_time;type:timestamps;comment:创建时间"`
+	UpdatedTime time.Time  `gorm:"column:updated_time"`
+}
+
+func (u *UserMessage) BeforeCreate(_ *gorm.DB) (err error) {
+	u.CreatedTime = time.Now()
+	return
+}
+
+func CreateMessage(message UserMessage) (err error) {
+	db := models.Main()
+	return db.Create(&message).Error
+}
+
+func BatchInsertMessage(messages []UserMessage) (err error) {
+	db := models.Main()
+	//手动事务
+	tx := db.Begin()
+	err = db.CreateInBatches(messages, MaxBatchNum).Error
+	if err != nil {
+		logger.Error("批量插入消息失败:%v", err)
+		tx.Rollback()
+		return
+	}
+	tx.Commit()
+	return nil
+}

+ 2 - 2
task/eta/report/eta_report_task.go

@@ -12,7 +12,7 @@ import (
 
 var (
 	taskName base.TaskType = "ETAReportSyncTask"
-	cron                   = "0/5 * * * * *"
+	cron                   = "0/10 * * * * *"
 )
 
 var (
@@ -50,6 +50,6 @@ type ReportTask struct {
 }
 
 func init() {
-	reportTask := base.NewTask(taskName, cron, new(ReportTask), base.DEV)
+	reportTask := base.NewTask(taskName, cron, new(ReportTask), base.PROD)
 	base.RegisterTask(&reportTask)
 }

+ 14 - 36
task/message/notice_task.go

@@ -3,58 +3,36 @@ package message
 import (
 	logger "eta/eta_mini_ht_api/common/component/log"
 	"eta/eta_mini_ht_api/common/contants"
-	"eta/eta_mini_ht_api/domian/financial_analyst"
-	"eta/eta_mini_ht_api/models/eta"
+	userService "eta/eta_mini_ht_api/domian/user"
 	"eta/eta_mini_ht_api/task/base"
-	"fmt"
-	"github.com/google/uuid"
 	"sync"
-	"time"
 )
 
 var (
 	taskName base.TaskType = "NoticeTask"
 	cron                   = "0/5 * * * * *"
-	duration               = 5 * time.Second
 )
 
 // Execute Task ETA取研报的数据
-func (au *AuthorTask) Execute(taskDetail *base.TaskDetail) error {
+func (au *NoticeTask) Execute(taskDetail *base.TaskDetail) error {
 	logger.Info(contants.TaskFormat, "监听更新通知开始")
-	//报告和媒体
+	metaInfoList := userService.GetInitMetaInfos()
 	var wg sync.WaitGroup
-	wg.Add(2)
-	UUID := uuid.New()
-	uuidStr := UUID.String()
-	timeBefore := time.Now().Add(-duration)
-	fmt.Printf("监听更新通知开始:%v", timeBefore)
-	//监听报告
-	go func(uuid string) {
-		defer wg.Done()
-		//list := reportService.GetNewReportByPublishTime(timeBefore)
-		//	users := userService.GetNoticeUsersByReports(list)
-	}(uuidStr)
-	//监听媒体
-	go func(uuid string) {
-		defer wg.Done()
-	}(uuidStr)
-	wg.Wait()
-
+	wg.Add(len(metaInfoList))
+	for _, metaInfo := range metaInfoList {
+		go func(metaInfo userService.MetaInfoDTO) {
+			defer wg.Done()
+			userService.CreateMessage(metaInfo)
+		}(metaInfo)
+	}
+	//报告和媒体
 	return nil
 }
 
-type AuthorTask struct {
+type NoticeTask struct {
 }
 
-func convert(author eta.ReportAuthor) financial_analyst.FinancialAnalystDTO {
-	return financial_analyst.FinancialAnalystDTO{
-		Deleted: author.IsDelete,
-		ETAId:   author.Id,
-		Name:    author.ReportAuthor,
-		Status:  author.Enable,
-	}
-}
 func init() {
-	authorTask := base.NewTask(taskName, cron, new(AuthorTask), base.PROD)
-	base.RegisterTask(&authorTask)
+	reportTask := base.NewTask(taskName, cron, new(NoticeTask), base.PROD)
+	base.RegisterTask(&reportTask)
 }

+ 1 - 0
task/task_starter.go

@@ -5,6 +5,7 @@ import (
 	_ "eta/eta_mini_ht_api/task/eta/author"
 	_ "eta/eta_mini_ht_api/task/eta/media"
 	_ "eta/eta_mini_ht_api/task/eta/report"
+	_ "eta/eta_mini_ht_api/task/message"
 	_ "eta/eta_mini_ht_api/task/sms"
 	"github.com/beego/beego/v2/server/web"
 	"github.com/beego/beego/v2/task"