浏览代码

自有数据接口

kobe6258 3 月之前
父节点
当前提交
997018ad5c
共有 29 个文件被更改,包括 238 次插入130 次删除
  1. 1 0
      qhtx-eta-integrator/qhtx-integrator-common/src/main/java/com/qhtx/eta/common/constant/ETAConstants.java
  2. 3 0
      qhtx-eta-integrator/qhtx-integrator-common/src/main/java/com/qhtx/eta/common/eunms/ErrorEnum.java
  3. 15 8
      qhtx-eta-integrator/qhtx-integrator-common/src/main/java/com/qhtx/eta/common/exception/ETAException.java
  4. 1 1
      qhtx-eta-integrator/qhtx-integrator-common/src/main/java/com/qhtx/eta/common/interceptor/RetryInterceptor.java
  5. 5 0
      qhtx-eta-integrator/qhtx-integrator-common/src/main/java/com/qhtx/eta/common/utils/HttpClient.java
  6. 20 7
      qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/api/http/ETAHttpClientImpl.java
  7. 2 2
      qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/api/http/request/ETAPushData.java
  8. 2 1
      qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/api/http/request/ETAPushIndexRequest.java
  9. 5 2
      qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/api/http/response/eta/ETAPushDataResponse.java
  10. 15 7
      qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/api/proxy/ApiParameterProxy.java
  11. 13 2
      qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/api/proxy/impl/ETAPushParameterProxy.java
  12. 3 0
      qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/convert/DWIndexDTOConverter.java
  13. 20 3
      qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/convert/DWIndexDataDTOConverter.java
  14. 1 0
      qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/entity/DWIndexDTO.java
  15. 2 0
      qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/facade/ETAFacadeService.java
  16. 55 24
      qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/facade/impl/ETAFacadeServiceImpl.java
  17. 3 1
      qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/service/DWIndexFrameService.java
  18. 2 0
      qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/service/ETAPushDataService.java
  19. 23 21
      qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/service/impl/DWIndexFrameServiceImpl.java
  20. 5 0
      qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/service/impl/ETAPushDataServiceImpl.java
  21. 1 0
      qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/entity/dw/DWIndex.java
  22. 2 1
      qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/entity/dw/TDampDwIndexData.java
  23. 0 4
      qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/entity/dw/TEtaIndexPushTask.java
  24. 1 1
      qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/mapper/TEtaIndexPushTaskDao.java
  25. 1 1
      qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/service/TEtaIndexPushTaskService.java
  26. 18 8
      qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/service/impl/TEtaIndexPushTaskServiceImpl.java
  27. 2 1
      qhtx-eta-integrator/qhtx-integrator-infra/src/main/resources/mapper/DWIndexDao.xml
  28. 1 1
      qhtx-eta-integrator/qhtx-integrator-infra/src/main/resources/mapper/TDampDwIndexDataDao.xml
  29. 16 34
      qhtx-eta-integrator/qhtx-integrator-infra/src/main/resources/mapper/TEtaIndexPushTaskDao.xml

+ 1 - 0
qhtx-eta-integrator/qhtx-integrator-common/src/main/java/com/qhtx/eta/common/constant/ETAConstants.java

@@ -47,4 +47,5 @@ public class ETAConstants {
     public static final int INDEX_FRAME_CREATE_UPDATE_USER_ID = 0;
 
     public static final int ETA_PUSH_DATA_LIMIT = 1000;
+    public static final int ETA_PUSH_DATA_CHUNK = 100;
 }

+ 3 - 0
qhtx-eta-integrator/qhtx-integrator-common/src/main/java/com/qhtx/eta/common/eunms/ErrorEnum.java

@@ -39,6 +39,9 @@ public enum ErrorEnum {
     DW_INDEX_TABLE_CODE_GET_ERROR("20010", "获取指标框架table_code失败"),
     DW_HZ_ROOT_PATH_NOT_EXIST("20011", "弘则研究指标根目录不存在"),
     DW_INDEX_NOT_FOUND("20012", "DW指标信息不存在"),
+    INDEX_AllREADY_ADD("20013", "指标已添加,请勿重复添加"),
+    INDEX_UPDATE_ERROR("20013", "更新指标出错,指标异常"),
+    PUSH_TO_ETA_ERROR("20014", "ETA数据推送失败"),
     ;
     private String code;
     private String msg;

+ 15 - 8
qhtx-eta-integrator/qhtx-integrator-common/src/main/java/com/qhtx/eta/common/exception/ETAException.java

@@ -22,7 +22,7 @@ public class ETAException extends RuntimeException {
     /**
      * 可选的附加信息,如受影响的实体ID、操作详情等。
      */
-    private Object additionalInfo;
+    private String additionalInfo;
 
     public ETAException(ErrorEnum errorEnum) {
         super(errorEnum.getMsg());
@@ -30,12 +30,26 @@ public class ETAException extends RuntimeException {
         this.errorMessage = errorEnum.getMsg();
     }
 
+    public ETAException(ErrorEnum errorEnum, String additionalInfo) {
+        super(errorEnum.getMsg());
+        this.errorCode = errorEnum.getCode();
+        this.errorMessage = errorEnum.getMsg();
+        this.additionalInfo = additionalInfo;
+    }
+
     public ETAException(String errorCode, String errorMessage) {
         super(errorMessage);
         this.errorCode = errorCode;
         this.errorMessage = errorMessage;
     }
 
+    public ETAException(String errorCode, String errorMessage, String additionalInfo) {
+        super(errorMessage);
+        this.errorCode = errorCode;
+        this.errorMessage = errorMessage;
+        this.additionalInfo = additionalInfo;
+    }
+
     public ETAException(Throwable cause) {
         super(cause);
         this.errorCode = ErrorEnum.SYSTEM_ERROR.getCode();
@@ -48,13 +62,6 @@ public class ETAException extends RuntimeException {
         this.errorMessage = errorMessage;
     }
 
-    public ETAException(String errorCode, String errorMessage, Object additionalInfo) {
-        super(errorMessage);
-        this.errorCode = errorCode;
-        this.errorMessage = errorMessage;
-        this.additionalInfo = additionalInfo;
-    }
-
     // Add other constructors as needed, depending on your requirements.
 
     public String getErrorCode() {

+ 1 - 1
qhtx-eta-integrator/qhtx-integrator-common/src/main/java/com/qhtx/eta/common/interceptor/RetryInterceptor.java

@@ -37,7 +37,7 @@ public class RetryInterceptor implements Interceptor {
             try {
                 response = chain.proceed(request);
                 if (!isSuccessful(response) && shouldRetry(response.code())) {
-                    TimeUnit.MILLISECONDS.sleep(retryInterval);
+                    TimeUnit.MILLISECONDS.sleep((long) retryInterval * retryCount.get());
                     log.warn("尝试重新请求第 " + retryCount + " 次,上次应答为:[{}:{}]", response.code(), response.message());
                     response.close();
                     continue;

+ 5 - 0
qhtx-eta-integrator/qhtx-integrator-common/src/main/java/com/qhtx/eta/common/utils/HttpClient.java

@@ -59,6 +59,11 @@ public class HttpClient {
         return path;
     }
 
+    public RequestBody createRequestBody(Map<String, Object> params) {
+            String jsonStr = JSONObject.toJSONString(params);
+            return RequestBody.create(jsonStr,MediaType.get("application/json"));
+    }
+
     public Headers buildHeader(Map<String, ?> params) {
         Headers.Builder headersBuilder = new Headers.Builder();
         for (Map.Entry<String, ?> entry : params.entrySet()) {

+ 20 - 7
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/api/http/ETAHttpClientImpl.java

@@ -14,6 +14,7 @@ import com.qhtx.eta.common.utils.HttpClient;
 import com.qhtx.eta.common.utils.SignatureUtil;
 import lombok.extern.slf4j.Slf4j;
 import okhttp3.*;
+import okio.Buffer;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.stereotype.Component;
 
@@ -47,18 +48,30 @@ public class ETAHttpClientImpl implements ETAHttpClient {
     }
 
 
-    private Request buildETARequest(ETAInterfaceEnum etaInterface, Map<String, Object> params) {
+    private Request buildETARequest(ETAInterfaceEnum etaInterface, Map<String, Object> params)  {
         String api = etaInterface.getApi();
         String url = properties.getUrl().endsWith("/") ? properties.getUrl().substring(0, properties.getUrl().length() - 1) : properties.getUrl();
         String apiUrl = String.format("%s%s", url, api);
+        //生成Header信息
+        Headers headers = buildETAHeader();
         if (Objects.requireNonNull(etaInterface.getMethod()) == HttpMethod.GET) {
             apiUrl = httpClient.buildGetUrl(apiUrl, params);
+            return new Request.Builder().url(apiUrl).headers(headers).build();
         } else {
-            throw new ETAException(ErrorEnum.HTTP_METHOD_NOT_ALLOWED);
+            RequestBody body = httpClient.createRequestBody(params);
+            Buffer buffer = new Buffer();
+            try {
+                body.writeTo(buffer);
+            }catch(Exception e){
+                log.error("请求eta接口异常",e);
+            }
+            System.out.println(buffer.readUtf8());
+            return new Request.Builder().url(apiUrl).headers(headers).post(body).build();
+//            throw new ETAException(ErrorEnum.HTTP_METHOD_NOT_ALLOWED);
         }
-        //生成Header信息
-        Headers headers = buildETAHeader();
-        return new Request.Builder().url(apiUrl).headers(headers).build();
+
+
+
     }
 
     private Headers buildETAHeader() {
@@ -111,10 +124,10 @@ public class ETAHttpClientImpl implements ETAHttpClient {
             } else {
                 ETAHttpResponse<?, ?> responseData = JSONObject.parseObject(responseBody.string(), etaInterfaceEnum.getResponse());
                 if (200 == responseData.getRet()) {
-                    List<?> quotaInfoDTOs = etaInterfaceEnum.getResponse().cast(responseData).convertDataToDTO();
+                    List<?> dtos = etaInterfaceEnum.getResponse().cast(responseData).convertDataToDTO();
                     responseInner.setSuccess(true);
                     responseInner.setNeedRetry(false);
-                    responseInner.setData(quotaInfoDTOs);
+                    responseInner.setData(dtos);
                 } else {
                     responseInner.setSuccess(false);
                     responseInner.setNeedRetry(false);

+ 2 - 2
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/api/http/response/eta/ETAPushData.java → qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/api/http/request/ETAPushData.java

@@ -1,4 +1,4 @@
-package com.qhtx.eta.domain.api.http.response.eta;
+package com.qhtx.eta.domain.api.http.request;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import lombok.Data;
@@ -11,6 +11,6 @@ public class ETAPushData implements Serializable {
     @JsonProperty("Date")
     private String date;
     @JsonProperty("Value")
-    private String value;
+    private Double value;
 
 }

+ 2 - 1
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/api/http/request/ETAPushIndexRequest.java

@@ -1,7 +1,6 @@
 package com.qhtx.eta.domain.api.http.request;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.qhtx.eta.domain.api.http.response.eta.ETAPushData;
 import lombok.Data;
 
 import java.io.Serializable;
@@ -21,6 +20,8 @@ public class ETAPushIndexRequest implements Serializable {
     private String frequency;
     @JsonProperty("SourceName")
     private String sourceName;
+    @JsonProperty("Remark")
+    private String remark;
     @JsonProperty("DataList")
     private List<ETAPushData> dataList;
 }

+ 5 - 2
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/api/http/response/eta/ETAPushDataResponse.java

@@ -1,12 +1,15 @@
 package com.qhtx.eta.domain.api.http.response.eta;
 
+import com.qhtx.eta.domain.api.http.request.ETAPushData;
+import com.qhtx.eta.domain.entity.DWIndexDTO;
+
 import java.util.List;
 
-public class ETAPushDataResponse extends ETAHttpResponse<ETAPushData, ETAPushIndexDTO> {
+public class ETAPushDataResponse extends ETAHttpResponse<ETAPushData, DWIndexDTO> {
     private static final long serialVersionUID = 3157981069870661780L;
 
     @Override
-    public List<ETAPushIndexDTO> convertDataToDTO() {
+    public List<DWIndexDTO> convertDataToDTO() {
         return null;
     }
 }

+ 15 - 7
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/api/proxy/ApiParameterProxy.java

@@ -1,15 +1,15 @@
 package com.qhtx.eta.domain.api.proxy;
 
 import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.qhtx.eta.domain.enums.ExcludedField;
 import com.qhtx.eta.common.eunms.ErrorEnum;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 
 import java.lang.reflect.Field;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
 
 @Slf4j
 @Data
@@ -48,16 +48,24 @@ public abstract class ApiParameterProxy<T> {
     private Map<String, Object> afterExecute(Object data) {
         Class<?> clazz = data.getClass();
         Field[] fields = clazz.getDeclaredFields();
-        fields = Arrays.stream(fields)
-                .filter(field -> Arrays.stream(ExcludedField.values())
-                        .noneMatch(excludedField -> field.getName().equals(excludedField.getFieldName()))).toArray(Field[]::new);
+        fields = Arrays.stream(fields).filter(field -> Arrays.stream(ExcludedField.values()).noneMatch(excludedField -> field.getName().equals(excludedField.getFieldName()))).toArray(Field[]::new);
         Map<String, Object> params = new HashMap<>(fields.length);
         for (Field field : fields) {
             field.setAccessible(true);
             try {
                 Object obj = field.get(data);
                 if (obj != null) {
-                    params.put(field.getName(), obj);
+                    JsonProperty prop = field.getAnnotation(JsonProperty.class);
+                    String name = prop != null ? (StringUtils.isEmpty(prop.value()) ? field.getName() : prop.value()) : field.getName();
+                    if (obj instanceof List) {
+                        List<Map<String, Object>> paramsList = new ArrayList<>();
+                        for (Object o : (List<?>) obj) {
+                            paramsList.add(afterExecute(o));
+                        }
+                        params.put(name, paramsList);
+                    } else {
+                        params.put(name, obj);
+                    }
                 }
             } catch (IllegalAccessException e) {
                 log.error("{},类:{}-{}", ErrorEnum.BEAN_COVERT_TO_MAP_ERROR.getMsg(), data.getClass().getName(), JSONObject.toJSONString(data));

+ 13 - 2
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/api/proxy/impl/ETAPushParameterProxy.java

@@ -5,6 +5,9 @@ import com.qhtx.eta.common.exception.ETAException;
 import com.qhtx.eta.domain.api.http.request.ETADataHttpRequest;
 import com.qhtx.eta.domain.api.http.request.ETAPushIndexRequest;
 import com.qhtx.eta.domain.api.proxy.ApiParameterProxy;
+import com.qhtx.eta.domain.convert.DWIndexDataDTOConverter;
+import com.qhtx.eta.domain.entity.DWIndexDTO;
+import com.qhtx.eta.domain.entity.DWIndexDataDTO;
 
 /**
  * @author mac
@@ -12,8 +15,16 @@ import com.qhtx.eta.domain.api.proxy.ApiParameterProxy;
 public class ETAPushParameterProxy extends ApiParameterProxy<ETAPushIndexRequest> {
     @Override
     protected ETAPushIndexRequest acceptParam(Object data) {
-        if (data instanceof ETADataHttpRequest) {
-            return (ETAPushIndexRequest) data;
+        if (data instanceof DWIndexDTO) {
+            ETAPushIndexRequest etaDataHttpRequest = new ETAPushIndexRequest();
+            etaDataHttpRequest.setDataList(DWIndexDataDTOConverter.INSTANCE.convertToETAPushDataList(((DWIndexDTO) data).getDataList()));
+            etaDataHttpRequest.setFrequency(((DWIndexDTO) data).getFrequency());
+            etaDataHttpRequest.setSourceName(((DWIndexDTO) data).getSourceName());
+            etaDataHttpRequest.setRemark(((DWIndexDTO) data).getRemark());
+            etaDataHttpRequest.setIndexCode(((DWIndexDTO) data).getIndexCode());
+            etaDataHttpRequest.setIndexName(((DWIndexDTO) data).getIndexName());
+            etaDataHttpRequest.setUnit(((DWIndexDTO) data).getUnit());
+            return etaDataHttpRequest;
         }
         throw new ETAException(ErrorEnum.PARAM_TYPE_ERROR);
     }

+ 3 - 0
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/convert/DWIndexDTOConverter.java

@@ -4,6 +4,7 @@ import com.qhtx.eta.domain.entity.DWIndexDTO;
 import com.qhtx.eta.infra.entity.dw.DWIndex;
 import com.qhtx.eta.infra.entity.dw.TEtaIndexPushTask;
 import org.mapstruct.Mapper;
+import org.mapstruct.Mapping;
 import org.mapstruct.factory.Mappers;
 
 import java.util.List;
@@ -15,6 +16,8 @@ public interface DWIndexDTOConverter {
 
     DWIndexDTOConverter INSTANCE = Mappers.getMapper(DWIndexDTOConverter.class);
 
+    @Mapping(source = "indexCnName",target = "indexName")
+    @Mapping(source = "units",target = "unit")
     DWIndexDTO convertToDTO(DWIndex dwIndex);
 
     TEtaIndexPushTask convertToTEtaIndexPushTask(DWIndexDTO dwIndexDTO);

+ 20 - 3
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/convert/DWIndexDataDTOConverter.java

@@ -1,19 +1,36 @@
 package com.qhtx.eta.domain.convert;
 
-import com.qhtx.eta.domain.entity.DWIndexDTO;
+import com.qhtx.eta.domain.api.http.request.ETAPushData;
 import com.qhtx.eta.domain.entity.DWIndexDataDTO;
-import com.qhtx.eta.infra.entity.dw.DWIndex;
 import com.qhtx.eta.infra.entity.dw.TDampDwIndexData;
-import com.qhtx.eta.infra.entity.dw.TEtaIndexPushTask;
 import org.mapstruct.Mapper;
 import org.mapstruct.Mapping;
+import org.mapstruct.Named;
 import org.mapstruct.factory.Mappers;
 
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+
 @Mapper
 public interface DWIndexDataDTOConverter {
 
     DWIndexDataDTOConverter INSTANCE = Mappers.getMapper(DWIndexDataDTOConverter.class);
 
     @Mapping(source = "dataDate",target = "date")
+    @Mapping(source = "dataValue",target = "value")
     DWIndexDataDTO convertToDTO(TDampDwIndexData dwIndexData);
+
+
+
+    @Named("dateToString")
+    default String dateToString(Date date) {
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
+        return date != null ? sdf.format(date) : null;
+    }
+
+
+    @Mapping(source = "date",target = "date",qualifiedByName = "dateToString")
+    ETAPushData convertToETAPushData(DWIndexDataDTO dwIndexData);
+    List<ETAPushData> convertToETAPushDataList(List<DWIndexDataDTO> dwIndexData);
 }

+ 1 - 0
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/entity/DWIndexDTO.java

@@ -15,5 +15,6 @@ public class DWIndexDTO implements Serializable {
     private String frequency;
     private String sourceName;
     private String remark;
+    private String unit;
     private List<DWIndexDataDTO> dataList;
 }

+ 2 - 0
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/facade/ETAFacadeService.java

@@ -2,6 +2,8 @@ package com.qhtx.eta.domain.facade;
 
 import com.qhtx.eta.common.annotation.Facade;
 
+import java.util.concurrent.ExecutionException;
+
 
 public interface ETAFacadeService {
 

+ 55 - 24
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/facade/impl/ETAFacadeServiceImpl.java

@@ -15,16 +15,16 @@ import com.qhtx.eta.domain.facade.ETAFacadeService;
 import com.qhtx.eta.domain.service.DWIndexFrameService;
 import com.qhtx.eta.domain.service.ETAPushDataService;
 import com.qhtx.eta.infra.entity.dw.TEtaIndexPushTask;
-import com.qhtx.eta.infra.service.TEtaIndexPushTaskService;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
-import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Comparator;
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 /**
@@ -46,27 +46,58 @@ public class ETAFacadeServiceImpl implements ETAFacadeService {
     @Override
     @DomainTransDataSource(dataSourceType = DataSourceType.DW)
     public void pushIndex(String indexCode) {
-        DWIndexDTO dwIndexDTO = dwIndexFrameService.pushIndexToEta(indexCode);
-        if (dwIndexDTO.getDataList().size() > ETAConstants.ETA_PUSH_DATA_LIMIT) {
-            log.info("数据量超过限制,分批推送");
-            List<DWIndexDataDTO> sortList = dwIndexDTO.getDataList().stream().sorted(Comparator.comparing(DWIndexDataDTO::getDate)).collect(Collectors.toList());
-            Lists.partition(sortList, ETAConstants.ETA_PUSH_DATA_LIMIT).forEach(dataList -> {
-                Date latestDataDate = dataList.stream().map(item -> item.getDate()).max(Comparator.comparing(Date::getTime)).get();
-                //防止数据漏更新,先推送,再更新本地任务记录,下游做好冗余和幂等
-                DWIndexDTO dataDTO = new DWIndexDTO();
-                dataDTO.setIndexCode(dwIndexDTO.getIndexCode());
-                dataDTO.setFrequency(dwIndexDTO.getFrequency());
-                dataDTO.setIndexName(dwIndexDTO.getIndexName());
-                dataDTO.setRemark(dwIndexDTO.getRemark());
-                dataDTO.setSourceName(dwIndexDTO.getSourceName());
-                dataDTO.setDataList(dataList);
-                apiServiceHolder.runApi(ETAInterfaceEnum.PUSH_ETA_INDEX, dataDTO);
-                TEtaIndexPushTask tEtaIndexPushTask = etaPushDataService.getETAPushTaskByIndexCode(dataDTO.getIndexCode());
-                tEtaIndexPushTask.setLatestDataDate(new SimpleDateFormat(ETAConstants.DATE_PATTERN).format(latestDataDate));
-                tEtaIndexPushTask.setDataCount(dataList.size());
-                dataDTO.setDataList(dataList);
-            });
-        }
-        apiServiceHolder.runApi(ETAInterfaceEnum.PUSH_ETA_INDEX, dwIndexDTO);
+        DWIndexDTO dwIndexDTO = dwIndexFrameService.addSyncIndex(indexCode);
+
+        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+            List<DWIndexDataDTO> dataList = dwIndexFrameService.getIndexData(indexCode);
+            dwIndexDTO.setDataList(dataList);
+            //api数据发送
+            if (dwIndexDTO.getDataList().size() > ETAConstants.ETA_PUSH_DATA_LIMIT) {
+                log.info("数据量超过限制,分批推送");
+                List<DWIndexDataDTO> sortList = dwIndexDTO.getDataList().stream().sorted(Comparator.comparing(DWIndexDataDTO::getDate)).collect(Collectors.toList());
+                Lists.partition(sortList, ETAConstants.ETA_PUSH_DATA_CHUNK).forEach(list -> {
+                    Date latestDataDate = list.stream().map(DWIndexDataDTO::getDate).max(Comparator.comparing(Date::getTime)).get();
+                    //防止数据漏更新,先推送,再更新本地任务记录,下游做好冗余和幂等
+                    DWIndexDTO dataDTO = new DWIndexDTO();
+                    dataDTO.setIndexCode(dwIndexDTO.getIndexCode());
+                    dataDTO.setFrequency(dwIndexDTO.getFrequency());
+                    dataDTO.setIndexName(dwIndexDTO.getIndexName());
+                    dataDTO.setRemark(dwIndexDTO.getRemark());
+                    dataDTO.setSourceName(dwIndexDTO.getSourceName());
+                    dataDTO.setUnit(dwIndexDTO.getUnit());
+                    dataDTO.setDataList(list);
+                    List<?> result = apiServiceHolder.runApi(ETAInterfaceEnum.PUSH_ETA_INDEX, dataDTO);
+                    if (result == null) {
+                        log.error("调用ETA接口失败,停止更新");
+                        throw new ETAException(ErrorEnum.PUSH_TO_ETA_ERROR);
+                    }
+                    try {
+                        Thread.sleep(500);
+                    } catch (InterruptedException e) {
+                        log.warn("线程中断异常");
+                    }
+                    TEtaIndexPushTask tEtaIndexPushTask = etaPushDataService.getETAPushTaskByIndexCode(dataDTO.getIndexCode());
+                    if (tEtaIndexPushTask == null) {
+                        log.error("未找到对应指标任务记录:{}", dataDTO.getIndexCode());
+                        throw new ETAException(ErrorEnum.INDEX_UPDATE_ERROR);
+                    }
+                    tEtaIndexPushTask.setLatestDataDate(new SimpleDateFormat(ETAConstants.DATE_PATTERN).format(latestDataDate));
+                    int currentCount = tEtaIndexPushTask.getDataCount() == null ? 0 : tEtaIndexPushTask.getDataCount();
+                    tEtaIndexPushTask.setDataCount(currentCount + list.size());
+                    tEtaIndexPushTask.setUpdateTime(new Date());
+                    tEtaIndexPushTask.setLastPushTime(new Date());
+                    etaPushDataService.updateTaskByIndexCode(tEtaIndexPushTask);
+                });
+            } else {
+                apiServiceHolder.runApi(ETAInterfaceEnum.PUSH_ETA_INDEX, dwIndexDTO);
+            }
+        }).thenRun(() -> {
+            log.info("更新指标任务表完成");
+        });
+        future.exceptionally(exception -> {
+            log.error("更新指标任务表失败:", exception);
+            return null;
+        });
+
     }
 }

+ 3 - 1
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/service/DWIndexFrameService.java

@@ -1,6 +1,7 @@
 package com.qhtx.eta.domain.service;
 
 import com.qhtx.eta.domain.entity.DWIndexDTO;
+import com.qhtx.eta.domain.entity.DWIndexDataDTO;
 import com.qhtx.eta.domain.entity.DWIndexFrameDTO;
 import com.qhtx.eta.infra.entity.dw.DWIndex;
 
@@ -16,6 +17,7 @@ public interface DWIndexFrameService {
 
     DWIndex getIndexByIndexCode(String indexCode);
 
-    DWIndexDTO pushIndexToEta(String indexCode);
+    DWIndexDTO addSyncIndex(String indexCode);
 
+    List<DWIndexDataDTO> getIndexData(String indexCode);
 }

+ 2 - 0
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/service/ETAPushDataService.java

@@ -9,4 +9,6 @@ public interface ETAPushDataService {
     List<DWIndexDTO> getPushIndexTaskList();
 
     TEtaIndexPushTask getETAPushTaskByIndexCode(String indexCode);
+
+    void updateTaskByIndexCode(TEtaIndexPushTask tEtaIndexPushTask);
 }

+ 23 - 21
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/service/impl/DWIndexFrameServiceImpl.java

@@ -1,12 +1,9 @@
 package com.qhtx.eta.domain.service.impl;
 
-import com.alibaba.druid.pool.DruidDataSource;
 import com.alibaba.druid.util.StringUtils;
 import com.qhtx.eta.common.constant.ETAConstants;
 import com.qhtx.eta.common.eunms.ErrorEnum;
 import com.qhtx.eta.common.exception.ETAException;
-import com.qhtx.eta.domain.annotation.DomainTransDataSource;
-import com.qhtx.eta.domain.api.http.response.eta.ETAPushData;
 import com.qhtx.eta.domain.convert.DWIndexDTOConverter;
 import com.qhtx.eta.domain.convert.DWIndexDataDTOConverter;
 import com.qhtx.eta.domain.convert.DWIndexFrameDTOConverter;
@@ -32,7 +29,6 @@ import org.springframework.transaction.support.TransactionTemplate;
 import org.springframework.util.CollectionUtils;
 
 import javax.annotation.Resource;
-import java.text.SimpleDateFormat;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -139,11 +135,26 @@ public class DWIndexFrameServiceImpl implements DWIndexFrameService {
     }
 
     @Override
-    @Transactional(rollbackFor = Exception.class)
-    public DWIndexDTO pushIndexToEta(String indexCode) {
+    public DWIndexDTO addSyncIndex(String indexCode) {
         DWIndex dwIndex = indexFrameService.getIndexByIndexCode(indexCode);
+        if (dwIndex == null) {
+            log.warn("指标信息不存在,indexCode:{}", indexCode);
+            throw new ETAException(ErrorEnum.DW_INDEX_NOT_FOUND);
+        }
+        if (StringUtils.isEmpty(dwIndex.getSourceName())) {
+            dwIndex.setSourceName("钢联");
+        }
+        DWIndexDTO dwIndexDTO = DWIndexDTOConverter.INSTANCE.convertToDTO(dwIndex);
+        dwIndexDTO.setRemark("");
+        TEtaIndexPushTask tEtaIndexPushTask = DWIndexDTOConverter.INSTANCE.convertToTEtaIndexPushTask(dwIndexDTO);
+        etaIndexPushTaskService.addTask(tEtaIndexPushTask);
+        return dwIndexDTO;
+    }
+
+    @Override
+    public List<DWIndexDataDTO> getIndexData(String indexCode) {
         int total = tDampDwIndexDataService.countByIndexCode(indexCode);
-        List<TDampDwIndexData> allResults = new ArrayList<>();
+        List<TDampDwIndexData> allResults;
         //分块取值
         if (total > ETAConstants.ETA_PUSH_DATA_LIMIT) {
             int chunk = (total + ETAConstants.ETA_PUSH_DATA_LIMIT - 1) / ETAConstants.ETA_PUSH_DATA_LIMIT;
@@ -154,27 +165,18 @@ public class DWIndexFrameServiceImpl implements DWIndexFrameService {
                     try {
                         return tDampDwIndexDataService.queryAllByLimit(indexCode, pars * ETAConstants.ETA_PUSH_DATA_LIMIT, ETAConstants.ETA_PUSH_DATA_LIMIT);
                     } catch (Exception e) {
-                        log.error("数据推送异常", e);
-                        throw e;
+                        log.error("获取指标数据失败", e);
+                        throw new ETAException(ErrorEnum.DATASOURCE_EXECUTE_ERROR);
                     }
                 }, indexDataThreadPool);
                 futures.add(future);
             }
-            try {
-                // 等待所有任务完成并合并结果
-                allResults = futures.stream().map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList());
-            } catch (Exception e) {
-                log.error("数据推送异常", e);
-            }
+            // 等待所有任务完成并合并结果
+            allResults = futures.stream().map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList());
         } else {
             allResults = tDampDwIndexDataService.queryAllByLimit(indexCode, 0, total);
         }
-        DWIndexDTO dwIndexDTO = DWIndexDTOConverter.INSTANCE.convertToDTO(dwIndex);
-        List<DWIndexDataDTO> dtoList = allResults.stream().map(item -> DWIndexDataDTOConverter.INSTANCE.convertToDTO(item)).collect(Collectors.toList());
-        dwIndexDTO.setDataList(dtoList);
-        TEtaIndexPushTask tEtaIndexPushTask = DWIndexDTOConverter.INSTANCE.convertToTEtaIndexPushTask(dwIndexDTO);
-        etaIndexPushTaskService.insert(tEtaIndexPushTask);
-        return DWIndexDTOConverter.INSTANCE.convertToDTO(dwIndex);
+        return allResults.stream().map(DWIndexDataDTOConverter.INSTANCE::convertToDTO).collect(Collectors.toList());
     }
 
     private List<DWIndexFrameDTO> parseList(FrameNode node) {

+ 5 - 0
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/service/impl/ETAPushDataServiceImpl.java

@@ -25,4 +25,9 @@ public class ETAPushDataServiceImpl implements ETAPushDataService {
     public TEtaIndexPushTask getETAPushTaskByIndexCode(String indexCode) {
         return tEtaIndexPushTaskService.getByIndexCode(indexCode);
     }
+
+    @Override
+    public void updateTaskByIndexCode(TEtaIndexPushTask tEtaIndexPushTask) {
+        tEtaIndexPushTaskService.update(tEtaIndexPushTask);
+    }
 }

+ 1 - 0
qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/entity/dw/DWIndex.java

@@ -10,5 +10,6 @@ public class DWIndex {
     private String sourceName;
     private String indexCode;
     private String frequency;
+    private String units;
     private String places;
 }

+ 2 - 1
qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/entity/dw/TDampDwIndexData.java

@@ -4,6 +4,7 @@ import lombok.Data;
 import lombok.Getter;
 
 import java.io.Serializable;
+import java.math.BigDecimal;
 import java.util.Date;
 
 /**
@@ -37,7 +38,7 @@ public class TDampDwIndexData {
     /**
      * 数据值
      */
-    private Integer dataValue;
+    private BigDecimal dataValue;
     /**
      * 数据值(非数值类型)
      */

+ 0 - 4
qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/entity/dw/TEtaIndexPushTask.java

@@ -22,10 +22,6 @@ public class TEtaIndexPushTask {
 
     private Date updateTime;
 
-    private Date startDate;
-
-    private Date endDate;
-
     private String indexName;
 
     private String unit;

+ 1 - 1
qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/mapper/TEtaIndexPushTaskDao.java

@@ -29,7 +29,7 @@ public interface TEtaIndexPushTaskDao {
      * @param tEtaIndexPushTask 实例对象
      * @return 影响行数
      */
-    void update(TEtaIndexPushTask tEtaIndexPushTask);
+    void update(@Param("entity") TEtaIndexPushTask tEtaIndexPushTask);
 
 
     List<TEtaIndexPushTask> getAllTask();

+ 1 - 1
qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/service/TEtaIndexPushTaskService.java

@@ -21,7 +21,7 @@ public interface TEtaIndexPushTaskService {
      * @param tEtaIndexPushTask 实例对象
      * @return 实例对象
      */
-    TEtaIndexPushTask insert(@Param("entity")TEtaIndexPushTask tEtaIndexPushTask);
+    TEtaIndexPushTask addTask(@Param("entity")TEtaIndexPushTask tEtaIndexPushTask);
 
     /**
      * 修改数据

+ 18 - 8
qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/service/impl/TEtaIndexPushTaskServiceImpl.java

@@ -1,6 +1,10 @@
 package com.qhtx.eta.infra.service.impl;
 
 
+import com.qhtx.eta.common.eunms.DataSourceType;
+import com.qhtx.eta.common.eunms.ErrorEnum;
+import com.qhtx.eta.common.exception.ETAException;
+import com.qhtx.eta.infra.annotation.UseDataSource;
 import com.qhtx.eta.infra.entity.dw.TEtaIndexPushTask;
 import com.qhtx.eta.infra.enums.Frequency;
 import com.qhtx.eta.infra.mapper.TEtaIndexPushTaskDao;
@@ -22,6 +26,7 @@ import java.util.stream.Collectors;
  * @since 2024-09-30 13:43:27
  */
 @Service("tEtaIndexPushTaskService")
+@UseDataSource(dataSourceType = DataSourceType.DW)
 @Slf4j
 public class TEtaIndexPushTaskServiceImpl implements TEtaIndexPushTaskService {
     @Resource
@@ -34,7 +39,12 @@ public class TEtaIndexPushTaskServiceImpl implements TEtaIndexPushTaskService {
      * @return 实例对象
      */
     @Override
-    public TEtaIndexPushTask insert(TEtaIndexPushTask tEtaIndexPushTask) {
+    public TEtaIndexPushTask addTask(TEtaIndexPushTask tEtaIndexPushTask) {
+        TEtaIndexPushTask dbTask = tEtaIndexPushTaskDao.getByIndexCode(tEtaIndexPushTask.getIndexCode());
+        if (dbTask != null) {
+            log.info("指标{}已存在,不新增", tEtaIndexPushTask.getIndexCode());
+            throw new ETAException(ErrorEnum.INDEX_AllREADY_ADD);
+        }
         tEtaIndexPushTaskDao.insert(tEtaIndexPushTask);
         return tEtaIndexPushTask;
     }
@@ -81,19 +91,19 @@ public class TEtaIndexPushTaskServiceImpl implements TEtaIndexPushTaskService {
         LocalDate localDate = startDate.toInstant().atZone(ZoneId.of("Asia/Shanghai")).toLocalDate();
         switch (frequency) {
             case DAILY:
-                 return localDate.minusDays(numberOfPeriods).compareTo(today)<=0;
+                return localDate.minusDays(numberOfPeriods).compareTo(today) <= 0;
             case WEEKLY:
-                return localDate.minusWeeks(numberOfPeriods).compareTo(today)<=0;
+                return localDate.minusWeeks(numberOfPeriods).compareTo(today) <= 0;
             case MONTHLY:
-                return localDate.minusMonths(numberOfPeriods).compareTo(today)<=0;
+                return localDate.minusMonths(numberOfPeriods).compareTo(today) <= 0;
             case YEARLY:
-                return localDate.minusYears(numberOfPeriods).compareTo(today)<=0;
+                return localDate.minusYears(numberOfPeriods).compareTo(today) <= 0;
             case SEMI_ANNUAL:
-                return localDate.minusMonths(6L * numberOfPeriods).compareTo(today)<=0;
+                return localDate.minusMonths(6L * numberOfPeriods).compareTo(today) <= 0;
             case QUARTERLY:
-                return localDate.minusMonths(3L * numberOfPeriods).compareTo(today)<=0;
+                return localDate.minusMonths(3L * numberOfPeriods).compareTo(today) <= 0;
             case DECADELY:
-                return localDate.minusDays(10L * numberOfPeriods).compareTo(today)<=0;
+                return localDate.minusDays(10L * numberOfPeriods).compareTo(today) <= 0;
             default:
                 throw new IllegalArgumentException("Unsupported frequency: " + frequency);
         }

+ 2 - 1
qhtx-eta-integrator/qhtx-integrator-infra/src/main/resources/mapper/DWIndexDao.xml

@@ -10,9 +10,10 @@
         <result property="indexCode" column="INDEX_CODE" jdbcType="VARCHAR"/>
         <result property="frequency" column="FREQUENCY" jdbcType="VARCHAR"/>
         <result property="places" column="PLACES" jdbcType="VARCHAR"/>
+        <result property="units" column="UNITS" jdbcType="VARCHAR"/>
     </resultMap>
 
     <select id="getByIndexCode" resultMap="DWIndexMap">
-        SELECT ID,INDEX_CN_NAME,INDEX_SHORT_NAME,SOURCE_NAME,INDEX_CODE,FREQUENCY,PLACES FROM T_DAMP_DW_INDEX WHERE INDEX_CODE = #{indexCode}
+        SELECT ID,INDEX_CN_NAME,INDEX_SHORT_NAME,SOURCE_NAME,INDEX_CODE,FREQUENCY,PLACES,UNITS FROM T_DAMP_DW_INDEX WHERE INDEX_CODE = #{indexCode}
     </select>
 </mapper>

+ 1 - 1
qhtx-eta-integrator/qhtx-integrator-infra/src/main/resources/mapper/TDampDwIndexDataDao.xml

@@ -8,7 +8,7 @@
         <result property="indexCode" column="INDEX_CODE" jdbcType="VARCHAR"/>
         <result property="dataDate" column="DATA_DATE" jdbcType="TIMESTAMP"/>
         <result property="publishTime" column="PUBLISH_TIME" jdbcType="TIMESTAMP"/>
-        <result property="dataValue" column="DATA_VALUE" jdbcType="INTEGER"/>
+        <result property="dataValue" column="DATA_VALUE" jdbcType="DOUBLE"/>
         <result property="dataValueChar" column="DATA_VALUE_CHAR" jdbcType="VARCHAR"/>
         <result property="remark" column="REMARK" jdbcType="VARCHAR"/>
         <result property="isDelete" column="IS_DELETE" jdbcType="INTEGER"/>

+ 16 - 34
qhtx-eta-integrator/qhtx-integrator-infra/src/main/resources/mapper/TEtaIndexPushTaskDao.xml

@@ -7,9 +7,9 @@
         <result property="indexCode" column="INDEX_CODE" jdbcType="VARCHAR"/>
         <result property="createTime" column="CREATE_TIME" jdbcType="TIMESTAMP"/>
         <result property="updateTime" column="UPDATE_TIME" jdbcType="TIMESTAMP"/>
-        <result property="startDate" column="START_DATE" jdbcType="TIMESTAMP"/>
-        <result property="endDate" column="END_DATE" jdbcType="TIMESTAMP"/>
+        <result property="latestDataDate" column="LATEST_DATA_DATE" jdbcType="VARCHAR"/>
         <result property="indexName" column="INDEX_NAME" jdbcType="VARCHAR"/>
+        <result property="frequency" column="FREQUENCY" jdbcType="VARCHAR"/>
         <result property="unit" column="UNIT" jdbcType="VARCHAR"/>
         <result property="sourceName" column="SOURCE_NAME" jdbcType="VARCHAR"/>
         <result property="lastPushTime" column="LAST_PUSH_TIME" jdbcType="TIMESTAMP"/>
@@ -17,51 +17,33 @@
     </resultMap>
 
     <insert id="insert" keyProperty="id" useGeneratedKeys="false">
-        insert into T_ETA_INDEX_PUSH_TASK (INDEX_CODE, CREATE_TIME, START_DATE, END_DATE, INDEX_NAME, UNIT,
-                                           SOURCE_NAME, LAST_PUSH_TIMEDATA_COUNT)
-        values (#{entity.indexCode}, #{entity.createTime}, #{entity.startDate}, #{entity.endDate}, #{entity.indexName},
-                #{entity.unit}, #{entity.sourceName}, #{entity.lastPushTime}, #{entity.dataCount})
+        insert into T_ETA_INDEX_PUSH_TASK (INDEX_CODE, CREATE_TIME, INDEX_NAME, UNIT,
+                                           SOURCE_NAME, LAST_PUSH_TIME,DATA_COUNT,FREQUENCY,LATEST_DATA_DATE)
+        values (#{entity.indexCode}, #{entity.createTime}, #{entity.indexName},
+                #{entity.unit}, #{entity.sourceName,jdbcType=VARCHAR}, #{entity.lastPushTime, jdbcType=TIMESTAMP}, #{entity.dataCount, jdbcType=INTEGER},#{entity.frequency},#{entity.latestDataDate, jdbcType=TIMESTAMP})
     </insert>
 
     <!--通过主键修改数据-->
     <update id="update">
         update T_ETA_INDEX_PUSH_TASK
         <set>
-            <if test="indexCode != null and indexCode != ''">
-                INDEX_CODE = #{indexCode},
+            <if test="entity.updateTime != null">
+                UPDATE_TIME = #{entity.updateTime},
             </if>
-            <if test="createTime != null">
-                CREATE_TIME = #{createTime},
+            <if test="entity.lastPushTime != null">
+                LAST_PUSH_TIME = #{entity.lastPushTime},
             </if>
-            <if test="updateTime != null">
-                UPDATE_TIME = #{updateTime},
+            <if test="entity.latestDataDate != null">
+                LATEST_DATA_DATE = #{entity.latestDataDate},
             </if>
-            <if test="startDate != null">
-                START_DATE = #{startDate},
-            </if>
-            <if test="endDate != null">
-                END_DATE = #{endDate},
-            </if>
-            <if test="indexName != null and indexName != ''">
-                INDEX_NAME = #{indexName},
-            </if>
-            <if test="unit != null and unit != ''">
-                UNIT = #{unit},
-            </if>
-            <if test="sourceName != null and sourceName != ''">
-                SOURCE_NAME = #{sourceName},
-            </if>
-            <if test="lastPushTime != null">
-                LAST_PUSH_TIME = #{lastPushTime},
-            </if>
-            <if test="dataCount != null and dataCount != ''">
-                DATA_COUNT = #{dataCount},
+            <if test="entity.dataCount != null ">
+                DATA_COUNT = #{entity.dataCount},
             </if>
         </set>
-        where ID = #{id}
+        where INDEX_CODE = #{entity.indexCode}
     </update>
 
-    <select id="getByIndexCode" >
+    <select id="getByIndexCode" resultMap="TEtaIndexPushTaskMap">
             SELECT * from T_ETA_INDEX_PUSH_TASK where INDEX_CODE=#{indexCode}
     </select>
 </mapper>