소스 검색

自有数据接口

kobe6258 3 달 전
부모
커밋
cc249def1b
19개의 변경된 파일437개의 추가작업 그리고 84개의 파일을 삭제
  1. 1 1
      qhtx-eta-integrator/qhtx-integrator-application/src/main/java/com/qhtx/eta/controller/ETAController.java
  2. 2 1
      qhtx-eta-integrator/qhtx-integrator-common/src/main/java/com/qhtx/eta/common/eunms/ErrorEnum.java
  3. 6 0
      qhtx-eta-integrator/qhtx-integrator-domain/pom.xml
  4. 32 0
      qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/config/RedissonConfig.java
  5. 7 3
      qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/convert/DWIndexDataDTOConverter.java
  6. 4 1
      qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/entity/DWIndexDTO.java
  7. 1 1
      qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/facade/ETAFacadeService.java
  8. 156 50
      qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/facade/impl/ETAFacadeServiceImpl.java
  9. 109 15
      qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/job/ETASyncJob.java
  10. 2 0
      qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/service/DWIndexFrameService.java
  11. 5 0
      qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/service/ETAPushDataService.java
  12. 8 3
      qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/service/impl/DWIndexFrameServiceImpl.java
  13. 45 0
      qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/service/impl/ETAPushDataServiceImpl.java
  14. 5 1
      qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/mapper/TDampDwIndexDataDao.java
  15. 4 1
      qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/service/TDampDwIndexDataService.java
  16. 0 1
      qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/service/impl/IndexFrameServiceImpl.java
  17. 30 1
      qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/service/impl/TDampDwIndexDataServiceImpl.java
  18. 12 5
      qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/service/impl/TEtaIndexPushTaskServiceImpl.java
  19. 8 0
      qhtx-eta-integrator/qhtx-integrator-infra/src/main/resources/mapper/TDampDwIndexDataDao.xml

+ 1 - 1
qhtx-eta-integrator/qhtx-integrator-application/src/main/java/com/qhtx/eta/controller/ETAController.java

@@ -20,7 +20,7 @@ public class ETAController {
 
     @PostMapping("/push")
     public Result<String> getIndex(@RequestBody ETAIndexPushRequest etaIndexPushRequest) {
-        etaFacadeService.pushIndex(etaIndexPushRequest.getIndexCode());
+        etaFacadeService.pushIndex(etaIndexPushRequest.getIndexCode(), true);
         return Result.success("同步成功");
     }
 }

+ 2 - 1
qhtx-eta-integrator/qhtx-integrator-common/src/main/java/com/qhtx/eta/common/eunms/ErrorEnum.java

@@ -9,7 +9,7 @@ public enum ErrorEnum {
      * SYSTEM INNER ERROR CODE
      */
     BEAN_COVERT_TO_MAP_ERROR("00001", "bean转换成map失败"),
-    PARSE_TIME_ERROR("00002","解析时间错误"),
+    PARSE_TIME_ERROR("00002", "解析时间错误"),
     /**
      * SYSTEM ERROR CODE
      */
@@ -42,6 +42,7 @@ public enum ErrorEnum {
     INDEX_AllREADY_ADD("20013", "指标已添加,请勿重复添加"),
     INDEX_UPDATE_ERROR("20013", "更新指标出错,指标异常"),
     PUSH_TO_ETA_ERROR("20014", "ETA数据推送失败"),
+    INDEX_DEAL_LOCK_FAIL("20015", "指标更新锁获取失败"),
     ;
     private String code;
     private String msg;

+ 6 - 0
qhtx-eta-integrator/qhtx-integrator-domain/pom.xml

@@ -55,5 +55,11 @@
             <version>2.4.2</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.redisson</groupId>
+            <artifactId>redisson-spring-boot-starter</artifactId>
+            <version>3.18.0</version>
+        </dependency>
     </dependencies>
 </project>

+ 32 - 0
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/config/RedissonConfig.java

@@ -0,0 +1,32 @@
+package com.qhtx.eta.domain.config;
+
+import com.alibaba.nacos.common.utils.StringUtils;
+import org.redisson.Redisson;
+import org.redisson.api.RLock;
+import org.redisson.api.RedissonClient;
+import org.redisson.config.Config;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class RedissonConfig {
+
+    @Value("${redisson.address}")
+    private String address;
+
+    @Value("${redisson.password}")
+    private String password;
+
+    @Bean
+    public RedissonClient redissonClient() {
+        Config config = new Config();
+        if (StringUtils.isNotBlank(password)) {
+            config.useSingleServer().setAddress(address).setPassword(password);
+        } else {
+            config.useSingleServer().setAddress(address);
+        }
+        return Redisson.create(config);
+    }
+
+}

+ 7 - 3
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/convert/DWIndexDataDTOConverter.java

@@ -17,11 +17,14 @@ public interface DWIndexDataDTOConverter {
 
     DWIndexDataDTOConverter INSTANCE = Mappers.getMapper(DWIndexDataDTOConverter.class);
 
-    @Mapping(source = "dataDate",target = "date")
-    @Mapping(source = "dataValue",target = "value")
+    @Mapping(source = "dataDate", target = "date")
+    @Mapping(source = "dataValue", target = "value")
     DWIndexDataDTO convertToDTO(TDampDwIndexData dwIndexData);
 
 
+    @Mapping(source = "dataDate", target = "date")
+    @Mapping(source = "dataValue", target = "value")
+    List<DWIndexDataDTO> convertToDTOList(List<TDampDwIndexData> dwIndexData);
 
     @Named("dateToString")
     default String dateToString(Date date) {
@@ -30,7 +33,8 @@ public interface DWIndexDataDTOConverter {
     }
 
 
-    @Mapping(source = "date",target = "date",qualifiedByName = "dateToString")
+    @Mapping(source = "date", target = "date", qualifiedByName = "dateToString")
     ETAPushData convertToETAPushData(DWIndexDataDTO dwIndexData);
+
     List<ETAPushData> convertToETAPushDataList(List<DWIndexDataDTO> dwIndexData);
 }

+ 4 - 1
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/entity/DWIndexDTO.java

@@ -4,16 +4,19 @@ import lombok.Data;
 
 import java.io.Serializable;
 import java.util.List;
+import java.util.Date;
 
 @Data
 public class DWIndexDTO implements Serializable {
 
     private static final long serialVersionUID = -2209989045090549430L;
-    
+
     private String indexCode;
     private String indexName;
     private String frequency;
     private String sourceName;
+    private String latestDataDate;
+    private Date lastPushTime;
     private String remark;
     private String unit;
     private List<DWIndexDataDTO> dataList;

+ 1 - 1
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/facade/ETAFacadeService.java

@@ -7,5 +7,5 @@ import java.util.concurrent.ExecutionException;
 
 public interface ETAFacadeService {
 
-    void pushIndex(String indexCode);
+    void pushIndex(String indexCode, boolean init);
 }

+ 156 - 50
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/facade/impl/ETAFacadeServiceImpl.java

@@ -16,12 +16,15 @@ import com.qhtx.eta.domain.service.DWIndexFrameService;
 import com.qhtx.eta.domain.service.ETAPushDataService;
 import com.qhtx.eta.infra.entity.dw.TEtaIndexPushTask;
 import lombok.extern.slf4j.Slf4j;
+import org.redisson.api.RLock;
+import org.redisson.api.RedissonClient;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
 import java.text.SimpleDateFormat;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
@@ -41,62 +44,165 @@ public class ETAFacadeServiceImpl implements ETAFacadeService {
     @Resource
     private ApiServiceHolder apiServiceHolder;
 
+    @Resource
+    private RedissonClient redissonClient;
+
     @Override
     @DomainTransDataSource(dataSourceType = DataSourceType.DW)
-    public void pushIndex(String indexCode) {
-        DWIndexDTO dwIndexDTO = dwIndexFrameService.addSyncIndex(indexCode);
+    public void pushIndex(String indexCode, boolean init) {
+        DWIndexDTO dwIndexDTO;
+        RLock lock = redissonClient.getLock(indexCode);
+        if (init) {
+            dwIndexDTO = dwIndexFrameService.addSyncIndex(indexCode);
 
-        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
-            List<DWIndexDataDTO> dataList = dwIndexFrameService.getIndexData(indexCode);
-            dataList.sort(Comparator.comparingLong(o -> o.getDate().getTime()));
-//            dataList.stream().sorted(Comparator.comparing(DWIndexDataDTO::getDate)).collect(Collectors.toList());
-            dwIndexDTO.setDataList(dataList);
-            //api数据发送
-            if (dwIndexDTO.getDataList().size() > ETAConstants.ETA_PUSH_DATA_LIMIT) {
-                log.info("数据量超过限制,分批推送");
-                Lists.partition(dataList, ETAConstants.ETA_PUSH_DATA_CHUNK).forEach(list -> {
-                    Date latestDataDate = list.stream().map(DWIndexDataDTO::getDate).max(Comparator.comparing(Date::getTime)).get();
-                    //防止数据漏更新,先推送,再更新本地任务记录,下游做好冗余和幂等
-                    DWIndexDTO dataDTO = new DWIndexDTO();
-                    dataDTO.setIndexCode(dwIndexDTO.getIndexCode());
-                    dataDTO.setFrequency(dwIndexDTO.getFrequency());
-                    dataDTO.setIndexName(dwIndexDTO.getIndexName());
-                    dataDTO.setRemark(dwIndexDTO.getRemark());
-                    dataDTO.setSourceName(dwIndexDTO.getSourceName());
-                    dataDTO.setUnit(dwIndexDTO.getUnit());
-                    dataDTO.setDataList(list);
-                      List<?> result = apiServiceHolder.runApi(ETAInterfaceEnum.PUSH_ETA_INDEX, dataDTO);
-                    if (result == null) {
-                        log.error("调用ETA接口失败,停止更新");
-                        throw new ETAException(ErrorEnum.PUSH_TO_ETA_ERROR);
-                    }
-                    try {
-                        Thread.sleep(500);
-                    } catch (InterruptedException e) {
-                        log.warn("线程中断异常");
+            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+                if (lock.tryLock()) {
+                    List<DWIndexDataDTO> dataList = dwIndexFrameService.getIndexData(indexCode);
+                    dataList.sort(Comparator.comparingLong(o -> o.getDate().getTime()));
+                    dwIndexDTO.setDataList(dataList);
+                    //api数据发送
+                    if (dwIndexDTO.getDataList().size() > ETAConstants.ETA_PUSH_DATA_LIMIT) {
+                        log.info("数据量超过限制,分批推送");
+                        Lists.partition(dataList, ETAConstants.ETA_PUSH_DATA_CHUNK).forEach(list -> {
+                            Date latestDataDate = list.stream().map(DWIndexDataDTO::getDate).max(Comparator.comparing(Date::getTime)).get();
+                            //防止数据漏更新,先推送,再更新本地任务记录,下游做好冗余和幂等
+                            DWIndexDTO dataDTO = new DWIndexDTO();
+                            dataDTO.setIndexCode(dwIndexDTO.getIndexCode());
+                            dataDTO.setFrequency(dwIndexDTO.getFrequency());
+                            dataDTO.setIndexName(dwIndexDTO.getIndexName());
+                            dataDTO.setRemark(dwIndexDTO.getRemark());
+                            dataDTO.setSourceName(dwIndexDTO.getSourceName());
+                            dataDTO.setUnit(dwIndexDTO.getUnit());
+                            dataDTO.setDataList(list);
+                            List<?> result = apiServiceHolder.runApi(ETAInterfaceEnum.PUSH_ETA_INDEX, dataDTO);
+                            if (result == null) {
+                                log.error("调用ETA接口失败,停止更新");
+                                throw new ETAException(ErrorEnum.PUSH_TO_ETA_ERROR);
+                            }
+                            try {
+                                Thread.sleep(1000);
+                            } catch (InterruptedException e) {
+                                log.warn("线程中断异常");
+                            }
+                            TEtaIndexPushTask tEtaIndexPushTask = etaPushDataService.getETAPushTaskByIndexCode(dataDTO.getIndexCode());
+                            if (tEtaIndexPushTask == null) {
+                                log.error("未找到对应指标任务记录:{}", dataDTO.getIndexCode());
+                                throw new ETAException(ErrorEnum.INDEX_UPDATE_ERROR);
+                            }
+                            tEtaIndexPushTask.setLatestDataDate(new SimpleDateFormat(ETAConstants.DATE_PATTERN).format(latestDataDate));
+                            int currentCount = tEtaIndexPushTask.getDataCount() == null ? 0 : tEtaIndexPushTask.getDataCount();
+                            tEtaIndexPushTask.setDataCount(currentCount + list.size());
+                            tEtaIndexPushTask.setUpdateTime(new Date());
+                            tEtaIndexPushTask.setLastPushTime(new Date());
+                            etaPushDataService.updateTaskByIndexCode(tEtaIndexPushTask);
+                        });
+                    } else {
+                        apiServiceHolder.runApi(ETAInterfaceEnum.PUSH_ETA_INDEX, dwIndexDTO);
+                        TEtaIndexPushTask tEtaIndexPushTask = etaPushDataService.getETAPushTaskByIndexCode(dwIndexDTO.getIndexCode());
+                        if (tEtaIndexPushTask == null) {
+                            log.error("未找到对应指标任务记录:{}", dwIndexDTO.getIndexCode());
+                            throw new ETAException(ErrorEnum.INDEX_UPDATE_ERROR);
+                        }
+                        Date latestDataDate = dwIndexDTO.getDataList().stream().map(DWIndexDataDTO::getDate).max(Comparator.comparing(Date::getTime)).get();
+                        tEtaIndexPushTask.setLatestDataDate(new SimpleDateFormat(ETAConstants.DATE_PATTERN).format(latestDataDate));
+                        int currentCount = tEtaIndexPushTask.getDataCount() == null ? 0 : tEtaIndexPushTask.getDataCount();
+                        tEtaIndexPushTask.setDataCount(currentCount + dwIndexDTO.getDataList().size());
+                        tEtaIndexPushTask.setUpdateTime(new Date());
+                        tEtaIndexPushTask.setLastPushTime(new Date());
+                        etaPushDataService.updateTaskByIndexCode(tEtaIndexPushTask);
                     }
-                    TEtaIndexPushTask tEtaIndexPushTask = etaPushDataService.getETAPushTaskByIndexCode(dataDTO.getIndexCode());
-                    if (tEtaIndexPushTask == null) {
-                        log.error("未找到对应指标任务记录:{}", dataDTO.getIndexCode());
-                        throw new ETAException(ErrorEnum.INDEX_UPDATE_ERROR);
-                    }
-                    tEtaIndexPushTask.setLatestDataDate(new SimpleDateFormat(ETAConstants.DATE_PATTERN).format(latestDataDate));
-                    int currentCount = tEtaIndexPushTask.getDataCount() == null ? 0 : tEtaIndexPushTask.getDataCount();
-                    tEtaIndexPushTask.setDataCount(currentCount + list.size());
-                    tEtaIndexPushTask.setUpdateTime(new Date());
-                    tEtaIndexPushTask.setLastPushTime(new Date());
-                    etaPushDataService.updateTaskByIndexCode(tEtaIndexPushTask);
-                });
+                } else {
+                    log.info("当前任务正在被处理,{}", indexCode);
+                }
+            }).thenRun(() -> {
+                log.info("同步指标任务表完成");
+                lock.unlock();
+            });
+            future.exceptionally(exception -> {
+                lock.unlock();
+                log.error("同步指标任务表失败:", exception);
+                return null;
+            });
+        } else {
+            dwIndexDTO = dwIndexFrameService.getIndex(indexCode);
+
+            boolean isLock;
+            isLock = lock.tryLock();
+            if (isLock) {
+                try {
+                    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+                        List<DWIndexDataDTO> dataList = dwIndexFrameService.getIndexData(indexCode);
+                        dataList.sort(Comparator.comparingLong(o -> o.getDate().getTime()));
+                        dwIndexDTO.setDataList(dataList);
+                        //api数据发送
+                        if (dwIndexDTO.getDataList().size() > ETAConstants.ETA_PUSH_DATA_LIMIT) {
+                            log.info("数据量超过限制,分批推送");
+                            Lists.partition(dataList, ETAConstants.ETA_PUSH_DATA_CHUNK).forEach(list -> {
+                                Date latestDataDate = list.stream().map(DWIndexDataDTO::getDate).max(Comparator.comparing(Date::getTime)).get();
+                                //防止数据漏更新,先推送,再更新本地任务记录,下游做好冗余和幂等
+                                DWIndexDTO dataDTO = new DWIndexDTO();
+                                dataDTO.setIndexCode(dwIndexDTO.getIndexCode());
+                                dataDTO.setFrequency(dwIndexDTO.getFrequency());
+                                dataDTO.setIndexName(dwIndexDTO.getIndexName());
+                                dataDTO.setRemark(dwIndexDTO.getRemark());
+                                dataDTO.setSourceName(dwIndexDTO.getSourceName());
+                                dataDTO.setUnit(dwIndexDTO.getUnit());
+                                dataDTO.setDataList(list);
+                                List<?> result = apiServiceHolder.runApi(ETAInterfaceEnum.PUSH_ETA_INDEX, dataDTO);
+                                if (result == null) {
+                                    log.error("调用ETA接口失败,停止更新");
+                                    throw new ETAException(ErrorEnum.PUSH_TO_ETA_ERROR);
+                                }
+                                try {
+                                    Thread.sleep(500);
+                                } catch (InterruptedException e) {
+                                    log.warn("线程中断异常");
+                                }
+                                TEtaIndexPushTask tEtaIndexPushTask = etaPushDataService.getETAPushTaskByIndexCode(dataDTO.getIndexCode());
+                                if (tEtaIndexPushTask == null) {
+                                    log.error("未找到对应指标任务记录:{}", dataDTO.getIndexCode());
+                                    throw new ETAException(ErrorEnum.INDEX_UPDATE_ERROR);
+                                }
+                                tEtaIndexPushTask.setLatestDataDate(new SimpleDateFormat(ETAConstants.DATE_PATTERN).format(latestDataDate));
+                                int currentCount = tEtaIndexPushTask.getDataCount() == null ? 0 : tEtaIndexPushTask.getDataCount();
+                                tEtaIndexPushTask.setDataCount(currentCount + list.size());
+                                tEtaIndexPushTask.setUpdateTime(new Date());
+                                tEtaIndexPushTask.setLastPushTime(new Date());
+                                etaPushDataService.updateTaskByIndexCode(tEtaIndexPushTask);
+                            });
+                        } else {
+                            apiServiceHolder.runApi(ETAInterfaceEnum.PUSH_ETA_INDEX, dwIndexDTO);
+                            TEtaIndexPushTask tEtaIndexPushTask = etaPushDataService.getETAPushTaskByIndexCode(dwIndexDTO.getIndexCode());
+                            if (tEtaIndexPushTask == null) {
+                                log.error("未找到对应指标任务记录:{}", dwIndexDTO.getIndexCode());
+                                throw new ETAException(ErrorEnum.INDEX_UPDATE_ERROR);
+                            }
+                            Date latestDataDate = dwIndexDTO.getDataList().stream().map(DWIndexDataDTO::getDate).max(Comparator.comparing(Date::getTime)).get();
+                            tEtaIndexPushTask.setLatestDataDate(new SimpleDateFormat(ETAConstants.DATE_PATTERN).format(latestDataDate));
+                            int currentCount = tEtaIndexPushTask.getDataCount() == null ? 0 : tEtaIndexPushTask.getDataCount();
+                            tEtaIndexPushTask.setDataCount(currentCount + dwIndexDTO.getDataList().size());
+                            tEtaIndexPushTask.setUpdateTime(new Date());
+                            tEtaIndexPushTask.setLastPushTime(new Date());
+                            etaPushDataService.updateTaskByIndexCode(tEtaIndexPushTask);
+                        }
+                    }).thenRun(() -> {
+                        log.info("同步指标任务表完成");
+                    });
+                    future.exceptionally(exception -> {
+                        log.error("同步指标任务表失败:", exception);
+                        return null;
+                    });
+                    future.get();
+                } catch (Exception e) {
+                    throw new ETAException(ErrorEnum.INDEX_DEAL_LOCK_FAIL);
+                } finally {
+                    lock.unlock();
+                }
             } else {
-                apiServiceHolder.runApi(ETAInterfaceEnum.PUSH_ETA_INDEX, dwIndexDTO);
+                log.error("同步指标任务表失败,当前正在被其他任务处理中请稍后再试");
             }
-        }).thenRun(() -> {
-            log.info("同步指标任务表完成");
-        });
-        future.exceptionally(exception -> {
-            log.error("同步指标任务表失败:", exception);
-            return null;
-        });
+        }
+
 
     }
 }

+ 109 - 15
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/job/ETASyncJob.java

@@ -1,6 +1,11 @@
 package com.qhtx.eta.domain.job;
 
+import com.alibaba.nacos.api.utils.StringUtils;
+import com.google.common.collect.Lists;
 import com.qhtx.eta.common.eunms.DataSourceType;
+import com.qhtx.eta.common.eunms.ErrorEnum;
+import com.qhtx.eta.common.exception.ETAException;
+import com.qhtx.eta.domain.annotation.DomainTransDataSource;
 import com.qhtx.eta.domain.config.ScheduleConfig;
 import com.qhtx.eta.domain.config.ScheduleTaskConfig;
 import com.qhtx.eta.domain.convert.ClassifyMappingDTOConverter;
@@ -9,6 +14,7 @@ import com.qhtx.eta.domain.enums.ETADataStatus;
 import com.qhtx.eta.domain.enums.ETAInterfaceEnum;
 import com.qhtx.eta.domain.enums.ETATaskEnum;
 import com.qhtx.eta.domain.enums.ExcludedQuotaSource;
+import com.qhtx.eta.domain.facade.ETAFacadeService;
 import com.qhtx.eta.domain.service.DWIndexFrameService;
 import com.qhtx.eta.domain.service.ETAClassifyService;
 import com.qhtx.eta.domain.service.ETAPushDataService;
@@ -22,17 +28,19 @@ import com.qhtx.eta.infra.annotation.UseDataSource;
 import com.qhtx.eta.infra.entity.dw.TEtaIndexPushTask;
 import com.qhtx.eta.infra.enums.Frequency;
 import lombok.extern.slf4j.Slf4j;
+import org.redisson.api.RLock;
+import org.redisson.api.RedissonClient;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.stereotype.Component;
 import org.springframework.util.CollectionUtils;
 
 import javax.annotation.Resource;
+import java.text.SimpleDateFormat;
 import java.time.LocalDate;
 import java.time.format.DateTimeFormatter;
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.*;
 import java.util.stream.Collectors;
 
 
@@ -67,11 +75,15 @@ public class ETASyncJob implements CommandLineRunner {
     @Resource
     private DWIndexFrameService dwIndexFrameService;
 
-
+    @Resource
+    private ETAFacadeService facadeService;
+    @Resource(name = "IndexDataThreadPool")
+    private ThreadPoolExecutor indexDataThreadPool;
     @Resource
     private ETAPushDataService etaPushDataService;
-//    @Resource(name = "ETAIndexFrameThreadPool")
-//    private ThreadPoolExecutor etaIndexFrameThreadPool;
+
+    @Resource
+    private RedissonClient redissonClient;
 
     private static final Map<Integer, ETATaskEnum> taskMappingMap = new ConcurrentHashMap<>(16);
 
@@ -85,16 +97,98 @@ public class ETASyncJob implements CommandLineRunner {
     public void syncETAPushData() {
         //获取需要更新的数据
         List<DWIndexDTO> pushTaskList = etaPushDataService.getPushIndexTaskList();
-//        if (!CollectionUtils.isEmpty(pushTaskList)) {
-//            log.info("更新任务列表:{}", pushTaskList.size());
-//            pushTaskList.forEach(item -> {
-//                //获取指标数据
-//                List<QuotaDataUpdateBO> dataList = etaPushDataService.getQuotaInfoListForUpdate(item);
-//                if (!CollectionUtils.isEmpty(dataList)) {
-//                }
-//            });
-//        }
-
+        if (!CollectionUtils.isEmpty(pushTaskList)) {
+            log.info("更新任务列表:{}", pushTaskList.size());
+            pushTaskList.forEach(item -> {
+                boolean isLock;
+                RLock rLock = redissonClient.getLock(item.getIndexCode());
+                isLock = rLock.tryLock();
+                if (isLock) {
+                    try {
+                        DWIndexDTO dwIndexDTO = dwIndexFrameService.getIndex(item.getIndexCode());
+                        //如果数据没有直接初始化
+                        if (StringUtils.isBlank(item.getLatestDataDate())) {
+                            log.info("初始化指标:{}", item.getIndexCode());
+                            facadeService.pushIndex(item.getIndexCode(), false);
+                            return;
+                        }
+                        //提交给线程池去完成,每个指标顺序执行不并发。
+                        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+                            //获取指标数据
+                            List<DWIndexDataDTO> dataList = etaPushDataService.getUpdateDataList(item.getIndexCode(), item.getLatestDataDate());
+                            dataList.sort(Comparator.comparingLong(o -> o.getDate().getTime()));
+                            if (!CollectionUtils.isEmpty(dataList)) {
+                                dwIndexDTO.setDataList(dataList);
+                                //api数据发送
+                                if (dataList.size() > ETAConstants.ETA_PUSH_DATA_LIMIT) {
+                                    log.info("数据量超过限制,分批更新推送");
+                                    Lists.partition(dataList, ETAConstants.ETA_PUSH_DATA_CHUNK).forEach(list -> {
+                                        Date latestDataDate = list.stream().map(DWIndexDataDTO::getDate).max(Comparator.comparing(Date::getTime)).get();
+                                        DWIndexDTO dataDTO = new DWIndexDTO();
+                                        dataDTO.setIndexCode(dwIndexDTO.getIndexCode());
+                                        dataDTO.setFrequency(dwIndexDTO.getFrequency());
+                                        dataDTO.setIndexName(dwIndexDTO.getIndexName());
+                                        dataDTO.setRemark(dwIndexDTO.getRemark());
+                                        dataDTO.setSourceName(dwIndexDTO.getSourceName());
+                                        dataDTO.setUnit(dwIndexDTO.getUnit());
+                                        dataDTO.setDataList(list);
+                                        List<?> result = apiServiceHolder.runApi(ETAInterfaceEnum.PUSH_ETA_INDEX, dataDTO);
+                                        if (result == null) {
+                                            log.error("调用ETA接口失败,停止更新");
+                                            throw new ETAException(ErrorEnum.PUSH_TO_ETA_ERROR);
+                                        }
+                                        try {
+                                            Thread.sleep(1000);
+                                        } catch (InterruptedException e) {
+                                            log.warn("线程中断异常");
+                                        }
+                                        TEtaIndexPushTask tEtaIndexPushTask = etaPushDataService.getETAPushTaskByIndexCode(dataDTO.getIndexCode());
+                                        if (tEtaIndexPushTask == null) {
+                                            log.error("未找到对应指标任务记录:{}", dataDTO.getIndexCode());
+                                            throw new ETAException(ErrorEnum.INDEX_UPDATE_ERROR);
+                                        }
+                                        tEtaIndexPushTask.setLatestDataDate(new SimpleDateFormat(ETAConstants.DATE_PATTERN).format(latestDataDate));
+                                        int currentCount = tEtaIndexPushTask.getDataCount() == null ? 0 : tEtaIndexPushTask.getDataCount();
+                                        tEtaIndexPushTask.setDataCount(currentCount + list.size());
+                                        tEtaIndexPushTask.setUpdateTime(new Date());
+                                        tEtaIndexPushTask.setLastPushTime(new Date());
+                                        etaPushDataService.updateTaskByIndexCode(tEtaIndexPushTask);
+                                    });
+                                } else {
+                                    apiServiceHolder.runApi(ETAInterfaceEnum.PUSH_ETA_INDEX, dwIndexDTO);
+                                    TEtaIndexPushTask tEtaIndexPushTask = etaPushDataService.getETAPushTaskByIndexCode(dwIndexDTO.getIndexCode());
+                                    if (tEtaIndexPushTask == null) {
+                                        log.error("未找到对应指标任务记录:{}", dwIndexDTO.getIndexCode());
+                                        throw new ETAException(ErrorEnum.INDEX_UPDATE_ERROR);
+                                    }
+                                    Date latestDataDate = dwIndexDTO.getDataList().stream().map(DWIndexDataDTO::getDate).max(Comparator.comparing(Date::getTime)).get();
+                                    tEtaIndexPushTask.setLatestDataDate(new SimpleDateFormat(ETAConstants.DATE_PATTERN).format(latestDataDate));
+                                    int currentCount = tEtaIndexPushTask.getDataCount() == null ? 0 : tEtaIndexPushTask.getDataCount();
+                                    tEtaIndexPushTask.setDataCount(currentCount + dwIndexDTO.getDataList().size());
+                                    tEtaIndexPushTask.setUpdateTime(new Date());
+                                    tEtaIndexPushTask.setLastPushTime(new Date());
+                                    etaPushDataService.updateTaskByIndexCode(tEtaIndexPushTask);
+                                }
+                            }
+                        }).thenRun(() -> {
+                            log.info("更新指标任务表完成{}", item.getIndexCode());
+                        });
+                        future.exceptionally(exception -> {
+                            log.error("同步指标任务表失败:", exception);
+                            return null;
+                        });
+                        future.get();
+                    } catch (Exception e) {
+                        log.info("更新数据失败", e);
+                    } finally {
+                        rLock.unlock();
+                        log.info("释放锁{}", item.getIndexCode());
+                    }
+                } else {
+                    log.info("未获取到锁,跳过更新{}", item.getIndexCode());
+                }
+            });
+        }
     }
 
     /**

+ 2 - 0
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/service/DWIndexFrameService.java

@@ -19,5 +19,7 @@ public interface DWIndexFrameService {
 
     DWIndexDTO addSyncIndex(String indexCode);
 
+    DWIndexDTO getIndex(String indexCode);
+
     List<DWIndexDataDTO> getIndexData(String indexCode);
 }

+ 5 - 0
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/service/ETAPushDataService.java

@@ -1,6 +1,7 @@
 package com.qhtx.eta.domain.service;
 
 import com.qhtx.eta.domain.entity.DWIndexDTO;
+import com.qhtx.eta.domain.entity.DWIndexDataDTO;
 import com.qhtx.eta.infra.entity.dw.TEtaIndexPushTask;
 
 import java.util.List;
@@ -8,7 +9,11 @@ import java.util.List;
 public interface ETAPushDataService {
     List<DWIndexDTO> getPushIndexTaskList();
 
+    List<DWIndexDataDTO> getUpdateDataList(String indexCode, String beginDate);
+
     TEtaIndexPushTask getETAPushTaskByIndexCode(String indexCode);
 
     void updateTaskByIndexCode(TEtaIndexPushTask tEtaIndexPushTask);
+
+
 }

+ 8 - 3
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/service/impl/DWIndexFrameServiceImpl.java

@@ -130,12 +130,19 @@ public class DWIndexFrameServiceImpl implements DWIndexFrameService {
 
     @Override
     public DWIndex getIndexByIndexCode(String indexCode) {
-
         return indexFrameService.getIndexByIndexCode(indexCode);
     }
 
     @Override
     public DWIndexDTO addSyncIndex(String indexCode) {
+        DWIndexDTO dwIndexDTO = getIndex(indexCode);
+        TEtaIndexPushTask tEtaIndexPushTask = DWIndexDTOConverter.INSTANCE.convertToTEtaIndexPushTask(dwIndexDTO);
+        etaIndexPushTaskService.addTask(tEtaIndexPushTask);
+        return dwIndexDTO;
+    }
+
+    @Override
+    public DWIndexDTO getIndex(String indexCode) {
         DWIndex dwIndex = indexFrameService.getIndexByIndexCode(indexCode);
         if (dwIndex == null) {
             log.warn("指标信息不存在,indexCode:{}", indexCode);
@@ -146,8 +153,6 @@ public class DWIndexFrameServiceImpl implements DWIndexFrameService {
         }
         DWIndexDTO dwIndexDTO = DWIndexDTOConverter.INSTANCE.convertToDTO(dwIndex);
         dwIndexDTO.setRemark("");
-        TEtaIndexPushTask tEtaIndexPushTask = DWIndexDTOConverter.INSTANCE.convertToTEtaIndexPushTask(dwIndexDTO);
-        etaIndexPushTaskService.addTask(tEtaIndexPushTask);
         return dwIndexDTO;
     }
 

+ 45 - 0
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/service/impl/ETAPushDataServiceImpl.java

@@ -1,27 +1,72 @@
 package com.qhtx.eta.domain.service.impl;
 
+import com.qhtx.eta.common.constant.ETAConstants;
+import com.qhtx.eta.common.eunms.ErrorEnum;
+import com.qhtx.eta.common.exception.ETAException;
 import com.qhtx.eta.domain.convert.DWIndexDTOConverter;
 import com.qhtx.eta.domain.convert.DWIndexDataDTOConverter;
 import com.qhtx.eta.domain.entity.DWIndexDTO;
+import com.qhtx.eta.domain.entity.DWIndexDataDTO;
 import com.qhtx.eta.domain.service.ETAPushDataService;
+import com.qhtx.eta.infra.entity.dw.TDampDwIndexData;
 import com.qhtx.eta.infra.entity.dw.TEtaIndexPushTask;
+import com.qhtx.eta.infra.service.TDampDwIndexDataService;
 import com.qhtx.eta.infra.service.TEtaIndexPushTaskService;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.stream.Collectors;
 
 @Service
+@Slf4j
 public class ETAPushDataServiceImpl implements ETAPushDataService {
 
     @Resource
     private TEtaIndexPushTaskService tEtaIndexPushTaskService;
 
+    @Resource
+    private TDampDwIndexDataService tDampDwIndexDataService;
+    @Resource(name = "IndexDataThreadPool")
+    private ThreadPoolExecutor indexDataThreadPool;
+
     @Override
     public List<DWIndexDTO> getPushIndexTaskList() {
         return DWIndexDTOConverter.INSTANCE.convertToDTOList(tEtaIndexPushTaskService.getAllTask());
     }
 
+    @Override
+    public List<DWIndexDataDTO> getUpdateDataList(String indexCode, String beginDate) {
+        int total = tDampDwIndexDataService.countByIndexCodeWithDate(indexCode, beginDate);
+        List<TDampDwIndexData> allResults;
+        //分块取值
+        if (total > ETAConstants.ETA_PUSH_DATA_LIMIT) {
+            int chunk = (total + ETAConstants.ETA_PUSH_DATA_LIMIT - 1) / ETAConstants.ETA_PUSH_DATA_LIMIT;
+            List<CompletableFuture<List<TDampDwIndexData>>> futures = new ArrayList<>();
+            for (int i = 0; i < chunk; i++) {
+                int pars = i;
+                CompletableFuture<List<TDampDwIndexData>> future = CompletableFuture.supplyAsync(() -> {
+                    try {
+                        return tDampDwIndexDataService.queryByDate(indexCode, beginDate, pars * ETAConstants.ETA_PUSH_DATA_LIMIT, ETAConstants.ETA_PUSH_DATA_LIMIT);
+                    } catch (Exception e) {
+                        log.error("获取指标数据失败", e);
+                        throw new ETAException(ErrorEnum.DATASOURCE_EXECUTE_ERROR);
+                    }
+                }, indexDataThreadPool);
+                futures.add(future);
+            }
+            // 等待所有任务完成并合并结果
+            allResults = futures.stream().map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList());
+        } else {
+            allResults = tDampDwIndexDataService.queryByDate(indexCode, beginDate, 0, total);
+        }
+        return allResults.stream().map(DWIndexDataDTOConverter.INSTANCE::convertToDTO).collect(Collectors.toList());
+    }
+
     @Override
     public TEtaIndexPushTask getETAPushTaskByIndexCode(String indexCode) {
         return tEtaIndexPushTaskService.getByIndexCode(indexCode);

+ 5 - 1
qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/mapper/TDampDwIndexDataDao.java

@@ -3,6 +3,7 @@ package com.qhtx.eta.infra.mapper;
 import com.qhtx.eta.infra.entity.dw.TDampDwIndexData;
 import org.apache.ibatis.annotations.Param;
 
+import java.util.Date;
 import java.util.List;
 
 /**
@@ -18,7 +19,7 @@ public interface TDampDwIndexDataDao {
      *
      * @return 实例对象
      */
-    List<TDampDwIndexData> queryAllByLimit(@Param("indexCode") String indexCode,@Param("offset") int offset,@Param("limit") int limit);
+    List<TDampDwIndexData> queryAllByLimit(@Param("indexCode") String indexCode, @Param("offset") int offset, @Param("limit") int limit);
 
     int countByIndexCode(String indexCode);
 
@@ -32,5 +33,8 @@ public interface TDampDwIndexDataDao {
     int insert(TDampDwIndexData tDampDwIndexData);
 
 
+    List<TDampDwIndexData> queryByDate(@Param("indexCode") String indexCode, @Param("begin") Date beginDate, @Param("offset") int offset, @Param("limit") int limit);
+
+    int countByIndexCodeWithDate(@Param("indexCode") String indexCode, @Param("begin") Date begin);
 }
 

+ 4 - 1
qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/service/TDampDwIndexDataService.java

@@ -13,8 +13,11 @@ import java.util.List;
  */
 public interface TDampDwIndexDataService {
 
-    List<TDampDwIndexData> queryAllByLimit(String indexCode, int id, int limit);
+    List<TDampDwIndexData> queryAllByLimit(String indexCode, int offset, int limit);
 
     int countByIndexCode(String indexCode);
 
+    List<TDampDwIndexData> queryByDate(String indexCode, String beginDate, int offset, int limit);
+
+    int countByIndexCodeWithDate(String indexCode, String beginDate);
 }

+ 0 - 1
qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/service/impl/IndexFrameServiceImpl.java

@@ -40,7 +40,6 @@ public class IndexFrameServiceImpl implements IndexFrameService {
     }
 
     @Override
-    @Transactional(rollbackFor = Exception.class)
     public DWIndex getIndexByIndexCode(String indexCode) {
         return dwIndexDao.getByIndexCode(indexCode);
     }

+ 30 - 1
qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/service/impl/TDampDwIndexDataServiceImpl.java

@@ -1,13 +1,19 @@
 package com.qhtx.eta.infra.service.impl;
 
 import com.qhtx.eta.common.eunms.DataSourceType;
+import com.qhtx.eta.common.eunms.ErrorEnum;
+import com.qhtx.eta.common.exception.ETAException;
 import com.qhtx.eta.infra.annotation.UseDataSource;
 import com.qhtx.eta.infra.entity.dw.TDampDwIndexData;
 import com.qhtx.eta.infra.mapper.TDampDwIndexDataDao;
 import com.qhtx.eta.infra.service.TDampDwIndexDataService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.time.DateUtils;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
+import java.text.ParseException;
+import java.util.Date;
 import java.util.List;
 
 /**
@@ -18,13 +24,14 @@ import java.util.List;
  */
 @Service("tDampDwIndexDataService")
 @UseDataSource(dataSourceType = DataSourceType.DW)
+@Slf4j
 public class TDampDwIndexDataServiceImpl implements TDampDwIndexDataService {
     @Resource
     private TDampDwIndexDataDao tDampDwIndexDataDao;
 
     /**
      * @param indexCode
-     * @param id
+     * @param offset
      * @param limit
      * @return
      */
@@ -38,4 +45,26 @@ public class TDampDwIndexDataServiceImpl implements TDampDwIndexDataService {
         return tDampDwIndexDataDao.countByIndexCode(indexCode);
     }
 
+    @Override
+    public List<TDampDwIndexData> queryByDate(String indexCode, String beginDate, int offset, int limit) {
+        try {
+            Date begin = DateUtils.parseDate(beginDate, "yyyy-MM-dd");
+            return tDampDwIndexDataDao.queryByDate(indexCode, begin, offset, limit);
+        } catch (ParseException e) {
+            log.error("日期解析失败", e);
+            throw new ETAException(ErrorEnum.PARSE_TIME_ERROR, "停止更新当前指标" + indexCode);
+        }
+
+    }
+
+    @Override
+    public int countByIndexCodeWithDate(String indexCode, String beginDate) {
+        try {
+            Date begin = DateUtils.parseDate(beginDate, "yyyy-MM-dd");
+            return tDampDwIndexDataDao.countByIndexCodeWithDate(indexCode, begin);
+        } catch (ParseException e) {
+            log.error("日期解析失败", e);
+            throw new ETAException(ErrorEnum.PARSE_TIME_ERROR, "停止更新当前指标" + indexCode);
+        }
+    }
 }

+ 12 - 5
qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/service/impl/TEtaIndexPushTaskServiceImpl.java

@@ -70,10 +70,7 @@ public class TEtaIndexPushTaskServiceImpl implements TEtaIndexPushTaskService {
                 return false;
             }
             Date lastPushTime = item.getLastPushTime();
-            if (needPush(lastPushTime, frequency)) {
-                return true;
-            }
-            return false;
+            return needPush(lastPushTime);
         }).collect(Collectors.toList());
     }
 
@@ -82,7 +79,17 @@ public class TEtaIndexPushTaskServiceImpl implements TEtaIndexPushTaskService {
         return tEtaIndexPushTaskDao.getByIndexCode(indexCode);
     }
 
-    private boolean needPush(Date lastPushTime, Frequency frequency) {
+    private boolean needPush(Date lastPushTime) {
+        if (lastPushTime == null) {
+            return true;
+        }
+        int numberOfPeriods = -1;
+        LocalDate today = LocalDate.now();
+        LocalDate localDate = lastPushTime.toInstant().atZone(ZoneId.of("Asia/Shanghai")).toLocalDate();
+        return localDate.minusDays(numberOfPeriods).compareTo(today) <= 0;
+    }
+
+    private boolean needPushByFrequency(Date lastPushTime, Frequency frequency) {
         if (lastPushTime == null) {
             return true;
         }

+ 8 - 0
qhtx-eta-integrator/qhtx-integrator-infra/src/main/resources/mapper/TDampDwIndexDataDao.xml

@@ -36,5 +36,13 @@
          <![CDATA[<=]]>
         #{limit}
     </select>
+    <select id="countByIndexCodeWithDate" resultType="int">
+        select count(1) from T_DAMP_DW_INDEX_DATA where index_code = #{indexCode} and is_delete=0 and DATA_DATE <![CDATA[>=]]> #{begin}
+    </select>
+    <select id="queryByDate" resultMap="DataMap">
+        select * from ( select * from (select rownum NO, DATA_DATE, DATA_VALUE from T_DAMP_DW_INDEX_DATA where
+            index_code = #{indexCode}  and is_delete =0 and DATA_DATE <![CDATA[>]]> #{begin} ORDER BY NO ASC) where NO <![CDATA[>]]> #{offset}) where ROWNUM
+         <![CDATA[<=]]>  #{limit}
+    </select>
 </mapper>