|
@@ -37,6 +37,15 @@ type PdfResponse struct {
|
|
|
MobilePdfUrl string `json:"MobilePdfUrl"`
|
|
|
MobileJpgUrl string `json:"MobileJpgUrl"`
|
|
|
}
|
|
|
+type BaseResponse struct {
|
|
|
+ Ret int
|
|
|
+ Msg string
|
|
|
+ ErrMsg string
|
|
|
+ ErrCode string
|
|
|
+ Data json.RawMessage
|
|
|
+ Success bool
|
|
|
+ IsSendEmail bool
|
|
|
+}
|
|
|
type ReportPDFService interface {
|
|
|
//RejectCallback 任务拒绝策略
|
|
|
RejectCallback(p interface{})
|
|
@@ -47,10 +56,10 @@ type ReportPDFService interface {
|
|
|
|
|
|
ReleasePool()
|
|
|
|
|
|
- TryTicker() *time.Ticker
|
|
|
+ TryTimer() *time.Timer
|
|
|
}
|
|
|
type ReportPDFHandler struct {
|
|
|
- Ticker *time.Ticker
|
|
|
+ Timer *time.Timer
|
|
|
WorkerPool *ants.Pool
|
|
|
HttpClient *http.Client
|
|
|
}
|
|
@@ -63,8 +72,8 @@ func (d *ReportPDFHandler) RegisterWorkPool(pool *ants.Pool) {
|
|
|
d.WorkerPool = pool
|
|
|
}
|
|
|
|
|
|
-func (d *ReportPDFHandler) TryTicker() *time.Ticker {
|
|
|
- return d.Ticker
|
|
|
+func (d *ReportPDFHandler) TryTimer() *time.Timer {
|
|
|
+ return d.Timer
|
|
|
}
|
|
|
func (d *ReportPDFHandler) SubmitTask(task *report.ReportPdfTask) {
|
|
|
_ = d.WorkerPool.Submit(func() {
|
|
@@ -96,26 +105,31 @@ func (d *ReportPDFHandler) processTask(task *report.ReportPdfTask) {
|
|
|
ob.DetailPdfUrlMobile = pdfMinioURL.MobilePdfUrl
|
|
|
if err = ob.Update([]string{"DetailPdfUrl", "DetailImgUrl", "DetailImgUrlMobile", "DetailPdfUrlMobile"}); err != nil {
|
|
|
utils.FileLog.Info("更新研报失败, Err: \n" + err.Error())
|
|
|
- return
|
|
|
}
|
|
|
} else if task.ReportType == 2 {
|
|
|
err = models.ModifyEnglishReportPdfAndJPGUrl(task.ReportId, pdfMinioURL.PdfUrl, pdfMinioURL.JpgUrl, pdfMinioURL.MobilePdfUrl, pdfMinioURL.MobileJpgUrl)
|
|
|
if err != nil {
|
|
|
utils.FileLog.Info("更新研报失败, Err: \n" + err.Error())
|
|
|
- return
|
|
|
}
|
|
|
} else if task.ReportType == 1 {
|
|
|
err = models.ModifyReportPdfAndJpgUrl(task.ReportId, pdfMinioURL.PdfUrl, pdfMinioURL.JpgUrl, pdfMinioURL.MobilePdfUrl, pdfMinioURL.MobileJpgUrl)
|
|
|
if err != nil {
|
|
|
utils.FileLog.Info("更新研报失败, Err: \n" + err.Error())
|
|
|
- return
|
|
|
}
|
|
|
}
|
|
|
+ task.LastFinishTime = time.Now()
|
|
|
+ if err != nil {
|
|
|
+ task.Message = err.Error()
|
|
|
+ _ = report.FailTask(task)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ _ = report.DoneTask(task)
|
|
|
utils.FileLog.Info("任务 %d 成功更新 pdf &jpeg URL: %s", task.ReportId, pdfMinioURL)
|
|
|
}
|
|
|
|
|
|
// fetchPDFMinioURL 发起 HTTP 请求获取 MinIO 地址(根据实际接口修改)
|
|
|
func fetchPDFMinioURL(task *report.ReportPdfTask) (pdfResponse PdfResponse, err error) {
|
|
|
+
|
|
|
postData, _ := json.Marshal(PdfRequest{
|
|
|
ReportUrl: task.ReportURL,
|
|
|
ReportId: task.ReportId,
|
|
@@ -123,14 +137,14 @@ func fetchPDFMinioURL(task *report.ReportPdfTask) (pdfResponse PdfResponse, err
|
|
|
})
|
|
|
body := io.NopCloser(strings.NewReader(string(postData)))
|
|
|
client := &http.Client{}
|
|
|
- req, err := http.NewRequest("POST", utils.SendWxTemplateMsgUrl, body)
|
|
|
+
|
|
|
+ req, err := http.NewRequest("POST", task.PostUrl, body)
|
|
|
if err != nil {
|
|
|
utils.FileLog.Error("PDF generate http.NewRequest Err:"+err.Error(), 1)
|
|
|
return
|
|
|
}
|
|
|
contentType := "application/json;charset=utf-8"
|
|
|
req.Header.Set("Content-Type", contentType)
|
|
|
- req.Header.Set("Authorization", utils.SendTemplateMsgAuthorization)
|
|
|
resp, err := client.Do(req)
|
|
|
if err != nil {
|
|
|
fmt.Println("PDF generate http client.Do Err:" + err.Error())
|
|
@@ -141,8 +155,16 @@ func fetchPDFMinioURL(task *report.ReportPdfTask) (pdfResponse PdfResponse, err
|
|
|
if err != nil {
|
|
|
return
|
|
|
}
|
|
|
- result := new(PdfResponse)
|
|
|
+ result := new(BaseResponse)
|
|
|
err = json.Unmarshal(b, &result)
|
|
|
+ if err != nil {
|
|
|
+ utils.FileLog.Error("解析应答失败" + err.Error())
|
|
|
+ }
|
|
|
+ if result.Success {
|
|
|
+ err = json.Unmarshal(result.Data, &pdfResponse)
|
|
|
+ } else {
|
|
|
+ err = fmt.Errorf("PDF generate http client.Do Err:" + result.Msg)
|
|
|
+ }
|
|
|
return
|
|
|
}
|
|
|
|
|
@@ -150,7 +172,7 @@ func GetTaskHandlerInstance() (reportHandler ReportPDFService, err error) {
|
|
|
if reportHandler == nil {
|
|
|
reportOnce.Do(func() {
|
|
|
reportHandler = &ReportPDFHandler{
|
|
|
- Ticker: time.NewTicker(taskInterval),
|
|
|
+ Timer: time.NewTimer(taskInterval),
|
|
|
HttpClient: &http.Client{
|
|
|
Timeout: httpTimeout,
|
|
|
},
|
|
@@ -173,10 +195,14 @@ func StartGenerateReportPDF() {
|
|
|
utils.FileLog.Warn("初始化协程池失败: %v", err)
|
|
|
}
|
|
|
defer reportHandler.ReleasePool()
|
|
|
-
|
|
|
+ conf, err := models.GetBusinessConfByKey("RemoteGeneratePdfUrl")
|
|
|
+ if err != nil {
|
|
|
+ utils.FileLog.Error("获取RemoteGeneratePdfUrl失败: %v", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
for {
|
|
|
select {
|
|
|
- case <-reportHandler.TryTicker().C:
|
|
|
+ case <-reportHandler.TryTimer().C:
|
|
|
// 从数据库中获取待处理的任务
|
|
|
tasks, getErr := report.GetPendingAndFailedTasks()
|
|
|
if getErr != nil {
|
|
@@ -190,8 +216,11 @@ func StartGenerateReportPDF() {
|
|
|
|
|
|
// 提交任务到协程池
|
|
|
for _, task := range tasks {
|
|
|
+ task.PostUrl = conf.ConfVal
|
|
|
reportHandler.SubmitTask(task)
|
|
|
}
|
|
|
+ // 重置定时器,等待下一次执行
|
|
|
+ reportHandler.TryTimer().Reset(taskInterval)
|
|
|
}
|
|
|
}
|
|
|
}
|