kobe6258 4 сар өмнө
parent
commit
23f70060be

+ 4 - 2
qhtx-eta-integrator/qhtx-integrator-common/src/main/java/com/qhtx/eta/common/constant/ETAConstants.java

@@ -21,7 +21,7 @@ public class ETAConstants {
     public static final String THREAD_POOL_ETA_DATA = "eta_data";
     public static final String THREAD_POOL_ETA_CLASSIFY = "eta_classify";
     public static final String THREAD_POOL_INDEX_FRAME = "index_frame";
-
+    public static final String THREAD_POOL_INDEX_DATA = "index_data";
     /**
      * date
      */
@@ -40,9 +40,11 @@ public class ETAConstants {
     public static final String DW_INDEX_FRAME_PARENT_NAME = "弘则研究";
 
     //海通DW
-    public static final String REDIS_INDEX_FRAME_TABLE_CODE_KEY="t_damp_dm_index_frame_table_code_incr";
+    public static final String REDIS_INDEX_FRAME_TABLE_CODE_KEY = "t_damp_dm_index_frame_table_code_incr";
     public static final String INDEX_FRAME_TABLE_CODE_PREFIX = "DWZBKJ";
     public static final String INDEX_FRAME_CREATE_UPDATE_USER_NAME = "SYSINI";
 
     public static final int INDEX_FRAME_CREATE_UPDATE_USER_ID = 0;
+
+    public static final int ETA_PUSH_DATA_LIMIT = 1000;
 }

+ 5 - 0
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/config/ThreadPoolConfig.java

@@ -35,4 +35,9 @@ public class ThreadPoolConfig {
     public ThreadPoolExecutor etaIndexFrameThreadPool() {
         return new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(1), new ETAThreadFactory(ETAConstants.THREAD_POOL_INDEX_FRAME), new ThreadPoolExecutor.CallerRunsPolicy());
     }
+
+    @Bean(name = "IndexDataThreadPool")
+    public ThreadPoolExecutor etaIndexDataThreadPool() {
+        return new ThreadPoolExecutor(10, 20, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<>(40), new ETAThreadFactory(ETAConstants.THREAD_POOL_INDEX_DATA), new ThreadPoolExecutor.CallerRunsPolicy());
+    }
 }

+ 44 - 7
qhtx-eta-integrator/qhtx-integrator-domain/src/main/java/com/qhtx/eta/domain/service/impl/DWIndexFrameServiceImpl.java

@@ -13,12 +13,10 @@ import com.qhtx.eta.domain.entity.DWIndexFrameDTO;
 import com.qhtx.eta.domain.service.DWIndexFrameService;
 import com.qhtx.eta.domain.utils.RedisUtils;
 import com.qhtx.eta.infra.datasource.DataSourceContextHolder;
-import com.qhtx.eta.infra.entity.dw.DWIndex;
-import com.qhtx.eta.infra.entity.dw.DWIndexFrame;
-import com.qhtx.eta.infra.entity.dw.ETAClassifyIndexFrameMapping;
-import com.qhtx.eta.infra.entity.dw.TEtaIndexPushTask;
+import com.qhtx.eta.infra.entity.dw.*;
 import com.qhtx.eta.infra.service.EtaApiClassifyService;
 import com.qhtx.eta.infra.service.IndexFrameService;
+import com.qhtx.eta.infra.service.TDampDwIndexDataService;
 import com.qhtx.eta.infra.service.TEtaIndexPushTaskService;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
@@ -35,6 +33,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.stream.Collectors;
 
 import com.qhtx.eta.common.eunms.DataSourceType;
 
@@ -57,6 +58,12 @@ public class DWIndexFrameServiceImpl implements DWIndexFrameService {
     @Resource
     private TEtaIndexPushTaskService etaIndexPushTaskService;
 
+    @Resource
+    private TDampDwIndexDataService tDampDwIndexDataService;
+
+    @Resource(name = "IndexDataThreadPool")
+    private ThreadPoolExecutor indexDataThreadPool;
+
     @Override
     public void syncIndexFrame() {
 
@@ -134,9 +141,39 @@ public class DWIndexFrameServiceImpl implements DWIndexFrameService {
     @Transactional(rollbackFor = Exception.class)
     public DWIndexDTO pushIndexToEta(String indexCode) {
         DWIndex dwIndex = indexFrameService.getIndexByIndexCode(indexCode);
-        DWIndexDTO dwIndexDTO = DWIndexDTOConverter.INSTANCE.convertToDTO(dwIndex);
-        TEtaIndexPushTask tEtaIndexPushTask = DWIndexDTOConverter.INSTANCE.convertToTEtaIndexPushTask(dwIndexDTO);
-        etaIndexPushTaskService.insert(tEtaIndexPushTask);
+        int total = tDampDwIndexDataService.countByIndexCode(indexCode);
+        List<TDampDwIndexData> allResults = new ArrayList<>();
+        //分块取值
+        if (total > ETAConstants.ETA_PUSH_DATA_LIMIT) {
+            int chunk = (total + ETAConstants.ETA_PUSH_DATA_LIMIT - 1) / ETAConstants.ETA_PUSH_DATA_LIMIT;
+            List<CompletableFuture<List<TDampDwIndexData>>> futures = new ArrayList<>();
+            for (int i = 0; i < chunk; i++) {
+                int pars = i;
+                System.out.println(pars);
+                CompletableFuture<List<TDampDwIndexData>> future = CompletableFuture.supplyAsync(() -> {
+                    try {
+                        return tDampDwIndexDataService.queryAllByLimit(indexCode, pars * ETAConstants.ETA_PUSH_DATA_LIMIT, ETAConstants.ETA_PUSH_DATA_LIMIT);
+                    } catch (Exception e) {
+                        log.error("数据推送异常", e);
+                        throw e;
+                    }
+                }, indexDataThreadPool);
+                futures.add(future);
+            }
+            try {
+                // 等待所有任务完成并合并结果
+                allResults = futures.stream().map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList());
+            } catch (Exception e) {
+                log.error("数据推送异常", e);
+            }
+        } else {
+            return DWIndexDTOConverter.INSTANCE.convertToDTO(dwIndex);
+        }
+        System.out.println("allResults size:" + allResults.size());
+//        List<TDampDwIndexData> dataList = tDampDwIndexDataService.queryAllByLimit(indexCode, 0, 500);
+//        DWIndexDTO dwIndexDTO = DWIndexDTOConverter.INSTANCE.convertToDTO(dwIndex);
+//        TEtaIndexPushTask tEtaIndexPushTask = DWIndexDTOConverter.INSTANCE.convertToTEtaIndexPushTask(dwIndexDTO);
+//        etaIndexPushTaskService.insert(tEtaIndexPushTask);
         return DWIndexDTOConverter.INSTANCE.convertToDTO(dwIndex);
     }
 

+ 3 - 4
qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/entity/dw/TEtaIndexPushTask.java

@@ -12,16 +12,15 @@ import java.io.Serializable;
  * @since 2024-09-30 13:54:17
  */
 @Data
-public class TEtaIndexPushTask implements Serializable {
-    private static final long serialVersionUID = 476162774263994345L;
+public class TEtaIndexPushTask {
 
     private Integer id;
 
     private String indexCode;
 
-    private Date createTime;
+    private Date createTime = new Date();
 
-    private Date updateTime;
+    private Date updateTime ;
 
     private Date startDate;
 

+ 2 - 4
qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/mapper/TDampDwIndexDataDao.java

@@ -18,10 +18,9 @@ public interface TDampDwIndexDataDao {
      *
      * @return 实例对象
      */
-    TDampDwIndexData queryById( );
-
-
+    List<TDampDwIndexData> queryAllByLimit(@Param("indexCode") String indexCode,@Param("id") int id,@Param("limit") int limit);
 
+    int countByIndexCode(String indexCode);
 
 
     /**
@@ -33,6 +32,5 @@ public interface TDampDwIndexDataDao {
     int insert(TDampDwIndexData tDampDwIndexData);
 
 
-
 }
 

+ 3 - 9
qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/mapper/TEtaIndexPushTaskDao.java

@@ -1,6 +1,7 @@
 package com.qhtx.eta.infra.mapper;
 
 import com.qhtx.eta.infra.entity.dw.TEtaIndexPushTask;
+import org.apache.ibatis.annotations.Param;
 
 /**
  * (TEtaIndexPushTask)表数据库访问层
@@ -17,7 +18,7 @@ public interface TEtaIndexPushTaskDao {
      * @param tEtaIndexPushTask 实例对象
      * @return 影响行数
      */
-    int insert(TEtaIndexPushTask tEtaIndexPushTask);
+    void insert(@Param("entity") TEtaIndexPushTask tEtaIndexPushTask);
 
 
     /**
@@ -26,15 +27,8 @@ public interface TEtaIndexPushTaskDao {
      * @param tEtaIndexPushTask 实例对象
      * @return 影响行数
      */
-    int update(TEtaIndexPushTask tEtaIndexPushTask);
+    void update(TEtaIndexPushTask tEtaIndexPushTask);
 
-    /**
-     * 通过主键删除数据
-     *
-     * @param id 主键
-     * @return 影响行数
-     */
-    int deleteById(Integer id);
 
 }
 

+ 4 - 9
qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/service/TDampDwIndexDataService.java

@@ -3,6 +3,8 @@ package com.qhtx.eta.infra.service;
 
 import com.qhtx.eta.infra.entity.dw.TDampDwIndexData;
 
+import java.util.List;
+
 /**
  * 指标数据表(TDampDwIndexData)表服务接口
  *
@@ -11,15 +13,8 @@ import com.qhtx.eta.infra.entity.dw.TDampDwIndexData;
  */
 public interface TDampDwIndexDataService {
 
-    /**
-     * 通过ID查询单条数据
-     *
-     * @return 实例对象
-     */
-    TDampDwIndexData queryById( );
-
-
-
+    List<TDampDwIndexData> queryAllByLimit(String indexCode, int id, int limit);
 
+    int countByIndexCode(String indexCode);
 
 }

+ 14 - 6
qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/service/impl/TDampDwIndexDataServiceImpl.java

@@ -1,11 +1,14 @@
 package com.qhtx.eta.infra.service.impl;
 
+import com.qhtx.eta.common.eunms.DataSourceType;
+import com.qhtx.eta.infra.annotation.UseDataSource;
 import com.qhtx.eta.infra.entity.dw.TDampDwIndexData;
 import com.qhtx.eta.infra.mapper.TDampDwIndexDataDao;
 import com.qhtx.eta.infra.service.TDampDwIndexDataService;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
+import java.util.List;
 
 /**
  * 指标数据表(TDampDwIndexData)表服务实现类
@@ -14,20 +17,25 @@ import javax.annotation.Resource;
  * @since 2024-09-29 17:37:18
  */
 @Service("tDampDwIndexDataService")
+@UseDataSource(dataSourceType = DataSourceType.DW)
 public class TDampDwIndexDataServiceImpl implements TDampDwIndexDataService {
     @Resource
     private TDampDwIndexDataDao tDampDwIndexDataDao;
 
     /**
-     * 通过ID查询单条数据
-     *
-     * @param  主键
-     * @return 实例对象
+     * @param indexCode
+     * @param id
+     * @param limit
+     * @return
      */
     @Override
-    public TDampDwIndexData queryById( ) {
-        return this.tDampDwIndexDataDao.queryById();
+    public List<TDampDwIndexData> queryAllByLimit(String indexCode, int id, int limit) {
+        return tDampDwIndexDataDao.queryAllByLimit(indexCode, id, limit);
     }
 
+    @Override
+    public int countByIndexCode(String indexCode) {
+        return tDampDwIndexDataDao.countByIndexCode(indexCode);
+    }
 
 }

+ 1 - 1
qhtx-eta-integrator/qhtx-integrator-infra/src/main/java/com/qhtx/eta/infra/service/impl/TEtaIndexPushTaskServiceImpl.java

@@ -27,7 +27,7 @@ public class TEtaIndexPushTaskServiceImpl implements TEtaIndexPushTaskService {
      */
     @Override
     public TEtaIndexPushTask insert(TEtaIndexPushTask tEtaIndexPushTask) {
-        this.tEtaIndexPushTaskDao.insert(tEtaIndexPushTask);
+        tEtaIndexPushTaskDao.insert(tEtaIndexPushTask);
         return tEtaIndexPushTask;
     }
 

+ 12 - 5
qhtx-eta-integrator/qhtx-integrator-infra/src/main/resources/mapper/TDampDwIndexDataDao.xml

@@ -21,13 +21,20 @@
         <result property="preDataDate" column="PRE_DATA_DATE" jdbcType="TIMESTAMP"/>
     </resultMap>
 
-    <!--查询单个-->
-    <select id="queryById" resultMap="TDampDwIndexDataMap">
-        select IDINDEX_IDINDEX_CODEDATA_DATEPUBLISH_TIMEDATA_VALUEDATA_VALUE_CHARREMARKIS_DELETECREATE_TIMEUPDATE_TIMECREATE_USER_IDCREATE_USER_NAMEUPDATE_USER_IDUPDATE_USER_NAMEPRE_DATA_DATE
-        from T_DAMP_DW_INDEX_DATA
+    <resultMap type="com.qhtx.eta.infra.entity.dw.TDampDwIndexData" id="DataMap">
+        <result property="dataDate" column="DATA_DATE" jdbcType="TIMESTAMP"/>
+        <result property="dataValue" column="DATA_VALUE" jdbcType="INTEGER"/>
+    </resultMap>
 
+    <select id="countByIndexCode" resultType="int">
+        select count(1) from T_DAMP_DW_INDEX_DATA where index_code = #{indexCode}
     </select>
 
-
+    <select id="queryAllByLimit" resultMap="DataMap">
+        select * from ( select * from (select rownum NO, DATA_DATE, DATA_VALUE from T_DAMP_DW_INDEX_DATA where
+        index_code = #{indexCode} ORDER BY DATA_DATE ASC) where NO <![CDATA[>]]> #{id}) where ROWNUM
+         <![CDATA[<=]]>
+        #{limit}
+    </select>
 </mapper>
 

+ 4 - 42
qhtx-eta-integrator/qhtx-integrator-infra/src/main/resources/mapper/TEtaIndexPushTaskDao.xml

@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
-<mapper namespace=".dao.TEtaIndexPushTaskDao">
+<mapper namespace="com.qhtx.eta.infra.mapper.TEtaIndexPushTaskDao">
 
     <resultMap type="com.qhtx.eta.infra.entity.dw.TEtaIndexPushTask" id="TEtaIndexPushTaskMap">
         <result property="id" column="ID" jdbcType="INTEGER"/>
@@ -16,49 +16,11 @@
         <result property="dataCount" column="DATA_COUNT" jdbcType="INTEGER"/>
     </resultMap>
 
-    <!--查询单个-->
-    <select id="queryById" resultMap="TEtaIndexPushTaskMap">
-        select ID,
-               INDEX_CODE,
-               CREATE_TIME,
-               UPDATE_TIME,
-               START_DATE,
-               END_DATE,
-               INDEX_NAME,
-               UNIT,
-               SOURCE_NAME,
-               LAST_PUSH_TIMEDATA_COUNT
-        from T_ETA_INDEX_PUSH_TASK
-        where ID = #{id}
-    </select>
-
-    <!--新增所有列-->
     <insert id="insert" keyProperty="id" useGeneratedKeys="false">
-        insert into T_ETA_INDEX_PUSH_TASK (INDEX_CODE, CREATE_TIME, UPDATE_TIME, START_DATE, END_DATE, INDEX_NAME, UNIT,
+        insert into T_ETA_INDEX_PUSH_TASK (INDEX_CODE, CREATE_TIME, START_DATE, END_DATE, INDEX_NAME, UNIT,
                                            SOURCE_NAME, LAST_PUSH_TIMEDATA_COUNT)
-        values (#{entity.indexCode}#{entity.createTime}#{entity.updateTime}#{entity.startDate}#{entity.endDate}#{entity.indexName}#{entity.unit}#{entity.sourceName}#{entity.lastPushTime}#{entity.dataCount})
-    </insert>
-
-    <insert id="insertBatch" keyProperty="id" useGeneratedKeys="true">
-        insert into
-        T_ETA_INDEX_PUSH_TASK (INDEX_CODE,CREATE_TIME,UPDATE_TIME,START_DATE,END_DATE,INDEX_NAME,UNIT,SOURCE_NAME,LAST_PUSH_TIMEDATA_COUNT)
-        values
-        <foreach collection="entities" item="entity" separator=",">
-            (#{entity.indexCode}#{entity.createTime}#{entity.updateTime}#{entity.startDate}#{entity.endDate}#{entity.indexName}#{entity.unit}#{entity.sourceName}#{entity.lastPushTime}#{entity.dataCount})
-        </foreach>
-    </insert>
-
-    <insert id="insertOrUpdateBatch" keyProperty="id" useGeneratedKeys="true">
-        insert into
-        T_ETA_INDEX_PUSH_TASK(INDEX_CODE,CREATE_TIME,UPDATE_TIME,START_DATE,END_DATE,INDEX_NAME,UNIT,SOURCE_NAME,LAST_PUSH_TIMEDATA_COUNT)
-        values
-        <foreach collection="entities" item="entity" separator=",">
-            (#{entity.indexCode}#{entity.createTime}#{entity.updateTime}#{entity.startDate}#{entity.endDate}#{entity.indexName}#{entity.unit}#{entity.sourceName}#{entity.lastPushTime}#{entity.dataCount})
-        </foreach>
-        on duplicate key update
-        INDEX_CODE = values(INDEX_CODE)CREATE_TIME = values(CREATE_TIME)UPDATE_TIME = values(UPDATE_TIME)START_DATE =
-        values(START_DATE)END_DATE = values(END_DATE)INDEX_NAME = values(INDEX_NAME)UNIT = values(UNIT)SOURCE_NAME =
-        values(SOURCE_NAME)LAST_PUSH_TIME = values(LAST_PUSH_TIME)DATA_COUNT = values(DATA_COUNT)
+        values (#{entity.indexCode}, #{entity.createTime}, #{entity.startDate}, #{entity.endDate}, #{entity.indexName},
+                #{entity.unit}, #{entity.sourceName}, #{entity.lastPushTime}, #{entity.dataCount})
     </insert>
 
     <!--通过主键修改数据-->