task.go 9.3 KB


  1. package scheduler
  2. import (
  3. "context"
  4. "eta/eta_mini_crm/models"
  5. "eta/eta_mini_crm/utils"
  6. "fmt"
  7. "time"
  8. "github.com/beego/beego/v2/task"
  9. )
  10. func InitJob() {
  11. fmt.Println("消息推送任务开启。。。")
  12. // 每天凌晨12点10分检测, 发送消息
  13. tk1 := task.NewTask("SendReminderMsg", "0 10 0 * * *", SendReminderMsg)
  14. task.AddTask("发送消息提醒", tk1)
  15. // 每天凌晨12点检测, 修改用户状态
  16. tk2 := task.NewTask("ModifyUserStatus", "5 0 0 * * *", ModifyUserStatus)
  17. task.AddTask("定时修改用户状态", tk2)
  18. // 每隔1分钟,同步报告状态
  19. tk3 := task.NewTask("SyncReportPushStatus", "0 0/1 * * * *", SyncReportPushStatus)
  20. task.AddTask("定时同步报告推送状态", tk3)
  21. task.StartTask()
  22. }
  23. func SyncReportPushStatus(ctx context.Context) (err error) {
  24. defer func() {
  25. if err != nil {
  26. utils.FileLog.Info("同步研报推送状态出错,Err: %s", err)
  27. }
  28. if err := recover(); err != nil {
  29. utils.FileLog.Warn("同步研报推送状态出错,定时任务出错,Err: %s", err)
  30. }
  31. }()
  32. // 获取前三分钟修改的数据
  33. conditon := " AND modify_time >= NOW() - INTERVAL 3 MINUTE"
  34. reportList, err := models.GetReportByCondition(conditon)
  35. var reportIds []int
  36. for _, v := range reportList {
  37. reportIds = append(reportIds, v.Id)
  38. }
  39. reportPushList, err := models.GetReportPushStatusByReportIds(reportIds)
  40. if err != nil {
  41. return
  42. }
  43. reportPushMap := make(map[int]*models.ReportPushStatus)
  44. for _, v := range reportPushList {
  45. reportPushMap[v.ReportId] = v
  46. }
  47. var insertReportPushList []*models.ReportPushStatus
  48. for _, v := range reportList {
  49. var updateCols []string
  50. if reportPush, ok := reportPushMap[v.Id]; ok {
  51. if reportPush.Title != v.Title {
  52. reportPush.Title = v.Title
  53. updateCols = append(updateCols, "title")
  54. }
  55. if reportPush.Abstract != v.Abstract {
  56. reportPush.Abstract = v.Abstract
  57. updateCols = append(updateCols, "abstract")
  58. }
  59. if reportPush.Stage != v.Stage {
  60. reportPush.Stage = v.Stage
  61. updateCols = append(updateCols, "stage")
  62. }
  63. if reportPush.ClassifyIdFirst != v.ClassifyIdFirst {
  64. reportPush.ClassifyIdFirst = v.ClassifyIdFirst
  65. updateCols = append(updateCols, "classify_id_first")
  66. }
  67. if reportPush.ClassifyNameFirst != v.ClassifyNameFirst {
  68. reportPush.ClassifyNameFirst = v.ClassifyNameFirst
  69. updateCols = append(updateCols, "classify_name_first")
  70. }
  71. if reportPush.ClassifyIdSecond != v.ClassifyIdSecond {
  72. reportPush.ClassifyIdSecond = v.ClassifyIdSecond
  73. updateCols = append(updateCols, "classify_id_second")
  74. }
  75. if reportPush.ClassifyNameSecond != v.ClassifyNameSecond {
  76. reportPush.ClassifyNameSecond = v.ClassifyNameSecond
  77. updateCols = append(updateCols, "classify_name_second")
  78. }
  79. if reportPush.ClassifyIdThird != v.ClassifyIdThird {
  80. reportPush.ClassifyIdThird = v.ClassifyIdThird
  81. updateCols = append(updateCols, "classify_id_third")
  82. }
  83. if reportPush.ClassifyNameThird != v.ClassifyNameThird {
  84. reportPush.ClassifyNameThird = v.ClassifyNameThird
  85. updateCols = append(updateCols, "classify_name_third")
  86. }
  87. if reportPush.Author != v.Author {
  88. reportPush.Author = v.Author
  89. updateCols = append(updateCols, "author")
  90. }
  91. if reportPush.PublishTime != v.PublishTime {
  92. reportPush.PublishTime = v.PublishTime
  93. updateCols = append(updateCols, "publish_time")
  94. }
  95. if len(updateCols) > 0 {
  96. reportPush.Update(updateCols)
  97. }
  98. } else {
  99. insertReportPushList = append(insertReportPushList, &models.ReportPushStatus{
  100. ReportId: v.Id,
  101. State: 0,
  102. Title: v.Title,
  103. Abstract: v.Abstract,
  104. Stage: v.Stage,
  105. ClassifyIdFirst: v.ClassifyIdFirst,
  106. ClassifyNameFirst: v.ClassifyNameFirst,
  107. ClassifyIdSecond: v.ClassifyIdSecond,
  108. ClassifyNameSecond: v.ClassifyNameSecond,
  109. ClassifyIdThird: v.ClassifyIdThird,
  110. ClassifyNameThird: v.ClassifyNameThird,
  111. Author: v.Author,
  112. ReportType: 1,
  113. PublishTime: v.PublishTime,
  114. CreateTime: time.Now(),
  115. ModifyTime: time.Now(),
  116. })
  117. }
  118. }
  119. err = models.BatchAddReportPushStatus(insertReportPushList)
  120. if err != nil {
  121. return
  122. }
  123. return
  124. }
  125. func ModifyUserStatus(ctx context.Context) (err error) {
  126. defer func() {
  127. if err != nil {
  128. utils.ApiLog.Info("修改用户状态,定时任务出错,Err: %s", err)
  129. }
  130. if err := recover(); err != nil {
  131. utils.ApiLog.Warn("修改用户状态,定时任务出错,Err: %s", err)
  132. }
  133. }()
  134. curTime := time.Now()
  135. var pars []interface{}
  136. condition := ` AND valid_end_time<?`
  137. pars = append(pars, curTime)
  138. condition += ` AND status=? `
  139. pars = append(pars, 2)
  140. userIds, err := models.GetUserIdListByCondition(condition, pars)
  141. if err != nil {
  142. return
  143. }
  144. err = models.UpdateUserStatus(condition, pars)
  145. if err != nil {
  146. return
  147. }
  148. userRecordList := make([]*models.UserChangeRecord, 0)
  149. for _, v := range userIds {
  150. record := &models.UserChangeRecord{}
  151. record.UserId = v
  152. record.SysUserId = 0
  153. record.Content = "有效期到期禁用用户"
  154. record.CreateTime = time.Now()
  155. userRecordList = append(userRecordList, record)
  156. }
  157. if len(userRecordList) > 0 {
  158. err = models.UserChangeRecordMultiInsert(userRecordList)
  159. if err != nil {
  160. return
  161. }
  162. }
  163. return
  164. }
  165. func SendReminderMsg(ctx context.Context) (err error) {
  166. defer func() {
  167. if err != nil {
  168. utils.ApiLog.Info("发送提醒消息,定时任务出错,Err: %s", err)
  169. }
  170. if err := recover(); err != nil {
  171. utils.ApiLog.Warn("发送提醒消息,定时任务出错,Err: %s", err)
  172. }
  173. }()
  174. var userPars7 []interface{}
  175. var userPars15 []interface{}
  176. var userPars30 []interface{}
  177. var userPars60 []interface{}
  178. var msgPars7 []interface{}
  179. var msgPars15 []interface{}
  180. var msgPars30 []interface{}
  181. var msgPars60 []interface{}
  182. userTime7 := time.Now().AddDate(0, 0, 7)
  183. userTime15 := time.Now().AddDate(0, 0, 15)
  184. userTime30 := time.Now().AddDate(0, 0, 30)
  185. userTime60 := time.Now().AddDate(0, 0, 60)
  186. msgTime7 := time.Now().AddDate(0, 0, -7)
  187. msgTime15 := time.Now().AddDate(0, 0, -15)
  188. msgTime30 := time.Now().AddDate(0, 0, -30)
  189. msgTime60 := time.Now().AddDate(0, 0, -60)
  190. userCondition1 := ` AND valid_end_time<? `
  191. userCondition2 := ` AND valid_end_time<? AND valid_end_time>? `
  192. msgCondition := ` AND create_time>? `
  193. userPars7 = append(userPars7, userTime7)
  194. userPars15 = append(userPars15, userTime15, userTime7)
  195. userPars30 = append(userPars30, userTime30, userTime15)
  196. userPars60 = append(userPars60, userTime60, userTime30)
  197. msgPars7 = append(msgPars7, msgTime7)
  198. msgPars15 = append(msgPars15, msgTime15)
  199. msgPars30 = append(msgPars30, msgTime30)
  200. msgPars60 = append(msgPars60, msgTime60)
  201. err = SendMsgToSysUser(userCondition1, msgCondition, userPars7, msgPars7, 7)
  202. if err != nil {
  203. utils.ApiLog.Warn("发送提醒消息,定时任务出错,Err: %s", err.Error())
  204. }
  205. err = SendMsgToSysUser(userCondition2, msgCondition, userPars15, msgPars15, 15)
  206. if err != nil {
  207. utils.ApiLog.Warn("发送提醒消息,定时任务出错,Err: %s", err.Error())
  208. }
  209. err = SendMsgToSysUser(userCondition2, msgCondition, userPars30, msgPars30, 30)
  210. if err != nil {
  211. utils.ApiLog.Warn("发送提醒消息,定时任务出错,Err: %s", err.Error())
  212. }
  213. err = SendMsgToSysUser(userCondition2, msgCondition, userPars60, msgPars60, 60)
  214. if err != nil {
  215. utils.ApiLog.Warn("发送提醒消息,定时任务出错,Err: %s", err.Error())
  216. }
  217. return
  218. }
  219. func SendMsgToSysUser(userCondition, msgCondition string, userPars, msgPars []interface{}, messagetType int) (err error) {
  220. total, err := models.GetUserCount(userCondition, userPars)
  221. if err != nil {
  222. utils.ApiLog.Warn("获取用户列表失败 task err: %s", err.Error())
  223. }
  224. if total == 0 {
  225. return
  226. }
  227. // 获得待发送用户的列表
  228. userList, err := models.GetUserList(userCondition, userPars, 0, total)
  229. if err != nil {
  230. return
  231. }
  232. // 获得待通知系统用户的列表
  233. sysUserIds, err := models.GetSysUserIdList()
  234. if err != nil {
  235. return
  236. }
  237. messageList, err := models.GetSysMessageReportByCondition(msgCondition, msgPars)
  238. if err != nil {
  239. return
  240. }
  241. layout := "【%s】到期%d天提醒,请及时跟进"
  242. messageMap := make(map[int]map[int]struct{})
  243. for _, v := range messageList {
  244. if mv, ok := messageMap[v.ReceiveSysUserId]; ok {
  245. mv[v.UserId] = struct{}{}
  246. } else {
  247. messageMap[v.ReceiveSysUserId] = make(map[int]struct{})
  248. messageMap[v.ReceiveSysUserId][v.UserId] = struct{}{}
  249. }
  250. }
  251. sendMsg := make([]*models.SysMessageReport, 0)
  252. for _, v := range sysUserIds {
  253. if userMap, ok := messageMap[v]; ok {
  254. for _, u := range userList {
  255. if _, ok := userMap[u.UserId]; !ok {
  256. sendMsg = append(sendMsg, &models.SysMessageReport{
  257. UserId: u.UserId,
  258. ReceiveSysUserId: v,
  259. MessageType: messagetType,
  260. IsRead: false,
  261. CreateTime: time.Now(),
  262. ModifyTime: time.Now(),
  263. Content: fmt.Sprintf(layout, u.RealName, messagetType),
  264. })
  265. }
  266. }
  267. } else {
  268. for _, u := range userList {
  269. sendMsg = append(sendMsg, &models.SysMessageReport{
  270. UserId: u.UserId,
  271. ReceiveSysUserId: v,
  272. MessageType: messagetType,
  273. IsRead: false,
  274. CreateTime: time.Now(),
  275. ModifyTime: time.Now(),
  276. Content: fmt.Sprintf(layout, u.RealName, messagetType),
  277. })
  278. }
  279. }
  280. }
  281. if len(sendMsg) > 0 {
  282. err = models.InsertMultiSysMessageReport(sendMsg)
  283. }
  284. return
  285. }