package binlog import ( "eta/eta_api/models/binlog" "eta/eta_api/utils" "fmt" "math/rand" "strconv" "time" "github.com/go-mysql-org/go-mysql/canal" "github.com/go-mysql-org/go-mysql/mysql" _ "github.com/go-sql-driver/mysql" ) func ListenMysql() { var err error defer func() { if err != nil { fmt.Println("数据库监听服务异常,err:", err) } }() if utils.MYSQL_DATA_BINLOG_URL == "" { panic("mysql url is empty") } if utils.MYSQL_DATA_BINLOG_USER == "" { panic("mysql user is empty") } if utils.MYSQL_DATA_BINLOG_PWD == "" { panic("mysql password is empty") } if utils.MYSQL_DATA_BINLOG_DB == "" { panic("mysql db is empty") } includeTableRegex := []string{ utils.MYSQL_DATA_BINLOG_DB + ".edb_info$", // utils.MYSQL_DATA_BINLOG_DB + ".edb_classify$", // utils.MYSQL_DATA_BINLOG_DB + ".base_from_mysteel_chemical_index$", // utils.MYSQL_DATA_BINLOG_DB + ".base_from_smm_index$", // utils.MYSQL_DATA_BINLOG_DB + ".edb_data*", // 数据源 utils.MYSQL_DATA_BINLOG_DB + ".base_from_rzd_index$", utils.MYSQL_DATA_BINLOG_DB + ".base_from_hisugar_index$", utils.MYSQL_DATA_BINLOG_DB + ".base_from_ly_index$", utils.MYSQL_DATA_BINLOG_DB + ".base_from_sci_hq_index$", utils.MYSQL_DATA_BINLOG_DB + ".base_from_ths_hf_index$", utils.MYSQL_DATA_BINLOG_DB + ".base_from_oilchem_index$", utils.MYSQL_DATA_BINLOG_DB + ".base_from_ccf_index$", utils.MYSQL_DATA_BINLOG_DB + ".base_from_mysteel_chemical_index$", utils.MYSQL_DATA_BINLOG_DB + ".base_from_smm_index$", utils.MYSQL_DATA_BINLOG_DB + ".base_from_baiinfo_index$", utils.MYSQL_DATA_BINLOG_DB + ".base_from_sci_index$", utils.MYSQL_DATA_BINLOG_DB + ".base_from_coalmine_mapping$", utils.MYSQL_DATA_BINLOG_DB + ".base_from_eia_steo_index$", utils.MYSQL_DATA_BINLOG_DB + ".base_from_icpi_index$", utils.MYSQL_DATA_BINLOG_DB + ".base_from_yongyi_index$", utils.MYSQL_DATA_BINLOG_DB + ".base_from_fenwei_index$", utils.MYSQL_DATA_BINLOG_DB + ".base_from_sci99_index$", utils.MYSQL_DATA_BINLOG_DB + ".base_from_bloomberg_index$", utils.MYSQL_DATA_BINLOG_DB + ".base_from_mtjh_mapping$", utils.MYSQL_DATA_BINLOG_DB + ".base_from_usda_fas_index$", utils.MYSQL_DATA_BINLOG_DB + ".base_from_business_index$", } // 监听手工指标库和钢联库 if utils.MYSQL_DATA_BINLOG_DB_EDB != "" { includeTableRegex = append(includeTableRegex, utils.MYSQL_DATA_BINLOG_DB_EDB+".edbinfo$") } if utils.MYSQL_DATA_BINLOG_DB_GL != "" { includeTableRegex = append(includeTableRegex, utils.MYSQL_DATA_BINLOG_DB_GL+".mb_index_main_info$") } // 主从复制的身份id配置,必须全局唯一,如果没有配置的话,那么会随机生成一个 var serverId uint32 if utils.MYSQL_DATA_BINLOG_SERVER_ID != "" { id, _ := strconv.ParseUint(utils.MYSQL_DATA_BINLOG_SERVER_ID, 10, 32) serverId = uint32(id) } if serverId == 0 { serverId = uint32(rand.New(rand.NewSource(time.Now().Unix())).Intn(1000)) + 1001 } cfg := &canal.Config{ // 一个32位无符号整数,用于标识当前 Canal 实例在 MySQL 主从复制体系中的身份。这里使用了一个随机数生成器确保每次启动时分配的 ServerID 是唯一的(在1001到1099之间)。在实际生产环境中,你需要手动指定一个全局唯一的 ServerID。 ServerID: serverId, // 指定 Canal 要连接的数据库类型,默认为 "mysql",表明这是一个 MySQL 数据库。 Flavor: "mysql", // 设置 MySQL 服务器地址(主机名或 IP 地址)和端口,例如 "127.0.0.1:3306"。 Addr: utils.MYSQL_DATA_BINLOG_URL, User: utils.MYSQL_DATA_BINLOG_USER, Password: utils.MYSQL_DATA_BINLOG_PWD, // 如果设置为 true,Canal 将以原始二进制格式获取 binlog,否则将以解析后的 SQL 语句形式提供。 //RawModeEnabled: false, // 是否启用半同步复制。当设置为 true 时,MySQL 主库在事务提交后会等待至少一个从库确认已接收并写入 binlog 才返回成功,提高了数据一致性。 SemiSyncEnabled: false, // 是否将 MySQL 中的 decimal 类型字段解析为 Go 的 Decimal 类型,而不是 float 或者 string。如果业务中有精确小数计算的需求,应开启此选项以避免精度丢失问题。 UseDecimal: true, // 用于控制初始数据导出的相关配置,在 Canal 启动时是否需要全量同步表数据。 //Dump: dumpConf, // 正则表达式字符串,用于定义 Canal 应该监听哪些表的 binlog 事件。只有名称匹配该正则表达式的表才会被 Canal 同步处理。 IncludeTableRegex: includeTableRegex, } // 校验mysql binlog format,目前仅支持row格式 { binlogFormat, tmpErr := binlog.GetBinlogFormat() if tmpErr != nil { err = tmpErr return } if binlogFormat.Value != "ROW" { panic("mysql binlog format is not ROW") } } // 获取上一次启动时的binlog文件名称和位置 fileName, position, err := getBinlogNamePosition() if err != nil { return } // 修改记录本次启动时的binlog文件名称和位置 modifyBinlogNamePosition(fileName, position) // 定时修改binlog文件名称和位置 go timingModifyBinlogNamePosition() c, err := canal.NewCanal(cfg) if err != nil { fmt.Println("err:", err) return } utils.FileLog.Debug("记录上一次启动时的fileName:", fileName, ";position:", position) binlogHandler := &EdbEventHandler{} binlogHandler.SetBinlogFileName(fileName, position) c.SetEventHandler(binlogHandler) //c.Run() // 同步到redis go binlogHandler.SyncToRedis() pos := mysql.Position{ Name: fileName, Pos: position, } err = c.RunFrom(pos) } // getBinlogNamePosition // @Description: 获取当前binlog文件名称和位置 // @author: Roc // @datetime 2024-05-17 13:18:19 // @return fileName string // @return position uint32 // @return err error func getBinlogNamePosition() (fileName string, position uint32, err error) { // 优先从redis获取 fileName = utils.Rc.GetStr(utils.CACHE_MYSQL_DATA_FILENAME) position64, err := utils.Rc.GetUInt64(utils.CACHE_MYSQL_DATA_POSITION) if err != nil { if err.Error() != utils.RedisNoKeyErr { panic("mysql binlog position is not found,err:" + err.Error()) return } err = nil } position = uint32(position64) // 如果没有从redis中获取到上次监听到的binlog的文件名称,或者位置为0,则从mysql中获取,则从 MySQL 中获取最新的文件名和位置。 if fileName == `` || position == 0 { // binlog文件名 fileNameKey := binlog.BinlogFileNameKey fileNameLog, tmpErr := binlog.GetBusinessSysInteractionLogByKey(fileNameKey) if tmpErr == nil { fileName = fileNameLog.InteractionKey } // binlog位置 positionKey := binlog.BinlogPositionKey positionLog, tmpErr := binlog.GetBusinessSysInteractionLogByKey(positionKey) if tmpErr == nil { positionStr := positionLog.InteractionKey positionInt, tmpErr := strconv.Atoi(positionStr) if tmpErr == nil { position = uint32(positionInt) } } } // 从mysql中获取最新的binlog文件名称和位置,名称不一致则以mysql中为准 if fileName == `` || position == 0 { item, tmpErr := binlog.GetShowMaster() if tmpErr != nil { err = tmpErr return } fileName = item.File position = item.Position } return } // modifyBinlogNamePosition // @Description: 修改记录本次启动时的binlog文件名称和位置 // @author: Roc // @datetime 2024-05-17 11:32:32 // @param fileName string // @param position uint32 // @return err error func modifyBinlogNamePosition(fileName string, position uint32) { var err error defer func() { if err != nil { utils.FileLog.Error("修改binlog文件名称和位置异常,fileName", fileName, ",position:", position, ",err:", err) } }() // fileName 变更 fileNameKey := binlog.BinlogFileNameKey fileNameLog, err := binlog.GetBusinessSysInteractionLogByKey(fileNameKey) if err != nil { if err.Error() != utils.ErrNoRow() { return } err = nil fileNameLog = &binlog.BusinessSysInteractionLog{ //ID: 0, InteractionKey: fileNameKey, InteractionVal: fileName, Remark: "mysql中binlog的filename名称", ModifyTime: time.Now(), CreateTime: time.Now(), } err = fileNameLog.Create() if err != nil { return } } else { fileNameLog.InteractionVal = fileName fileNameLog.ModifyTime = time.Now() err = fileNameLog.Update([]string{"InteractionVal", "ModifyTime"}) if err != nil { return } } // position 变更 positionKey := binlog.BinlogPositionKey positionLog, err := binlog.GetBusinessSysInteractionLogByKey(positionKey) if err != nil { if err.Error() != utils.ErrNoRow() { return } err = nil positionLog = &binlog.BusinessSysInteractionLog{ //ID: 0, InteractionKey: positionKey, InteractionVal: fmt.Sprint(position), Remark: "mysql中binlog的position位置", ModifyTime: time.Now(), CreateTime: time.Now(), } err = positionLog.Create() if err != nil { return } } else { positionLog.InteractionVal = fmt.Sprint(position) positionLog.ModifyTime = time.Now() err = positionLog.Update([]string{"InteractionVal", "ModifyTime"}) if err != nil { return } } return } // timingModifyBinlogNamePosition // @Description: 定时修改binlog文件名称和位置 // @author: Roc // @datetime 2024-05-17 13:08:13 func timingModifyBinlogNamePosition() { for { // 延时30s执行 time.Sleep(30 * time.Second) // 获取最新的binlog文件名称和位置 fileName, position, err := getBinlogNamePosition() if err != nil { return } if fileName != `` && position != 0 { // 修改记录本次启动时的binlog文件名称和位置 modifyBinlogNamePosition(fileName, position) } } }