eta_task.go 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package base
  2. import (
  3. "context"
  4. "encoding/json"
  5. taskService "eta/eta_mini_ht_api/domian/task"
  6. "time"
  7. )
  8. const (
  9. DEV = "dev"
  10. TEST = "test"
  11. PROD = "prod"
  12. )
  13. type TaskType string
  14. type ETATask struct {
  15. RunMode string
  16. Name TaskType
  17. TaskFn Task
  18. Cron string
  19. }
  20. type TaskDetail struct {
  21. TaskId int
  22. StartTime string
  23. EndTime string
  24. Content string
  25. }
  26. type Task interface {
  27. Execute(taskDetail *TaskDetail) error
  28. }
  29. func NewTask(name TaskType, cron string, taskFunc Task, mode string) ETATask {
  30. return ETATask{
  31. Name: name,
  32. TaskFn: taskFunc,
  33. Cron: cron,
  34. RunMode: mode,
  35. }
  36. }
  37. var tasks = make(map[TaskType]*ETATask)
  38. func GetTasks() map[TaskType]*ETATask {
  39. return tasks
  40. }
  41. func RegisterTask(etaTask *ETATask) {
  42. if etaTask == nil {
  43. panic("不能注册空任务")
  44. }
  45. if _, ok := tasks[etaTask.Name]; ok {
  46. panic("请勿重复创建任务:" + etaTask.Name)
  47. }
  48. tasks[etaTask.Name] = etaTask
  49. }
  50. func (eta *ETATask) Run(_ context.Context) (err error) {
  51. details := TaskDetail{
  52. StartTime: time.Now().Format(time.DateTime),
  53. }
  54. runTask := taskService.TaskDTO{
  55. TaskType: string(eta.Name),
  56. }
  57. //启动任务
  58. taskId, err := taskService.InitAndStartTask(runTask)
  59. details.TaskId = taskId
  60. err = eta.TaskFn.Execute(&details)
  61. details.EndTime = time.Now().Format(time.DateTime)
  62. detailStr, err := json.Marshal(details)
  63. //任务状态
  64. if err != nil {
  65. taskService.UpdateAndFailedTask(taskId, string(detailStr))
  66. return
  67. }
  68. taskService.UpdateAndSuccessTask(taskId, string(detailStr))
  69. return
  70. }