|
@@ -0,0 +1,198 @@
|
|
|
|
+package com.qhtx.eta.domain.job;
|
|
|
|
+
|
|
|
|
+import com.qhtx.eta.common.utils.AssertUtils;
|
|
|
|
+import com.qhtx.eta.domain.config.ScheduleConfig;
|
|
|
|
+import com.qhtx.eta.domain.config.ScheduleTaskConfig;
|
|
|
|
+import com.qhtx.eta.domain.entity.QuotaDataUpdateBO;
|
|
|
|
+import com.qhtx.eta.domain.entity.QuotaInfoDTO;
|
|
|
|
+import com.qhtx.eta.domain.enums.ETADataStatus;
|
|
|
|
+import com.qhtx.eta.domain.enums.ETAInterfaceEnum;
|
|
|
|
+import com.qhtx.eta.domain.enums.ExcludedQuotaSource;
|
|
|
|
+import com.qhtx.eta.domain.service.ETAQuotaService;
|
|
|
|
+import com.qhtx.eta.domain.api.ApiServiceHolder;
|
|
|
|
+import com.qhtx.eta.domain.api.http.request.ETADataHttpRequest;
|
|
|
|
+import com.qhtx.eta.common.constant.ETAConstants;
|
|
|
|
+import com.qhtx.eta.domain.entity.QuotaDataDTO;
|
|
|
|
+import com.qhtx.eta.domain.task.FixedDelayTaskRegistrar;
|
|
|
|
+import com.qhtx.eta.domain.task.SchedulingRunnable;
|
|
|
|
+import com.qhtx.eta.infra.entity.EtaApiQuotaData;
|
|
|
|
+import com.qhtx.eta.infra.enums.Frequency;
|
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
|
+import org.springframework.boot.CommandLineRunner;
|
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
|
+import org.springframework.util.CollectionUtils;
|
|
|
|
+import org.springframework.util.StringUtils;
|
|
|
|
+
|
|
|
|
+import javax.annotation.Resource;
|
|
|
|
+import java.text.SimpleDateFormat;
|
|
|
|
+import java.time.LocalDate;
|
|
|
|
+import java.time.format.DateTimeFormatter;
|
|
|
|
+import java.util.*;
|
|
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
+
|
|
|
|
+@Component
|
|
|
|
+@Slf4j
|
|
|
|
+public class ETADataSyncJob implements CommandLineRunner {
|
|
|
|
+
|
|
|
|
+ @Value("${eta.http.interval}")
|
|
|
|
+ private int interval;
|
|
|
|
+
|
|
|
|
+ @Value("${eta.http.numberOfPeriods}")
|
|
|
|
+ private int numberOfPeriods;
|
|
|
|
+ @Resource
|
|
|
|
+ private ApiServiceHolder apiServiceHolder;
|
|
|
|
+
|
|
|
|
+ @Resource
|
|
|
|
+ private ETAQuotaService etaQuotaService;
|
|
|
|
+
|
|
|
|
+ @Resource(name = "ETADataThreadPool")
|
|
|
|
+ private ThreadPoolExecutor etaDataThreadPool;
|
|
|
|
+
|
|
|
|
+ @Resource
|
|
|
|
+ private FixedDelayTaskRegistrar fixedDelayTaskRegistrar;
|
|
|
|
+
|
|
|
|
+ @Resource
|
|
|
|
+ private ScheduleTaskConfig scheduleTaskConfig;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 同步指标信息
|
|
|
|
+ */
|
|
|
|
+ public void syncETAInfo() {
|
|
|
|
+ List<?> dataList = apiServiceHolder.runApi(ETAInterfaceEnum.GET_EDB_LIST);
|
|
|
|
+ if (dataList != null) {
|
|
|
|
+ List<QuotaInfoDTO> etaQuotaList = dataList.stream().filter(item -> item instanceof QuotaInfoDTO && Arrays.stream(ExcludedQuotaSource.values()).noneMatch(excludedSource -> ((QuotaInfoDTO) item).getEdbSource().equals(excludedSource.getSourceId()))).map(item -> (QuotaInfoDTO) item).collect(Collectors.toList());
|
|
|
|
+ log.info("================================ 开始同步接口指标列表 ================================ ");
|
|
|
|
+ log.info("================================ 指标接口列表数量:{}条================================ ", etaQuotaList.size());
|
|
|
|
+ etaQuotaService.syncQuota(etaQuotaList);
|
|
|
|
+ log.info("================================ 同步接口指标列表完成 ================================ ");
|
|
|
|
+ } else {
|
|
|
|
+ log.warn("调用接口失败,无数据同步");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 同步指标数据
|
|
|
|
+ */
|
|
|
|
+ public void syncETAData() {
|
|
|
|
+ log.info("{} 开始同步指标数据任务 {}", ETAConstants.LINE_SEPARATOR, ETAConstants.LINE_SEPARATOR);
|
|
|
|
+ Date minDate = null;
|
|
|
|
+ boolean dataUpdated = false;
|
|
|
|
+ QuotaInfoDTO quotaInfoDTO = new QuotaInfoDTO();
|
|
|
|
+ //查询条件暂时不设置
|
|
|
|
+// quotaInfoDTO.setNoUpdate(0);
|
|
|
|
+ List<QuotaDataUpdateBO> updateList = etaQuotaService.getQuotaInfoListForUpdate(quotaInfoDTO).stream().filter(item -> item.getStatus() != ETADataStatus.LATEST).collect(Collectors.toList());
|
|
|
|
+ if (CollectionUtils.isEmpty(updateList)) {
|
|
|
|
+ log.info("{} 当前无同步指标 {}", ETAConstants.LINE_SEPARATOR, ETAConstants.LINE_SEPARATOR);
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ int count = 0;
|
|
|
|
+ for (QuotaDataUpdateBO quotaDataUpdateBO : updateList) {
|
|
|
|
+ ETADataHttpRequest etaDataRequest = new ETADataHttpRequest();
|
|
|
|
+ etaDataRequest.setUniqueCode(quotaDataUpdateBO.getUniqueCode());
|
|
|
|
+ if (quotaDataUpdateBO.getStatus() == ETADataStatus.UPDATE) {
|
|
|
|
+ String startDate = calculatePreviousDate(quotaDataUpdateBO.getCurrentDate(), Frequency.getFrequencyByDesc(quotaDataUpdateBO.getFrequency()), numberOfPeriods);
|
|
|
|
+ etaDataRequest.setStartDate(startDate);
|
|
|
|
+ }
|
|
|
|
+ log.info("================================ 开始同步接口指标明细数据 ================================ ");
|
|
|
|
+ List<?> dataList = apiServiceHolder.runApi(ETAInterfaceEnum.GET_EDB_DATA, etaDataRequest);
|
|
|
|
+ if (CollectionUtils.isEmpty(dataList)) {
|
|
|
|
+ log.info("{} 没有获取到新的指标数据 {}", ETAConstants.LINE_SEPARATOR, ETAConstants.LINE_SEPARATOR);
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ List<QuotaDataDTO> quotaDataList = dataList.stream().filter(item -> item instanceof QuotaDataDTO).map(item -> {
|
|
|
|
+ QuotaDataDTO data = (QuotaDataDTO) item;
|
|
|
|
+ data.setUniqueCode(etaDataRequest.getUniqueCode());
|
|
|
|
+ return data;
|
|
|
|
+ }).collect(Collectors.toList());
|
|
|
|
+// log.info("================================ 开始同步接口指标明细数据 ================================ ");
|
|
|
|
+// if (CollectionUtils.isEmpty(quotaDataList)) {
|
|
|
|
+// log.info("{} 没有获取到新的指标数据 {}", ETAConstants.LINE_SEPARATOR, ETAConstants.LINE_SEPARATOR);
|
|
|
|
+// continue;
|
|
|
|
+// }
|
|
|
|
+ log.info("================================ 指标接口数据[{}]数量:{}条================================ ", quotaDataList.get(0).getUniqueCode(), quotaDataList.size());
|
|
|
|
+ //对指标数据做一个时间排序
|
|
|
|
+ quotaDataList.sort(Comparator.comparing(QuotaDataDTO::getDataTime));
|
|
|
|
+ //取最小的日期
|
|
|
|
+ Date startDate = quotaDataList.get(0).getDataTime();
|
|
|
|
+ minDate = getMinDate(minDate, startDate);
|
|
|
|
+ count += quotaDataList.size();
|
|
|
|
+ Runnable task = () -> etaQuotaService.syncData(quotaDataList);
|
|
|
|
+ etaDataThreadPool.submit(task);
|
|
|
|
+ pause();
|
|
|
|
+ }
|
|
|
|
+ log.info("{} 同步指标数据任务完成 {}", ETAConstants.LINE_SEPARATOR, ETAConstants.LINE_SEPARATOR);
|
|
|
|
+ if (count > 0) {
|
|
|
|
+ long minUpdateTime = 0;
|
|
|
|
+ // 设置为前一天
|
|
|
|
+ if (minDate != null) {
|
|
|
|
+// Calendar calendar = Calendar.getInstance();
|
|
|
|
+// calendar.setTime(minDate);
|
|
|
|
+// calendar.add(Calendar.DAY_OF_MONTH, -1);
|
|
|
|
+ minUpdateTime = minDate.getTime();
|
|
|
|
+ }
|
|
|
|
+ etaQuotaService.LinkedCodeMessage(minUpdateTime);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private Date getMinDate(Date date1, Date date2) {
|
|
|
|
+ if (date2 == null) {
|
|
|
|
+ return date1;
|
|
|
|
+ }
|
|
|
|
+ if (date1 == null) {
|
|
|
|
+ return date2;
|
|
|
|
+ }
|
|
|
|
+ return date1.before(date2) ? date1 : date2;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private String calculatePreviousDate(String startDate, Frequency frequency, int numberOfPeriods) {
|
|
|
|
+ if (frequency == null) {
|
|
|
|
+ frequency = Frequency.DAILY;
|
|
|
|
+ }
|
|
|
|
+ LocalDate localDate = LocalDate.parse(startDate);
|
|
|
|
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern(ETAConstants.DATE_PATTERN);
|
|
|
|
+ switch (frequency) {
|
|
|
|
+ case DAILY:
|
|
|
|
+ return localDate.minusDays(numberOfPeriods).format(formatter);
|
|
|
|
+ case WEEKLY:
|
|
|
|
+ return localDate.minusWeeks(numberOfPeriods).format(formatter);
|
|
|
|
+ case MONTHLY:
|
|
|
|
+ return localDate.minusMonths(numberOfPeriods).format(formatter);
|
|
|
|
+ case YEARLY:
|
|
|
|
+ return localDate.minusYears(numberOfPeriods).format(formatter);
|
|
|
|
+ case SEMI_ANNUAL:
|
|
|
|
+ return localDate.minusMonths(6L * numberOfPeriods).format(formatter);
|
|
|
|
+ case QUARTERLY:
|
|
|
|
+ return localDate.minusMonths(3L * numberOfPeriods).format(formatter);
|
|
|
|
+ case DECADELY:
|
|
|
|
+ return localDate.minusDays(10L * numberOfPeriods).format(formatter);
|
|
|
|
+ default:
|
|
|
|
+ throw new IllegalArgumentException("Unsupported frequency: " + frequency);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void pause() {
|
|
|
|
+ try {
|
|
|
|
+ Thread.sleep(interval);
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
|
+ log.info("网络请求延时异常中断:[{}]", e.getLocalizedMessage());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ //加载系统中的定时任务,目前只有2个就冗余写一下,后续有新任务再抽代码改造
|
|
|
|
+ @Override
|
|
|
|
+ public void run(String... args) {
|
|
|
|
+ ScheduleConfig dataConfig = scheduleTaskConfig.getData();
|
|
|
|
+ ScheduleConfig infoConfig = scheduleTaskConfig.getInfo();
|
|
|
|
+ if (dataConfig.getEnabled()) {
|
|
|
|
+ //同步指标数据任务
|
|
|
|
+ fixedDelayTaskRegistrar.addFixedDelayTask(new SchedulingRunnable(this.getClass().getName(), "syncETAData", this), dataConfig.getInterval(), dataConfig.getInitialDelay());
|
|
|
|
+ }
|
|
|
|
+ if (infoConfig.getEnabled()) {
|
|
|
|
+ //同步主表基本信息任务
|
|
|
|
+ fixedDelayTaskRegistrar.addFixedDelayTask(new SchedulingRunnable(this.getClass().getName(), "syncETAInfo", this), infoConfig.getInterval(), infoConfig.getInitialDelay());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+}
|