Browse Source

fix:修改中国煤炭网邮箱监听

zqbao 3 months ago
parent
commit
6f3ed3a026
5 changed files with 28 additions and 138 deletions
  1. 0 28
      models/db_sqlite.go
  2. 0 36
      models/email/email_listen_log.go
  3. 11 45
      services/email/mail.go
  4. 3 5
      utils/config.go
  5. 14 24
      utils/mail/imap.go

+ 0 - 28
models/db_sqlite.go

@@ -1,28 +0,0 @@
-package models
-
-import (
-	"eta/eta_crawler/models/email"
-	"eta/eta_crawler/utils"
-	"fmt"
-
-	"github.com/beego/beego/v2/client/orm"
-	_ "github.com/mattn/go-sqlite3"
-)
-
-func init() {
-	// 配置数据库连接
-	err := orm.RegisterDataBase("default", "sqlite3", utils.CoalEmailLogDir)
-	if err != nil {
-		fmt.Println("数据库连接失败: ", err)
-		return
-	}
-
-	// 自动创建表
-	orm.RegisterModel(
-		new(email.EmailListenLog),
-	)
-	err = orm.RunSyncdb("default", false, true)
-	if err != nil {
-		fmt.Println("数据库同步失败: ", err)
-	}
-}

+ 0 - 36
models/email/email_listen_log.go

@@ -1,36 +0,0 @@
-package email
-
-import (
-	"time"
-
-	"github.com/beego/beego/v2/client/orm"
-)
-
-type EmailListenLog struct {
-	Id              uint   `orm:"pk;auto"`
-	Title           string `orm:"size(100)"` // 报告标题
-	Author          string `orm:"size(100)"` // 创建人姓名
-	Email           string `orm:"size(200)"` // 创建人邮箱
-	FileName        string `orm:"size(200)"` // 报告文件名
-	EmailMessageUid uint32
-	CreateTime      time.Time // 创建时间
-}
-
-func (e *EmailListenLog) Add() (err error) {
-	db := orm.NewOrm()
-	_, err = db.Insert(e)
-	return
-}
-
-func BatchAddEmailListenLog(emailListenLog []*EmailListenLog) (err error) {
-	db := orm.NewOrm()
-	_, err = db.InsertMulti(100, emailListenLog)
-	return
-}
-
-func GetMaxEmailUIdByEmailMessageId() (maxEmailMessageUid int, err error) {
-	db := orm.NewOrm()
-	sql := "select max(email_message_uid) from email_listen_log"
-	err = db.Raw(sql).QueryRow(&maxEmailMessageUid)
-	return
-}

+ 11 - 45
services/email/mail.go

@@ -2,7 +2,6 @@ package email
 
 import (
 	"context"
-	"eta/eta_crawler/models/email"
 	"eta/eta_crawler/utils"
 	"eta/eta_crawler/utils/mail"
 	"fmt"
@@ -10,69 +9,36 @@ import (
 	"os"
 	"strconv"
 	"sync"
-	"time"
 )
 
 // 同步用户锁,防止重复同步,不管是全量还是增量,都是同一时间只能一个同步
 var lockListenEmail sync.Mutex
 
 func ListenMail(cont context.Context) (err error) {
-	ctx, cancel := context.WithTimeout(cont, time.Minute*1)
-	defer cancel()
-
-	if !lockWithTimeout(ctx) {
-		utils.FileLog.Info("煤炭中国获取锁超时")
-		return
-	}
-	defer lockListenEmail.Unlock()
-
-	listenMail()
-	return
-}
-
-func lockWithTimeout(ctx context.Context) (ok bool) {
-	lockDone := make(chan struct{})
-	go func() {
-		lockListenEmail.Lock()
-		close(lockDone)
-	}()
-	select {
-	case <-lockDone:
-		ok = true
-		return
-	case <-ctx.Done():
-		return
-	}
-}
-
-func listenMail() (err error) {
 	defer func() {
+		lockListenEmail.Unlock()
 		if err != nil {
 			utils.FileLog.Error("监听邮件失败:%s", err.Error())
 		}
 	}()
+	lockListenEmail.Lock()
 	// 目录创建
 	_ = ensureDirExists(fmt.Sprintf("%s%s", utils.CoalFilePath, `file`))
 
 	fmt.Println("开始监听邮件")
 	utils.FileLog.Info("中国煤炭网开始监听邮件")
-	var emailMessageUID int
-	if utils.CoalEmailStarIndex != "" {
-		emailMessageUID, err = strconv.Atoi(utils.CoalEmailStarIndex)
+	var lastNday int
+	if utils.CoalEmailNDay != "" {
+		lastNday, _ = strconv.Atoi(utils.CoalEmailNDay)
 		if err != nil {
-			emailMessageUID = -1
-			utils.FileLog.Warning("读取邮件 MtjhEmailStarIndex 配置失败:%s, 默认改为:%d", err.Error(), emailMessageUID)
+			lastNday = 1
 		}
 	}
-	if emailMessageUID <= 0 {
-		// 获取最大的邮件id
-		emailMessageUID, err = email.GetMaxEmailUIdByEmailMessageId()
-		// 已经存在了,那么就返回
-		if err != nil {
-			utils.FileLog.Error("获取已入库的最大邮件id失败:%s", err.Error())
-		}
+	if lastNday < 0 {
+		lastNday = 1
 	}
-	utils.FileLog.Info("中国煤炭监听配置为:CoalEmailAddress:%s, CoalEmailFolder:%s, emailMessageUID:%d", utils.CoalEmailAddress, utils.CoalEmailFolder, emailMessageUID)
+
+	utils.FileLog.Info("中国煤炭监听配置为:CoalEmailAddress:%s, CoalEmailFolder:%s, lastNday:%d", utils.CoalEmailAddress, utils.CoalEmailFolder, lastNday)
 	var readBatch int
 	if utils.CoalEmailReadBatch != "" {
 		readBatch, err = strconv.Atoi(utils.CoalEmailReadBatch)
@@ -81,7 +47,7 @@ func listenMail() (err error) {
 			utils.FileLog.Warning("读取邮件 MtjhEmailReadBatch 配置失败:%s, 默认改为:%d", err.Error(), readBatch)
 		}
 	}
-	mail.ListenMail(utils.CoalEmailAddress, utils.CoalEmailFolder, utils.CoalEmailUseName, utils.CoalEmailPassword, readBatch, emailMessageUID)
+	mail.ListenMail(utils.CoalEmailAddress, utils.CoalEmailFolder, utils.CoalEmailUseName, utils.CoalEmailPassword, readBatch, lastNday)
 	return
 }
 

+ 3 - 5
utils/config.go

@@ -55,7 +55,7 @@ var (
 // 中国煤炭网
 var (
 	CoalFilePath           string // excel文件地址
-	CoalOpen               string // 是否配置煤炭江湖数据源,1已配置
+	CoalOpen               string // 是否配置中国煤炭网数据源,1已配置
 	CoalMailAttachmentOpen string // 获取邮件附件功能,1已配置
 	CoalMailAttachmentTime string // 获取邮件附件功能时间
 	CoalEmailAddress       string // 中国煤炭网监听邮箱服务器地址
@@ -63,9 +63,8 @@ var (
 	CoalEmailPassword      string // 中国煤炭网监听邮箱密码
 	CoalEmailFolder        string // 中国煤炭网监听邮箱文件夹
 	CoalEmailReadBatch     string // 中国煤炭网监听邮箱读取批次
-	CoalEmailStarIndex     string // 中国煤炭网监听邮箱索引
 	CoalEmailFileExt       string // 中国煤炭网监听文件后缀
-	CoalEmailLogDir        string // 中国煤炭网监听记录路径
+	CoalEmailNDay          string // 中国煤炭网监听取最近N天数据
 )
 
 func init() {
@@ -160,9 +159,8 @@ func init() {
 		CoalEmailPassword = config["coal_email_password"]
 		CoalEmailFolder = config["coal_email_folder"]
 		CoalEmailReadBatch = config["coal_email_read_batch"]
-		CoalEmailStarIndex = config["coal_email_star_index"]
 		CoalEmailFileExt = config["coal_email_file_ext"]
-		CoalEmailLogDir = config["coal_email_log_dir"]
+		CoalEmailNDay = config["coal_email_n_day"]
 	}
 
 }

+ 14 - 24
utils/mail/imap.go

@@ -1,7 +1,6 @@
 package mail
 
 import (
-	"eta/eta_crawler/models/email"
 	"eta/eta_crawler/utils"
 	"fmt"
 	"io"
@@ -30,7 +29,7 @@ type MailMessage struct {
 	Attachment  map[string][]byte `description:"附件资源"`
 }
 
-func ListenMail(mailAddress, folder, userName, password string, readBatchSize, fromEmailIndex int) (err error) { // 收件箱
+func ListenMail(mailAddress, folder, userName, password string, readBatchSize, lastNday int) (err error) { // 收件箱
 	defer func() {
 		// 处理结束
 		if err != nil {
@@ -100,6 +99,9 @@ func ListenMail(mailAddress, folder, userName, password string, readBatchSize, f
 	seqSet := new(imap.SeqSet)
 	to := mbox.Messages // 此文件下的邮件总数
 
+	now := time.Now()
+	startTime := time.Date(now.Year(), now.Month(), now.Day()-lastNday, 0, 0, 0, 0, time.Local)
+
 	var isStopFor bool
 	step := uint32(1)
 	for i := to; i >= 1; {
@@ -128,17 +130,20 @@ func ListenMail(mailAddress, folder, userName, password string, readBatchSize, f
 		}
 		// 获取邮件内容 End
 		for msg := range messages {
-			utils.FileLog.Info("正在读取邮件uid: %d", msg.Uid)
 			// 如果需要终止,那么就不处理了
 			if isStopFor {
 				continue
 			}
-
-			// 判断当前邮件id是否小于等于已经监听到的最小id,如果是,那么就不处理了
-			if msg.Uid <= uint32(fromEmailIndex) {
-				isStopFor = true
-				break
+			// 判断当前邮件收件时间是否小于设定的时间,如果是,那么就不处理了
+			envelope := msg.Envelope
+			if envelope != nil {
+				if envelope.Date.Before(startTime) {
+					continue
+				}
+			} else {
+				continue
 			}
+			utils.FileLog.Info("正在读取邮件uid: %d", msg.Uid)
 
 			emailMessage, tmpErr := readEveryMsg(msg)
 			if tmpErr != nil {
@@ -223,13 +228,10 @@ func readEveryMsg(msg *imap.Message) (emailMessage MailMessage, err error) {
 	// 邮件标题
 	subject, err := mr.Header.Subject()
 	if err != nil {
-		utils.FileLog.Warning("邮件主题 Subject ERR:", err)
+		utils.FileLog.Warning("邮件主题 Subject ERR:%v", err)
 	}
 	emailMessage.Title = subject
 
-	// 过滤
-
-	logList := make([]*email.EmailListenLog, 0)
 	for {
 		p, tmpErr := mr.NextPart()
 		if tmpErr == io.EOF {
@@ -285,14 +287,6 @@ func readEveryMsg(msg *imap.Message) (emailMessage MailMessage, err error) {
 			}
 			fmt.Printf("保存到文件: %s \n", filePath)
 			utils.FileLog.Info("保存到文件: %s ", filePath)
-			logList = append(logList, &email.EmailListenLog{
-				Title:           emailMessage.Title,
-				Author:          emailMessage.From,
-				Email:           emailMessage.FromAddress,
-				EmailMessageUid: emailMessage.Uid,
-				FileName:        filename,
-				CreateTime:      time.Now(),
-			})
 			// 这是附件资源
 			if contentDisposition := p.Header.Get("Content-Disposition"); contentDisposition != "" {
 				if strings.HasPrefix(contentDisposition, "attachment") {
@@ -311,10 +305,6 @@ func readEveryMsg(msg *imap.Message) (emailMessage MailMessage, err error) {
 	if emailMessage.Content == `` {
 		emailMessage.Content = textStr
 	}
-	err = email.BatchAddEmailListenLog(logList)
-	if err != nil {
-		utils.FileLog.Error("邮件日志保存失败:%v", err)
-	}
 
 	return
 }