Quellcode durchsuchen

ETA同步指标目录

kobe6258 vor 3 Monaten
Ursprung
Commit
b2c7b8a158

+ 20 - 4
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/job/ETASyncJob.java

@@ -2,10 +2,12 @@ package com.qhtx.eta.domain.job;
 
 import com.alibaba.nacos.api.utils.StringUtils;
 import com.google.common.collect.Lists;
+import com.google.common.hash.Hashing;
 import com.qhtx.eta.common.eunms.DataSourceType;
 import com.qhtx.eta.common.eunms.ErrorEnum;
 import com.qhtx.eta.common.exception.ETAException;
 import com.qhtx.eta.common.utils.MD5Utils;
+import com.qhtx.eta.domain.config.RedisConfig;
 import com.qhtx.eta.domain.config.ScheduleConfig;
 import com.qhtx.eta.domain.config.ScheduleTaskConfig;
 import com.qhtx.eta.domain.convert.ClassifyMappingDTOConverter;
@@ -26,6 +28,7 @@ import com.qhtx.eta.common.constant.ETAConstants;
 import com.qhtx.eta.domain.service.impl.TDampLinkService;
 import com.qhtx.eta.domain.task.FixedDelayTaskRegistrar;
 import com.qhtx.eta.domain.task.SchedulingRunnable;
+import com.qhtx.eta.domain.utils.RedisUtils;
 import com.qhtx.eta.infra.datasource.DataSourceContextHolder;
 import com.qhtx.eta.infra.entity.dw.*;
 import com.qhtx.eta.infra.enums.Frequency;
@@ -44,6 +47,7 @@ import org.springframework.util.CollectionUtils;
 
 import javax.annotation.Resource;
 import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
 import java.text.SimpleDateFormat;
 import java.time.DayOfWeek;
 import java.time.LocalDate;
@@ -100,6 +104,8 @@ public class ETASyncJob implements CommandLineRunner {
     @Resource
     private RedissonClient redissonClient;
 
+    @Resource
+    private RedisUtils redisUtils;
     private static final Map<Integer, ETATaskEnum> taskMappingMap = new ConcurrentHashMap<>(16);
 
     public ETATaskEnum getTaskType(int configId) {
@@ -233,7 +239,7 @@ public class ETASyncJob implements CommandLineRunner {
             log.info("================================ 开始同步接口指标分类列表 ================================ ");
             log.info("================================ 接口指标分类列表数量:{}条================================ ", classifyList.size());
             try {
-                // etaClassifyService.syncClassify(classifyList);
+                etaClassifyService.syncClassify(classifyList);
                 List<ClassifyMappingDTO> processingList = etaClassifyService.getProcessingList();
                 if (!CollectionUtils.isEmpty(processingList)) {
                     log.info("processingList size:{}", processingList.size());
@@ -335,9 +341,18 @@ public class ETASyncJob implements CommandLineRunner {
                                 return dwIndexData;
                             }).collect(Collectors.toList());
                             if (CollectionUtils.isEmpty(dwLinkDetailList)) {
-                                log.warn("未找到对应指标数据:{},不同步指标", dwIndexWithLinkCodeDTO.getLinkCode());
                                 continue;
                             }
+                            if (!dwIndexWithLinkCodeDTO.getLinkCode().contains("ETA")) {
+                                log.warn("当前对接码非ETA数据,不做处理{}", dwIndexWithLinkCodeDTO.getLinkCode());
+                                continue;
+                            }
+                            int uniqueId = Integer.parseInt(dwIndexWithLinkCodeDTO.getLinkCode().substring(dwIndexWithLinkCodeDTO.getLinkCode().indexOf("ETA") + 3));
+                            if (redisUtils.getBit(redisUtils.indexSyncKey(), uniqueId)) {
+                                log.warn("对应指标数据:{},已同步,不在进行同步处理", dwIndexWithLinkCodeDTO.getLinkCode());
+                                continue;
+                            }
+                            log.warn("对应指标数据:{},同步指标", dwIndexWithLinkCodeDTO.getLinkCode());
                             DataSourceContextHolder.setDataSourceType(DataSourceType.DW);
                             transactionTemplate.execute(new TransactionCallbackWithoutResult() {
                                 @Override
@@ -347,13 +362,14 @@ public class ETASyncJob implements CommandLineRunner {
                                     dwIndexFrameService.insertIndex(dwIndex);
                                     //更新指标对接码
                                     dwIndexFrameService.insertIndexLink(tDampDwIndexLink);
-                                    dwIndexFrameService.batchInsertIndexData(dwLinkDetailList);
                                     //添加指标数据
+                                    dwIndexFrameService.batchInsertIndexData(dwLinkDetailList);
                                 }
                             });
+
+                            redisUtils.setBit(redisUtils.indexSyncKey(), uniqueId, true);
                         }
                     }
-
                 } catch (Exception e) {
                     log.error("同步指标数据数据失败", e);
                 }

+ 20 - 3
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/service/impl/TDampLinkService.java

@@ -9,7 +9,10 @@ import com.qhtx.eta.domain.entity.LinkDataDTO;
 import com.qhtx.eta.domain.entity.LinkFieldDTO;
 import com.qhtx.eta.domain.entity.LinkInfoDTO;
 import com.qhtx.eta.domain.entity.QuotaInfoDTO;
+import com.qhtx.eta.domain.utils.RedisUtils;
+import com.qhtx.eta.infra.entity.dw.TDampDwIndexLink;
 import com.qhtx.eta.infra.entity.dw.TDampDwLinkInfo;
+import com.qhtx.eta.infra.service.TDampDwIndexLinkService;
 import com.qhtx.eta.infra.service.TDampDwLinkDataService;
 import com.qhtx.eta.infra.service.TDampDwLinkInfoService;
 import lombok.Data;
@@ -38,10 +41,14 @@ public class TDampLinkService {
 
     @Resource
     private TDampDwLinkDataService tdampDwLinkDataService;
+
+    @Resource
+    private TDampDwIndexLinkService tDampDwIndexLinkService;
     // ETA 维表和对接码的映射关系
     private static Map<Integer, LinkFieldDTO> linkFieldMap = new ConcurrentHashMap<>();
 
-
+    @Resource
+    private RedisUtils redisUtils;
     @Resource(name = "IndexSyncThreadPool")
     private ThreadPoolExecutor indexSyncThreadPool;
 
@@ -58,6 +65,13 @@ public class TDampLinkService {
         for (LinkFieldDTO field : linkInfoDTO.getLinkFields()) {
             linkFieldMap.put(field.getFiledType(), field);
         }
+        //设置已经同步的指标
+        List<TDampDwIndexLink> indexLinks = tDampDwIndexLinkService.queryAll();
+        indexLinks.forEach(indexLink -> {
+            int uniqueId = Integer.parseInt(indexLink.getLinkCode().substring(indexLink.getLinkCode().indexOf("ETA") + 3));
+            redisUtils.setBit(redisUtils.indexSyncKey(), uniqueId, true);
+            log.info("对接码:{}对应指标数据已同步,不在进行同步处理,指标编码{}", indexLink.getLinkCode(), indexLink.getIndexCode());
+        });
     }
 
     public List<LinkDataDTO> getLinkDataList() {
@@ -78,11 +92,14 @@ public class TDampLinkService {
                 }, indexSyncThreadPool);
                 futures.add(future);
             }
-            return futures.stream().map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList());
+            List<LinkDataDTO> list = futures.stream().map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList());
+            return list.stream().filter(linkDataDTO -> !redisUtils.getBit(redisUtils.indexSyncKey(), Integer.parseInt(linkDataDTO.getLinkCode().substring(linkDataDTO.getLinkCode().indexOf("ETA") + 3)))).collect(Collectors.toList());
         } else {
-            return LinkDataDTOConverter.INSTANCE.convertEntityToDTOList(tdampDwLinkDataService.queryAll());
+            List<LinkDataDTO> list = LinkDataDTOConverter.INSTANCE.convertEntityToDTOList(tdampDwLinkDataService.queryAll());
+            return list.stream().filter(linkDataDTO -> !redisUtils.getBit(redisUtils.indexSyncKey(), Integer.parseInt(linkDataDTO.getLinkCode().substring(linkDataDTO.getLinkCode().indexOf("ETA") + 3)))).collect(Collectors.toList());
         }
     }
+
     public LinkFieldDTO getLinkField(int filedType) {
         return linkFieldMap.get(filedType);
     }

+ 12 - 0
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/utils/RedisUtils.java

@@ -18,6 +18,10 @@ public class RedisUtils {
 
     private static final String CACHE_KEY_SEPARATOR = ".";
 
+    public String indexSyncKey() {
+        return "eta:index:sync";
+    }
+
     /**
      * 构建缓存key
      */
@@ -92,6 +96,14 @@ public class RedisUtils {
         return redisTemplate.opsForZSet().incrementScore(key, obj, score);
     }
 
+    public void setBit(String key, int offset, boolean flag) {
+        redisTemplate.opsForValue().setBit(key, offset, flag);
+    }
+
+    public boolean getBit(String key, int offset) {
+        return redisTemplate.opsForValue().getBit(key, offset);
+    }
+
     public long add(String key, Long num) {
         return redisTemplate.opsForValue().increment(key, num);
     }

+ 1 - 0
qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/mapper/TDampDwIndexLinkDao.java

@@ -29,5 +29,6 @@ public interface TDampDwIndexLinkDao {
     int insert(@Param("entity") TDampDwIndexLink tDampDwIndexLink);
 
 
+    List<TDampDwIndexLink> queryAll();
 }
 

+ 3 - 0
qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/service/TDampDwIndexLinkService.java

@@ -2,6 +2,8 @@ package com.qhtx.eta.infra.service;
 
 import com.qhtx.eta.infra.entity.dw.TDampDwIndexLink;
 
+import java.util.List;
+
 /**
  * DW层-指标和对接码关系表(TDampDwIndexLink)表服务接口
  *
@@ -19,4 +21,5 @@ public interface TDampDwIndexLinkService {
     TDampDwIndexLink insert(TDampDwIndexLink tDampDwIndexLink);
 
 
+    List<TDampDwIndexLink> queryAll();
 }

+ 5 - 2
qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/service/impl/TDampDwIndexDataServiceImpl.java

@@ -81,10 +81,13 @@ public class TDampDwIndexDataServiceImpl implements TDampDwIndexDataService {
     @Override
     public void batchInsertIndexData(List<TDampDwIndexData> dwLinkDataList) {
         try (SqlSession sqlSession = sqlSessionTemplate.getSqlSessionFactory().openSession(ExecutorType.BATCH, false)) {
-            // tDampDwIndexDataDao.insertList(dwLinkDataList);
-           // TDampDwIndexDataDao tDampDwIndexDataDao = sqlSession.getMapper(TDampDwIndexDataDao.class);
+            int index = 1;
             for (TDampDwIndexData dwLinkData : dwLinkDataList) {
                 tDampDwIndexDataDao.insert(dwLinkData);
+                if (index % ETAConstants.INDEX_SYNC_LIMIT == 0) {
+                    sqlSession.flushStatements();
+                }
+                index++;
             }
             sqlSession.commit(!TransactionSynchronizationManager.isSynchronizationActive());
         } catch (Exception e) {

+ 9 - 0
qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/service/impl/TDampDwIndexLinkServiceImpl.java

@@ -1,11 +1,14 @@
 package com.qhtx.eta.infra.service.impl;
 
+import com.qhtx.eta.common.eunms.DataSourceType;
+import com.qhtx.eta.infra.annotation.UseDataSource;
 import com.qhtx.eta.infra.entity.dw.TDampDwIndexLink;
 import com.qhtx.eta.infra.mapper.TDampDwIndexLinkDao;
 import com.qhtx.eta.infra.service.TDampDwIndexLinkService;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
+import java.util.List;
 
 /**
  * DW层-指标和对接码关系表(TDampDwIndexLink)表服务实现类
@@ -14,6 +17,7 @@ import javax.annotation.Resource;
  * @since 2024-10-16 11:03:42
  */
 @Service("tDampDwIndexLinkService")
+@UseDataSource(dataSourceType = DataSourceType.DW)
 public class TDampDwIndexLinkServiceImpl implements TDampDwIndexLinkService {
     @Resource
     private TDampDwIndexLinkDao tDampDwIndexLinkDao;
@@ -30,4 +34,9 @@ public class TDampDwIndexLinkServiceImpl implements TDampDwIndexLinkService {
         return tDampDwIndexLink;
     }
 
+    @Override
+    public List<TDampDwIndexLink> queryAll() {
+        return tDampDwIndexLinkDao.queryAll();
+    }
+
 }

+ 4 - 1
qhtx-eta-integrator/qhtx-integrator-infra/src/main/resources/mapper/TDampDwIndexLinkDao.xml

@@ -42,6 +42,9 @@
                 #{entity.updateUserName,jdbcType=VARCHAR}, #{entity.htUniqueCode,jdbcType=VARCHAR},
                 #{entity.indexCnName,jdbcType=VARCHAR})
     </insert>
-
+<select id="queryAll" resultMap="TDampDwIndexLinkMap">
+    select INDEX_CODE, LINK_CODE
+    from T_DAMP_DW_INDEX_LINK
+</select>
 </mapper>
 

+ 0 - 1
qhtx-eta-integrator/qhtx-integrator-starter/src/main/java/com/qhtx/eta/init/ETAEnvironmentInit.java

@@ -22,7 +22,6 @@ public class ETAEnvironmentInit implements CommandLineRunner {
     private ETAHttpClient httpClientHandler;
     @Resource
     private TDampLinkService TDampLinkService;
-
     @Override
     public void run(String... args) {
         log.info("============================ 初始化系统API接口 ============================");