eta_task.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. package base
  2. import (
  3. "context"
  4. "encoding/json"
  5. taskService "eta/eta_mini_ht_api/domian/task"
  6. "time"
  7. )
  8. const (
  9. FORBIDDEN = "forbidden"
  10. DEV = "dev"
  11. TEST = "test"
  12. PROD = "prod"
  13. )
  14. type TaskType string
  15. type ETATask struct {
  16. RunMode string
  17. Name TaskType
  18. TaskFn Task
  19. Cron string
  20. }
  21. type TaskDetail struct {
  22. TaskId int
  23. StartTime string
  24. EndTime string
  25. Content string
  26. }
  27. type Task interface {
  28. Execute(taskDetail *TaskDetail) error
  29. }
  30. func NewTask(name TaskType, cron string, taskFunc Task, mode string) ETATask {
  31. return ETATask{
  32. Name: name,
  33. TaskFn: taskFunc,
  34. Cron: cron,
  35. RunMode: mode,
  36. }
  37. }
  38. var tasks = make(map[TaskType]*ETATask)
  39. func GetTasks() map[TaskType]*ETATask {
  40. return tasks
  41. }
  42. func RegisterTask(etaTask *ETATask) {
  43. if etaTask == nil {
  44. panic("不能注册空任务")
  45. }
  46. if _, ok := tasks[etaTask.Name]; ok {
  47. panic("请勿重复创建任务:" + etaTask.Name)
  48. }
  49. tasks[etaTask.Name] = etaTask
  50. }
  51. func (eta *ETATask) Run(_ context.Context) (err error) {
  52. details := TaskDetail{
  53. StartTime: time.Now().Format(time.DateTime),
  54. }
  55. runTask := taskService.TaskDTO{
  56. TaskType: string(eta.Name),
  57. }
  58. //启动任务
  59. taskId, err := taskService.InitAndStartTask(runTask)
  60. details.TaskId = taskId
  61. err = eta.TaskFn.Execute(&details)
  62. details.EndTime = time.Now().Format(time.DateTime)
  63. detailStr, err := json.Marshal(details)
  64. //任务状态
  65. if err != nil {
  66. taskService.UpdateAndFailedTask(taskId, string(detailStr))
  67. return
  68. }
  69. taskService.UpdateAndSuccessTask(taskId, string(detailStr))
  70. return
  71. }