binlog.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. package binlog
  2. import (
  3. "eta/eta_bridge/global"
  4. "eta/eta_bridge/models/index"
  5. "eta/eta_bridge/utils"
  6. "fmt"
  7. "github.com/go-mysql-org/go-mysql/canal"
  8. "github.com/go-mysql-org/go-mysql/mysql"
  9. _ "github.com/go-sql-driver/mysql"
  10. "math/rand"
  11. "time"
  12. )
  13. //var mysqlHost, mysqlUser, mysqlPwd, mysqlDb string
  14. //func init() {
  15. //
  16. // for _, sqlConfig := range global.CONFIG.Mysql.List {
  17. // if sqlConfig.AliasName == `index` {
  18. // // 找出用户
  19. // tmpList := strings.Split(sqlConfig.Dsn, ":")
  20. // mysqlUser = tmpList[0]
  21. // // 找出密码
  22. // tmpList = strings.Split(strings.Join(tmpList[1:], ":"), "@")
  23. // lenTmp := len(tmpList)
  24. // mysqlPwd = strings.Join(tmpList[:lenTmp-1], "@")
  25. // // 找出地址
  26. // tmpList = strings.Split(tmpList[lenTmp-1], "tcp(")
  27. // tmpList = strings.Split(tmpList[1], ")")
  28. // mysqlHost = tmpList[0]
  29. //
  30. // // 找出数据库名称
  31. // u, err := url.Parse(tmpList[1])
  32. // if err != nil {
  33. // panic(err) // 如果解析失败,处理错误
  34. // }
  35. //
  36. // // 获取Path部分的内容
  37. // mysqlDb = u.Path[1:]
  38. //
  39. // }
  40. // }
  41. //
  42. // if mysqlHost == "" {
  43. // panic("mysqlHost is empty")
  44. // }
  45. //
  46. // //fmt.Println("HOST:", mysqlHost)
  47. // //fmt.Println("user:", mysqlUser)
  48. // //fmt.Println("password:", mysqlPwd)
  49. //
  50. //}
  51. func ListenMysql() {
  52. var err error
  53. defer func() {
  54. if err != nil {
  55. fmt.Println("数据库监听服务异常,err:", err)
  56. }
  57. }()
  58. if global.CONFIG.Mysql.Binlog.Host == "" {
  59. panic("mysqlHost is empty")
  60. //err = errors.New("mysqlHost is empty")
  61. //return
  62. }
  63. if global.CONFIG.Mysql.Binlog.User == "" {
  64. panic("user is empty")
  65. }
  66. if global.CONFIG.Mysql.Binlog.Password == "" {
  67. panic("password is empty")
  68. }
  69. if global.CONFIG.Mysql.Binlog.Db == "" {
  70. panic("db is empty")
  71. }
  72. //includeTableRegex := []string{
  73. // "test_hz_data.edb_info",
  74. //}
  75. includeTableRegex := []string{
  76. global.CONFIG.Mysql.Binlog.Db + ".edb_info$",
  77. global.CONFIG.Mysql.Binlog.Db + ".edb_classify$",
  78. global.CONFIG.Mysql.Binlog.Db + ".base_from_mysteel_chemical_index$",
  79. global.CONFIG.Mysql.Binlog.Db + ".base_from_smm_index$",
  80. global.CONFIG.Mysql.Binlog.Db + ".edb_data*",
  81. }
  82. // 主从复制的身份id配置,必须全局唯一,如果没有配置的话,那么会随机生成一个
  83. serverId := global.CONFIG.Mysql.Binlog.ServerID
  84. if serverId == 0 {
  85. serverId = uint32(rand.New(rand.NewSource(time.Now().Unix())).Intn(1000)) + 1001
  86. }
  87. cfg := &canal.Config{
  88. // 一个32位无符号整数,用于标识当前 Canal 实例在 MySQL 主从复制体系中的身份。这里使用了一个随机数生成器确保每次启动时分配的 ServerID 是唯一的(在1001到1099之间)。在实际生产环境中,你需要手动指定一个全局唯一的 ServerID。
  89. ServerID: serverId,
  90. // 指定 Canal 要连接的数据库类型,默认为 "mysql",表明这是一个 MySQL 数据库。
  91. Flavor: "mysql",
  92. // 设置 MySQL 服务器地址(主机名或 IP 地址)和端口,例如 "127.0.0.1:3306"。
  93. Addr: global.CONFIG.Mysql.Binlog.Host,
  94. User: global.CONFIG.Mysql.Binlog.User,
  95. Password: global.CONFIG.Mysql.Binlog.Password,
  96. // 如果设置为 true,Canal 将以原始二进制格式获取 binlog,否则将以解析后的 SQL 语句形式提供。
  97. //RawModeEnabled: false,
  98. // 是否启用半同步复制。当设置为 true 时,MySQL 主库在事务提交后会等待至少一个从库确认已接收并写入 binlog 才返回成功,提高了数据一致性。
  99. SemiSyncEnabled: false,
  100. // 是否将 MySQL 中的 decimal 类型字段解析为 Go 的 Decimal 类型,而不是 float 或者 string。如果业务中有精确小数计算的需求,应开启此选项以避免精度丢失问题。
  101. UseDecimal: true,
  102. // 用于控制初始数据导出的相关配置,在 Canal 启动时是否需要全量同步表数据。
  103. //Dump: dumpConf,
  104. // 正则表达式字符串,用于定义 Canal 应该监听哪些表的 binlog 事件。只有名称匹配该正则表达式的表才会被 Canal 同步处理。
  105. IncludeTableRegex: includeTableRegex,
  106. Logger: global.FILE_LOG,
  107. }
  108. // 校验mysql binlog format,目前仅支持row格式
  109. {
  110. binlogFormat, tmpErr := index.GetBinlogFormat()
  111. if tmpErr != nil {
  112. err = tmpErr
  113. return
  114. }
  115. if binlogFormat.Value != "ROW" {
  116. panic("mysql binlog format is not ROW")
  117. return
  118. }
  119. }
  120. var fileName string
  121. var position uint32
  122. fileName = global.Rc.GetStr(utils.CACHE_MYSQL_MASTER_FILENAME)
  123. position64, tmpErr := global.Rc.GetUInt64(utils.CACHE_MYSQL_MASTER_POSITION)
  124. if tmpErr != nil && tmpErr.Error() != utils.RedisNoKeyErr {
  125. panic("mysql binlog position is not found,err:" + tmpErr.Error())
  126. return
  127. }
  128. position = uint32(position64)
  129. // 如果没有从redis中获取到上次监听到的binlog的文件名称,或者位置为0,则从mysql中获取,则从 MySQL 中获取最新的文件名和位置。
  130. if fileName == `` || position == 0 {
  131. item, tmpErr := index.GetShowMaster()
  132. if tmpErr != nil {
  133. err = tmpErr
  134. return
  135. }
  136. fileName = item.File
  137. position = item.Position
  138. }
  139. c, err := canal.NewCanal(cfg)
  140. if err != nil {
  141. fmt.Println("err:", err)
  142. return
  143. }
  144. binlogHandler := &MyEventHandler{}
  145. binlogHandler.SetBinlogFileName(fileName, position)
  146. c.SetEventHandler(binlogHandler)
  147. //c.Run()
  148. pos := mysql.Position{
  149. Name: fileName,
  150. Pos: position,
  151. }
  152. err = c.RunFrom(pos)
  153. }