123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314 |
- package binlog
- import (
- "errors"
- "eta_gn/eta_bridge/global"
- "eta_gn/eta_bridge/models/index"
- "eta_gn/eta_bridge/utils"
- "fmt"
- "github.com/go-mysql-org/go-mysql/canal"
- "github.com/go-mysql-org/go-mysql/mysql"
- _ "github.com/go-sql-driver/mysql"
- "math/rand"
- "strconv"
- "time"
- )
- //var mysqlHost, mysqlUser, mysqlPwd, mysqlDb string
- //func init() {
- //
- // for _, sqlConfig := range global.CONFIG.Mysql.List {
- // if sqlConfig.AliasName == `index` {
- // // 找出用户
- // tmpList := strings.Split(sqlConfig.Dsn, ":")
- // mysqlUser = tmpList[0]
- // // 找出密码
- // tmpList = strings.Split(strings.Join(tmpList[1:], ":"), "@")
- // lenTmp := len(tmpList)
- // mysqlPwd = strings.Join(tmpList[:lenTmp-1], "@")
- // // 找出地址
- // tmpList = strings.Split(tmpList[lenTmp-1], "tcp(")
- // tmpList = strings.Split(tmpList[1], ")")
- // mysqlHost = tmpList[0]
- //
- // // 找出数据库名称
- // u, err := url.Parse(tmpList[1])
- // if err != nil {
- // panic(err) // 如果解析失败,处理错误
- // }
- //
- // // 获取Path部分的内容
- // mysqlDb = u.Path[1:]
- //
- // }
- // }
- //
- // if mysqlHost == "" {
- // panic("mysqlHost is empty")
- // }
- //
- // //fmt.Println("HOST:", mysqlHost)
- // //fmt.Println("user:", mysqlUser)
- // //fmt.Println("password:", mysqlPwd)
- //
- //}
- func ListenMysql() {
- var err error
- defer func() {
- if err != nil {
- fmt.Println("数据库监听服务异常,err:", err)
- }
- }()
- if global.CONFIG.Mysql.Binlog.Host == "" {
- panic("mysqlHost is empty")
- //err = errors.New("mysqlHost is empty")
- //return
- }
- if global.CONFIG.Mysql.Binlog.User == "" {
- panic("user is empty")
- }
- if global.CONFIG.Mysql.Binlog.Password == "" {
- panic("password is empty")
- }
- if global.CONFIG.Mysql.Binlog.Db == "" {
- panic("db is empty")
- }
- //includeTableRegex := []string{
- // "test_hz_data.edb_info",
- //}
- includeTableRegex := []string{
- global.CONFIG.Mysql.Binlog.Db + ".edb_info$",
- global.CONFIG.Mysql.Binlog.Db + ".edb_classify$",
- global.CONFIG.Mysql.Binlog.Db + ".base_from_mysteel_chemical_index$",
- global.CONFIG.Mysql.Binlog.Db + ".base_from_smm_index$",
- global.CONFIG.Mysql.Binlog.Db + ".edb_data*",
- }
- // 主从复制的身份id配置,必须全局唯一,如果没有配置的话,那么会随机生成一个
- serverId := global.CONFIG.Mysql.Binlog.ServerID
- if serverId == 0 {
- serverId = uint32(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(1000)) + 1001
- }
- cfg := &canal.Config{
- // 一个32位无符号整数,用于标识当前 Canal 实例在 MySQL 主从复制体系中的身份。这里使用了一个随机数生成器确保每次启动时分配的 ServerID 是唯一的(在1001到1099之间)。在实际生产环境中,你需要手动指定一个全局唯一的 ServerID。
- ServerID: serverId,
- // 指定 Canal 要连接的数据库类型,默认为 "mysql",表明这是一个 MySQL 数据库。
- Flavor: "mysql",
- // 设置 MySQL 服务器地址(主机名或 IP 地址)和端口,例如 "127.0.0.1:3306"。
- Addr: global.CONFIG.Mysql.Binlog.Host,
- User: global.CONFIG.Mysql.Binlog.User,
- Password: global.CONFIG.Mysql.Binlog.Password,
- // 如果设置为 true,Canal 将以原始二进制格式获取 binlog,否则将以解析后的 SQL 语句形式提供。
- //RawModeEnabled: false,
- // 是否启用半同步复制。当设置为 true 时,MySQL 主库在事务提交后会等待至少一个从库确认已接收并写入 binlog 才返回成功,提高了数据一致性。
- SemiSyncEnabled: false,
- // 是否将 MySQL 中的 decimal 类型字段解析为 Go 的 Decimal 类型,而不是 float 或者 string。如果业务中有精确小数计算的需求,应开启此选项以避免精度丢失问题。
- UseDecimal: true,
- // 用于控制初始数据导出的相关配置,在 Canal 启动时是否需要全量同步表数据。
- //Dump: dumpConf,
- // 正则表达式字符串,用于定义 Canal 应该监听哪些表的 binlog 事件。只有名称匹配该正则表达式的表才会被 Canal 同步处理。
- IncludeTableRegex: includeTableRegex,
- }
- // 校验mysql binlog format,目前仅支持row格式
- {
- binlogFormat, tmpErr := index.GetBinlogFormat()
- if tmpErr != nil {
- err = tmpErr
- return
- }
- if binlogFormat.Value != "ROW" {
- panic("mysql binlog format is not ROW")
- return
- }
- }
- // 获取上一次启动时的binlog文件名称和位置
- fileName, position, err := getBinlogNamePosition()
- if err != nil {
- return
- }
- // 修改记录本次启动时的binlog文件名称和位置
- modifyBinlogNamePosition(fileName, position)
- // 定时修改binlog文件名称和位置
- go timingModifyBinlogNamePosition()
- c, err := canal.NewCanal(cfg)
- if err != nil {
- fmt.Println("err:", err)
- return
- }
- global.FILE_LOG.Debug("记录上一次启动时的fileName:", fileName, ";position:", position)
- binlogHandler := &MyEventHandler{}
- binlogHandler.SetBinlogFileName(fileName, position)
- c.SetEventHandler(binlogHandler)
- //c.Run()
- pos := mysql.Position{
- Name: fileName,
- Pos: position,
- }
- err = c.RunFrom(pos)
- }
- // getBinlogNamePosition
- // @Description: 获取当前binlog文件名称和位置
- // @author: Roc
- // @datetime 2024-05-17 13:18:19
- // @return fileName string
- // @return position uint32
- // @return err error
- func getBinlogNamePosition() (fileName string, position uint32, err error) {
- // 优先从redis获取
- fileName = global.Rc.GetStr(utils.CACHE_MYSQL_MASTER_FILENAME)
- position64, err := global.Rc.GetUInt64(utils.CACHE_MYSQL_MASTER_POSITION)
- if err != nil {
- if err.Error() != utils.RedisNoKeyErr {
- panic("mysql binlog position is not found,err:" + err.Error())
- return
- }
- err = nil
- }
- position = uint32(position64)
- // 如果没有从redis中获取到上次监听到的binlog的文件名称,或者位置为0,则从mysql中获取,则从 MySQL 中获取最新的文件名和位置。
- if fileName == `` || position == 0 {
- // binlog文件名
- fileNameKey := index.BinlogFileNameKey
- fileNameLog, tmpErr := index.GetBusinessSysInteractionLogByKey(fileNameKey)
- if tmpErr == nil {
- fileName = fileNameLog.InteractionKey
- }
- // binlog位置
- positionKey := index.BinlogPositionKey
- positionLog, tmpErr := index.GetBusinessSysInteractionLogByKey(positionKey)
- if tmpErr == nil {
- positionStr := positionLog.InteractionKey
- positionInt, tmpErr := strconv.Atoi(positionStr)
- if tmpErr == nil {
- position = uint32(positionInt)
- }
- }
- }
- // 如果从表中没有取到数据,则从mysql中获取,则从 MySQL 中获取最新的文件名和位置。
- if fileName == `` || position == 0 {
- item, tmpErr := index.GetShowMaster()
- if tmpErr != nil {
- err = tmpErr
- return
- }
- fileName = item.File
- position = item.Position
- }
- return
- }
- // timingModifyBinlogNamePosition
- // @Description: 定时修改binlog文件名称和位置
- // @author: Roc
- // @datetime 2024-05-17 13:08:13
- func timingModifyBinlogNamePosition() {
- for {
- // 延时30s执行
- time.Sleep(30 * time.Second)
- // 获取最新的binlog文件名称和位置
- fileName, position, err := getBinlogNamePosition()
- if err != nil {
- return
- }
- if fileName != `` && position != 0 {
- // 修改记录本次启动时的binlog文件名称和位置
- modifyBinlogNamePosition(fileName, position)
- }
- }
- }
- // modifyBinlogNamePosition
- // @Description: 修改记录本次启动时的binlog文件名称和位置
- // @author: Roc
- // @datetime 2024-05-17 11:32:32
- // @param fileName string
- // @param position uint32
- // @return err error
- func modifyBinlogNamePosition(fileName string, position uint32) {
- var err error
- defer func() {
- if err != nil {
- global.FILE_LOG.Error("修改binlog文件名称和位置异常,fileName", fileName, ",position:", position, ",err:", err)
- }
- }()
- // fileName 变更
- fileNameKey := index.BinlogFileNameKey
- fileNameLog, err := index.GetBusinessSysInteractionLogByKey(fileNameKey)
- if err != nil {
- if !errors.Is(err, utils.ErrNoRow) {
- return
- }
- err = nil
- fileNameLog = &index.BusinessSysInteractionLog{
- //ID: 0,
- InteractionKey: fileNameKey,
- InteractionVal: fileName,
- Remark: "mysql中binlog的filename名称",
- ModifyTime: time.Now(),
- CreateTime: time.Now(),
- }
- err = fileNameLog.Create()
- if err != nil {
- return
- }
- } else {
- fileNameLog.InteractionVal = fileName
- fileNameLog.ModifyTime = time.Now()
- err = fileNameLog.Update([]string{"InteractionVal", "ModifyTime"})
- if err != nil {
- return
- }
- }
- // position 变更
- positionKey := index.BinlogPositionKey
- positionLog, err := index.GetBusinessSysInteractionLogByKey(positionKey)
- if err != nil {
- if !errors.Is(err, utils.ErrNoRow) {
- return
- }
- err = nil
- positionLog = &index.BusinessSysInteractionLog{
- //ID: 0,
- InteractionKey: positionKey,
- InteractionVal: fmt.Sprint(position),
- Remark: "mysql中binlog的position位置",
- ModifyTime: time.Now(),
- CreateTime: time.Now(),
- }
- err = positionLog.Create()
- if err != nil {
- return
- }
- } else {
- positionLog.InteractionVal = fmt.Sprint(position)
- positionLog.ModifyTime = time.Now()
- err = positionLog.Update([]string{"InteractionVal", "ModifyTime"})
- if err != nil {
- return
- }
- }
- return
- }
|