|
@@ -1,31 +1,72 @@
|
|
|
package com.qhtx.eta.domain.facade.impl;
|
|
|
|
|
|
+import com.google.common.collect.Lists;
|
|
|
import com.qhtx.eta.common.annotation.Facade;
|
|
|
+import com.qhtx.eta.common.constant.ETAConstants;
|
|
|
import com.qhtx.eta.common.eunms.DataSourceType;
|
|
|
import com.qhtx.eta.common.eunms.ErrorEnum;
|
|
|
import com.qhtx.eta.common.exception.ETAException;
|
|
|
import com.qhtx.eta.domain.annotation.DomainTransDataSource;
|
|
|
+import com.qhtx.eta.domain.api.ApiServiceHolder;
|
|
|
import com.qhtx.eta.domain.entity.DWIndexDTO;
|
|
|
+import com.qhtx.eta.domain.entity.DWIndexDataDTO;
|
|
|
+import com.qhtx.eta.domain.enums.ETAInterfaceEnum;
|
|
|
import com.qhtx.eta.domain.facade.ETAFacadeService;
|
|
|
import com.qhtx.eta.domain.service.DWIndexFrameService;
|
|
|
+import com.qhtx.eta.domain.service.ETAPushDataService;
|
|
|
+import com.qhtx.eta.infra.entity.dw.TEtaIndexPushTask;
|
|
|
+import com.qhtx.eta.infra.service.TEtaIndexPushTaskService;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
import javax.annotation.Resource;
|
|
|
+import java.text.ParseException;
|
|
|
+import java.text.SimpleDateFormat;
|
|
|
+import java.util.Comparator;
|
|
|
+import java.util.Date;
|
|
|
+import java.util.List;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
+/**
|
|
|
+ * @author mac
|
|
|
+ */
|
|
|
@Service
|
|
|
@Facade
|
|
|
+@Slf4j
|
|
|
public class ETAFacadeServiceImpl implements ETAFacadeService {
|
|
|
|
|
|
@Resource
|
|
|
private DWIndexFrameService dwIndexFrameService;
|
|
|
|
|
|
+ @Resource
|
|
|
+ private ETAPushDataService etaPushDataService;
|
|
|
+ @Resource
|
|
|
+ private ApiServiceHolder apiServiceHolder;
|
|
|
|
|
|
@Override
|
|
|
@DomainTransDataSource(dataSourceType = DataSourceType.DW)
|
|
|
public void pushIndex(String indexCode) {
|
|
|
DWIndexDTO dwIndexDTO = dwIndexFrameService.pushIndexToEta(indexCode);
|
|
|
- if (dwIndexDTO == null) {
|
|
|
- throw new ETAException(ErrorEnum.DW_INDEX_NOT_FOUND);
|
|
|
+ if (dwIndexDTO.getDataList().size() > ETAConstants.ETA_PUSH_DATA_LIMIT) {
|
|
|
+ log.info("数据量超过限制,分批推送");
|
|
|
+ List<DWIndexDataDTO> sortList = dwIndexDTO.getDataList().stream().sorted(Comparator.comparing(DWIndexDataDTO::getDate)).collect(Collectors.toList());
|
|
|
+ Lists.partition(sortList, ETAConstants.ETA_PUSH_DATA_LIMIT).forEach(dataList -> {
|
|
|
+ Date latestDataDate = dataList.stream().map(item -> item.getDate()).max(Comparator.comparing(Date::getTime)).get();
|
|
|
+ //防止数据漏更新,先推送,再更新本地任务记录,下游做好冗余和幂等
|
|
|
+ DWIndexDTO dataDTO = new DWIndexDTO();
|
|
|
+ dataDTO.setIndexCode(dwIndexDTO.getIndexCode());
|
|
|
+ dataDTO.setFrequency(dwIndexDTO.getFrequency());
|
|
|
+ dataDTO.setIndexName(dwIndexDTO.getIndexName());
|
|
|
+ dataDTO.setRemark(dwIndexDTO.getRemark());
|
|
|
+ dataDTO.setSourceName(dwIndexDTO.getSourceName());
|
|
|
+ dataDTO.setDataList(dataList);
|
|
|
+ apiServiceHolder.runApi(ETAInterfaceEnum.PUSH_ETA_INDEX, dataDTO);
|
|
|
+ TEtaIndexPushTask tEtaIndexPushTask = etaPushDataService.getETAPushTaskByIndexCode(dataDTO.getIndexCode());
|
|
|
+ tEtaIndexPushTask.setLatestDataDate(new SimpleDateFormat(ETAConstants.DATE_PATTERN).format(latestDataDate));
|
|
|
+ tEtaIndexPushTask.setDataCount(dataList.size());
|
|
|
+ dataDTO.setDataList(dataList);
|
|
|
+ });
|
|
|
}
|
|
|
+ apiServiceHolder.runApi(ETAInterfaceEnum.PUSH_ETA_INDEX, dwIndexDTO);
|
|
|
}
|
|
|
}
|