mail.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. package pcsg
  2. import (
  3. "encoding/json"
  4. "eta/eta_email_analysis/global"
  5. "eta/eta_email_analysis/models/eta"
  6. "eta/eta_email_analysis/models/report"
  7. "eta/eta_email_analysis/services/oss"
  8. "eta/eta_email_analysis/utils"
  9. "eta/eta_email_analysis/utils/mail"
  10. "fmt"
  11. "github.com/google/uuid"
  12. "html"
  13. "io/fs"
  14. "log"
  15. "os"
  16. "path"
  17. "strings"
  18. "sync"
  19. "time"
  20. )
  21. // 同步用户锁,防止重复同步,不管是全量还是增量,都是同一时间只能一个同步
  22. var lockListenEmail sync.Mutex
  23. func ListenMail() {
  24. var err error
  25. defer func() {
  26. if err != nil {
  27. global.FILE_LOG.Errorf("监听邮件失败:%s", err.Error())
  28. }
  29. }()
  30. //Fix()
  31. //return
  32. lockListenEmail.Lock()
  33. // 目录创建
  34. _ = ensureDirExists(fmt.Sprintf("%s%s", global.CONFIG.Serve.StaticDir, `file`))
  35. mailMessageChan := make(chan mail.MailMessage, 5) // 创建一个通道,用于接收邮件消息
  36. mailMessageDoneChan := make(chan bool, 1) // 创建一个通道,用于接收邮件消息
  37. //fmt.Println(len(mailMessageChan))
  38. // 邮件监听后的处理函数
  39. go afterByListen(mailMessageChan, mailMessageDoneChan)
  40. fmt.Println("开始监听邮件")
  41. emailMessageUID := global.CONFIG.Email.StartReadIndex
  42. if emailMessageUID <= 0 {
  43. emailMessageUID, err = report.GetMaxOutsideReportByEmailMessageId()
  44. // 已经存在了,那么就返回
  45. if err != nil {
  46. global.FILE_LOG.Errorf("获取已入库的最大邮件id失败:%s", err.Error())
  47. }
  48. }
  49. mailAddress := fmt.Sprintf("%s:%d", global.CONFIG.Email.Host, global.CONFIG.Email.Port)
  50. mail.ListenMail(mailAddress, global.CONFIG.Email.Folder, global.CONFIG.Email.UserName, global.CONFIG.Email.Password, global.CONFIG.Email.ReadBatchSize, emailMessageUID, mailMessageChan, mailMessageDoneChan)
  51. return
  52. }
  53. func afterByListen(mailMessageChan chan mail.MailMessage, mailMessageDoneChan chan bool) {
  54. defer func() {
  55. log.Println("监听读取结束")
  56. lockListenEmail.Unlock()
  57. }()
  58. for {
  59. select {
  60. case emailMessage := <-mailMessageChan:
  61. //fmt.Println("读取成功")
  62. //fmt.Println(emailMessage.Title)
  63. handleMailMessage(emailMessage)
  64. case <-time.After(10 * time.Second):
  65. //fmt.Println("监听超时了")
  66. break
  67. case <-mailMessageDoneChan:
  68. //fmt.Println("读取完成一轮了")
  69. for len(mailMessageChan) > 0 {
  70. emailMessage := <-mailMessageChan
  71. handleMailMessage(emailMessage)
  72. }
  73. //fmt.Println("结束了")
  74. return
  75. }
  76. }
  77. }
  78. func handleMailMessage(emailMessage mail.MailMessage) (err error) {
  79. defer func() {
  80. if err != nil {
  81. global.FILE_LOG.Errorf("邮件处理失败,邮件标题:%s,错误原因:%v", emailMessage.Title, err)
  82. }
  83. for _, v := range emailMessage.Resources {
  84. os.Remove(v)
  85. }
  86. for _, v := range emailMessage.Attachment {
  87. os.Remove(v)
  88. }
  89. }()
  90. //rootPath := `C:\Users\123\go\src\eta\eta_email_analysis\static\`
  91. ossClient := oss.NewOssClient()
  92. //fmt.Println(emailMessage.Title)
  93. outsideReportAttachmentList := make([]*report.OutsideReportAttachment, 0)
  94. emailMessageUID := int(emailMessage.Uid)
  95. outReport, err := report.GetOutsideReportByEmailMessageId(emailMessageUID)
  96. // 已经存在了,那么就返回
  97. if err == nil {
  98. global.FILE_LOG.Debugf("已存在,就不处理了,报告标题:%s;;邮件下标:%d", outReport.Title, emailMessage.Uid)
  99. return
  100. }
  101. fmt.Println("开始处理邮件,标题:", emailMessage.Title, ";邮件下标:", emailMessage.Uid)
  102. // sql报错,那么就返回
  103. if err != nil && !utils.IsErrNoRow(err) {
  104. return
  105. }
  106. // 已经存在了,那么就返回
  107. if outReport.OutsideReportID > 0 {
  108. return
  109. }
  110. for k, v := range emailMessage.Resources {
  111. randStr := utils.GetRandStringNoSpecialChar(28)
  112. ext := path.Ext(v)
  113. fileName := randStr + ext
  114. resourceUrl, tmpErr := ossClient.UploadFile(fileName, v, "")
  115. //os.Remove(v)
  116. if tmpErr != nil {
  117. global.FILE_LOG.Error(emailMessage.Title, "- 文件上传文件失败:", tmpErr)
  118. continue
  119. }
  120. tmpK := strings.Replace(k, "<", "cid:", -1)
  121. tmpK = strings.Replace(tmpK, ">", "", -1)
  122. emailMessage.Content = strings.Replace(emailMessage.Content, fmt.Sprint(tmpK, `"`), fmt.Sprint(resourceUrl, `"`), -1)
  123. }
  124. for name, v := range emailMessage.Attachment {
  125. var fileSize int64
  126. fileInfo, tmpErr := os.Stat(v)
  127. if tmpErr != nil {
  128. continue
  129. }
  130. fileSize = fileInfo.Size()
  131. randStr := utils.GetRandStringNoSpecialChar(28)
  132. ext := path.Ext(v)
  133. fileName := randStr + ext
  134. resourceUrl, tmpErr := ossClient.UploadFile(fileName, v, "")
  135. //defer os.Remove(v)
  136. if tmpErr != nil {
  137. global.FILE_LOG.Error(emailMessage.Title, " - 文件上传文件失败:", tmpErr)
  138. continue
  139. }
  140. outsideReportAttachmentList = append(outsideReportAttachmentList, &report.OutsideReportAttachment{
  141. OutsideReportAttachmentID: 0,
  142. OutsideReportID: 0,
  143. Title: name,
  144. URL: resourceUrl,
  145. CreateTime: time.Now(),
  146. FileSize: fileSize,
  147. })
  148. }
  149. //fileName := fmt.Sprintf("%s%s.%s", rootPath, emailMessage.Title, "html")
  150. htmlEscapeBody := html.EscapeString(emailMessage.Content)
  151. //fmt.Println(htmlEscapeBody)
  152. //err = utils.SaveToFile([]byte(emailMessage.Content), fileName)
  153. //if err != nil {
  154. // fmt.Println(fileName, "生成失败;err:", err)
  155. //}
  156. var sysUserId int
  157. sysUserName := emailMessage.FromAddress
  158. // 查找用户
  159. if emailMessage.From != `` {
  160. tmpEmailStrList := strings.Split(sysUserName, "<")
  161. if len(tmpEmailStrList) >= 2 {
  162. sysUserName = tmpEmailStrList[1]
  163. tmpEmailStrList = strings.Split(sysUserName, ">")
  164. if len(tmpEmailStrList) >= 2 {
  165. sysUserName = tmpEmailStrList[0]
  166. }
  167. }
  168. sysUser, tmpErr := eta.GetSysUserByEmail(sysUserName)
  169. if tmpErr != nil && !utils.IsErrNoRow(tmpErr) {
  170. err = tmpErr
  171. return
  172. }
  173. if tmpErr == nil {
  174. sysUserId = sysUser.AdminId
  175. sysUserName = sysUser.RealName
  176. }
  177. }
  178. ruleJson, err := eta.GetBusinessConfByKey("MailCheckRule")
  179. if err != nil {
  180. global.FILE_LOG.Error("获取规则配置失败:%v", err)
  181. return
  182. }
  183. var ruleList []report.MailRule
  184. err = json.Unmarshal([]byte(ruleJson.ConfVal), &ruleList)
  185. if err != nil {
  186. global.FILE_LOG.Error("解析规则配置失败:%v", err)
  187. return
  188. }
  189. ruleMap := make(map[string]report.MailRule)
  190. for _, v := range ruleList {
  191. ruleMap[v.Rule] = v
  192. }
  193. reportUpdateTime := emailMessage.Date
  194. if reportUpdateTime.IsZero() {
  195. reportUpdateTime = time.Now()
  196. }
  197. reportInfo := &report.OutsideReport{
  198. OutsideReportID: 0,
  199. Source: 3,
  200. Title: emailMessage.Title,
  201. Abstract: "",
  202. ClassifyID: 0,
  203. ClassifyName: "",
  204. Content: htmlEscapeBody,
  205. SysUserID: sysUserId,
  206. SysUserName: sysUserName,
  207. EmailMessageUID: emailMessageUID,
  208. ReportUpdateTime: reportUpdateTime,
  209. ModifyTime: time.Now(),
  210. CreateTime: time.Now(),
  211. ReportCode: uuid.New().String(),
  212. }
  213. reportInfo.Title = strings.Replace(reportInfo.Title, "[PCI ANALYTICS CLOUD PLATFORM(00) Fwd] - ", "", -1)
  214. for _, v := range ruleList {
  215. if strings.Contains(emailMessage.Title, v.Rule) {
  216. reportInfo.Title = v.Title
  217. reportInfo.Abstract = v.Abstract
  218. reportInfo.ClassifyID = v.ClassifyId
  219. classify, e := report.GetClassifyByClassifyId(v.ClassifyId)
  220. if e != nil {
  221. err = e
  222. global.FILE_LOG.Error("获取分类失败:", err)
  223. return
  224. }
  225. reportInfo.ClassifyName = classify.ClassifyName
  226. if strings.Contains(v.Author, "@") {
  227. sysUser, tmpErr := eta.GetSysUserByEmail(v.Author)
  228. if tmpErr != nil && !utils.IsErrNoRow(tmpErr) {
  229. err = tmpErr
  230. return
  231. }
  232. if sysUser.AdminId > 0 {
  233. reportInfo.SysUserID = sysUser.AdminId
  234. reportInfo.SysUserName = sysUser.RealName
  235. } else {
  236. reportInfo.SysUserName = v.Author
  237. }
  238. } else {
  239. reportInfo.SysUserName = v.Author
  240. }
  241. }
  242. }
  243. err = report.CreateOutsideReport(reportInfo, outsideReportAttachmentList)
  244. return
  245. }
  246. func ensureDirExists(dirPath string) error {
  247. info, err := os.Stat(dirPath)
  248. if err == nil {
  249. if info.IsDir() {
  250. return nil // 目录已存在
  251. }
  252. return fmt.Errorf("path '%s' exists but is not a directory", dirPath)
  253. }
  254. if os.IsNotExist(err) {
  255. return os.MkdirAll(dirPath, fs.ModePerm)
  256. }
  257. return err
  258. }