|
@@ -2,12 +2,10 @@ package com.qhtx.eta.domain.job;
|
|
|
|
|
|
import com.alibaba.nacos.api.utils.StringUtils;
|
|
|
import com.google.common.collect.Lists;
|
|
|
-import com.google.common.hash.Hashing;
|
|
|
import com.qhtx.eta.common.eunms.DataSourceType;
|
|
|
import com.qhtx.eta.common.eunms.ErrorEnum;
|
|
|
import com.qhtx.eta.common.exception.ETAException;
|
|
|
import com.qhtx.eta.common.utils.MD5Utils;
|
|
|
-import com.qhtx.eta.domain.config.RedisConfig;
|
|
|
import com.qhtx.eta.domain.config.ScheduleConfig;
|
|
|
import com.qhtx.eta.domain.config.ScheduleTaskConfig;
|
|
|
import com.qhtx.eta.domain.convert.ClassifyMappingDTOConverter;
|
|
@@ -18,10 +16,7 @@ import com.qhtx.eta.domain.enums.ETAInterfaceEnum;
|
|
|
import com.qhtx.eta.domain.enums.ETATaskEnum;
|
|
|
import com.qhtx.eta.domain.enums.ExcludedQuotaSource;
|
|
|
import com.qhtx.eta.domain.facade.ETAFacadeService;
|
|
|
-import com.qhtx.eta.domain.service.DWIndexFrameService;
|
|
|
-import com.qhtx.eta.domain.service.ETAClassifyService;
|
|
|
-import com.qhtx.eta.domain.service.ETAPushDataService;
|
|
|
-import com.qhtx.eta.domain.service.ETAQuotaService;
|
|
|
+import com.qhtx.eta.domain.service.*;
|
|
|
import com.qhtx.eta.domain.api.ApiServiceHolder;
|
|
|
import com.qhtx.eta.domain.api.http.request.ETADataHttpRequest;
|
|
|
import com.qhtx.eta.common.constant.ETAConstants;
|
|
@@ -30,6 +25,7 @@ import com.qhtx.eta.domain.task.FixedDelayTaskRegistrar;
|
|
|
import com.qhtx.eta.domain.task.SchedulingRunnable;
|
|
|
import com.qhtx.eta.domain.utils.RedisUtils;
|
|
|
import com.qhtx.eta.infra.datasource.DataSourceContextHolder;
|
|
|
+import com.qhtx.eta.infra.entity.TEtaIndexMapping;
|
|
|
import com.qhtx.eta.infra.entity.dw.*;
|
|
|
import com.qhtx.eta.infra.enums.Frequency;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
@@ -46,8 +42,6 @@ import org.springframework.transaction.support.TransactionTemplate;
|
|
|
import org.springframework.util.CollectionUtils;
|
|
|
|
|
|
import javax.annotation.Resource;
|
|
|
-import java.math.BigDecimal;
|
|
|
-import java.nio.charset.StandardCharsets;
|
|
|
import java.text.SimpleDateFormat;
|
|
|
import java.time.DayOfWeek;
|
|
|
import java.time.LocalDate;
|
|
@@ -101,6 +95,8 @@ public class ETASyncJob implements CommandLineRunner {
|
|
|
@Resource
|
|
|
private ETAPushDataService etaPushDataService;
|
|
|
|
|
|
+ @Resource
|
|
|
+ private ETAIndexService etaIndexService;
|
|
|
@Resource
|
|
|
private TransactionTemplate transactionTemplate;
|
|
|
@Resource
|
|
@@ -221,7 +217,7 @@ public class ETASyncJob implements CommandLineRunner {
|
|
|
public void syncETAInfo() {
|
|
|
List<?> dataList = apiServiceHolder.runApi(ETAInterfaceEnum.GET_EDB_LIST);
|
|
|
if (dataList != null) {
|
|
|
- List<QuotaInfoDTO> etaQuotaList = dataList.stream().filter(item -> item instanceof QuotaInfoDTO && Arrays.stream(ExcludedQuotaSource.values()).noneMatch(excludedSource -> ((QuotaInfoDTO) item).getEdbSource().equals(excludedSource.getSourceId()))).map(item -> (QuotaInfoDTO) item).collect(Collectors.toList());
|
|
|
+ List<QuotaInfoDTO> etaQuotaList = dataList.stream().filter(item -> item instanceof QuotaInfoDTO && ((QuotaInfoDTO) item).getEdbInfoType().equals(0) && Arrays.stream(ExcludedQuotaSource.values()).noneMatch(excludedSource -> ((QuotaInfoDTO) item).getEdbSource().equals(excludedSource.getSourceId()))).map(item -> (QuotaInfoDTO) item).collect(Collectors.toList());
|
|
|
log.info("================================ 开始同步接口指标列表 ================================ ");
|
|
|
log.info("================================ 指标接口列表数量:{}条================================ ", etaQuotaList.size());
|
|
|
etaQuotaService.syncQuota(etaQuotaList);
|
|
@@ -260,6 +256,7 @@ public class ETASyncJob implements CommandLineRunner {
|
|
|
List<QuotaInfoDTO> etaApiQuotaInfoList = QuotaInfoFutures.get();
|
|
|
List<LinkDataDTO> linkDataList = linkDataFutures.get();
|
|
|
List<ClassifyMappingDTO> classiyfList = classifyFutures.get();
|
|
|
+ log.info("对接码数量:{}", linkDataList.size());
|
|
|
if (CollectionUtils.isEmpty(classifyList)) {
|
|
|
log.info("指标目录未同步,停止同步指标信息");
|
|
|
return;
|
|
@@ -273,7 +270,6 @@ public class ETASyncJob implements CommandLineRunner {
|
|
|
QuotaInfoDTO dimensionData = quotaInfoDTO;
|
|
|
dimensionDataMap.put(elementVal, dimensionData);
|
|
|
}
|
|
|
-
|
|
|
// 创建哈希表存储 frameId 和 classify
|
|
|
Map<Integer, ClassifyMappingDTO> classifyMap = new HashMap<>();
|
|
|
for (ClassifyMappingDTO classifyMappingDTO : classiyfList) {
|
|
@@ -282,6 +278,7 @@ public class ETASyncJob implements CommandLineRunner {
|
|
|
classifyMap.put(classifyId, classify);
|
|
|
}
|
|
|
for (LinkDataDTO linkDataDTO : linkDataList) {
|
|
|
+ log.info("开始处理指标信息:{}", linkDataDTO.getLinkCode());
|
|
|
String dateFrom = "1970-01-02";
|
|
|
long startDate = new SimpleDateFormat(ETAConstants.DATE_PATTERN).parse(dateFrom).getTime();
|
|
|
long createTime = new Date().getTime();
|
|
@@ -325,15 +322,17 @@ public class ETASyncJob implements CommandLineRunner {
|
|
|
dwIndexWithLinkCodeDTO.setFrameId(classify.getIndexFrameId());
|
|
|
dwIndexWithLinkCodeList.add(dwIndexWithLinkCodeDTO);
|
|
|
}
|
|
|
+ log.info("开始处理指标信息,当前指标数量为{}", dwIndexWithLinkCodeList.size());
|
|
|
//开始处理指标
|
|
|
for (DWIndexWithLinkCodeDTO dwIndexWithLinkCodeDTO : dwIndexWithLinkCodeList) {
|
|
|
TDampDwIndex dwIndex = DWIndexWithLinkCodeDTOConverter.INSTANCE.convertDTOToEntity(dwIndexWithLinkCodeDTO);
|
|
|
TDampDwIndexLink tDampDwIndexLink = DWIndexWithLinkCodeDTOConverter.INSTANCE.convertDTOToLinkEntity(dwIndexWithLinkCodeDTO);
|
|
|
+ TEtaIndexMapping etaIndexMapping = DWIndexWithLinkCodeDTOConverter.INSTANCE.convertDTOToMappingEntity(dwIndexWithLinkCodeDTO);
|
|
|
//获取对接码数据
|
|
|
List<TDampDwIndexData> dwLinkDetailList = dwIndexFrameService.getlinkDataList(dwIndexWithLinkCodeDTO.getLinkCode()).stream().map(linkData -> {
|
|
|
TDampDwIndexData dwIndexData = new TDampDwIndexData();
|
|
|
dwIndexData.setId(UUID.randomUUID().toString().replace("-", ""));
|
|
|
- dwIndexData.setDataDate(convertDateByFrequency(linkData.getDataDate(), Frequency.getFrequencyByDesc(String.format("%s度",dwIndex.getFrequency()))));
|
|
|
+ dwIndexData.setDataDate(convertDateByFrequency(linkData.getDataDate(), Frequency.getFrequencyByDesc(String.format("%s度", dwIndex.getFrequency()))));
|
|
|
dwIndexData.setDataValue(linkData.getDataValue());
|
|
|
dwIndexData.setDataValueChar(linkData.getDataValueChar());
|
|
|
dwIndexData.setIndexId(dwIndex.getId());
|
|
@@ -358,24 +357,28 @@ public class ETASyncJob implements CommandLineRunner {
|
|
|
int uniqueId = Integer.parseInt(dwIndexWithLinkCodeDTO.getLinkCode().substring(dwIndexWithLinkCodeDTO.getLinkCode().indexOf("ETA") + 3));
|
|
|
if (redisUtils.getBit(redisUtils.indexSyncKey(), uniqueId)) {
|
|
|
log.warn("对应指标数据:{},已同步,不在进行同步处理", dwIndexWithLinkCodeDTO.getLinkCode());
|
|
|
- continue;
|
|
|
+ //执行更新指标信息的操作
|
|
|
+ dwIndexFrameService.updateIndexInfo(dwIndexWithLinkCodeDTO);
|
|
|
+ } else {
|
|
|
+ log.warn("对应指标数据:{},同步指标", dwIndexWithLinkCodeDTO.getLinkCode());
|
|
|
+ DataSourceContextHolder.setDataSourceType(DataSourceType.DW);
|
|
|
+ transactionTemplate.execute(new TransactionCallbackWithoutResult() {
|
|
|
+ @Override
|
|
|
+ @Transactional(rollbackFor = Exception.class)
|
|
|
+ protected void doInTransactionWithoutResult(@NotNull TransactionStatus status) {
|
|
|
+
|
|
|
+ //将指标写入映射表
|
|
|
+ etaIndexService.insert(etaIndexMapping);
|
|
|
+ //现插入指标
|
|
|
+ dwIndexFrameService.insertIndex(dwIndex);
|
|
|
+ //更新指标对接码
|
|
|
+ dwIndexFrameService.insertIndexLink(tDampDwIndexLink);
|
|
|
+ //添加指标数据
|
|
|
+ dwIndexFrameService.batchInsertIndexData(dwLinkDetailList);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ redisUtils.setBit(redisUtils.indexSyncKey(), uniqueId, true);
|
|
|
}
|
|
|
- log.warn("对应指标数据:{},同步指标", dwIndexWithLinkCodeDTO.getLinkCode());
|
|
|
- DataSourceContextHolder.setDataSourceType(DataSourceType.DW);
|
|
|
- transactionTemplate.execute(new TransactionCallbackWithoutResult() {
|
|
|
- @Override
|
|
|
- @Transactional(rollbackFor = Exception.class)
|
|
|
- protected void doInTransactionWithoutResult(@NotNull TransactionStatus status) {
|
|
|
- //现插入指标
|
|
|
- dwIndexFrameService.insertIndex(dwIndex);
|
|
|
- //更新指标对接码
|
|
|
- dwIndexFrameService.insertIndexLink(tDampDwIndexLink);
|
|
|
- //添加指标数据
|
|
|
- dwIndexFrameService.batchInsertIndexData(dwLinkDetailList);
|
|
|
- }
|
|
|
- });
|
|
|
-
|
|
|
- redisUtils.setBit(redisUtils.indexSyncKey(), uniqueId, true);
|
|
|
}
|
|
|
}
|
|
|
} catch (Exception e) {
|