mail.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. package pcsg
  2. import (
  3. "encoding/json"
  4. "eta/eta_email_analysis/global"
  5. "eta/eta_email_analysis/models/eta"
  6. "eta/eta_email_analysis/models/report"
  7. "eta/eta_email_analysis/services/oss"
  8. "eta/eta_email_analysis/utils"
  9. "eta/eta_email_analysis/utils/mail"
  10. "fmt"
  11. "github.com/google/uuid"
  12. "html"
  13. "io/fs"
  14. "log"
  15. "os"
  16. "path"
  17. "strings"
  18. "sync"
  19. "time"
  20. )
  21. // 同步用户锁,防止重复同步,不管是全量还是增量,都是同一时间只能一个同步
  22. var lockListenEmail sync.Mutex
  23. // 邮件改名规则
  24. var ruleList []report.MailRule
  25. func ListenMail() {
  26. var err error
  27. defer func() {
  28. if err != nil {
  29. global.FILE_LOG.Errorf("监听邮件失败:%s", err.Error())
  30. }
  31. }()
  32. //Fix()
  33. //return
  34. lockListenEmail.Lock()
  35. // 目录创建
  36. _ = ensureDirExists(fmt.Sprintf("%s%s", global.CONFIG.Serve.StaticDir, `file`))
  37. mailMessageChan := make(chan mail.MailMessage, 5) // 创建一个通道,用于接收邮件消息
  38. mailMessageDoneChan := make(chan bool, 1) // 创建一个通道,用于接收邮件消息
  39. //fmt.Println(len(mailMessageChan))
  40. // 获取规则
  41. ruleList = []report.MailRule{}
  42. ruleJson, e := eta.GetBusinessConfByKey("MailCheckRule")
  43. if e != nil {
  44. err = e
  45. global.FILE_LOG.Error("获取规则配置失败:%v", err)
  46. return
  47. }
  48. err = json.Unmarshal([]byte(ruleJson.ConfVal), &ruleList)
  49. if err != nil {
  50. global.FILE_LOG.Error("解析规则配置失败:%v", err)
  51. return
  52. }
  53. // 邮件监听后的处理函数
  54. go afterByListen(mailMessageChan, mailMessageDoneChan)
  55. fmt.Println("开始监听邮件")
  56. emailMessageUID := global.CONFIG.Email.StartReadIndex
  57. if emailMessageUID <= 0 {
  58. emailMessageUID, err = report.GetMaxOutsideReportByEmailMessageId()
  59. // 查找失败了,那么就记录
  60. if err != nil {
  61. global.FILE_LOG.Errorf("获取已入库的最大邮件id失败:%s", err.Error())
  62. }
  63. }
  64. {
  65. idList, err := report.GetAllOutsideReportByEmailMessageId()
  66. // 已经存在了,那么就返回
  67. if err != nil {
  68. global.FILE_LOG.Errorf("获取已入库的所有邮件id失败:%s", err.Error())
  69. } else {
  70. idMap := make(map[int]bool)
  71. for _, v := range idList {
  72. idMap[v] = true
  73. }
  74. mail.IsHandleMessageIdMap = idMap
  75. }
  76. }
  77. mailAddress := fmt.Sprintf("%s:%d", global.CONFIG.Email.Host, global.CONFIG.Email.Port)
  78. mail.ListenMail(mailAddress, global.CONFIG.Email.Folder, global.CONFIG.Email.UserName, global.CONFIG.Email.Password, global.CONFIG.Email.ReadBatchSize, emailMessageUID, mailMessageChan, mailMessageDoneChan)
  79. return
  80. }
  81. func afterByListen(mailMessageChan chan mail.MailMessage, mailMessageDoneChan chan bool) {
  82. defer func() {
  83. log.Println("监听读取结束")
  84. lockListenEmail.Unlock()
  85. }()
  86. for {
  87. select {
  88. case emailMessage := <-mailMessageChan:
  89. //fmt.Println("读取成功")
  90. //fmt.Println(emailMessage.Title)
  91. handleMailMessage(emailMessage)
  92. case <-time.After(10 * time.Second):
  93. //fmt.Println("监听超时了")
  94. break
  95. case <-mailMessageDoneChan:
  96. //fmt.Println("读取完成一轮了")
  97. for len(mailMessageChan) > 0 {
  98. emailMessage := <-mailMessageChan
  99. handleMailMessage(emailMessage)
  100. }
  101. //fmt.Println("结束了")
  102. return
  103. }
  104. }
  105. }
  106. func handleMailMessage(emailMessage mail.MailMessage) (err error) {
  107. defer func() {
  108. if err != nil {
  109. global.FILE_LOG.Errorf("邮件处理失败,邮件标题:%s,错误原因:%v", emailMessage.Title, err)
  110. }
  111. for _, v := range emailMessage.Resources {
  112. os.Remove(v)
  113. }
  114. for _, v := range emailMessage.Attachment {
  115. os.Remove(v)
  116. }
  117. }()
  118. // 记录邮件信息
  119. {
  120. obj := new(report.OutsideEmailBaseInfo)
  121. outEmail, _ := obj.GetByEmailMessageIdAndFolder(emailMessage.Uid, emailMessage.Folder)
  122. if outEmail == nil || outEmail.EmailMessageUid <= 0 {
  123. obj = &report.OutsideEmailBaseInfo{
  124. Id: 0,
  125. Folder: emailMessage.Folder,
  126. EmailMessageUid: emailMessage.Uid,
  127. Title: emailMessage.Title,
  128. FromAddress: emailMessage.FromAddress,
  129. From: emailMessage.From,
  130. DeliveryTime: emailMessage.Date,
  131. CreateTime: time.Now(),
  132. }
  133. tmpErr := obj.Add(obj)
  134. if tmpErr != nil {
  135. global.FILE_LOG.Error("保存邮件基础信息失败:" + tmpErr.Error())
  136. }
  137. }
  138. }
  139. //rootPath := `C:\Users\123\go\src\eta\eta_email_analysis\static\`
  140. ossClient := oss.NewOssClient()
  141. //fmt.Println(emailMessage.Title)
  142. outsideReportAttachmentList := make([]*report.OutsideReportAttachment, 0)
  143. emailMessageUID := int(emailMessage.Uid)
  144. outReport, err := report.GetOutsideReportByEmailMessageId(emailMessageUID)
  145. // 已经存在了,那么就返回
  146. if err == nil {
  147. if outReport.ReportUpdateTime != emailMessage.Date {
  148. // 如果更新时间不一样,那么就更新收件时间
  149. err = outReport.Update([]string{"ReportUpdateTime"})
  150. if err != nil {
  151. global.FILE_LOG.Errorf("更新收件时间失败:%s", err.Error())
  152. return
  153. }
  154. global.FILE_LOG.Debugf("更新收件时间,名称:%s, 原时间:%s, 新收件时间: %s", outReport.Title, outReport.ReportUpdateTime, emailMessage.Date)
  155. }
  156. global.FILE_LOG.Debugf("已存在,就不处理了,报告标题:%s;;邮件下标:%d", outReport.Title, emailMessage.Uid)
  157. return
  158. }
  159. fmt.Println("开始处理邮件,标题:", emailMessage.Title, ";邮件下标:", emailMessage.Uid)
  160. // sql报错,那么就返回
  161. if err != nil && !utils.IsErrNoRow(err) {
  162. return
  163. }
  164. // 已经存在了,那么就返回
  165. if outReport.OutsideReportID > 0 {
  166. return
  167. }
  168. title := emailMessage.Title
  169. var abstract string
  170. var classifyId int
  171. var classifyName string
  172. var sysUserId int
  173. var sysUserName string
  174. sysUserName = emailMessage.FromAddress
  175. // 查找用户
  176. if emailMessage.From != `` {
  177. tmpEmailStrList := strings.Split(sysUserName, "<")
  178. if len(tmpEmailStrList) >= 2 {
  179. sysUserName = tmpEmailStrList[1]
  180. tmpEmailStrList = strings.Split(sysUserName, ">")
  181. if len(tmpEmailStrList) >= 2 {
  182. sysUserName = tmpEmailStrList[0]
  183. }
  184. }
  185. sysUser, tmpErr := eta.GetSysUserByEmail(sysUserName)
  186. if tmpErr != nil && !utils.IsErrNoRow(tmpErr) {
  187. err = tmpErr
  188. return
  189. }
  190. if tmpErr == nil {
  191. sysUserId = sysUser.AdminId
  192. sysUserName = sysUser.RealName
  193. }
  194. }
  195. title = strings.Replace(title, "[PCI ANALYTICS CLOUD PLATFORM(00) Fwd] - ", "", -1)
  196. for _, v := range ruleList {
  197. tmpTitle := strings.ToLower(emailMessage.Title)
  198. rule := strings.ToLower(v.Rule)
  199. if strings.Contains(tmpTitle, rule) {
  200. title = v.Title
  201. abstract = v.Abstract
  202. classifyId = v.ClassifyId
  203. classify, e := report.GetClassifyByClassifyId(v.ClassifyId)
  204. if e != nil {
  205. err = e
  206. global.FILE_LOG.Error("获取分类失败:", err)
  207. return
  208. }
  209. classifyName = classify.ClassifyName
  210. // 特殊规则
  211. //if strings.Contains(title, "海外分公司市场信息月报") || strings.Contains(title, "区域市场信息月报") {
  212. // if emailMessage.From == "report.pcanalyst00@petrochina-usa.com" {
  213. // sysUserName = "PCI"
  214. // break
  215. // } else {
  216. // v.Author = emailMessage.From
  217. // }
  218. //}
  219. if v.Author != `` {
  220. if strings.Contains(v.Author, "@") {
  221. sysUser, tmpErr := eta.GetSysUserByEmail(v.Author)
  222. if tmpErr != nil && !utils.IsErrNoRow(tmpErr) {
  223. err = tmpErr
  224. return
  225. }
  226. if sysUser.AdminId > 0 {
  227. sysUserId = sysUser.AdminId
  228. sysUserName = sysUser.RealName
  229. } else {
  230. sysUserName = v.Author
  231. }
  232. } else {
  233. sysUserName = v.Author
  234. }
  235. }
  236. break
  237. }
  238. }
  239. for k, v := range emailMessage.Resources {
  240. randStr := utils.GetRandStringNoSpecialChar(28)
  241. ext := path.Ext(v)
  242. fileName := randStr + ext
  243. resourceUrl, tmpErr := ossClient.UploadFile(fileName, v, "")
  244. //os.Remove(v)
  245. if tmpErr != nil {
  246. global.FILE_LOG.Error(emailMessage.Title, "- 文件上传文件失败:", tmpErr)
  247. continue
  248. }
  249. tmpK := strings.Replace(k, "<", "cid:", -1)
  250. tmpK = strings.Replace(tmpK, ">", "", -1)
  251. emailMessage.Content = strings.Replace(emailMessage.Content, fmt.Sprint(tmpK, `"`), fmt.Sprint(resourceUrl, `"`), -1)
  252. }
  253. for name, v := range emailMessage.Attachment {
  254. var fileSize int64
  255. fileInfo, tmpErr := os.Stat(v)
  256. if tmpErr != nil {
  257. continue
  258. }
  259. fileSize = fileInfo.Size()
  260. randStr := utils.GetRandStringNoSpecialChar(28)
  261. ext := path.Ext(v)
  262. fileName := randStr + ext
  263. resourceUrl, tmpErr := ossClient.UploadFile(fileName, v, "")
  264. //defer os.Remove(v)
  265. if tmpErr != nil {
  266. global.FILE_LOG.Error(emailMessage.Title, " - 文件上传文件失败:", tmpErr)
  267. continue
  268. }
  269. outsideReportAttachmentList = append(outsideReportAttachmentList, &report.OutsideReportAttachment{
  270. OutsideReportAttachmentID: 0,
  271. OutsideReportID: 0,
  272. Title: name,
  273. URL: resourceUrl,
  274. CreateTime: time.Now(),
  275. FileSize: fileSize,
  276. })
  277. }
  278. //fileName := fmt.Sprintf("%s%s.%s", rootPath, emailMessage.Title, "html")
  279. htmlEscapeBody := html.EscapeString(emailMessage.Content)
  280. //fmt.Println(htmlEscapeBody)
  281. //err = utils.SaveToFile([]byte(emailMessage.Content), fileName)
  282. //if err != nil {
  283. // fmt.Println(fileName, "生成失败;err:", err)
  284. //}
  285. reportUpdateTime := emailMessage.Date
  286. if reportUpdateTime.IsZero() {
  287. reportUpdateTime = time.Now()
  288. }
  289. reportInfo := &report.OutsideReport{
  290. OutsideReportID: 0,
  291. Source: 3,
  292. Title: title,
  293. Abstract: abstract,
  294. ClassifyID: classifyId,
  295. ClassifyName: classifyName,
  296. Content: htmlEscapeBody,
  297. SysUserID: sysUserId,
  298. SysUserName: sysUserName,
  299. EmailMessageUID: emailMessageUID,
  300. ReportUpdateTime: reportUpdateTime,
  301. ModifyTime: time.Now(),
  302. CreateTime: time.Now(),
  303. ReportCode: uuid.New().String(),
  304. }
  305. err = report.CreateOutsideReport(reportInfo, outsideReportAttachmentList)
  306. return
  307. }
  308. func ensureDirExists(dirPath string) error {
  309. info, err := os.Stat(dirPath)
  310. if err == nil {
  311. if info.IsDir() {
  312. return nil // 目录已存在
  313. }
  314. return fmt.Errorf("path '%s' exists but is not a directory", dirPath)
  315. }
  316. if os.IsNotExist(err) {
  317. return os.MkdirAll(dirPath, fs.ModePerm)
  318. }
  319. return err
  320. }