|
- package pcsg
- import (
- "encoding/json"
- "eta/eta_email_analysis/global"
- "eta/eta_email_analysis/models/eta"
- "eta/eta_email_analysis/models/report"
- "eta/eta_email_analysis/services/oss"
- "eta/eta_email_analysis/utils"
- "eta/eta_email_analysis/utils/mail"
- "fmt"
- "github.com/google/uuid"
- "html"
- "io/fs"
- "log"
- "os"
- "path"
- "strings"
- "sync"
- "time"
- )
- // 同步用户锁,防止重复同步,不管是全量还是增量,都是同一时间只能一个同步
- var lockListenEmail sync.Mutex
- // 邮件改名规则
- var ruleList []report.MailRule
- func ListenMail() {
- var err error
- defer func() {
- if err != nil {
- global.FILE_LOG.Errorf("监听邮件失败:%s", err.Error())
- }
- }()
- //Fix()
- //return
- lockListenEmail.Lock()
- // 目录创建
- _ = ensureDirExists(fmt.Sprintf("%s%s", global.CONFIG.Serve.StaticDir, `file`))
- mailMessageChan := make(chan mail.MailMessage, 5) // 创建一个通道,用于接收邮件消息
- mailMessageDoneChan := make(chan bool, 1) // 创建一个通道,用于接收邮件消息
- //fmt.Println(len(mailMessageChan))
- // 获取规则
- ruleList = []report.MailRule{}
- ruleJson, e := eta.GetBusinessConfByKey("MailCheckRule")
- if e != nil {
- err = e
- global.FILE_LOG.Error("获取规则配置失败:%v", err)
- return
- }
- err = json.Unmarshal([]byte(ruleJson.ConfVal), &ruleList)
- if err != nil {
- global.FILE_LOG.Error("解析规则配置失败:%v", err)
- return
- }
- // 邮件监听后的处理函数
- go afterByListen(mailMessageChan, mailMessageDoneChan)
- fmt.Println("开始监听邮件")
- emailMessageUID := global.CONFIG.Email.StartReadIndex
- if emailMessageUID <= 0 {
- emailMessageUID, err = report.GetMaxOutsideReportByEmailMessageId()
- // 已经存在了,那么就返回
- if err != nil {
- global.FILE_LOG.Errorf("获取已入库的最大邮件id失败:%s", err.Error())
- }
- }
- mailAddress := fmt.Sprintf("%s:%d", global.CONFIG.Email.Host, global.CONFIG.Email.Port)
- mail.ListenMail(mailAddress, global.CONFIG.Email.Folder, global.CONFIG.Email.UserName, global.CONFIG.Email.Password, global.CONFIG.Email.ReadBatchSize, emailMessageUID, mailMessageChan, mailMessageDoneChan)
- return
- }
- func afterByListen(mailMessageChan chan mail.MailMessage, mailMessageDoneChan chan bool) {
- defer func() {
- log.Println("监听读取结束")
- lockListenEmail.Unlock()
- }()
- for {
- select {
- case emailMessage := <-mailMessageChan:
- //fmt.Println("读取成功")
- //fmt.Println(emailMessage.Title)
- handleMailMessage(emailMessage)
- case <-time.After(10 * time.Second):
- //fmt.Println("监听超时了")
- break
- case <-mailMessageDoneChan:
- //fmt.Println("读取完成一轮了")
- for len(mailMessageChan) > 0 {
- emailMessage := <-mailMessageChan
- handleMailMessage(emailMessage)
- }
- //fmt.Println("结束了")
- return
- }
- }
- }
- func handleMailMessage(emailMessage mail.MailMessage) (err error) {
- defer func() {
- if err != nil {
- global.FILE_LOG.Errorf("邮件处理失败,邮件标题:%s,错误原因:%v", emailMessage.Title, err)
- }
- for _, v := range emailMessage.Resources {
- os.Remove(v)
- }
- for _, v := range emailMessage.Attachment {
- os.Remove(v)
- }
- }()
- //rootPath := `C:\Users\123\go\src\eta\eta_email_analysis\static\`
- ossClient := oss.NewOssClient()
- //fmt.Println(emailMessage.Title)
- outsideReportAttachmentList := make([]*report.OutsideReportAttachment, 0)
- emailMessageUID := int(emailMessage.Uid)
- outReport, err := report.GetOutsideReportByEmailMessageId(emailMessageUID)
- // 已经存在了,那么就返回
- if err == nil {
- if outReport.ReportUpdateTime != emailMessage.Date {
- // 如果更新时间不一样,那么就更新收件时间
- err = outReport.Update([]string{"ReportUpdateTime"})
- if err != nil {
- global.FILE_LOG.Errorf("更新收件时间失败:%s", err.Error())
- return
- }
- global.FILE_LOG.Debugf( "更新收件时间,名称:%s, 原时间:%s, 新收件时间: %s", outReport.Title, outReport.ReportUpdateTime, emailMessage.Date)
- }
- global.FILE_LOG.Debugf("已存在,就不处理了,报告标题:%s;;邮件下标:%d", outReport.Title, emailMessage.Uid)
- return
- }
- fmt.Println("开始处理邮件,标题:", emailMessage.Title, ";邮件下标:", emailMessage.Uid)
- // sql报错,那么就返回
- if err != nil && !utils.IsErrNoRow(err) {
- return
- }
- // 已经存在了,那么就返回
- if outReport.OutsideReportID > 0 {
- return
- }
- title := emailMessage.Title
- var abstract string
- var classifyId int
- var classifyName string
- var sysUserId int
- var sysUserName string
- sysUserName = emailMessage.FromAddress
- // 查找用户
- if emailMessage.From != `` {
- tmpEmailStrList := strings.Split(sysUserName, "<")
- if len(tmpEmailStrList) >= 2 {
- sysUserName = tmpEmailStrList[1]
- tmpEmailStrList = strings.Split(sysUserName, ">")
- if len(tmpEmailStrList) >= 2 {
- sysUserName = tmpEmailStrList[0]
- }
- }
- sysUser, tmpErr := eta.GetSysUserByEmail(sysUserName)
- if tmpErr != nil && !utils.IsErrNoRow(tmpErr) {
- err = tmpErr
- return
- }
- if tmpErr == nil {
- sysUserId = sysUser.AdminId
- sysUserName = sysUser.RealName
- }
- }
- title = strings.Replace(title, "[PCI ANALYTICS CLOUD PLATFORM(00) Fwd] - ", "", -1)
- for _, v := range ruleList {
- tmpTitle := strings.ToLower(emailMessage.Title)
- rule := strings.ToLower(v.Rule)
- if strings.Contains(tmpTitle, rule) {
- title = v.Title
- abstract = v.Abstract
- classifyId = v.ClassifyId
- classify, e := report.GetClassifyByClassifyId(v.ClassifyId)
- if e != nil {
- err = e
- global.FILE_LOG.Error("获取分类失败:", err)
- return
- }
- classifyName = classify.ClassifyName
- // 特殊规则
- if strings.Contains(title, "海外分公司市场信息月报") || strings.Contains(title, "区域市场信息月报"){
- if emailMessage.From == "report.pcanalyst00@petrochina-usa.com" {
- sysUserName = "PCI"
- break
- } else {
- v.Author = emailMessage.From
- }
- }
- if strings.Contains(v.Author, "@") {
- sysUser, tmpErr := eta.GetSysUserByEmail(v.Author)
- if tmpErr != nil && !utils.IsErrNoRow(tmpErr) {
- err = tmpErr
- return
- }
- if sysUser.AdminId > 0 {
- sysUserId = sysUser.AdminId
- sysUserName = sysUser.RealName
- } else {
- sysUserName = v.Author
- }
- } else {
- sysUserName = v.Author
- }
- break
- }
- }
- for k, v := range emailMessage.Resources {
- randStr := utils.GetRandStringNoSpecialChar(28)
- ext := path.Ext(v)
- fileName := randStr + ext
- resourceUrl, tmpErr := ossClient.UploadFile(fileName, v, "")
- //os.Remove(v)
- if tmpErr != nil {
- global.FILE_LOG.Error(emailMessage.Title, "- 文件上传文件失败:", tmpErr)
- continue
- }
- tmpK := strings.Replace(k, "<", "cid:", -1)
- tmpK = strings.Replace(tmpK, ">", "", -1)
- emailMessage.Content = strings.Replace(emailMessage.Content, fmt.Sprint(tmpK, `"`), fmt.Sprint(resourceUrl, `"`), -1)
- }
- for name, v := range emailMessage.Attachment {
- var fileSize int64
- fileInfo, tmpErr := os.Stat(v)
- if tmpErr != nil {
- continue
- }
- fileSize = fileInfo.Size()
- randStr := utils.GetRandStringNoSpecialChar(28)
- ext := path.Ext(v)
- fileName := randStr + ext
- resourceUrl, tmpErr := ossClient.UploadFile(fileName, v, "")
- //defer os.Remove(v)
- if tmpErr != nil {
- global.FILE_LOG.Error(emailMessage.Title, " - 文件上传文件失败:", tmpErr)
- continue
- }
- outsideReportAttachmentList = append(outsideReportAttachmentList, &report.OutsideReportAttachment{
- OutsideReportAttachmentID: 0,
- OutsideReportID: 0,
- Title: name,
- URL: resourceUrl,
- CreateTime: time.Now(),
- FileSize: fileSize,
- })
- }
- //fileName := fmt.Sprintf("%s%s.%s", rootPath, emailMessage.Title, "html")
- htmlEscapeBody := html.EscapeString(emailMessage.Content)
- //fmt.Println(htmlEscapeBody)
- //err = utils.SaveToFile([]byte(emailMessage.Content), fileName)
- //if err != nil {
- // fmt.Println(fileName, "生成失败;err:", err)
- //}
- reportUpdateTime := emailMessage.Date
- if reportUpdateTime.IsZero() {
- reportUpdateTime = time.Now()
- }
- reportInfo := &report.OutsideReport{
- OutsideReportID: 0,
- Source: 3,
- Title: title,
- Abstract: abstract,
- ClassifyID: classifyId,
- ClassifyName: classifyName,
- Content: htmlEscapeBody,
- SysUserID: sysUserId,
- SysUserName: sysUserName,
- EmailMessageUID: emailMessageUID,
- ReportUpdateTime: reportUpdateTime,
- ModifyTime: time.Now(),
- CreateTime: time.Now(),
- ReportCode: uuid.New().String(),
- }
- err = report.CreateOutsideReport(reportInfo, outsideReportAttachmentList)
- return
- }
- func ensureDirExists(dirPath string) error {
- info, err := os.Stat(dirPath)
- if err == nil {
- if info.IsDir() {
- return nil // 目录已存在
- }
- return fmt.Errorf("path '%s' exists but is not a directory", dirPath)
- }
- if os.IsNotExist(err) {
- return os.MkdirAll(dirPath, fs.ModePerm)
- }
- return err
- }
|