浏览代码

fix:新增binlog记录以及数据推送接口

Roc 1 年之前
父节点
当前提交
689edb6aa5

+ 1 - 0
config/config.go

@@ -47,6 +47,7 @@ type Mysql struct {
 	Stdout              bool        `mapstructure:"stdout" json:"stdout" yaml:"stdout" description:"日志是否输出在控制台"`
 	DefaultDsnAliasName string      `mapstructure:"default-dsn-alias-name" json:"default-dsn-alias-name" yaml:"default-dsn-alias-name" description:"默认的数据库连接别名"`
 	IsListenMysqlBinlog bool        `mapstructure:"is-listen-mysql-binlog" json:"is-listen-mysql-binlog" yaml:"is-listen-mysql-binlog" description:"是否监听mysql binlog"`
+	UpdateLogSaveDay    int         `mapstructure:"update-log-save-day" json:"update-log-save-day" yaml:"update-log-save-day" description:"更新日志保存天数" default:"10"`
 	List                []MysqlConn `mapstructure:"list" json:"list" yaml:"list" description:"数据库链接配置列表"`
 }
 

+ 19 - 5
global/global.go

@@ -55,12 +55,26 @@ func init() {
 
 	v.OnConfigChange(func(e fsnotify.Event) {
 		fmt.Println("配置文件变更:", e.Name)
-		if err := v.Unmarshal(&CONFIG); err != nil {
-			fmt.Println("配置重赋值失败,Err:", err)
-		}
-		fmt.Println(CONFIG)
+		handleConfig(v, "配置重赋值")
 	})
+	handleConfig(v, "配置初始化")
+
+	return
+}
+
+// handleConfig
+// @Description: 配置文件处理
+// @author: Roc
+// @datetime 2024-03-13 10:27:40
+// @param v *viper.Viper
+// @param handleType string
+func handleConfig(v *viper.Viper, handleType string) {
 	if err := v.Unmarshal(&CONFIG); err != nil {
-		fmt.Println("配置初始化赋值失败,Err:", err)
+		fmt.Println(handleType+"赋值失败,Err:", err)
+	}
+	if CONFIG.Mysql.UpdateLogSaveDay == 0 {
+		CONFIG.Mysql.UpdateLogSaveDay = 31
 	}
+	//fmt.Println(CONFIG)
+	//fmt.Println(CONFIG.Mysql.UpdateLogSaveDay)
 }

+ 9 - 0
init_serve/task.go

@@ -3,6 +3,7 @@ package init_serve
 import (
 	"eta/eta_bridge/global"
 	"eta/eta_bridge/services"
+	"eta/eta_bridge/services/index_data"
 	"github.com/robfig/cron/v3"
 )
 
@@ -24,5 +25,13 @@ func InitTask() {
 	// 开始定时任务
 	c := cron.New(cron.WithSeconds())
 
+	// 定时清除指标更新日志
+	if global.CONFIG.Mysql.IsListenMysqlBinlog {
+		_, err := c.AddFunc("0 0 0 * * *", index_data.DeleteBeforeTenDayLog)
+		if err != nil {
+			global.LOG.Error("DeleteBeforeTenDayLog err" + err.Error())
+		}
+	}
+
 	c.Start()
 }

+ 9 - 0
models/index/edb_update_log.go

@@ -2,6 +2,7 @@ package index
 
 import (
 	"eta/eta_bridge/global"
+	"fmt"
 	"time"
 )
 
@@ -24,3 +25,11 @@ func (m *EdbUpdateLog) Create() (err error) {
 	err = global.MYSQL["index"].Create(m).Error
 	return
 }
+
+// DeleteBeforeTenDayLog 清除n前的日志数据
+func (m *EdbUpdateLog) DeleteBeforeTenDayLog(beforeDate string) (err error) {
+	sql := fmt.Sprintf(`DELETE FROM %s WHERE create_time < ?`, m.TableName())
+	err = global.MYSQL["index"].Exec(sql, beforeDate).Error
+
+	return
+}

+ 5 - 5
services/binlog/binlog.go

@@ -17,8 +17,6 @@ import (
 var mysqlHost, mysqlUser, mysqlPwd, mysqlDb string
 
 func init() {
-	//eta()
-	//local()
 
 	for _, sqlConfig := range global.CONFIG.Mysql.List {
 		if sqlConfig.AliasName == `index` {
@@ -46,9 +44,9 @@ func init() {
 		}
 	}
 
-	fmt.Println("HOST:", mysqlHost)
-	fmt.Println("user:", mysqlUser)
-	fmt.Println("password:", mysqlPwd)
+	//fmt.Println("HOST:", mysqlHost)
+	//fmt.Println("user:", mysqlUser)
+	//fmt.Println("password:", mysqlPwd)
 
 }
 
@@ -66,6 +64,8 @@ func ListenMysql() {
 	includeTableRegex := []string{
 		mysqlDb + ".edb_info$",
 		mysqlDb + ".edb_classify$",
+		mysqlDb + ".base_from_mysteel_chemical_index$",
+		mysqlDb + ".base_from_smm_index$",
 		mysqlDb + ".edb_data*",
 	}
 	cfg := &canal.Config{

+ 4 - 4
services/binlog/handler.go

@@ -39,8 +39,8 @@ func (h *MyEventHandler) OnRow(e *canal.RowsEvent) (err error) {
 	fmt.Println("fileName:", h.fileName, ";position:", h.position)
 
 	// 每次操作完成后都将当前位置记录到缓存
-	global.Rc.Put(utils.CACHE_MYSQL_MASTER_FILENAME, h.fileName, 24*time.Hour)
-	global.Rc.Put(utils.CACHE_MYSQL_MASTER_POSITION, h.position, 24*time.Hour)
+	global.Rc.Put(utils.CACHE_MYSQL_MASTER_FILENAME, h.fileName, 31*24*time.Hour)
+	global.Rc.Put(utils.CACHE_MYSQL_MASTER_POSITION, h.position, 31*24*time.Hour)
 
 	return nil
 }
@@ -50,8 +50,8 @@ func (h *MyEventHandler) OnPosSynced(header *replication.EventHeader, p mysql.Po
 	h.position = p.Pos
 
 	// 旋转binlog日志的时候,需要将当前位置记录到缓存
-	global.Rc.Put(utils.CACHE_MYSQL_MASTER_FILENAME, h.fileName, 24*time.Hour)
-	global.Rc.Put(utils.CACHE_MYSQL_MASTER_POSITION, h.position, 24*time.Hour)
+	global.Rc.Put(utils.CACHE_MYSQL_MASTER_FILENAME, h.fileName, 31*24*time.Hour)
+	global.Rc.Put(utils.CACHE_MYSQL_MASTER_POSITION, h.position, 31*24*time.Hour)
 
 	return nil
 }

+ 23 - 0
services/index_data/edb_update_log.go

@@ -0,0 +1,23 @@
+package index_data
+
+import (
+	"eta/eta_bridge/global"
+	"eta/eta_bridge/models/index"
+	"eta/eta_bridge/utils"
+	"time"
+)
+
+// DeleteBeforeTenDayLog
+// @Description: 定时删除10天前的指标更新日志
+// @author: Roc
+// @datetime 2024-03-07 20:01:04
+func DeleteBeforeTenDayLog() {
+	item := index.EdbUpdateLog{}
+	beforeDay := time.Now().AddDate(0, 0, -global.CONFIG.Mysql.UpdateLogSaveDay).Format(utils.FormatDate)
+	//beforeDay := time.Now().AddDate(0, 0, -6).Format(utils.FormatDate)
+	err := item.DeleteBeforeTenDayLog(beforeDay)
+	if err != nil {
+		global.FILE_LOG.Error("定时删除10天前的指标更新日志失败:", err)
+	}
+	return
+}