kobe6258 8 miesięcy temu
rodzic
commit
cabe3222e1

+ 31 - 10
common/component/log/log_plugin.go

@@ -8,6 +8,7 @@ import (
 	"log"
 	"os"
 	"path"
+	"strings"
 	"sync"
 )
 
@@ -23,7 +24,7 @@ var (
 
 type logger struct {
 	*logs.BeeLogger
-	filter string
+	filter Filter
 }
 
 type CustomLogger struct {
@@ -58,17 +59,17 @@ func Debug(msg string, v ...interface{}) {
 	loggerHandler.Debug(msg, v...)
 }
 func (c *CustomLogger) Debug(msg string, v ...interface{}) {
-	for _, appender := range c.logs {
-		if appender.GetLevel() >= logs.LevelInfo {
-			appender.Debug(msg, v...)
+	for _, logger := range c.logs {
+		if logger.GetLevel() >= logs.LevelInfo && logger.filter.ShouldLog(msg) {
+			logger.Debug(msg, v...)
 		}
 
 	}
 }
 func (c *CustomLogger) Info(msg string, v ...interface{}) {
-	for _, appender := range c.logs {
-		if appender.GetLevel() >= logs.LevelInfo {
-			appender.Info(msg, v...)
+	for _, logger := range c.logs {
+		if logger.GetLevel() >= logs.LevelInfo && logger.filter.ShouldLog(msg) {
+			logger.Info(msg, v...)
 		}
 
 	}
@@ -76,7 +77,7 @@ func (c *CustomLogger) Info(msg string, v ...interface{}) {
 
 func (c *CustomLogger) Error(msg string, v ...interface{}) {
 	for _, logger := range c.logs {
-		if logger.GetLevel() >= logs.LevelError {
+		if logger.GetLevel() >= logs.LevelError && logger.filter.ShouldLog(msg) {
 			logger.Error(msg, v...)
 		}
 	}
@@ -84,7 +85,7 @@ func (c *CustomLogger) Error(msg string, v ...interface{}) {
 
 func (c *CustomLogger) Warn(msg string, v ...interface{}) {
 	for _, logger := range c.logs {
-		if logger.GetLevel() >= logs.LevelWarning {
+		if logger.GetLevel() >= logs.LevelWarning && logger.filter.ShouldLog(msg) {
 			logger.Warn(msg, v...)
 		}
 	}
@@ -163,7 +164,7 @@ func initLogger(logCfg logConfig) {
 			beeLogger.EnableFuncCallDepth(true)
 		}
 		loggerHandler.logs = append(loggerHandler.logs, &logger{BeeLogger: beeLogger,
-			filter: appender.Filter})
+			filter: NewLevelFilter(appender.Filter)})
 	}
 }
 
@@ -187,6 +188,26 @@ func convertAppenderToLog(a *appender) (*logProps, error) {
 	}, nil
 }
 
+// Filter 接口定义
+type Filter interface {
+	ShouldLog(message string) bool
+}
+
+// LevelFilter 实现 Filter 接口,根据日志级别过滤日志
+type ContentFilter struct {
+	filterStr string
+}
+
+// NewLevelFilter 创建一个新的 LevelFilter 实例
+func NewLevelFilter(filterStr string) *ContentFilter {
+	return &ContentFilter{filterStr: filterStr}
+}
+
+// ShouldLog 根据日志级别决定是否记录日志
+func (lf *ContentFilter) ShouldLog(message string) bool {
+	return strings.Contains(message, lf.filterStr)
+}
+
 // logConfig 日志配置
 type logConfig struct {
 	FilePath  string     `json:"filepath" description:"日志路径"`

+ 0 - 27
common/component/log/log_plugin_test.go

@@ -1,27 +0,0 @@
-package logger
-
-import (
-	"errors"
-	"testing"
-)
-
-func TestError(t *testing.T) {
-	type args struct {
-		msg string
-		v   []interface{}
-	}
-	tests := []struct {
-		name string
-		args args
-	}{
-		{
-			name: "错误",
-			args: args{msg: "错误", v: []interface{}{errors.New("1212")}},
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			Error(tt.args.msg, tt.args.v...)
-		})
-	}
-}

+ 13 - 1
conf/log/log_config.json

@@ -1,5 +1,5 @@
 {
-  "filepath": "../etalogs",
+  "filepath": "./etalogs",
   "appenders": [
     {
       "type": "console",
@@ -29,6 +29,18 @@
       "level": "info",
       "color": true
     },
+    {
+      "filter": "apiRequest",
+      "type": "file",
+      "filename": "api/api.log",
+      "maxlines": 1000000,
+      "maxsize": 102400,
+      "daily": true,
+      "maxdays": 30,
+      "rotate": true,
+      "level": "info",
+      "color": true
+    },
     {
       "filter": "coin,orm....*",
       "type": "file",

+ 41 - 0
controllers/base_controller.go

@@ -7,6 +7,7 @@ import (
 	"eta/eta_mini_ht_api/common/exception"
 	"eta/eta_mini_ht_api/common/http"
 	"github.com/beego/beego/v2/server/web"
+	"net/url"
 )
 
 type WrapData struct {
@@ -87,3 +88,43 @@ func (b *BaseController) SuccessResult(msg string, data interface{}, wrapData *W
 func (b *BaseController) FailedResult(msg string, wrapData *WrapData) {
 	wrapData.Msg = msg
 }
+
+func (b *BaseController) Prepare() {
+	var requestBody string
+	uri := b.Ctx.Input.URI()
+	method := b.Ctx.Input.Method()
+	if method == "GET" {
+		requestBody = b.Ctx.Request.RequestURI
+	} else {
+		requestBody, _ = url.QueryUnescape(string(b.Ctx.Input.RequestBody))
+	}
+	ip := b.Ctx.Input.IP()
+	b.Ctx.Input.URL()
+	logger.Info("apiRequest:[uri:%s, requestBody:%s, ip:%s]", uri, requestBody, ip)
+}
+func (b *BaseController) Finish() {
+	runMode := web.BConfig.RunMode
+	if b.Data["json"] == nil {
+		logger.Warn("apiRequest:[异常提醒:%v 接口:URI:%v;无返回值]", runMode, b.Ctx.Input.URI())
+		return
+	}
+	baseRes := b.Data["json"].(BaseResponse)
+	if !baseRes.Success {
+		logger.Info("apiRequest:[异常提醒:%v接口:URI:%v;ErrMsg:&v;Msg:%v]", b.Ctx.Input.URI(), baseRes.ErrMsg, baseRes.Msg)
+	} else {
+		if baseRes.Data == nil {
+			logger.Warn("apiRequest:[异常提醒:%v 接口:URI:%v;无返回值]", runMode, b.Ctx.Input.URI())
+			return
+		} else {
+			logger.Info("apiRequest:[uri:%s, resData:%s, ip:%s]", b.Ctx.Input.URI(), baseRes.Data)
+		}
+	}
+
+}
+
+type RequestInfo struct {
+	Uri       string `json:"uri"`
+	IpAddress string `json:"ip_address"`
+	Method    string `json:"method"`
+	Params    string `json:"params"`
+}

+ 8 - 15
domian/report/report_service.go

@@ -2,12 +2,12 @@ package report
 
 import (
 	"encoding/json"
-	stringUtils "eta/eta_mini_ht_api/common/utils/string"
-	userService "eta/eta_mini_ht_api/domian/user"
-
 	"eta/eta_mini_ht_api/common/component/es"
 	logger "eta/eta_mini_ht_api/common/component/log"
 	"eta/eta_mini_ht_api/common/utils/page"
+	stringUtils "eta/eta_mini_ht_api/common/utils/string"
+	analystService "eta/eta_mini_ht_api/domian/financial_analyst"
+	userService "eta/eta_mini_ht_api/domian/user"
 	"eta/eta_mini_ht_api/models"
 	"eta/eta_mini_ht_api/models/eta"
 	etaDao "eta/eta_mini_ht_api/models/eta"
@@ -265,13 +265,13 @@ func SyncETAReportList(list []eta.ETAReport) (err error) {
 	logger.Info("生成推送META信息")
 	for _, report := range reports {
 		userIds := userService.GetPostUser(report.Author, report.PublishedTime)
+		author, _ := analystService.GetAnalystByName(report.Author)
 		if len(userIds) > 0 {
 			usersStr := stringUtils.IntToStringSlice(userIds)
-			Meta := reportMeta{
-				AuthorName:    report.Author,
-				PublishedTime: report.PublishedTime,
-				ReportId:      report.ID,
-				Title:         report.Title,
+			Meta := userService.MetaData{
+				AuthorName: report.Author,
+				AuthorId:   author.Id,
+				SourceId:   report.ID,
 			}
 			metaStr, _ := json.Marshal(Meta)
 			toStr := strings.Join(usersStr, ",")
@@ -296,13 +296,6 @@ func SyncETAReportList(list []eta.ETAReport) (err error) {
 	return
 }
 
-type reportMeta struct {
-	ReportId      int    `json:"reportId"`
-	AuthorName    string `json:"authorName"`
-	Title         string `json:"title"`
-	PublishedTime string `json:"publishedTime"`
-}
-
 func GetListOrderByConditionWeekly(week bool, column string, limit int, order models.Order) (dtoList []ReportDTO, err error) {
 	reports, err := reportDao.GetListOrderByCondition(week, column, limit, order)
 	if err != nil {

+ 23 - 14
domian/user/meta_info.go

@@ -1,30 +1,38 @@
 package user
 
-import "eta/eta_mini_ht_api/models/user"
+import (
+	userDao "eta/eta_mini_ht_api/models/user"
+)
 
 type MetaInfoDTO struct {
-	Uid        string
-	Meta       string
-	From       string
-	To         string
-	SourceType string
-	MetaType   string
+	Uid        string `json:"Uid,omitempty"`
+	Meta       string `json:"Meta,omitempty"`
+	From       string `json:"From,omitempty"`
+	To         string `json:"To,omitempty"`
+	SourceType string `json:"SourceType,omitempty"`
+	MetaType   string `json:"MetaType,omitempty"`
+}
+
+type MetaData struct {
+	SourceId   int    `json:"reportId"`
+	AuthorId   int    `json:"AuthorId"`
+	AuthorName string `json:"authorName"`
 }
 
 func CreateMetaInfo(dto MetaInfoDTO) (err error) {
-	return user.CreateMetaInfo(convertToMetaInfo(dto))
+	return userDao.CreateMetaInfo(convertToMetaInfo(dto))
 }
-func convertToMetaInfo(dto MetaInfoDTO) user.MetaInfo {
-	return user.MetaInfo{
+func convertToMetaInfo(dto MetaInfoDTO) userDao.MetaInfo {
+	return userDao.MetaInfo{
 		Uid:        dto.Uid,
 		Meta:       dto.Meta,
 		From:       dto.From,
 		To:         dto.To,
-		SourceType: user.SourceType(dto.SourceType),
-		MetaType:   user.MetaType(dto.MetaType),
+		SourceType: userDao.SourceType(dto.SourceType),
+		MetaType:   userDao.MetaType(dto.MetaType),
 	}
 }
-func convertToMetaDTO(dto user.MetaInfo) MetaInfoDTO {
+func convertToMetaDTO(dto userDao.MetaInfo) MetaInfoDTO {
 	return MetaInfoDTO{
 		Uid:        dto.Uid,
 		Meta:       dto.Meta,
@@ -38,6 +46,7 @@ func convertToMetaDTO(dto user.MetaInfo) MetaInfoDTO {
 func GetInitMetaInfos() (list []MetaInfoDTO) {
 	metas := userDao.GetInitMetaInfos()
 	for _, meta := range metas {
-		list = append(list, convertToUserDTO())
+		list = append(list, convertToMetaDTO(meta))
 	}
+	return
 }

+ 63 - 0
domian/user/user_message_service.go

@@ -0,0 +1,63 @@
+package user
+
+import (
+	"encoding/json"
+	logger "eta/eta_mini_ht_api/common/component/log"
+	userDao "eta/eta_mini_ht_api/models/user"
+	"fmt"
+	"strconv"
+	"strings"
+)
+
+type MessageDTO struct {
+	From    int    `gorm:"column:from"`
+	To      int    `gorm:"column:to"`
+	Message string `gorm:"column:message"`
+	Type    string `gorm:"column:type;type:enum('REPORT','VIDEO','AUDIO')"`
+	Status  string `gorm:"column:status;type:enum('UNREAD','READ')"`
+}
+
+const (
+	ReportMessageTemplate = "您关注的研究员%v更新了一篇报告"
+	VideoMessageTemplate  = "您关注的研究员%v更新了一个视频"
+	AudioMessageTemplate  = "您关注的研究员%v更新了一个音频"
+)
+
+type MessageInfo struct {
+	analyst string
+}
+
+func CreateMessage(meta MetaInfoDTO) (err error) {
+	messageType := userDao.SourceType(meta.SourceType)
+	var messageList []userDao.UserMessage
+	users := strings.Split(meta.To, ",")
+	if err != nil {
+		logger.Error("unmarshal meta info error")
+		return
+	}
+	var message string
+	var content MetaData
+	err = json.Unmarshal([]byte(meta.Meta), &content)
+	switch userDao.SourceType(meta.SourceType) {
+	case userDao.ReportSourceType:
+		message = fmt.Sprintf(ReportMessageTemplate, content.AuthorName)
+		messageType = userDao.ReportSourceType
+	case userDao.VideoSourceType:
+		message = fmt.Sprintf(VideoMessageTemplate, content.AuthorName)
+	case userDao.AudioSourceType:
+		message = fmt.Sprintf(AudioMessageTemplate, content.AuthorName)
+	}
+	for _, user := range users {
+		id, _ := strconv.Atoi(user)
+		userMessage := userDao.UserMessage{
+			From:     content.AuthorId,
+			To:       id,
+			Message:  message,
+			SourceId: content.SourceId,
+			Type:     messageType,
+			Status:   userDao.UnReadStatus,
+		}
+		messageList = append(messageList, userMessage)
+	}
+	return userDao.BatchInsertMessage(messageList)
+}

+ 4 - 2
models/report/report.go

@@ -39,13 +39,15 @@ type Report struct {
 	UpdatedTime   time.Time    `gorm:"column:updated_time;comment:'修改时间'" json:"updated_time"`
 }
 
-func BatchInsertReport(list *[]Report) error {
+func BatchInsertReport(list *[]Report) (err error) {
 	db := models.Main()
 	//手动事务
 	tx := db.Begin()
-	err := db.CreateInBatches(list, MaxBatchNum).Error
+	err = db.CreateInBatches(list, MaxBatchNum).Error
 	if err != nil {
 		logger.Error("批量插入研报失败:%v", err)
+		tx.Rollback()
+		return
 	}
 	tx.Commit()
 	return nil

+ 6 - 0
models/user/meta_info.go

@@ -1,6 +1,7 @@
 package user
 
 import (
+	logger "eta/eta_mini_ht_api/common/component/log"
 	"eta/eta_mini_ht_api/models"
 	"gorm.io/gorm"
 	"time"
@@ -50,4 +51,9 @@ func CreateMetaInfo(metaInfo MetaInfo) (err error) {
 func GetInitMetaInfos() (list []MetaInfo) {
 	db := models.Main()
 	err := db.Where("status = ?", InitStatusType).Order("id asc").Limit(100).Find(&list).Error
+	if err != nil {
+		logger.Error("获取meta数据失败:%v", err)
+		return []MetaInfo{}
+	}
+	return list
 }

+ 53 - 0
models/user/user_message.go

@@ -0,0 +1,53 @@
+package user
+
+import (
+	logger "eta/eta_mini_ht_api/common/component/log"
+	"eta/eta_mini_ht_api/models"
+	"gorm.io/gorm"
+	"time"
+)
+
+type MessageType string
+
+const (
+	UnReadStatus StatusType = "UNREAD"
+	ReadStatus   StatusType = "READ"
+	MaxBatchNum             = 1000
+)
+
+// UserMessage 表示 user_message 表的模型
+type UserMessage struct {
+	Id          int        `gorm:"primaryKey;autoIncrement;column:id"`
+	From        int        `gorm:"column:from"`
+	To          int        `gorm:"column:to"`
+	SourceId    int        `gorm:"column:source_id"`
+	Message     string     `gorm:"column:message"`
+	Type        SourceType `gorm:"column:type;type:enum('REPORT','VIDEO','AUDIO')"`
+	Status      StatusType `gorm:"column:status;type:enum('UNREAD','READ')"`
+	CreatedTime time.Time  `gorm:"column:created_time;type:timestamps;comment:创建时间"`
+	UpdatedTime time.Time  `gorm:"column:updated_time"`
+}
+
+func (u *UserMessage) BeforeCreate(_ *gorm.DB) (err error) {
+	u.CreatedTime = time.Now()
+	return
+}
+
+func CreateMessage(message UserMessage) (err error) {
+	db := models.Main()
+	return db.Create(&message).Error
+}
+
+func BatchInsertMessage(messages []UserMessage) (err error) {
+	db := models.Main()
+	//手动事务
+	tx := db.Begin()
+	err = db.CreateInBatches(messages, MaxBatchNum).Error
+	if err != nil {
+		logger.Error("批量插入消息失败:%v", err)
+		tx.Rollback()
+		return
+	}
+	tx.Commit()
+	return nil
+}

+ 2 - 2
task/eta/report/eta_report_task.go

@@ -12,7 +12,7 @@ import (
 
 var (
 	taskName base.TaskType = "ETAReportSyncTask"
-	cron                   = "0/5 * * * * *"
+	cron                   = "0/10 * * * * *"
 )
 
 var (
@@ -50,6 +50,6 @@ type ReportTask struct {
 }
 
 func init() {
-	reportTask := base.NewTask(taskName, cron, new(ReportTask), base.DEV)
+	reportTask := base.NewTask(taskName, cron, new(ReportTask), base.PROD)
 	base.RegisterTask(&reportTask)
 }

+ 15 - 4
task/message/notice_task.go

@@ -3,7 +3,9 @@ package message
 import (
 	logger "eta/eta_mini_ht_api/common/component/log"
 	"eta/eta_mini_ht_api/common/contants"
+	userService "eta/eta_mini_ht_api/domian/user"
 	"eta/eta_mini_ht_api/task/base"
+	"sync"
 )
 
 var (
@@ -12,16 +14,25 @@ var (
 )
 
 // Execute Task ETA取研报的数据
-func (au *AuthorTask) Execute(taskDetail *base.TaskDetail) error {
+func (au *NoticeTask) Execute(taskDetail *base.TaskDetail) error {
 	logger.Info(contants.TaskFormat, "监听更新通知开始")
-	//metaInfoList := userServcie.getInitMetaInfos()
+	metaInfoList := userService.GetInitMetaInfos()
+	var wg sync.WaitGroup
+	wg.Add(len(metaInfoList))
+	for _, metaInfo := range metaInfoList {
+		go func(metaInfo userService.MetaInfoDTO) {
+			defer wg.Done()
+			userService.CreateMessage(metaInfo)
+		}(metaInfo)
+	}
 	//报告和媒体
 	return nil
 }
 
-type AuthorTask struct {
+type NoticeTask struct {
 }
 
 func init() {
-
+	reportTask := base.NewTask(taskName, cron, new(NoticeTask), base.PROD)
+	base.RegisterTask(&reportTask)
 }

+ 1 - 0
task/task_starter.go

@@ -5,6 +5,7 @@ import (
 	_ "eta/eta_mini_ht_api/task/eta/author"
 	_ "eta/eta_mini_ht_api/task/eta/media"
 	_ "eta/eta_mini_ht_api/task/eta/report"
+	_ "eta/eta_mini_ht_api/task/message"
 	_ "eta/eta_mini_ht_api/task/sms"
 	"github.com/beego/beego/v2/server/web"
 	"github.com/beego/beego/v2/task"