Browse Source

fix:coal邮箱监听

zqbao 3 months ago
parent
commit
953d84134c

+ 2 - 1
go.mod

@@ -12,12 +12,14 @@ require (
 	github.com/emersion/go-message v0.18.1
 	github.com/go-sql-driver/mysql v1.8.0
 	github.com/h2non/filetype v1.1.3
+	github.com/mattn/go-sqlite3 v2.0.3+incompatible
 	github.com/mozillazg/go-pinyin v0.20.0
 	github.com/patrickmn/go-cache v2.1.0+incompatible
 	github.com/rdlucklib/rdluck_tools v1.0.3
 	github.com/shopspring/decimal v1.3.1
 	github.com/tealeg/xlsx v1.0.5
 	github.com/xuri/excelize/v2 v2.8.1
+	golang.org/x/text v0.14.0
 	gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
 )
 
@@ -51,7 +53,6 @@ require (
 	golang.org/x/crypto v0.19.0 // indirect
 	golang.org/x/net v0.21.0 // indirect
 	golang.org/x/sys v0.17.0 // indirect
-	golang.org/x/text v0.14.0 // indirect
 	google.golang.org/protobuf v1.30.0 // indirect
 	gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
 	gopkg.in/yaml.v3 v3.0.1 // indirect

+ 28 - 0
models/db_sqlite.go

@@ -0,0 +1,28 @@
+package models
+
+import (
+	"eta/eta_crawler/models/email"
+	"eta/eta_crawler/utils"
+	"fmt"
+
+	"github.com/beego/beego/v2/client/orm"
+	_ "github.com/mattn/go-sqlite3"
+)
+
+func init() {
+	// 配置数据库连接
+	err := orm.RegisterDataBase("default", "sqlite3", utils.CoalEmailLogDir)
+	if err != nil {
+		fmt.Println("数据库连接失败: ", err)
+		return
+	}
+
+	// 自动创建表
+	orm.RegisterModel(
+		new(email.EmailListenLog),
+	)
+	err = orm.RunSyncdb("default", false, true)
+	if err != nil {
+		fmt.Println("数据库同步失败: ", err)
+	}
+}

+ 36 - 0
models/email/email_listen_log.go

@@ -0,0 +1,36 @@
+package email
+
+import (
+	"time"
+
+	"github.com/beego/beego/v2/client/orm"
+)
+
+type EmailListenLog struct {
+	Id              uint   `orm:"pk;auto"`
+	Title           string `orm:"size(100)"` // 报告标题
+	Author          string `orm:"size(100)"` // 创建人姓名
+	Email           string `orm:"size(200)"` // 创建人邮箱
+	FileName        string `orm:"size(200)"` // 报告文件名
+	EmailMessageUid uint32
+	CreateTime      time.Time // 创建时间
+}
+
+func (e *EmailListenLog) Add() (err error) {
+	db := orm.NewOrm()
+	_, err = db.Insert(e)
+	return
+}
+
+func BatchAddEmailListenLog(emailListenLog []*EmailListenLog) (err error) {
+	db := orm.NewOrm()
+	_, err = db.InsertMulti(100, emailListenLog)
+	return
+}
+
+func GetMaxEmailUIdByEmailMessageId() (maxEmailMessageUid int, err error) {
+	db := orm.NewOrm()
+	sql := "select max(id) from email_listen_log"
+	err = db.Raw(sql).QueryRow(&maxEmailMessageUid)
+	return
+}

+ 88 - 0
services/commodity_coal_watch.go

@@ -0,0 +1,88 @@
+package services
+
+import (
+	"context"
+	"eta/eta_crawler/utils"
+	"fmt"
+	"io/fs"
+	"os"
+	"path/filepath"
+	"strings"
+	"syscall"
+	"time"
+
+	"github.com/patrickmn/go-cache"
+)
+
+func CoalWatchTask(cont context.Context) (err error) {
+	ReadWatchIndexFile()
+	return
+}
+
+func ReadWatchIndexFile() {
+	fmt.Println("ReadWatchIndexFile start")
+	var err error
+	defer func() {
+		if err != nil {
+			fmt.Println("ReadWatchIndexFile Err:" + err.Error())
+		}
+	}()
+	var cacheClient *cache.Cache
+	if cacheClient == nil {
+		cacheClient = cache.New(365*24*time.Hour, 365*24*time.Hour)
+	}
+	err = filepath.Walk(utils.CoalFilePath, func(path string, info fs.FileInfo, err error) error {
+		if err != nil {
+			return err
+		}
+		if !info.IsDir() {
+			fileInfo, err := os.Stat(path)
+			if err != nil {
+				fmt.Println("os.Stat:", err.Error())
+			}
+			winFileAttr := fileInfo.Sys().(*syscall.Win32FileAttributeData)
+			modifyTimeStr := utils.SecondToTime(winFileAttr.LastWriteTime.Nanoseconds() / 1e9).Format(utils.FormatDateTime)
+
+			existModifyTime, ok := cacheClient.Get(path)
+			if ok {
+				existModifyTimeStr := existModifyTime.(string)
+				if existModifyTimeStr != modifyTimeStr {
+					if strings.Contains(path, "442家晋陕蒙煤矿周度产量数据") {
+						err = Jsm(path)
+					} else if strings.Contains(path, "内陆17省动力煤终端用户供耗存") {
+						err = Inland(path)
+					} else if strings.Contains(path, "沿海八省动力煤终端用户供耗存数据更新") {
+						err = Coastal(path)
+					} else if strings.Contains(path, "442家晋陕蒙历史数据") {
+						err = JsmHistory(path)
+					} else if strings.Contains(path, "CⅢ-8-16 25省市库存和日耗情况") {
+						err = CoastalHistory(path)
+						time.Sleep(time.Second * 10)
+						err = InlandHistory(path)
+					} else if strings.Contains(path, "分企业煤炭产量旬度数据") {
+						err = Firm(path)
+					}
+				}
+			} else {
+				if strings.Contains(path, "442家晋陕蒙煤矿周度产量数据") {
+					err = Jsm(path)
+				} else if strings.Contains(path, "内陆17省动力煤终端用户供耗存") {
+					err = Inland(path)
+				} else if strings.Contains(path, "沿海八省动力煤终端用户供耗存数据更新") {
+					err = Coastal(path)
+				} else if strings.Contains(path, "442家晋陕蒙历史数据") {
+					err = JsmHistory(path)
+				} else if strings.Contains(path, "CⅢ-8-16 25省市库存和日耗情况") {
+					err = CoastalHistory(path)
+					time.Sleep(time.Second * 10)
+					err = InlandHistory(path)
+				} else if strings.Contains(path, "分企业煤炭产量旬度数据") {
+					err = Firm(path)
+				}
+			}
+			cacheClient.Delete(path)
+			cacheClient.Set(path, modifyTimeStr, 24*time.Hour)
+		}
+		return nil
+	})
+}

+ 8 - 38
services/email/mail.go

@@ -2,15 +2,14 @@ package email
 
 import (
 	"context"
+	"eta/eta_crawler/models/email"
 	"eta/eta_crawler/utils"
 	"eta/eta_crawler/utils/mail"
 	"fmt"
 	"io/fs"
-	"log"
 	"os"
 	"strconv"
 	"sync"
-	"time"
 )
 
 // 同步用户锁,防止重复同步,不管是全量还是增量,都是同一时间只能一个同步
@@ -37,19 +36,15 @@ func ListenMail(cont context.Context) (err error) {
 	//return
 	lockListenEmail.Lock()
 	// 目录创建
-	_ = ensureDirExists(fmt.Sprintf("%s%s", utils.MtjhFilePath, `file`))
+	_ = ensureDirExists(fmt.Sprintf("%s%s", utils.CoalFilePath, `file`))
 
 	mailMessageChan := make(chan mail.MailMessage, 5) // 创建一个通道,用于接收邮件消息
 	mailMessageDoneChan := make(chan bool, 1)         // 创建一个通道,用于接收邮件消息
 
-	// 邮件监听后的处理函数
-	go afterByListen(mailMessageChan, mailMessageDoneChan)
-
 	fmt.Println("开始监听邮件")
-
 	var emailMessageUID int
-	if utils.MtjhEmailStarIndex != "" {
-		emailMessageUID, err = strconv.Atoi(utils.MtjhEmailStarIndex)
+	if utils.CoalEmailStarIndex != "" {
+		emailMessageUID, err = strconv.Atoi(utils.CoalEmailStarIndex)
 		if err != nil {
 			emailMessageUID = -1
 			utils.FileLog.Warning("读取邮件 MtjhEmailStarIndex 配置失败:%s, 默认改为:%d", err.Error(), emailMessageUID)
@@ -57,46 +52,21 @@ func ListenMail(cont context.Context) (err error) {
 	}
 	if emailMessageUID <= 0 {
 		// 获取最大的邮件id
-		// emailMessageUID, err = report.GetMaxOutsideReportByEmailMessageId()
+		emailMessageUID, err = email.GetMaxEmailUIdByEmailMessageId()
 		// 已经存在了,那么就返回
 		if err != nil {
 			utils.FileLog.Error("获取已入库的最大邮件id失败:%s", err.Error())
 		}
 	}
 	var readBatch int
-	if utils.MtjhEmailReadBatch != "" {
-		readBatch, err = strconv.Atoi(utils.MtjhEmailReadBatch)
+	if utils.CoalEmailReadBatch != "" {
+		readBatch, err = strconv.Atoi(utils.CoalEmailReadBatch)
 		if err != nil {
 			readBatch = 10
 			utils.FileLog.Warning("读取邮件 MtjhEmailReadBatch 配置失败:%s, 默认改为:%d", err.Error(), readBatch)
 		}
 	}
-	mail.ListenMail(utils.MtjhEmailAddress, utils.MtjhEmailFolder, utils.MtjhEmailUseName, utils.MtjhEmailPassword, readBatch, emailMessageUID, mailMessageChan, mailMessageDoneChan)
-	return
-}
-
-func afterByListen(mailMessageChan chan mail.MailMessage, mailMessageDoneChan chan bool) {
-	defer func() {
-		log.Println("监听读取结束")
-		lockListenEmail.Unlock()
-	}()
-	for {
-		select {
-		case emailMessage := <-mailMessageChan:
-			handleMailMessage(emailMessage)
-		case <-time.After(10 * time.Second):
-			return
-		case <-mailMessageDoneChan:
-			for len(mailMessageChan) > 0 {
-				emailMessage := <-mailMessageChan
-				handleMailMessage(emailMessage)
-			}
-			return
-		}
-	}
-}
-
-func handleMailMessage(emailMessage mail.MailMessage) (err error) {
+	mail.ListenMail(utils.CoalEmailAddress, utils.CoalEmailFolder, utils.CoalEmailUseName, utils.CoalEmailPassword, readBatch, emailMessageUID, mailMessageChan, mailMessageDoneChan)
 	return
 }
 

+ 0 - 67
services/mtjh_watch.go

@@ -1,67 +0,0 @@
-package services
-
-import (
-	"context"
-	"eta/eta_crawler/utils"
-	"fmt"
-	"io/fs"
-	"os"
-	"path/filepath"
-	"strings"
-	"syscall"
-	"time"
-
-	"github.com/patrickmn/go-cache"
-)
-
-func MtjhWatchTask(cont context.Context) (err error) {
-	mtjhWatch()
-	return
-}
-
-func mtjhWatch() {
-	fmt.Println("mtjhWatch start")
-	var err error
-	defer func() {
-		if err != nil {
-			fmt.Println("mtjhWatch Err:" + err.Error())
-			utils.FileLog.Info("mtjhWatch Err:" + err.Error())
-		}
-	}()
-	var cacheClient *cache.Cache
-	if cacheClient == nil {
-		cacheClient = cache.New(365*24*time.Hour, 365*24*time.Hour)
-	}
-	err = filepath.Walk(utils.MtjhFilePath, func(path string, info fs.FileInfo, err error) error {
-		if err != nil {
-			return err
-		}
-		if !info.IsDir() {
-			fileInfo, err := os.Stat(path)
-			if err != nil {
-				fmt.Println("os.Stat:", err.Error())
-			}
-			winFileAttr := fileInfo.Sys().(*syscall.Win32FileAttributeData)
-			modifyTimeStr := utils.SecondToTime(winFileAttr.LastWriteTime.Nanoseconds() / 1e9).Format(utils.FormatDateTime)
-
-			existModifyTime, ok := cacheClient.Get(path)
-			if ok {
-				existModifyTimeStr := existModifyTime.(string)
-				if existModifyTimeStr != modifyTimeStr {
-					if strings.Contains(path, "煤炭江湖") {
-						err = Mtjh(path)
-						utils.FileLog.Warning("Mtjh:" + err.Error())
-					}
-				}
-			} else {
-				if strings.Contains(path, "煤炭江湖") {
-					err = Mtjh(path)
-					utils.FileLog.Warning("Mtjh:" + err.Error())
-				}
-			}
-			cacheClient.Delete(path)
-			cacheClient.Set(path, modifyTimeStr, 24*time.Hour)
-		}
-		return nil
-	})
-}

+ 7 - 6
services/task.go

@@ -85,12 +85,13 @@ func Task() {
 	//task.AddTask("统计局数据爬取-季度", refreshNationalQuarter) // 每月15号1:25执行
 	//task.AddTask("统计局数据爬取-年度A", refreshNationalYearA)  // 每月20日1:45执行
 	//task.AddTask("统计局数据爬取-年度B", refreshNationalYearB)  // 每月25日1:45执行
-	// if utils.MtjhOpen == "1" {
-	// 	mtjh := task.NewTask("refreshMtjh", "0 */2 * * * *", MtjhWatchTask)
-	// 	task.AddTask("启动煤炭江湖监听excel脚本", mtjh)
-	// }
-	if utils.MtjhMailAttachmentOpen == "1" {
-		coalMailTask := task.NewTask("MailAttachment", utils.MtjhMailAttachmentTime, email.ListenMail)
+	if utils.CoalOpen == "1" {
+		mtjh := task.NewTask("refreshMtjh", "0 */2 * * * *", CoalWatchTask)
+		task.AddTask("启动中国煤炭网监听excel脚本", mtjh)
+	}
+
+	if utils.CoalMailAttachmentOpen == "1" {
+		coalMailTask := task.NewTask("MailAttachment", utils.CoalMailAttachmentTime, email.ListenMail)
 		task.AddTask("启动获取邮件附件脚本", coalMailTask)
 	}
 

+ 26 - 22
utils/config.go

@@ -52,18 +52,20 @@ var (
 	OLD_EXCEL_PATH_JR string
 )
 
-// 煤炭江湖
+// 中国煤炭网
 var (
-	MtjhFilePath           string // excel文件地址
-	MtjhOpen               string // 是否配置煤炭江湖数据源,1已配置
-	MtjhMailAttachmentOpen string // 获取邮件附件功能,1已配置
-	MtjhMailAttachmentTime string // 获取邮件附件功能时间
-	MtjhEmailAddress       string // 煤炭江湖监听邮箱服务器地址
-	MtjhEmailUseName       string // 煤炭江湖监听邮箱用户名
-	MtjhEmailPassword      string // 煤炭江湖监听邮箱密码
-	MtjhEmailFolder        string // 煤炭江湖监听邮箱文件夹
-	MtjhEmailReadBatch     string // 煤炭江湖监听邮箱读取批次
-	MtjhEmailStarIndex     string // 煤炭江湖监听邮箱索引
+	CoalFilePath           string // excel文件地址
+	CoalOpen               string // 是否配置煤炭江湖数据源,1已配置
+	CoalMailAttachmentOpen string // 获取邮件附件功能,1已配置
+	CoalMailAttachmentTime string // 获取邮件附件功能时间
+	CoalEmailAddress       string // 中国煤炭网监听邮箱服务器地址
+	CoalEmailUseName       string // 中国煤炭网监听邮箱用户名
+	CoalEmailPassword      string // 中国煤炭网监听邮箱密码
+	CoalEmailFolder        string // 中国煤炭网监听邮箱文件夹
+	CoalEmailReadBatch     string // 中国煤炭网监听邮箱读取批次
+	CoalEmailStarIndex     string // 中国煤炭网监听邮箱索引
+	CoalEmailFileExt       string // 中国煤炭网监听文件后缀
+	CoalEmailLogDir        string // 中国煤炭网监听记录路径
 )
 
 func init() {
@@ -147,18 +149,20 @@ func init() {
 		LY_OPEN = config["ly_open"]
 
 	}
-	//煤炭江湖文件夹配置
+	// 中国煤炭网
 	{
-		MtjhFilePath = config["mtjh_file_path"]
-		MtjhOpen = config["mtjh_open"]
-		MtjhMailAttachmentOpen = config["mtjh_mail_attachment_open"]
-		MtjhMailAttachmentTime = config["mtjh_mail_attachment_time"]
-		MtjhEmailAddress = config["mtjh_email_address"]
-		MtjhEmailUseName = config["mtjh_email_use_name"]
-		MtjhEmailPassword = config["mtjh_email_password"]
-		MtjhEmailFolder = config["mtjh_email_folder"]
-		MtjhEmailReadBatch = config["mtjh_email_read_batch"]
-		MtjhEmailStarIndex = config["mtjh_email_star_index"]
+		CoalFilePath = config["coal_file_path"]
+		CoalOpen = config["coal_open"]
+		CoalMailAttachmentOpen = config["coal_mail_attachment_open"]
+		CoalMailAttachmentTime = config["coal_mail_attachment_time"]
+		CoalEmailAddress = config["coal_email_address"]
+		CoalEmailUseName = config["coal_email_use_name"]
+		CoalEmailPassword = config["coal_email_password"]
+		CoalEmailFolder = config["coal_email_folder"]
+		CoalEmailReadBatch = config["coal_email_read_batch"]
+		CoalEmailStarIndex = config["coal_email_star_index"]
+		CoalEmailFileExt = config["coal_email_file_ext"]
+		CoalEmailLogDir = config["coal_email_log_dir"]
 	}
 
 }

+ 37 - 5
utils/mail/imap.go

@@ -2,6 +2,7 @@ package mail
 
 import (
 	"errors"
+	"eta/eta_crawler/models/email"
 	"eta/eta_crawler/utils"
 	"fmt"
 	"io"
@@ -241,6 +242,7 @@ func readEveryMsg(msg *imap.Message) (emailMessage MailMessage, ok bool, err err
 
 	// 过滤
 
+	logList := make([]*email.EmailListenLog, 0)
 	for {
 		p, tmpErr := mr.NextPart()
 		if tmpErr == io.EOF {
@@ -276,7 +278,7 @@ func readEveryMsg(msg *imap.Message) (emailMessage MailMessage, ok bool, err err
 
 				// 确定文件后缀
 				fileSuffix := determineFileSuffix(bodyBytes)
-				fileName := fmt.Sprintf("%s%s.%s", utils.MtjhFilePath, cid[1:len(cid)-1], fileSuffix)
+				fileName := fmt.Sprintf("%s%s.%s", utils.CoalFilePath, cid[1:len(cid)-1], fileSuffix)
 
 				err = SaveToFile(bodyBytes, fileName)
 				if err != nil {
@@ -288,15 +290,28 @@ func readEveryMsg(msg *imap.Message) (emailMessage MailMessage, ok bool, err err
 		case *mail.AttachmentHeader:
 			// 这是一个附件
 			filename, _ := h.Filename()
+			fmt.Printf("读取到到附件: %s \n", filename)
+			utils.FileLog.Info("读取到附件: %s ", filename)
 			//log.Printf("得到附件: %v,content-type:%s \n", filename, p.Header.Get("Content-Type"))
-			saveName := fmt.Sprint(msg.SeqNum, utils.MD5(filename), time.Now().Format(utils.FormatDateTimeUnSpace), time.Now().Nanosecond(), path.Ext(filename))
-			filePath := fmt.Sprintf("%s%s%s%s", utils.MtjhFilePath, `file`, string(os.PathSeparator), saveName)
+			if !IsMatchExt(filename) {
+				continue
+			}
+			filePath := fmt.Sprintf("%s%s%s%s", utils.CoalFilePath, `file`, string(os.PathSeparator), filename)
 			err = SaveToFile(bodyBytes, filePath)
 			if err != nil {
 				err = errors.New(fmt.Sprintf("保存文件时出现错误:%v \n", err))
 				return
 			}
-
+			fmt.Printf("保存到文件: %s \n", filePath)
+			utils.FileLog.Info("保存到文件: %s ", filePath)
+			logList = append(logList, &email.EmailListenLog{
+				Title:           emailMessage.Title,
+				Author:          emailMessage.From,
+				Email:           emailMessage.FromAddress,
+				EmailMessageUid: emailMessage.Uid,
+				FileName:        filename,
+				CreateTime:      time.Now(),
+			})
 			// 这是附件资源
 			if contentDisposition := p.Header.Get("Content-Disposition"); contentDisposition != "" {
 				if strings.HasPrefix(contentDisposition, "attachment") {
@@ -306,6 +321,7 @@ func readEveryMsg(msg *imap.Message) (emailMessage MailMessage, ok bool, err err
 				// 这是内嵌资源
 				emailMessage.Resources[cid] = filePath
 			}
+
 			//else {
 			//	mailMessage.Attachment[filename] = filePath
 			//}
@@ -318,7 +334,10 @@ func readEveryMsg(msg *imap.Message) (emailMessage MailMessage, ok bool, err err
 	if emailMessage.Content == `` {
 		emailMessage.Content = textStr
 	}
-
+	err = email.BatchAddEmailListenLog(logList)
+	if err != nil {
+		utils.FileLog.Error("邮件日志保存失败:%v \n", err)
+	}
 	//log.Println("一封邮件读取完毕")
 	//log.Printf("------------------------- \n\n")
 
@@ -367,3 +386,16 @@ func ContainsWholeWord(s string, word string) bool {
 	re := regexp.MustCompile(pattern)
 	return re.MatchString(s)
 }
+
+func IsMatchExt(filename string) (ok bool) {
+	exts := utils.CoalEmailFileExt
+	extArr := strings.Split(exts, "|")
+	for _, ext := range extArr {
+		ex := strings.ToLower(path.Ext(filename))
+		if ext == ex {
+			ok = true
+			return
+		}
+	}
+	return
+}