package pcsg import ( "eta/eta_email_analysis/global" "eta/eta_email_analysis/models/eta" "eta/eta_email_analysis/models/report" "eta/eta_email_analysis/services/oss" "eta/eta_email_analysis/utils" "eta/eta_email_analysis/utils/mail" "fmt" "github.com/google/uuid" "html" "io/fs" "os" "path" "strings" "sync" "time" ) // 同步用户锁,防止重复同步,不管是全量还是增量,都是同一时间只能一个同步 var lockListenEmail sync.Mutex func ListenMail() (err error) { lockListenEmail.Lock() // 目录创建 _ = ensureDirExists(fmt.Sprintf("%s%s", global.CONFIG.Serve.StaticDir, `file`)) mailMessageChan := make(chan mail.MailMessage, 5) // 创建一个通道,用于接收邮件消息 mailMessageDoneChan := make(chan bool, 1) // 创建一个通道,用于接收邮件消息 fmt.Println(len(mailMessageChan)) // 邮件监听后的处理函数 go afterByListen(mailMessageChan, mailMessageDoneChan) fmt.Println("开始监听邮件") emailMessageUID, err := report.GetMaxOutsideReportByEmailMessageId() // 已经存在了,那么就返回 if err != nil { global.FILE_LOG.Errorf("获取已入库的最大邮件id失败:%s", err.Error()) } mailAddress := fmt.Sprintf("%s:%d", global.CONFIG.Email.Host, global.CONFIG.Email.Port) mail.ListenMail(mailAddress, global.CONFIG.Email.Folder, global.CONFIG.Email.UserName, global.CONFIG.Email.Password, global.CONFIG.Email.ReadBatchSize, emailMessageUID, mailMessageChan, mailMessageDoneChan) return } func afterByListen(mailMessageChan chan mail.MailMessage, mailMessageDoneChan chan bool) { defer func() { fmt.Println("监听读取结束") lockListenEmail.Unlock() }() for { select { case emailMessage := <-mailMessageChan: //fmt.Println("读取成功") //fmt.Println(emailMessage.Title) handleMailMessage(emailMessage) case <-time.After(10 * time.Second): //fmt.Println("监听超时了") break case <-mailMessageDoneChan: fmt.Println("读取完成一轮了") for len(mailMessageChan) > 0 { emailMessage := <-mailMessageChan handleMailMessage(emailMessage) } fmt.Println("结束了") return } } } func handleMailMessage(emailMessage mail.MailMessage) (err error) { defer func() { if err != nil { global.FILE_LOG.Errorf("邮件处理失败,邮件标题:%s,错误原因:%v", emailMessage.Title, err) } for _, v := range emailMessage.Resources { os.Remove(v) } for _, v := range emailMessage.Attachment { os.Remove(v) } }() //rootPath := `C:\Users\123\go\src\eta\eta_email_analysis\static\` ossClient := oss.NewOssClient() //fmt.Println(emailMessage.Title) outsideReportAttachmentList := make([]*report.OutsideReportAttachment, 0) emailMessageUID := int(emailMessage.Uid) outReport, err := report.GetOutsideReportByEmailMessageId(emailMessageUID) // 已经存在了,那么就返回 if err == nil { global.FILE_LOG.Debugf("已存在,就不处理了,报告标题:%s", outReport.Title) return } fmt.Println("开始处理邮件,标题:", emailMessage.Title, ";邮件下标:", emailMessage.Uid) // sql报错,那么就返回 if err != nil && !utils.IsErrNoRow(err) { return } // 已经存在了,那么就返回 if outReport.OutsideReportID > 0 { return } for k, v := range emailMessage.Resources { randStr := utils.GetRandStringNoSpecialChar(28) ext := path.Ext(v) fileName := randStr + ext resourceUrl, tmpErr := ossClient.UploadFile(fileName, v, "") //os.Remove(v) if tmpErr != nil { global.FILE_LOG.Error(emailMessage.Title, "- 文件上传文件失败:", tmpErr) continue } tmpK := strings.Replace(k, "<", "cid:", -1) tmpK = strings.Replace(tmpK, ">", "", -1) emailMessage.Content = strings.Replace(emailMessage.Content, fmt.Sprint(tmpK, `"`), fmt.Sprint(resourceUrl, `"`), -1) } for name, v := range emailMessage.Attachment { var fileSize int64 fileInfo, tmpErr := os.Stat(v) if tmpErr != nil { continue } fileSize = fileInfo.Size() randStr := utils.GetRandStringNoSpecialChar(28) ext := path.Ext(v) fileName := randStr + ext resourceUrl, tmpErr := ossClient.UploadFile(fileName, v, "") //defer os.Remove(v) if tmpErr != nil { global.FILE_LOG.Error(emailMessage.Title, " - 文件上传文件失败:", tmpErr) continue } outsideReportAttachmentList = append(outsideReportAttachmentList, &report.OutsideReportAttachment{ OutsideReportAttachmentID: 0, OutsideReportID: 0, Title: name, URL: resourceUrl, CreateTime: time.Now(), FileSize: fileSize, }) } //fileName := fmt.Sprintf("%s%s.%s", rootPath, emailMessage.Title, "html") htmlEscapeBody := html.EscapeString(emailMessage.Content) //fmt.Println(htmlEscapeBody) //err = utils.SaveToFile([]byte(emailMessage.Content), fileName) //if err != nil { // fmt.Println(fileName, "生成失败;err:", err) //} var sysUserId int sysUserName := emailMessage.FromEmail // 查找用户 if emailMessage.From != `` { tmpEmailStrList := strings.Split(sysUserName, "<") if len(tmpEmailStrList) >= 2 { sysUserName = tmpEmailStrList[1] tmpEmailStrList = strings.Split(sysUserName, ">") if len(tmpEmailStrList) >= 2 { sysUserName = tmpEmailStrList[0] } } sysUser, tmpErr := eta.GetSysUserByEmail(sysUserName) if tmpErr != nil && !utils.IsErrNoRow(tmpErr) { err = tmpErr return } if tmpErr == nil { sysUserId = sysUser.AdminId sysUserName = sysUser.RealName } } reportUpdateTime := emailMessage.Date if reportUpdateTime.IsZero() { reportUpdateTime = time.Now() } reportInfo := &report.OutsideReport{ OutsideReportID: 0, Source: 3, Title: emailMessage.Title, Abstract: "", ClassifyID: 0, ClassifyName: "", Content: htmlEscapeBody, SysUserID: sysUserId, SysUserName: sysUserName, EmailMessageUID: emailMessageUID, ReportUpdateTime: reportUpdateTime, ModifyTime: time.Now(), CreateTime: time.Now(), ReportCode: uuid.New().String(), } err = report.CreateOutsideReport(reportInfo, outsideReportAttachmentList) return } func ensureDirExists(dirPath string) error { info, err := os.Stat(dirPath) if err == nil { if info.IsDir() { return nil // 目录已存在 } return fmt.Errorf("path '%s' exists but is not a directory", dirPath) } if os.IsNotExist(err) { return os.MkdirAll(dirPath, fs.ModePerm) } return err }