Browse Source

根据binlog记录更新指标数据

xyxie 1 week ago
parent
commit
a45fd6acda

+ 1 - 1
models/chart_edb_mapping.go

@@ -322,7 +322,7 @@ func GetChartMappingList(chartInfoId int) (list []*ChartEdbMapping, err error) {
 	sql := ` SELECT a.*
              FROM chart_edb_mapping AS a
 			 WHERE chart_info_id=? 
-             ORDER BY chart_edb_mapping_id ASC `
+             ORDER BY edb_info_id ASC `
 	_, err = o.Raw(sql, chartInfoId).QueryRows(&list)
 	return
 }

+ 1 - 0
models/db.go

@@ -60,6 +60,7 @@ func initChart() {
 		new(ChartEdbMapping),
 		new(ChartTheme),
 		new(ChartThemeType),
+		new(ForumChartEdbMapping),
 	)
 }
 

+ 67 - 0
models/edb_update_log.go

@@ -0,0 +1,67 @@
+package models
+
+import (
+	"eta/eta_forum_task/utils"
+	"time"
+
+	"github.com/beego/beego/v2/client/orm"
+)
+
+type EdbUpdateLog struct {
+	Id          uint64    `orm:"pk;column(id)"`
+	OpDbName    string    
+	OpTableName string    
+	OpType      string 
+	OldData     string   
+	NewData     string    
+	IsHandle    uint8     
+	ModifyTime  time.Time
+	CreateTime  time.Time 
+}
+
+func (EdbUpdateLog) TableName() string {
+	return "edb_update_log"
+}
+
+// 查询未处理的日志列表总数
+func GetEdbUpdateLogListCount(dateTime string) (int, error) {
+	
+	o := orm.NewOrmUsingDB("data")
+	
+	sql := `SELECT COUNT(1) FROM edb_update_log WHERE is_handle = 0 and op_table_name != "edb_info" and create_time > ?`
+
+	var count int64
+	err := o.Raw(sql, dateTime).QueryRow(&count)
+	if err != nil {
+		return 0, err
+	}
+	return int(count), nil
+}
+// 分页获取待处理的日志列表
+func GetEdbUpdateLogList(dateTime string, offset int, pageSize int) ([]*EdbUpdateLog, error) {
+
+	o := orm.NewOrmUsingDB("data")
+	
+	sql := `SELECT * FROM edb_update_log WHERE is_handle = 0 and op_table_name != "edb_info" and op_table_name != "edb_data_insert_config" and create_time > ? ORDER BY id ASC LIMIT ?, ?`
+	
+	var logs []*EdbUpdateLog
+	_, err := o.Raw(sql, dateTime, offset, pageSize).QueryRows(&logs)
+	if err != nil {
+		return nil, err
+	}
+	return logs, nil
+}
+
+
+// 批量更新日志状态
+func UpdateEdbUpdateLogStatus(logIds []int) error {
+	o := orm.NewOrmUsingDB("data")
+	
+	sql := `UPDATE edb_update_log SET is_handle = 1, modify_time = ? WHERE id IN (`+utils.GetOrmInReplace(len(logIds))+`)`
+	_, err := o.Raw(sql, time.Now(), logIds).Exec()
+	if err != nil {
+		return err
+	}
+	return nil
+}
+

+ 47 - 0
models/forum_chart_edb_mapping.go

@@ -0,0 +1,47 @@
+package models
+
+import (
+	"time"
+
+	"github.com/beego/beego/v2/client/orm"
+)
+
+type ForumChartEdbMapping struct {
+	ID int `orm:"column(id);pk;auto" description:"主键"`
+	ChartInfoId int 
+	EdbInfoIds string 
+	CreateTime time.Time 
+	ModifyTime time.Time 
+}
+
+// 根据chartInfoId获取edbInfoIds
+func GetEdbInfoIdsByChartInfoId(chartInfoId int) (edbInfoIds string, err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := `SELECT edb_info_ids FROM forum_chart_edb_mapping WHERE chart_info_id = ?`
+	err = o.Raw(sql, chartInfoId).QueryRow(&edbInfoIds)
+	return
+}
+
+// 新增
+func AddForumChartEdbMapping(chartInfoId int, edbInfoIds string) (err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := `INSERT INTO forum_chart_edb_mapping (chart_info_id, edb_info_ids, create_time, modify_time) VALUES (?, ?, ?, ?)`
+	_, err = o.Raw(sql, chartInfoId, edbInfoIds, time.Now(), time.Now()).Exec()
+	return
+}
+
+// 更新
+func UpdateForumChartEdbMapping(chartInfoId int, edbInfoIds string) (err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := `UPDATE forum_chart_edb_mapping SET edb_info_ids = ?, modify_time = ? WHERE chart_info_id = ?`
+	_, err = o.Raw(sql, edbInfoIds, time.Now(), chartInfoId).Exec()
+	return
+}
+
+// 删除
+func DeleteForumChartEdbMapping(chartInfoId int) (err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := `DELETE FROM forum_chart_edb_mapping WHERE chart_info_id = ?`
+	_, err = o.Raw(sql, chartInfoId).Exec()
+	return
+}

+ 70 - 37
services/chart_info.go

@@ -25,7 +25,7 @@ func UpdateChart(chartInfoId int) (err error, errMsg string) {
 		err = fmt.Errorf("系统处理中,请稍后重试!")
 		return
 	}
-	utils.Rc.SetNX(cacheKey, 1, 30*time.Second)
+	utils.Rc.SetNX(cacheKey, 1, 10*time.Minute)
 	defer func() {
 		if err != nil {
 			go alarm_msg.SendAlarmMsg(fmt.Sprintf("更新图表至社区失败:Err:%v,ErrMsg:%s", err, errMsg), 3)
@@ -57,8 +57,10 @@ func UpdateChart(chartInfoId int) (err error, errMsg string) {
 		return
 	}
 	edbIds := make([]int, 0)
+	edbInfoStr := ""
 	for _, v := range chartMappingList {
 		edbIds = append(edbIds, v.EdbInfoId)
+		edbInfoStr += strconv.Itoa(v.EdbInfoId) + ","
 	}
 	chartSeriesList := make([]*models.ChartSeries, 0)
 	chartSeriesEdbList := make([]*models.ChartSeriesEdbMapping, 0)
@@ -83,7 +85,18 @@ func UpdateChart(chartInfoId int) (err error, errMsg string) {
 		edbInfoDataList []*eta_forum.AddEdbDataReq
 	)
 	//查询指标详情
-	edbInfoList, edbMappingList, edbInfoDataList, err = GetEdbListByEdbInfoId(edbIds)
+	isGetEdbData := false
+	// 查询投研资源库里的图表和指标绑定关系
+	oldEdbInfoStr, err := models.GetEdbInfoIdsByChartInfoId(chartInfoId)
+	if err != nil {
+		errMsg = "获取投研资源库里的图表和指标绑定关系失败"
+		err = fmt.Errorf("获取投研资源库里的图表和指标绑定关系失败,Err:" + err.Error())
+		return
+	}
+	if oldEdbInfoStr != edbInfoStr { // 图表更换过指标需要重新获取指标数据
+		isGetEdbData = true
+	}
+	edbInfoList, edbMappingList, edbInfoDataList, err = GetEdbListByEdbInfoId(edbIds, isGetEdbData)
 	if err != nil {
 		errMsg = "获取指标详情失败"
 		err = fmt.Errorf("获取指标详情失败,Err:" + err.Error())
@@ -129,11 +142,17 @@ func UpdateChart(chartInfoId int) (err error, errMsg string) {
 		err = fmt.Errorf(respItem.ErrMsg)
 		return
 	}
-
+	// 更新投研资源库里的图表和指标绑定关系
+	err = models.UpdateForumChartEdbMapping(chartInfoId, edbInfoStr)
+	if err != nil {
+		errMsg = "更新投研资源库里的图表和指标绑定关系失败"
+		err = fmt.Errorf("更新投研资源库里的图表和指标绑定关系失败,Err:" + err.Error())
+		return
+	}
 	return
 }
 
-func GetEdbListByEdbInfoId(edbInfoIds []int) (edbInfoList []*models.EdbInfo, edbMappingList []*models.EdbInfoCalculateMapping, edbInfoDataList []*eta_forum.AddEdbDataReq, err error) {
+func GetEdbListByEdbInfoId(edbInfoIds []int, isGetEdbData bool) (edbInfoList []*models.EdbInfo, edbMappingList []*models.EdbInfoCalculateMapping, edbInfoDataList []*eta_forum.AddEdbDataReq, err error) {
 	//查询指标信息
 	//查询指标映射
 	//查询所有指标数据
@@ -162,29 +181,30 @@ func GetEdbListByEdbInfoId(edbInfoIds []int) (edbInfoList []*models.EdbInfo, edb
 		err = fmt.Errorf("traceEdbInfoByEdbInfoId GetEdbInfoByIdList err: %s", err.Error())
 		return
 	}
+	if isGetEdbData {
+		for _, v := range edbInfoList {
+			if _, ok := chartEdbInfoIdMap[v.EdbInfoId]; !ok {
+				continue
+			}
+			var dataList []*models.EdbDataBase
+			if v.Source == utils.DATA_SOURCE_BUSINESS && utils.UseMongo {
+				dataList, err = models.GetEdbDataBaseMongoByEdbInfoId(v.EdbInfoId, v.Source, v.SubSource)
+			} else if v.Source == utils.DATA_SOURCE_THS && v.SubSource == utils.DATA_SUB_SOURCE_HIGH_FREQUENCY && utils.UseMongo {
+				dataList, err = models.GetThsHfEdbDataBaseMongoByEdbInfoId(v.EdbInfoId, v.Source, v.SubSource)
+			} else {
+				dataList, err = models.GetEdbDataBaseByEdbInfoId(v.EdbInfoId, v.Source, v.SubSource)
+			}
+			if err != nil {
+				err = fmt.Errorf("查询指标数据失败 Err: %s", err.Error())
+				return
+			}
 
-	for _, v := range edbInfoList {
-		if _, ok := chartEdbInfoIdMap[v.EdbInfoId]; !ok {
-			continue
-		}
-		var dataList []*models.EdbDataBase
-		if v.Source == utils.DATA_SOURCE_BUSINESS && utils.UseMongo {
-			dataList, err = models.GetEdbDataBaseMongoByEdbInfoId(v.EdbInfoId, v.Source, v.SubSource)
-		} else if v.Source == utils.DATA_SOURCE_THS && v.SubSource == utils.DATA_SUB_SOURCE_HIGH_FREQUENCY && utils.UseMongo {
-			dataList, err = models.GetThsHfEdbDataBaseMongoByEdbInfoId(v.EdbInfoId, v.Source, v.SubSource)
-		} else {
-			dataList, err = models.GetEdbDataBaseByEdbInfoId(v.EdbInfoId, v.Source, v.SubSource)
-		}
-		if err != nil {
-			err = fmt.Errorf("查询指标数据失败 Err: %s", err.Error())
-			return
+			tmp := new(eta_forum.AddEdbDataReq)
+			tmp.EdbCode = v.EdbCode
+			tmp.EdbType = v.EdbType
+			tmp.DataList = dataList
+			edbInfoDataList = append(edbInfoDataList, tmp)
 		}
-
-		tmp := new(eta_forum.AddEdbDataReq)
-		tmp.EdbCode = v.EdbCode
-		tmp.EdbType = v.EdbType
-		tmp.DataList = dataList
-		edbInfoDataList = append(edbInfoDataList, tmp)
 	}
 	return
 }
@@ -257,12 +277,13 @@ func traceEdbInfoByEdbInfoId(edbInfoId int, hasFindMap map[int]struct{}, edbInfo
 // 批量上传图表分类信息
 func ChartInfoSaveBatch() (err error) {
 	var tmpErr []error
-	deleteCache := true
 	cacheKey := "eta_forum_task:ChartInfoSaveBatchAdmin"
+	if !utils.Rc.SetNX(cacheKey, 1, 30*time.Minute) {
+		err = fmt.Errorf("系统处理中,请稍后重试!")
+		return
+	}
 	defer func() {
-		if deleteCache {
-			_ = utils.Rc.Delete(cacheKey)
-		}
+		utils.Rc.Delete(cacheKey)
 		stack := ""
 		if err != nil {
 			stack = fmt.Sprintln(stack + err.Error())
@@ -277,12 +298,6 @@ func ChartInfoSaveBatch() (err error) {
 			go alarm_msg.SendAlarmMsg("批量上传资源库图表信息失败"+"<br/>"+stack, 3)
 		}
 	}()
-	if !utils.Rc.SetNX(cacheKey, 1, 30*time.Second) {
-		deleteCache = false
-		err = fmt.Errorf("系统处理中,请稍后重试!")
-		return
-	}
-
 	// 已上架的图表都更新,批量更新图表信息
 	condition := " and source=1 and forum_chart_info_id > 0 and resource_status =1"
 	// 查询需要更新的图表信息总数
@@ -304,6 +319,9 @@ func ChartInfoSaveBatch() (err error) {
 				err = fmt.Errorf("查询需要更新的图表信息失败: %v", e)
 				return
 			}
+			if len(chartInfos) == 0 {
+				break
+			}
 			// 循环更新图表数据
 			for _, chartInfo := range chartInfos {
 				var er error
@@ -368,6 +386,9 @@ func ChartInfoSaveBatch() (err error) {
 			err = fmt.Errorf("查询需要更新的图表信息失败: %v", e)
 			return
 		}
+		if len(chartInfos) == 0 {
+			break
+		}
 		// 循环更新图表数据
 		for _, chartInfo := range chartInfos {
 			var er error
@@ -387,6 +408,8 @@ func ChartInfoSaveBatch() (err error) {
 	}
 	utils.FileLog.Info("上传图表数据完成, 上传图表数据总数:", success)
 
+	// 更新指标数据
+	utils.Rc.LPush(utils.CACHE_KEY_EDB_DATA_UPDATE_LOG, []byte("1"))
 	return
 }
 
@@ -398,7 +421,7 @@ func UploadChart(chartInfoId int, description string, uploaderInfo *models.Admin
 		err = fmt.Errorf("系统处理中,请稍后重试!")
 		return
 	}
-	utils.Rc.SetNX(cacheKey, 1, 30*time.Second)
+	utils.Rc.SetNX(cacheKey, 1, 10*time.Minute)
 	defer func() {
 		if err != nil {
 			go alarm_msg.SendAlarmMsg(fmt.Sprintf("上传图表至社区失败:Err:%v,ErrMsg:%s", err, errMsg), 3)
@@ -434,8 +457,10 @@ func UploadChart(chartInfoId int, description string, uploaderInfo *models.Admin
 		return
 	}
 	edbIds := make([]int, 0)
+	edbInfoStr := ""
 	for _, v := range chartMappingList {
 		edbIds = append(edbIds, v.EdbInfoId)
+		edbInfoStr += strconv.Itoa(v.EdbInfoId) + ","
 	}
 	chartSeriesList := make([]*models.ChartSeries, 0)
 	chartSeriesEdbList := make([]*models.ChartSeriesEdbMapping, 0)
@@ -461,7 +486,7 @@ func UploadChart(chartInfoId int, description string, uploaderInfo *models.Admin
 		edbInfoDataList []*eta_forum.AddEdbDataReq
 	)
 	//查询指标详情
-	edbInfoList, edbMappingList, edbInfoDataList, err = GetEdbListByEdbInfoId(edbIds)
+	edbInfoList, edbMappingList, edbInfoDataList, err = GetEdbListByEdbInfoId(edbIds, true)
 	if err != nil {
 		errMsg = "获取指标详情失败"
 		err = fmt.Errorf("获取指标详情失败,Err:" + err.Error())
@@ -518,6 +543,14 @@ func UploadChart(chartInfoId int, description string, uploaderInfo *models.Admin
 			err = fmt.Errorf("更新图表ID失败,Err:" + err.Error())
 			return
 		}
+
+		// 新增投研资源库里的图表和指标绑定关系
+		err = models.AddForumChartEdbMapping(chartInfoId, edbInfoStr)
+		if err != nil {
+			errMsg = "新增投研资源库里的图表和指标绑定关系失败"
+			err = fmt.Errorf("新增投研资源库里的图表和指标绑定关系失败,Err:" + err.Error())
+			return
+		}
 	}
 
 	return

+ 116 - 0
services/edb_data.go

@@ -0,0 +1,116 @@
+package services
+
+import (
+	"encoding/json"
+	"fmt"
+	"time"
+
+	"eta/eta_forum_task/models"
+	"eta/eta_forum_task/services/eta_forum"
+	"eta/eta_forum_task/utils"
+)
+
+type EdbDataBinlogDataReq struct {
+	Item 	 string  `description:"指标数据列表"`
+	OpType   string  `description:"操作类型"`
+}
+
+type EdbDataBinlogReq struct {
+	List []*EdbDataBinlogDataReq `description:"指标数据列表"`
+}
+
+func HandleBinlogEdbUpdateLog() error {
+	// 设置缓存防止重复调用
+	cacheKey := utils.CACHE_KEY_EDB_DATA_UPDATE_LOG
+	if utils.Rc.Get(cacheKey) != nil {
+		utils.FileLog.Info("HandleBinlogEdbUpdateLog cacheKey exists")
+		return nil
+	}
+	utils.Rc.SetNX(cacheKey, "1", 10*time.Minute)
+	defer func() {
+		utils.Rc.Delete(cacheKey)
+	}()
+	//查询记录总数
+	dateTime := "2024-10-14 00:00:00"
+	total, err := models.GetEdbUpdateLogListCount(dateTime)
+	if err != nil {
+		return err
+	}
+
+	//分页获取待处理的日志列表	
+	if total == 0 {
+		return nil
+	}
+	offset := 0
+	pageSize := 10
+
+		// 循环更新1000个图表数据
+	for i := 0; offset < total; i++ {
+		// 查询需要更新的图表信息
+		logs, err := models.GetEdbUpdateLogList(dateTime, offset, pageSize)
+		if err != nil {
+			return err
+		}
+		if len(logs) == 0 {
+			break
+		}
+		//处理日志
+		list := make([]*EdbDataBinlogDataReq, 0)
+		logIds := make([]int, 0)
+		for _, log := range logs {
+			logIds = append(logIds, int(log.Id))
+			//根据op_table_name查询表结构
+			if log.OpType == "insert" || log.OpType == "update" {
+				tmp := &EdbDataBinlogDataReq{
+					Item: log.NewData,
+					OpType: log.OpType,
+				}
+				list = append(list, tmp)
+			}else if log.OpType == "delete" {
+				tmp := &EdbDataBinlogDataReq{
+					Item: log.OldData,
+					OpType: log.OpType,
+				}
+				list = append(list, tmp)
+			}
+		}
+		if len(list) == 0 {
+			continue
+		}
+		req := &EdbDataBinlogReq{
+			List: list,
+		}
+		//批量上传
+		reqJson, err := json.Marshal(req)
+		if err != nil {
+			return err
+		}
+		resp, err := eta_forum.EdbDataBatchSaveLib(string(reqJson))
+		if err != nil {
+			return err
+		}
+		if resp.Ret != 200 {
+			return fmt.Errorf("上传失败,Err:%s", resp.ErrMsg)
+		}
+		// 批量更新日志状态
+		err = models.UpdateEdbUpdateLogStatus(logIds)
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+// the service for log
+func AutoUpdateEdbDataToEtaForum() {
+	defer func() {
+		if err := recover(); err != nil {
+			fmt.Println("[AutoUpdateEdbDataToEtaForum]", err)
+		}
+	}()
+	for {
+		utils.Rc.Brpop(utils.CACHE_KEY_EDB_DATA_UPDATE_LOG, func(b []byte) {
+			HandleBinlogEdbUpdateLog()
+		})
+	}
+}

+ 9 - 0
services/eta_forum/eta_forum_hub_lib.go

@@ -41,6 +41,15 @@ func AdminBatchSaveLib(req string) (resp *models.BaseResponse, err error) {
 	return
 }
 
+// 指标数据批量上传
+func EdbDataBatchSaveLib(req string) (resp *models.BaseResponse, err error) {
+	_, resultByte, err := post(req, "/v1/edb_data/save_by_binlog")
+	err = json.Unmarshal(resultByte, &resp)
+	if err != nil {
+		return
+	}
+	return
+}
 func PostLib(urlStr, req string) (resp *models.BaseResponse, err error) {
 	_, resultByte, err := post(req, urlStr)
 	err = json.Unmarshal(resultByte, &resp)

+ 2 - 1
services/task.go

@@ -13,7 +13,7 @@ func Task() {
 		releaseTask()
 	}
 	// 定时更新图表数据到eta社区
-	etaForumChartUpdate := task.NewTask("publishSmartReport", "0 0 */1 * * *", EtaForumChartUpdate)
+	etaForumChartUpdate := task.NewTask("etaForumChartUpdate", "0 0 */1 * * *", EtaForumChartUpdate)
 	task.AddTask("定时更新图表数据到eta社区", etaForumChartUpdate)
 
 	// 每隔1小时,定时更新管理员数据到eta社区
@@ -22,6 +22,7 @@ func Task() {
 
 	task.StartTask()
 	fmt.Println("task end")
+	go AutoUpdateEdbDataToEtaForum()
 }
 
 func releaseTask() {

+ 5 - 1
utils/constants.go

@@ -262,4 +262,8 @@ const (
 	ChartClassifyResourceStatusUp = 1 // 图表分类上架状态
 	ChartClassifyResourceStatusDown = 2 // 图表分类下架状态
 	ChartClassifyResourceStatusDefault = 0 // 图表分类默认状态
-)
+)
+
+const (
+	CACHE_KEY_EDB_DATA_UPDATE_LOG = "eta_forum:edb_data_update_log" // 经济数据库数据更新日志
+)