package pcsg import ( "encoding/json" "eta/eta_email_analysis/global" "eta/eta_email_analysis/models/eta" "eta/eta_email_analysis/models/report" "eta/eta_email_analysis/services/oss" "eta/eta_email_analysis/utils" "eta/eta_email_analysis/utils/mail" "fmt" "github.com/google/uuid" "html" "io/fs" "log" "os" "path" "strings" "sync" "time" ) // 同步用户锁,防止重复同步,不管是全量还是增量,都是同一时间只能一个同步 var lockListenEmail sync.Mutex // 邮件改名规则 var ruleList []report.MailRule func ListenMail() { var err error defer func() { if err != nil { global.FILE_LOG.Errorf("监听邮件失败:%s", err.Error()) } }() //Fix() //return lockListenEmail.Lock() // 目录创建 _ = ensureDirExists(fmt.Sprintf("%s%s", global.CONFIG.Serve.StaticDir, `file`)) mailMessageChan := make(chan mail.MailMessage, 5) // 创建一个通道,用于接收邮件消息 mailMessageDoneChan := make(chan bool, 1) // 创建一个通道,用于接收邮件消息 //fmt.Println(len(mailMessageChan)) // 获取规则 ruleList = []report.MailRule{} ruleJson, e := eta.GetBusinessConfByKey("MailCheckRule") if e != nil { err = e global.FILE_LOG.Error("获取规则配置失败:%v", err) return } err = json.Unmarshal([]byte(ruleJson.ConfVal), &ruleList) if err != nil { global.FILE_LOG.Error("解析规则配置失败:%v", err) return } // 邮件监听后的处理函数 go afterByListen(mailMessageChan, mailMessageDoneChan) fmt.Println("开始监听邮件") emailMessageUID := global.CONFIG.Email.StartReadIndex if emailMessageUID <= 0 { emailMessageUID, err = report.GetMaxOutsideReportByEmailMessageId() // 已经存在了,那么就返回 if err != nil { global.FILE_LOG.Errorf("获取已入库的最大邮件id失败:%s", err.Error()) } } mailAddress := fmt.Sprintf("%s:%d", global.CONFIG.Email.Host, global.CONFIG.Email.Port) mail.ListenMail(mailAddress, global.CONFIG.Email.Folder, global.CONFIG.Email.UserName, global.CONFIG.Email.Password, global.CONFIG.Email.ReadBatchSize, emailMessageUID, mailMessageChan, mailMessageDoneChan) return } func afterByListen(mailMessageChan chan mail.MailMessage, mailMessageDoneChan chan bool) { defer func() { log.Println("监听读取结束") lockListenEmail.Unlock() }() for { select { case emailMessage := <-mailMessageChan: //fmt.Println("读取成功") //fmt.Println(emailMessage.Title) handleMailMessage(emailMessage) case <-time.After(10 * time.Second): //fmt.Println("监听超时了") break case <-mailMessageDoneChan: //fmt.Println("读取完成一轮了") for len(mailMessageChan) > 0 { emailMessage := <-mailMessageChan handleMailMessage(emailMessage) } //fmt.Println("结束了") return } } } func handleMailMessage(emailMessage mail.MailMessage) (err error) { defer func() { if err != nil { global.FILE_LOG.Errorf("邮件处理失败,邮件标题:%s,错误原因:%v", emailMessage.Title, err) } for _, v := range emailMessage.Resources { os.Remove(v) } for _, v := range emailMessage.Attachment { os.Remove(v) } }() //rootPath := `C:\Users\123\go\src\eta\eta_email_analysis\static\` ossClient := oss.NewOssClient() //fmt.Println(emailMessage.Title) outsideReportAttachmentList := make([]*report.OutsideReportAttachment, 0) emailMessageUID := int(emailMessage.Uid) outReport, err := report.GetOutsideReportByEmailMessageId(emailMessageUID) // 已经存在了,那么就返回 if err == nil { if outReport.ReportUpdateTime != emailMessage.Date { // 如果更新时间不一样,那么就更新收件时间 err = outReport.Update([]string{"ReportUpdateTime"}) if err != nil { global.FILE_LOG.Errorf("更新收件时间失败:%s", err.Error()) return } global.FILE_LOG.Debugf( "更新收件时间,名称:%s, 原时间:%s, 新收件时间: %s", outReport.Title, outReport.ReportUpdateTime, emailMessage.Date) } global.FILE_LOG.Debugf("已存在,就不处理了,报告标题:%s;;邮件下标:%d", outReport.Title, emailMessage.Uid) return } fmt.Println("开始处理邮件,标题:", emailMessage.Title, ";邮件下标:", emailMessage.Uid) // sql报错,那么就返回 if err != nil && !utils.IsErrNoRow(err) { return } // 已经存在了,那么就返回 if outReport.OutsideReportID > 0 { return } title := emailMessage.Title var abstract string var classifyId int var classifyName string var sysUserId int var sysUserName string sysUserName = emailMessage.FromAddress // 查找用户 if emailMessage.From != `` { tmpEmailStrList := strings.Split(sysUserName, "<") if len(tmpEmailStrList) >= 2 { sysUserName = tmpEmailStrList[1] tmpEmailStrList = strings.Split(sysUserName, ">") if len(tmpEmailStrList) >= 2 { sysUserName = tmpEmailStrList[0] } } sysUser, tmpErr := eta.GetSysUserByEmail(sysUserName) if tmpErr != nil && !utils.IsErrNoRow(tmpErr) { err = tmpErr return } if tmpErr == nil { sysUserId = sysUser.AdminId sysUserName = sysUser.RealName } } title = strings.Replace(title, "[PCI ANALYTICS CLOUD PLATFORM(00) Fwd] - ", "", -1) for _, v := range ruleList { tmpTitle := strings.ToLower(emailMessage.Title) rule := strings.ToLower(v.Rule) if strings.Contains(tmpTitle, rule) { title = v.Title abstract = v.Abstract classifyId = v.ClassifyId classify, e := report.GetClassifyByClassifyId(v.ClassifyId) if e != nil { err = e global.FILE_LOG.Error("获取分类失败:", err) return } classifyName = classify.ClassifyName // 特殊规则 if strings.Contains(title, "海外分公司市场信息月报") || strings.Contains(title, "区域市场信息月报"){ if emailMessage.From == "report.pcanalyst00@petrochina-usa.com" { sysUserName = "PCI" break } else { v.Author = emailMessage.From } } if strings.Contains(v.Author, "@") { sysUser, tmpErr := eta.GetSysUserByEmail(v.Author) if tmpErr != nil && !utils.IsErrNoRow(tmpErr) { err = tmpErr return } if sysUser.AdminId > 0 { sysUserId = sysUser.AdminId sysUserName = sysUser.RealName } else { sysUserName = v.Author } } else { sysUserName = v.Author } break } } for k, v := range emailMessage.Resources { randStr := utils.GetRandStringNoSpecialChar(28) ext := path.Ext(v) fileName := randStr + ext resourceUrl, tmpErr := ossClient.UploadFile(fileName, v, "") //os.Remove(v) if tmpErr != nil { global.FILE_LOG.Error(emailMessage.Title, "- 文件上传文件失败:", tmpErr) continue } tmpK := strings.Replace(k, "<", "cid:", -1) tmpK = strings.Replace(tmpK, ">", "", -1) emailMessage.Content = strings.Replace(emailMessage.Content, fmt.Sprint(tmpK, `"`), fmt.Sprint(resourceUrl, `"`), -1) } for name, v := range emailMessage.Attachment { var fileSize int64 fileInfo, tmpErr := os.Stat(v) if tmpErr != nil { continue } fileSize = fileInfo.Size() randStr := utils.GetRandStringNoSpecialChar(28) ext := path.Ext(v) fileName := randStr + ext resourceUrl, tmpErr := ossClient.UploadFile(fileName, v, "") //defer os.Remove(v) if tmpErr != nil { global.FILE_LOG.Error(emailMessage.Title, " - 文件上传文件失败:", tmpErr) continue } outsideReportAttachmentList = append(outsideReportAttachmentList, &report.OutsideReportAttachment{ OutsideReportAttachmentID: 0, OutsideReportID: 0, Title: name, URL: resourceUrl, CreateTime: time.Now(), FileSize: fileSize, }) } //fileName := fmt.Sprintf("%s%s.%s", rootPath, emailMessage.Title, "html") htmlEscapeBody := html.EscapeString(emailMessage.Content) //fmt.Println(htmlEscapeBody) //err = utils.SaveToFile([]byte(emailMessage.Content), fileName) //if err != nil { // fmt.Println(fileName, "生成失败;err:", err) //} reportUpdateTime := emailMessage.Date if reportUpdateTime.IsZero() { reportUpdateTime = time.Now() } reportInfo := &report.OutsideReport{ OutsideReportID: 0, Source: 3, Title: title, Abstract: abstract, ClassifyID: classifyId, ClassifyName: classifyName, Content: htmlEscapeBody, SysUserID: sysUserId, SysUserName: sysUserName, EmailMessageUID: emailMessageUID, ReportUpdateTime: reportUpdateTime, ModifyTime: time.Now(), CreateTime: time.Now(), ReportCode: uuid.New().String(), } err = report.CreateOutsideReport(reportInfo, outsideReportAttachmentList) return } func ensureDirExists(dirPath string) error { info, err := os.Stat(dirPath) if err == nil { if info.IsDir() { return nil // 目录已存在 } return fmt.Errorf("path '%s' exists but is not a directory", dirPath) } if os.IsNotExist(err) { return os.MkdirAll(dirPath, fs.ModePerm) } return err }