package binlog import ( "errors" "eta/eta_bridge/global" "eta/eta_bridge/models/index" "eta/eta_bridge/utils" "fmt" "github.com/go-mysql-org/go-mysql/canal" "github.com/go-mysql-org/go-mysql/mysql" _ "github.com/go-sql-driver/mysql" "math/rand" "net/url" "strings" "time" ) var mysqlHost, mysqlUser, mysqlPwd, mysqlDb string func init() { for _, sqlConfig := range global.CONFIG.Mysql.List { if sqlConfig.AliasName == `index` { // 找出用户 tmpList := strings.Split(sqlConfig.Dsn, ":") mysqlUser = tmpList[0] // 找出密码 tmpList = strings.Split(strings.Join(tmpList[1:], ":"), "@") lenTmp := len(tmpList) mysqlPwd = strings.Join(tmpList[:lenTmp-1], "@") // 找出地址 tmpList = strings.Split(tmpList[lenTmp-1], "tcp(") tmpList = strings.Split(tmpList[1], ")") mysqlHost = tmpList[0] // 找出数据库名称 u, err := url.Parse(tmpList[1]) if err != nil { panic(err) // 如果解析失败,处理错误 } // 获取Path部分的内容 mysqlDb = u.Path[1:] } } if mysqlHost == "" { panic("mysqlHost is empty") } //fmt.Println("HOST:", mysqlHost) //fmt.Println("user:", mysqlUser) //fmt.Println("password:", mysqlPwd) } func ListenMysql() { var err error defer func() { if err != nil { fmt.Println("数据库监听服务异常,err:", err) } }() if mysqlHost == "" { err = errors.New("mysqlHost is empty") return } //includeTableRegex := []string{ // "test_hz_data.edb_info", //} includeTableRegex := []string{ mysqlDb + ".edb_info$", mysqlDb + ".edb_classify$", mysqlDb + ".base_from_mysteel_chemical_index$", mysqlDb + ".base_from_smm_index$", mysqlDb + ".edb_data*", } cfg := &canal.Config{ // 一个32位无符号整数,用于标识当前 Canal 实例在 MySQL 主从复制体系中的身份。这里使用了一个随机数生成器确保每次启动时分配的 ServerID 是唯一的(在1001到1099之间)。在实际生产环境中,你需要手动指定一个全局唯一的 ServerID。 ServerID: uint32(rand.New(rand.NewSource(time.Now().Unix())).Intn(1000)) + 1001, // 指定 Canal 要连接的数据库类型,默认为 "mysql",表明这是一个 MySQL 数据库。 Flavor: "mysql", // 设置 MySQL 服务器地址(主机名或 IP 地址)和端口,例如 "127.0.0.1:3306"。 Addr: mysqlHost, User: mysqlUser, Password: mysqlPwd, // 如果设置为 true,Canal 将以原始二进制格式获取 binlog,否则将以解析后的 SQL 语句形式提供。 //RawModeEnabled: false, // 是否启用半同步复制。当设置为 true 时,MySQL 主库在事务提交后会等待至少一个从库确认已接收并写入 binlog 才返回成功,提高了数据一致性。 SemiSyncEnabled: false, // 是否将 MySQL 中的 decimal 类型字段解析为 Go 的 Decimal 类型,而不是 float 或者 string。如果业务中有精确小数计算的需求,应开启此选项以避免精度丢失问题。 UseDecimal: true, // 用于控制初始数据导出的相关配置,在 Canal 启动时是否需要全量同步表数据。 //Dump: dumpConf, // 正则表达式字符串,用于定义 Canal 应该监听哪些表的 binlog 事件。只有名称匹配该正则表达式的表才会被 Canal 同步处理。 IncludeTableRegex: includeTableRegex, } // 校验mysql binlog format,目前仅支持row格式 { binlogFormat, tmpErr := index.GetBinlogFormat() if tmpErr != nil { err = tmpErr return } if binlogFormat.Value != "ROW" { panic("mysql binlog format is not ROW") return } } var fileName string var position uint32 fileName = global.Rc.GetStr(utils.CACHE_MYSQL_MASTER_FILENAME) position64, tmpErr := global.Rc.GetUInt64(utils.CACHE_MYSQL_MASTER_POSITION) if tmpErr != nil && tmpErr.Error() != utils.RedisNoKeyErr { panic("mysql binlog position is not found,err:" + tmpErr.Error()) return } position = uint32(position64) // 如果没有从redis中获取到上次监听到的binlog的文件名称,或者位置为0,则从mysql中获取,则从 MySQL 中获取最新的文件名和位置。 if fileName == `` || position == 0 { item, tmpErr := index.GetShowMaster() if tmpErr != nil { err = tmpErr return } fileName = item.File position = item.Position } c, err := canal.NewCanal(cfg) if err != nil { fmt.Println("err:", err) return } binlogHandler := &MyEventHandler{} binlogHandler.SetBinlogFileName(fileName, position) c.SetEventHandler(binlogHandler) //c.Run() pos := mysql.Position{ Name: fileName, Pos: position, } err = c.RunFrom(pos) }