Преглед изворни кода

fix:binlog监听调整配置获取

Roc пре 1 година
родитељ
комит
19471ec6d6
6 измењених фајлова са 85 додато и 61 уклоњено
  1. 11 2
      config/config.go
  2. 2 2
      global/global.go
  3. 1 1
      init_serve/mysql.go
  4. 1 1
      init_serve/task.go
  5. 69 54
      services/binlog/binlog.go
  6. 1 1
      services/index_data/edb_update_log.go

+ 11 - 2
config/config.go

@@ -46,9 +46,18 @@ type Mysql struct {
 	//LogMode             bool        `mapstructure:"log-mode" json:"log-mode" yaml:"log-mode" description:"是否开启日志"`
 	Stdout              bool        `mapstructure:"stdout" json:"stdout" yaml:"stdout" description:"日志是否输出在控制台"`
 	DefaultDsnAliasName string      `mapstructure:"default-dsn-alias-name" json:"default-dsn-alias-name" yaml:"default-dsn-alias-name" description:"默认的数据库连接别名"`
-	IsListenMysqlBinlog bool        `mapstructure:"is-listen-mysql-binlog" json:"is-listen-mysql-binlog" yaml:"is-listen-mysql-binlog" description:"是否监听mysql binlog"`
-	UpdateLogSaveDay    int         `mapstructure:"update-log-save-day" json:"update-log-save-day" yaml:"update-log-save-day" description:"更新日志保存天数" default:"10"`
 	List                []MysqlConn `mapstructure:"list" json:"list" yaml:"list" description:"数据库链接配置列表"`
+	Binlog              Binlog      `mapstructure:"binlog" json:"binlog" yaml:"binlog" description:"mysql binlog配置"`
+}
+
+type Binlog struct {
+	IsListen   bool   `mapstructure:"is-listen" json:"is-listen" yaml:"is-listen" description:"是否监听mysql binlog"`
+	ServerID   uint32 `mapstructure:"server-id" json:"server-id" yaml:"server-id" description:"mysql binlog server id" default:"10000"`
+	LogSaveDay int    `mapstructure:"log-save-day" json:"log-save-day" yaml:"log-save-day" description:"更新日志保存天数" default:"10"`
+	Host       string `mapstructure:"host" json:"host" yaml:"host" description:"mysql binlog地址"`
+	User       string `mapstructure:"user" json:"user" yaml:"user" description:"mysql binlog用户名"`
+	Password   string `mapstructure:"password" json:"password" yaml:"password" description:"mysql binlog密码"`
+	Db         string `mapstructure:"db" json:"db" yaml:"db" description:"mysql binlog数据库"`
 }
 
 // MysqlConn mysql数据链接配置

+ 2 - 2
global/global.go

@@ -72,8 +72,8 @@ func handleConfig(v *viper.Viper, handleType string) {
 	if err := v.Unmarshal(&CONFIG); err != nil {
 		fmt.Println(handleType+"赋值失败,Err:", err)
 	}
-	if CONFIG.Mysql.UpdateLogSaveDay == 0 {
-		CONFIG.Mysql.UpdateLogSaveDay = 31
+	if CONFIG.Mysql.Binlog.LogSaveDay == 0 {
+		CONFIG.Mysql.Binlog.LogSaveDay = 31
 	}
 	//fmt.Println(CONFIG)
 	//fmt.Println(CONFIG.Mysql.UpdateLogSaveDay)

+ 1 - 1
init_serve/mysql.go

@@ -73,7 +73,7 @@ func Mysql() {
 	global.MYSQL = mysqlMap
 
 	// 开启mysql binlog监听
-	if mysqlConf.IsListenMysqlBinlog {
+	if mysqlConf.Binlog.IsListen {
 		go binlog.ListenMysql()
 	}
 

+ 1 - 1
init_serve/task.go

@@ -26,7 +26,7 @@ func InitTask() {
 	c := cron.New(cron.WithSeconds())
 
 	// 定时清除指标更新日志
-	if global.CONFIG.Mysql.IsListenMysqlBinlog {
+	if global.CONFIG.Mysql.Binlog.IsListen {
 		_, err := c.AddFunc("0 0 0 * * *", index_data.DeleteBeforeTenDayLog)
 		if err != nil {
 			global.LOG.Error("DeleteBeforeTenDayLog err" + err.Error())

+ 69 - 54
services/binlog/binlog.go

@@ -1,7 +1,6 @@
 package binlog
 
 import (
-	"errors"
 	"eta/eta_bridge/global"
 	"eta/eta_bridge/models/index"
 	"eta/eta_bridge/utils"
@@ -10,50 +9,48 @@ import (
 	"github.com/go-mysql-org/go-mysql/mysql"
 	_ "github.com/go-sql-driver/mysql"
 	"math/rand"
-	"net/url"
-	"strings"
 	"time"
 )
 
-var mysqlHost, mysqlUser, mysqlPwd, mysqlDb string
-
-func init() {
-
-	for _, sqlConfig := range global.CONFIG.Mysql.List {
-		if sqlConfig.AliasName == `index` {
-			// 找出用户
-			tmpList := strings.Split(sqlConfig.Dsn, ":")
-			mysqlUser = tmpList[0]
-			// 找出密码
-			tmpList = strings.Split(strings.Join(tmpList[1:], ":"), "@")
-			lenTmp := len(tmpList)
-			mysqlPwd = strings.Join(tmpList[:lenTmp-1], "@")
-			// 找出地址
-			tmpList = strings.Split(tmpList[lenTmp-1], "tcp(")
-			tmpList = strings.Split(tmpList[1], ")")
-			mysqlHost = tmpList[0]
-
-			// 找出数据库名称
-			u, err := url.Parse(tmpList[1])
-			if err != nil {
-				panic(err) // 如果解析失败,处理错误
-			}
-
-			// 获取Path部分的内容
-			mysqlDb = u.Path[1:]
-
-		}
-	}
-
-	if mysqlHost == "" {
-		panic("mysqlHost is empty")
-	}
-
-	//fmt.Println("HOST:", mysqlHost)
-	//fmt.Println("user:", mysqlUser)
-	//fmt.Println("password:", mysqlPwd)
-
-}
+//var mysqlHost, mysqlUser, mysqlPwd, mysqlDb string
+
+//func init() {
+//
+//	for _, sqlConfig := range global.CONFIG.Mysql.List {
+//		if sqlConfig.AliasName == `index` {
+//			// 找出用户
+//			tmpList := strings.Split(sqlConfig.Dsn, ":")
+//			mysqlUser = tmpList[0]
+//			// 找出密码
+//			tmpList = strings.Split(strings.Join(tmpList[1:], ":"), "@")
+//			lenTmp := len(tmpList)
+//			mysqlPwd = strings.Join(tmpList[:lenTmp-1], "@")
+//			// 找出地址
+//			tmpList = strings.Split(tmpList[lenTmp-1], "tcp(")
+//			tmpList = strings.Split(tmpList[1], ")")
+//			mysqlHost = tmpList[0]
+//
+//			// 找出数据库名称
+//			u, err := url.Parse(tmpList[1])
+//			if err != nil {
+//				panic(err) // 如果解析失败,处理错误
+//			}
+//
+//			// 获取Path部分的内容
+//			mysqlDb = u.Path[1:]
+//
+//		}
+//	}
+//
+//	if mysqlHost == "" {
+//		panic("mysqlHost is empty")
+//	}
+//
+//	//fmt.Println("HOST:", mysqlHost)
+//	//fmt.Println("user:", mysqlUser)
+//	//fmt.Println("password:", mysqlPwd)
+//
+//}
 
 func ListenMysql() {
 	var err error
@@ -62,30 +59,47 @@ func ListenMysql() {
 			fmt.Println("数据库监听服务异常,err:", err)
 		}
 	}()
-	if mysqlHost == "" {
-		err = errors.New("mysqlHost is empty")
-		return
+	if global.CONFIG.Mysql.Binlog.Host == "" {
+		panic("mysqlHost is empty")
+		//err = errors.New("mysqlHost is empty")
+		//return
+	}
+
+	if global.CONFIG.Mysql.Binlog.User == "" {
+		panic("user is empty")
+	}
+	if global.CONFIG.Mysql.Binlog.Password == "" {
+		panic("password is empty")
+	}
+	if global.CONFIG.Mysql.Binlog.Db == "" {
+		panic("db is empty")
 	}
 	//includeTableRegex := []string{
 	//	"test_hz_data.edb_info",
 	//}
 
 	includeTableRegex := []string{
-		mysqlDb + ".edb_info$",
-		mysqlDb + ".edb_classify$",
-		mysqlDb + ".base_from_mysteel_chemical_index$",
-		mysqlDb + ".base_from_smm_index$",
-		mysqlDb + ".edb_data*",
+		global.CONFIG.Mysql.Binlog.Db + ".edb_info$",
+		global.CONFIG.Mysql.Binlog.Db + ".edb_classify$",
+		global.CONFIG.Mysql.Binlog.Db + ".base_from_mysteel_chemical_index$",
+		global.CONFIG.Mysql.Binlog.Db + ".base_from_smm_index$",
+		global.CONFIG.Mysql.Binlog.Db + ".edb_data*",
+	}
+
+	// 主从复制的身份id配置,必须全局唯一,如果没有配置的话,那么会随机生成一个
+	serverId := global.CONFIG.Mysql.Binlog.ServerID
+	if serverId == 0 {
+		serverId = uint32(rand.New(rand.NewSource(time.Now().Unix())).Intn(1000)) + 1001
 	}
 	cfg := &canal.Config{
 		// 一个32位无符号整数,用于标识当前 Canal 实例在 MySQL 主从复制体系中的身份。这里使用了一个随机数生成器确保每次启动时分配的 ServerID 是唯一的(在1001到1099之间)。在实际生产环境中,你需要手动指定一个全局唯一的 ServerID。
-		ServerID: uint32(rand.New(rand.NewSource(time.Now().Unix())).Intn(1000)) + 1001,
+		ServerID: serverId,
 		// 指定 Canal 要连接的数据库类型,默认为 "mysql",表明这是一个 MySQL 数据库。
 		Flavor: "mysql",
 		// 设置 MySQL 服务器地址(主机名或 IP 地址)和端口,例如 "127.0.0.1:3306"。
-		Addr:     mysqlHost,
-		User:     mysqlUser,
-		Password: mysqlPwd,
+		Addr:     global.CONFIG.Mysql.Binlog.Host,
+		User:     global.CONFIG.Mysql.Binlog.User,
+		Password: global.CONFIG.Mysql.Binlog.Password,
 		// 如果设置为 true,Canal 将以原始二进制格式获取 binlog,否则将以解析后的 SQL 语句形式提供。
 		//RawModeEnabled:  false,
 		// 是否启用半同步复制。当设置为 true 时,MySQL 主库在事务提交后会等待至少一个从库确认已接收并写入 binlog 才返回成功,提高了数据一致性。
@@ -96,6 +110,7 @@ func ListenMysql() {
 		//Dump:              dumpConf,
 		// 正则表达式字符串,用于定义 Canal 应该监听哪些表的 binlog 事件。只有名称匹配该正则表达式的表才会被 Canal 同步处理。
 		IncludeTableRegex: includeTableRegex,
+		Logger:            global.FILE_LOG,
 	}
 
 	// 校验mysql binlog format,目前仅支持row格式

+ 1 - 1
services/index_data/edb_update_log.go

@@ -13,7 +13,7 @@ import (
 // @datetime 2024-03-07 20:01:04
 func DeleteBeforeTenDayLog() {
 	item := index.EdbUpdateLog{}
-	beforeDay := time.Now().AddDate(0, 0, -global.CONFIG.Mysql.UpdateLogSaveDay).Format(utils.FormatDate)
+	beforeDay := time.Now().AddDate(0, 0, -global.CONFIG.Mysql.Binlog.LogSaveDay).Format(utils.FormatDate)
 	//beforeDay := time.Now().AddDate(0, 0, -6).Format(utils.FormatDate)
 	err := item.DeleteBeforeTenDayLog(beforeDay)
 	if err != nil {