Browse Source

自有数据接口

kobe6258 3 months ago
parent
commit
b2ed9f61a1

+ 1 - 0
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/api/http/ETAHttpClientImpl.java

@@ -1,5 +1,6 @@
 package com.qhtx.eta.domain.api.http;
 
+import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.qhtx.eta.domain.api.http.response.eta.ETAHttpResponse;
 import com.qhtx.eta.domain.config.ETAConfigProperties;

+ 4 - 3
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/api/http/response/eta/ETAPushDataResponse.java

@@ -1,15 +1,16 @@
 package com.qhtx.eta.domain.api.http.response.eta;
 
-import com.qhtx.eta.domain.api.http.request.ETAPushData;
+import com.qhtx.eta.domain.convert.DWIndexDTOConverter;
+import com.qhtx.eta.domain.convert.DWIndexDataDTOConverter;
 import com.qhtx.eta.domain.entity.DWIndexDTO;
 
 import java.util.List;
 
-public class ETAPushDataResponse extends ETAHttpResponse<ETAPushData, DWIndexDTO> {
+public class ETAPushDataResponse extends ETAHttpResponse<ETAPushIndex, DWIndexDTO> {
     private static final long serialVersionUID = 3157981069870661780L;
 
     @Override
     public List<DWIndexDTO> convertDataToDTO() {
-        return null;
+        return DWIndexDTOConverter.INSTANCE.convertToResDTOList(this.getData());
     }
 }

+ 22 - 0
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/api/http/response/eta/ETAPushIndex.java

@@ -0,0 +1,22 @@
+package com.qhtx.eta.domain.api.http.response.eta;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+public class ETAPushIndex implements Serializable {
+    private static final long serialVersionUID = -7357872175938006661L;
+    @JsonProperty("IndexCode")
+    private String indexCode;
+    @JsonProperty("IndexName")
+    private String indexName;
+    @JsonProperty("Unit")
+    private String unit;
+    @JsonProperty("Frequency")
+    private String frequency;
+    @JsonProperty("SourceName")
+    private String sourceName;
+
+}

+ 12 - 2
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/convert/DWIndexDTOConverter.java

@@ -1,5 +1,6 @@
 package com.qhtx.eta.domain.convert;
 
+import com.qhtx.eta.domain.api.http.response.eta.ETAPushIndex;
 import com.qhtx.eta.domain.entity.DWIndexDTO;
 import com.qhtx.eta.infra.entity.dw.DWIndex;
 import com.qhtx.eta.infra.entity.dw.TEtaIndexPushTask;
@@ -16,11 +17,20 @@ public interface DWIndexDTOConverter {
 
     DWIndexDTOConverter INSTANCE = Mappers.getMapper(DWIndexDTOConverter.class);
 
-    @Mapping(source = "indexCnName",target = "indexName")
-    @Mapping(source = "units",target = "unit")
+    @Mapping(source = "indexCnName", target = "indexName")
+    @Mapping(source = "units", target = "unit")
     DWIndexDTO convertToDTO(DWIndex dwIndex);
 
     TEtaIndexPushTask convertToTEtaIndexPushTask(DWIndexDTO dwIndexDTO);
 
     List<TEtaIndexPushTask> convertToTEtaIndexPushTaskList(List<DWIndexDTO> dwIndexDTO);
+
+    DWIndexDTO convertToDTO(TEtaIndexPushTask tEtaIndexPushTask);
+
+    List<DWIndexDTO> convertToDTOList(List<TEtaIndexPushTask> tEtaIndexPushTaskList);
+
+
+    DWIndexDTO convertToResDTO(ETAPushIndex etaPushIndex);
+
+    List<DWIndexDTO> convertToResDTOList(List<ETAPushIndex> etaPushIndexList);
 }

+ 7 - 8
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/facade/impl/ETAFacadeServiceImpl.java

@@ -20,9 +20,7 @@ import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
 import java.text.SimpleDateFormat;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
@@ -50,12 +48,13 @@ public class ETAFacadeServiceImpl implements ETAFacadeService {
 
         CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
             List<DWIndexDataDTO> dataList = dwIndexFrameService.getIndexData(indexCode);
+            dataList.sort(Comparator.comparingLong(o -> o.getDate().getTime()));
+//            dataList.stream().sorted(Comparator.comparing(DWIndexDataDTO::getDate)).collect(Collectors.toList());
             dwIndexDTO.setDataList(dataList);
             //api数据发送
             if (dwIndexDTO.getDataList().size() > ETAConstants.ETA_PUSH_DATA_LIMIT) {
                 log.info("数据量超过限制,分批推送");
-                List<DWIndexDataDTO> sortList = dwIndexDTO.getDataList().stream().sorted(Comparator.comparing(DWIndexDataDTO::getDate)).collect(Collectors.toList());
-                Lists.partition(sortList, ETAConstants.ETA_PUSH_DATA_CHUNK).forEach(list -> {
+                Lists.partition(dataList, ETAConstants.ETA_PUSH_DATA_CHUNK).forEach(list -> {
                     Date latestDataDate = list.stream().map(DWIndexDataDTO::getDate).max(Comparator.comparing(Date::getTime)).get();
                     //防止数据漏更新,先推送,再更新本地任务记录,下游做好冗余和幂等
                     DWIndexDTO dataDTO = new DWIndexDTO();
@@ -66,7 +65,7 @@ public class ETAFacadeServiceImpl implements ETAFacadeService {
                     dataDTO.setSourceName(dwIndexDTO.getSourceName());
                     dataDTO.setUnit(dwIndexDTO.getUnit());
                     dataDTO.setDataList(list);
-                    List<?> result = apiServiceHolder.runApi(ETAInterfaceEnum.PUSH_ETA_INDEX, dataDTO);
+                      List<?> result = apiServiceHolder.runApi(ETAInterfaceEnum.PUSH_ETA_INDEX, dataDTO);
                     if (result == null) {
                         log.error("调用ETA接口失败,停止更新");
                         throw new ETAException(ErrorEnum.PUSH_TO_ETA_ERROR);
@@ -92,10 +91,10 @@ public class ETAFacadeServiceImpl implements ETAFacadeService {
                 apiServiceHolder.runApi(ETAInterfaceEnum.PUSH_ETA_INDEX, dwIndexDTO);
             }
         }).thenRun(() -> {
-            log.info("更新指标任务表完成");
+            log.info("同步指标任务表完成");
         });
         future.exceptionally(exception -> {
-            log.error("更新指标任务表失败:", exception);
+            log.error("同步指标任务表失败:", exception);
             return null;
         });
 

+ 14 - 3
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/job/ETASyncJob.java

@@ -82,10 +82,21 @@ public class ETASyncJob implements CommandLineRunner {
     /**
      * 同步指标数据到自有数据源
      */
-    public void syncETAPushData(){
+    public void syncETAPushData() {
         //获取需要更新的数据
-      //  List<ETAPushIndexDTO> pushTaskList = etaPushDataService.getPushIndexTaskList();
+        List<DWIndexDTO> pushTaskList = etaPushDataService.getPushIndexTaskList();
+//        if (!CollectionUtils.isEmpty(pushTaskList)) {
+//            log.info("更新任务列表:{}", pushTaskList.size());
+//            pushTaskList.forEach(item -> {
+//                //获取指标数据
+//                List<QuotaDataUpdateBO> dataList = etaPushDataService.getQuotaInfoListForUpdate(item);
+//                if (!CollectionUtils.isEmpty(dataList)) {
+//                }
+//            });
+//        }
+
     }
+
     /**
      * 同步指标信息
      */
@@ -103,7 +114,7 @@ public class ETASyncJob implements CommandLineRunner {
     }
 
     /**
-     *   同步ETA的指标框架到一期
+     * 同步ETA的指标框架到一期
      */
     public void syncETAClassifyList() {
         List<?> respList = apiServiceHolder.runApi(ETAInterfaceEnum.GET_CLASSIFY_LIST);

+ 2 - 1
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/service/impl/ETAPushDataServiceImpl.java

@@ -1,6 +1,7 @@
 package com.qhtx.eta.domain.service.impl;
 
 import com.qhtx.eta.domain.convert.DWIndexDTOConverter;
+import com.qhtx.eta.domain.convert.DWIndexDataDTOConverter;
 import com.qhtx.eta.domain.entity.DWIndexDTO;
 import com.qhtx.eta.domain.service.ETAPushDataService;
 import com.qhtx.eta.infra.entity.dw.TEtaIndexPushTask;
@@ -18,7 +19,7 @@ public class ETAPushDataServiceImpl implements ETAPushDataService {
 
     @Override
     public List<DWIndexDTO> getPushIndexTaskList() {
-        return null;
+        return DWIndexDTOConverter.INSTANCE.convertToDTOList(tEtaIndexPushTaskService.getAllTask());
     }
 
     @Override

+ 1 - 1
qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/mapper/TDampDwIndexDataDao.java

@@ -18,7 +18,7 @@ public interface TDampDwIndexDataDao {
      *
      * @return 实例对象
      */
-    List<TDampDwIndexData> queryAllByLimit(@Param("indexCode") String indexCode,@Param("id") int id,@Param("limit") int limit);
+    List<TDampDwIndexData> queryAllByLimit(@Param("indexCode") String indexCode,@Param("offset") int offset,@Param("limit") int limit);
 
     int countByIndexCode(String indexCode);
 

+ 2 - 2
qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/service/impl/TDampDwIndexDataServiceImpl.java

@@ -29,8 +29,8 @@ public class TDampDwIndexDataServiceImpl implements TDampDwIndexDataService {
      * @return
      */
     @Override
-    public List<TDampDwIndexData> queryAllByLimit(String indexCode, int id, int limit) {
-        return tDampDwIndexDataDao.queryAllByLimit(indexCode, id, limit);
+    public List<TDampDwIndexData> queryAllByLimit(String indexCode, int offset, int limit) {
+        return tDampDwIndexDataDao.queryAllByLimit(indexCode, offset, limit);
     }
 
     @Override

+ 5 - 2
qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/service/impl/TEtaIndexPushTaskServiceImpl.java

@@ -82,13 +82,16 @@ public class TEtaIndexPushTaskServiceImpl implements TEtaIndexPushTaskService {
         return tEtaIndexPushTaskDao.getByIndexCode(indexCode);
     }
 
-    private boolean needPush(Date startDate, Frequency frequency) {
+    private boolean needPush(Date lastPushTime, Frequency frequency) {
+        if (lastPushTime == null) {
+            return true;
+        }
         int numberOfPeriods = -1;
         if (frequency == null) {
             frequency = Frequency.DAILY;
         }
         LocalDate today = LocalDate.now();
-        LocalDate localDate = startDate.toInstant().atZone(ZoneId.of("Asia/Shanghai")).toLocalDate();
+        LocalDate localDate = lastPushTime.toInstant().atZone(ZoneId.of("Asia/Shanghai")).toLocalDate();
         switch (frequency) {
             case DAILY:
                 return localDate.minusDays(numberOfPeriods).compareTo(today) <= 0;

+ 2 - 2
qhtx-eta-integrator/qhtx-integrator-infra/src/main/resources/mapper/TDampDwIndexDataDao.xml

@@ -27,12 +27,12 @@
     </resultMap>
 
     <select id="countByIndexCode" resultType="int">
-        select count(1) from T_DAMP_DW_INDEX_DATA where index_code = #{indexCode}
+        select count(1) from T_DAMP_DW_INDEX_DATA where index_code = #{indexCode} and is_delete=0
     </select>
 
     <select id="queryAllByLimit" resultMap="DataMap">
         select * from ( select * from (select rownum NO, DATA_DATE, DATA_VALUE from T_DAMP_DW_INDEX_DATA where
-        index_code = #{indexCode} ORDER BY DATA_DATE ASC) where NO <![CDATA[>]]> #{id}) where ROWNUM
+        index_code = #{indexCode}  and is_delete =0 ORDER BY NO ASC) where NO <![CDATA[>]]> #{offset}) where ROWNUM
          <![CDATA[<=]]>
         #{limit}
     </select>

+ 5 - 0
qhtx-eta-integrator/qhtx-integrator-infra/src/main/resources/mapper/TEtaIndexPushTaskDao.xml

@@ -46,5 +46,10 @@
     <select id="getByIndexCode" resultMap="TEtaIndexPushTaskMap">
             SELECT * from T_ETA_INDEX_PUSH_TASK where INDEX_CODE=#{indexCode}
     </select>
+
+
+    <select id="getAllTask" resultMap="TEtaIndexPushTaskMap">
+        SELECT * from T_ETA_INDEX_PUSH_TASK
+    </select>
 </mapper>