mail.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. package pcsg
  2. import (
  3. "eta/eta_email_analysis/global"
  4. "eta/eta_email_analysis/models/eta"
  5. "eta/eta_email_analysis/models/report"
  6. "eta/eta_email_analysis/services/oss"
  7. "eta/eta_email_analysis/utils"
  8. "eta/eta_email_analysis/utils/mail"
  9. "fmt"
  10. "github.com/google/uuid"
  11. "html"
  12. "io/fs"
  13. "os"
  14. "path"
  15. "strings"
  16. "sync"
  17. "time"
  18. )
  19. // 同步用户锁,防止重复同步,不管是全量还是增量,都是同一时间只能一个同步
  20. var lockListenEmail sync.Mutex
  21. func ListenMail() (err error) {
  22. lockListenEmail.Lock()
  23. // 目录创建
  24. _ = ensureDirExists(fmt.Sprintf("%s%s", global.CONFIG.Serve.StaticDir, `file`))
  25. mailMessageChan := make(chan mail.MailMessage, 5) // 创建一个通道,用于接收邮件消息
  26. mailMessageDoneChan := make(chan bool, 1) // 创建一个通道,用于接收邮件消息
  27. fmt.Println(len(mailMessageChan))
  28. // 邮件监听后的处理函数
  29. go afterByListen(mailMessageChan, mailMessageDoneChan)
  30. fmt.Println("开始监听邮件")
  31. emailMessageUID, err := report.GetMaxOutsideReportByEmailMessageId()
  32. // 已经存在了,那么就返回
  33. if err != nil {
  34. global.FILE_LOG.Errorf("获取已入库的最大邮件id失败:%s", err.Error())
  35. }
  36. mailAddress := fmt.Sprintf("%s:%d", global.CONFIG.Email.Host, global.CONFIG.Email.Port)
  37. mail.ListenMail(mailAddress, global.CONFIG.Email.Folder, global.CONFIG.Email.UserName, global.CONFIG.Email.Password, global.CONFIG.Email.ReadBatchSize, emailMessageUID, mailMessageChan, mailMessageDoneChan)
  38. return
  39. }
  40. func afterByListen(mailMessageChan chan mail.MailMessage, mailMessageDoneChan chan bool) {
  41. defer func() {
  42. fmt.Println("监听读取结束")
  43. lockListenEmail.Unlock()
  44. }()
  45. for {
  46. select {
  47. case emailMessage := <-mailMessageChan:
  48. //fmt.Println("读取成功")
  49. //fmt.Println(emailMessage.Title)
  50. handleMailMessage(emailMessage)
  51. case <-time.After(10 * time.Second):
  52. //fmt.Println("监听超时了")
  53. break
  54. case <-mailMessageDoneChan:
  55. fmt.Println("读取完成一轮了")
  56. for len(mailMessageChan) > 0 {
  57. emailMessage := <-mailMessageChan
  58. handleMailMessage(emailMessage)
  59. }
  60. fmt.Println("结束了")
  61. return
  62. }
  63. }
  64. }
  65. func handleMailMessage(emailMessage mail.MailMessage) (err error) {
  66. defer func() {
  67. if err != nil {
  68. global.FILE_LOG.Errorf("邮件处理失败,邮件标题:%s,错误原因:%v", emailMessage.Title, err)
  69. }
  70. for _, v := range emailMessage.Resources {
  71. os.Remove(v)
  72. }
  73. for _, v := range emailMessage.Attachment {
  74. os.Remove(v)
  75. }
  76. }()
  77. //rootPath := `C:\Users\123\go\src\eta\eta_email_analysis\static\`
  78. ossClient := oss.NewOssClient()
  79. //fmt.Println(emailMessage.Title)
  80. outsideReportAttachmentList := make([]*report.OutsideReportAttachment, 0)
  81. emailMessageUID := int(emailMessage.Uid)
  82. outReport, err := report.GetOutsideReportByEmailMessageId(emailMessageUID)
  83. // 已经存在了,那么就返回
  84. if err == nil {
  85. global.FILE_LOG.Debugf("已存在,就不处理了,报告标题:%s", outReport.Title)
  86. return
  87. }
  88. fmt.Println("开始处理邮件,标题:", emailMessage.Title, ";邮件下标:", emailMessage.Uid)
  89. // sql报错,那么就返回
  90. if err != nil && !utils.IsErrNoRow(err) {
  91. return
  92. }
  93. // 已经存在了,那么就返回
  94. if outReport.OutsideReportID > 0 {
  95. return
  96. }
  97. for k, v := range emailMessage.Resources {
  98. randStr := utils.GetRandStringNoSpecialChar(28)
  99. ext := path.Ext(v)
  100. fileName := randStr + ext
  101. resourceUrl, tmpErr := ossClient.UploadFile(fileName, v, "")
  102. //os.Remove(v)
  103. if tmpErr != nil {
  104. global.FILE_LOG.Error(emailMessage.Title, "- 文件上传文件失败:", tmpErr)
  105. continue
  106. }
  107. tmpK := strings.Replace(k, "<", "cid:", -1)
  108. tmpK = strings.Replace(tmpK, ">", "", -1)
  109. emailMessage.Content = strings.Replace(emailMessage.Content, fmt.Sprint(tmpK, `"`), fmt.Sprint(resourceUrl, `"`), -1)
  110. }
  111. for name, v := range emailMessage.Attachment {
  112. var fileSize int64
  113. fileInfo, tmpErr := os.Stat(v)
  114. if tmpErr != nil {
  115. continue
  116. }
  117. fileSize = fileInfo.Size()
  118. randStr := utils.GetRandStringNoSpecialChar(28)
  119. ext := path.Ext(v)
  120. fileName := randStr + ext
  121. resourceUrl, tmpErr := ossClient.UploadFile(fileName, v, "")
  122. //defer os.Remove(v)
  123. if tmpErr != nil {
  124. global.FILE_LOG.Error(emailMessage.Title, " - 文件上传文件失败:", tmpErr)
  125. continue
  126. }
  127. outsideReportAttachmentList = append(outsideReportAttachmentList, &report.OutsideReportAttachment{
  128. OutsideReportAttachmentID: 0,
  129. OutsideReportID: 0,
  130. Title: name,
  131. URL: resourceUrl,
  132. CreateTime: time.Now(),
  133. FileSize: fileSize,
  134. })
  135. }
  136. //fileName := fmt.Sprintf("%s%s.%s", rootPath, emailMessage.Title, "html")
  137. htmlEscapeBody := html.EscapeString(emailMessage.Content)
  138. //fmt.Println(htmlEscapeBody)
  139. //err = utils.SaveToFile([]byte(emailMessage.Content), fileName)
  140. //if err != nil {
  141. // fmt.Println(fileName, "生成失败;err:", err)
  142. //}
  143. var sysUserId int
  144. sysUserName := emailMessage.FromEmail
  145. // 查找用户
  146. if emailMessage.From != `` {
  147. tmpEmailStrList := strings.Split(sysUserName, "<")
  148. if len(tmpEmailStrList) >= 2 {
  149. sysUserName = tmpEmailStrList[1]
  150. tmpEmailStrList = strings.Split(sysUserName, ">")
  151. if len(tmpEmailStrList) >= 2 {
  152. sysUserName = tmpEmailStrList[0]
  153. }
  154. }
  155. sysUser, tmpErr := eta.GetSysUserByEmail(sysUserName)
  156. if tmpErr != nil && !utils.IsErrNoRow(tmpErr) {
  157. err = tmpErr
  158. return
  159. }
  160. if tmpErr == nil {
  161. sysUserId = sysUser.AdminId
  162. sysUserName = sysUser.RealName
  163. }
  164. }
  165. reportUpdateTime := emailMessage.Date
  166. if reportUpdateTime.IsZero() {
  167. reportUpdateTime = time.Now()
  168. }
  169. reportInfo := &report.OutsideReport{
  170. OutsideReportID: 0,
  171. Source: 3,
  172. Title: emailMessage.Title,
  173. Abstract: "",
  174. ClassifyID: 0,
  175. ClassifyName: "",
  176. Content: htmlEscapeBody,
  177. SysUserID: sysUserId,
  178. SysUserName: sysUserName,
  179. EmailMessageUID: emailMessageUID,
  180. ReportUpdateTime: reportUpdateTime,
  181. ModifyTime: time.Now(),
  182. CreateTime: time.Now(),
  183. ReportCode: uuid.New().String(),
  184. }
  185. err = report.CreateOutsideReport(reportInfo, outsideReportAttachmentList)
  186. return
  187. }
  188. func ensureDirExists(dirPath string) error {
  189. info, err := os.Stat(dirPath)
  190. if err == nil {
  191. if info.IsDir() {
  192. return nil // 目录已存在
  193. }
  194. return fmt.Errorf("path '%s' exists but is not a directory", dirPath)
  195. }
  196. if os.IsNotExist(err) {
  197. return os.MkdirAll(dirPath, fs.ModePerm)
  198. }
  199. return err
  200. }