binlog.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. package binlog
  2. import (
  3. "eta/eta_api/models/binlog"
  4. "eta/eta_api/utils"
  5. "fmt"
  6. "math/rand"
  7. "strconv"
  8. "time"
  9. "github.com/go-mysql-org/go-mysql/canal"
  10. "github.com/go-mysql-org/go-mysql/mysql"
  11. _ "github.com/go-sql-driver/mysql"
  12. )
  13. func ListenMysql() {
  14. var err error
  15. defer func() {
  16. if err != nil {
  17. fmt.Println("数据库监听服务异常,err:", err)
  18. }
  19. }()
  20. if utils.MYSQL_DATA_BINLOG_URL == "" {
  21. panic("mysql url is empty")
  22. }
  23. if utils.MYSQL_DATA_BINLOG_USER == "" {
  24. panic("mysql user is empty")
  25. }
  26. if utils.MYSQL_DATA_BINLOG_PWD == "" {
  27. panic("mysql password is empty")
  28. }
  29. if utils.MYSQL_DATA_BINLOG_DB == "" {
  30. panic("mysql db is empty")
  31. }
  32. includeTableRegex := []string{
  33. utils.MYSQL_DATA_BINLOG_DB + ".edb_info$",
  34. // utils.MYSQL_DATA_BINLOG_DB + ".edb_classify$",
  35. // utils.MYSQL_DATA_BINLOG_DB + ".base_from_mysteel_chemical_index$",
  36. // utils.MYSQL_DATA_BINLOG_DB + ".base_from_smm_index$",
  37. // utils.MYSQL_DATA_BINLOG_DB + ".edb_data*",
  38. }
  39. // 主从复制的身份id配置,必须全局唯一,如果没有配置的话,那么会随机生成一个
  40. var serverId uint32
  41. if utils.MYSQL_DATA_BINLOG_SERVER_ID != "" {
  42. id, _ := strconv.ParseUint(utils.MYSQL_DATA_BINLOG_SERVER_ID, 10, 32)
  43. serverId = uint32(id)
  44. }
  45. if serverId == 0 {
  46. serverId = uint32(rand.New(rand.NewSource(time.Now().Unix())).Intn(1000)) + 1001
  47. }
  48. cfg := &canal.Config{
  49. // 一个32位无符号整数,用于标识当前 Canal 实例在 MySQL 主从复制体系中的身份。这里使用了一个随机数生成器确保每次启动时分配的 ServerID 是唯一的(在1001到1099之间)。在实际生产环境中,你需要手动指定一个全局唯一的 ServerID。
  50. ServerID: serverId,
  51. // 指定 Canal 要连接的数据库类型,默认为 "mysql",表明这是一个 MySQL 数据库。
  52. Flavor: "mysql",
  53. // 设置 MySQL 服务器地址(主机名或 IP 地址)和端口,例如 "127.0.0.1:3306"。
  54. Addr: utils.MYSQL_DATA_BINLOG_URL,
  55. User: utils.MYSQL_DATA_BINLOG_USER,
  56. Password: utils.MYSQL_DATA_BINLOG_PWD,
  57. // 如果设置为 true,Canal 将以原始二进制格式获取 binlog,否则将以解析后的 SQL 语句形式提供。
  58. //RawModeEnabled: false,
  59. // 是否启用半同步复制。当设置为 true 时,MySQL 主库在事务提交后会等待至少一个从库确认已接收并写入 binlog 才返回成功,提高了数据一致性。
  60. SemiSyncEnabled: false,
  61. // 是否将 MySQL 中的 decimal 类型字段解析为 Go 的 Decimal 类型,而不是 float 或者 string。如果业务中有精确小数计算的需求,应开启此选项以避免精度丢失问题。
  62. UseDecimal: true,
  63. // 用于控制初始数据导出的相关配置,在 Canal 启动时是否需要全量同步表数据。
  64. //Dump: dumpConf,
  65. // 正则表达式字符串,用于定义 Canal 应该监听哪些表的 binlog 事件。只有名称匹配该正则表达式的表才会被 Canal 同步处理。
  66. IncludeTableRegex: includeTableRegex,
  67. }
  68. // 校验mysql binlog format,目前仅支持row格式
  69. {
  70. binlogFormat, tmpErr := binlog.GetBinlogFormat()
  71. if tmpErr != nil {
  72. err = tmpErr
  73. return
  74. }
  75. if binlogFormat.Value != "ROW" {
  76. panic("mysql binlog format is not ROW")
  77. }
  78. }
  79. // 获取上一次启动时的binlog文件名称和位置
  80. fileName, position, err := getBinlogNamePosition()
  81. if err != nil {
  82. return
  83. }
  84. // 修改记录本次启动时的binlog文件名称和位置
  85. modifyBinlogNamePosition(fileName, position)
  86. // 定时修改binlog文件名称和位置
  87. go timingModifyBinlogNamePosition()
  88. c, err := canal.NewCanal(cfg)
  89. if err != nil {
  90. fmt.Println("err:", err)
  91. return
  92. }
  93. utils.FileLog.Debug("记录上一次启动时的fileName:", fileName, ";position:", position)
  94. binlogHandler := &EdbEventHandler{}
  95. binlogHandler.SetBinlogFileName(fileName, position)
  96. c.SetEventHandler(binlogHandler)
  97. //c.Run()
  98. // 同步到redis
  99. go binlogHandler.SyncToRedis()
  100. pos := mysql.Position{
  101. Name: fileName,
  102. Pos: position,
  103. }
  104. err = c.RunFrom(pos)
  105. }
  106. // getBinlogNamePosition
  107. // @Description: 获取当前binlog文件名称和位置
  108. // @author: Roc
  109. // @datetime 2024-05-17 13:18:19
  110. // @return fileName string
  111. // @return position uint32
  112. // @return err error
  113. func getBinlogNamePosition() (fileName string, position uint32, err error) {
  114. // 优先从redis获取
  115. fileName = utils.Rc.GetStr(utils.CACHE_MYSQL_DATA_FILENAME)
  116. position64, err := utils.Rc.GetUInt64(utils.CACHE_MYSQL_DATA_POSITION)
  117. if err != nil {
  118. if err.Error() != utils.RedisNoKeyErr {
  119. panic("mysql binlog position is not found,err:" + err.Error())
  120. return
  121. }
  122. err = nil
  123. }
  124. position = uint32(position64)
  125. // 如果没有从redis中获取到上次监听到的binlog的文件名称,或者位置为0,则从mysql中获取,则从 MySQL 中获取最新的文件名和位置。
  126. if fileName == `` || position == 0 {
  127. // binlog文件名
  128. fileNameKey := binlog.BinlogFileNameKey
  129. fileNameLog, tmpErr := binlog.GetBusinessSysInteractionLogByKey(fileNameKey)
  130. if tmpErr == nil {
  131. fileName = fileNameLog.InteractionKey
  132. }
  133. // binlog位置
  134. positionKey := binlog.BinlogPositionKey
  135. positionLog, tmpErr := binlog.GetBusinessSysInteractionLogByKey(positionKey)
  136. if tmpErr == nil {
  137. positionStr := positionLog.InteractionKey
  138. positionInt, tmpErr := strconv.Atoi(positionStr)
  139. if tmpErr == nil {
  140. position = uint32(positionInt)
  141. }
  142. }
  143. }
  144. // 从mysql中获取最新的binlog文件名称和位置,名称不一致则以mysql中为准
  145. if fileName == `` || position == 0 {
  146. item, tmpErr := binlog.GetShowMaster()
  147. if tmpErr != nil {
  148. err = tmpErr
  149. return
  150. }
  151. fileName = item.File
  152. position = item.Position
  153. }
  154. return
  155. }
  156. // modifyBinlogNamePosition
  157. // @Description: 修改记录本次启动时的binlog文件名称和位置
  158. // @author: Roc
  159. // @datetime 2024-05-17 11:32:32
  160. // @param fileName string
  161. // @param position uint32
  162. // @return err error
  163. func modifyBinlogNamePosition(fileName string, position uint32) {
  164. var err error
  165. defer func() {
  166. if err != nil {
  167. utils.FileLog.Error("修改binlog文件名称和位置异常,fileName", fileName, ",position:", position, ",err:", err)
  168. }
  169. }()
  170. // fileName 变更
  171. fileNameKey := binlog.BinlogFileNameKey
  172. fileNameLog, err := binlog.GetBusinessSysInteractionLogByKey(fileNameKey)
  173. if err != nil {
  174. if err.Error() != utils.ErrNoRow() {
  175. return
  176. }
  177. err = nil
  178. fileNameLog = &binlog.BusinessSysInteractionLog{
  179. //ID: 0,
  180. InteractionKey: fileNameKey,
  181. InteractionVal: fileName,
  182. Remark: "mysql中binlog的filename名称",
  183. ModifyTime: time.Now(),
  184. CreateTime: time.Now(),
  185. }
  186. err = fileNameLog.Create()
  187. if err != nil {
  188. return
  189. }
  190. } else {
  191. fileNameLog.InteractionVal = fileName
  192. fileNameLog.ModifyTime = time.Now()
  193. err = fileNameLog.Update([]string{"InteractionVal", "ModifyTime"})
  194. if err != nil {
  195. return
  196. }
  197. }
  198. // position 变更
  199. positionKey := binlog.BinlogPositionKey
  200. positionLog, err := binlog.GetBusinessSysInteractionLogByKey(positionKey)
  201. if err != nil {
  202. if err.Error() != utils.ErrNoRow() {
  203. return
  204. }
  205. err = nil
  206. positionLog = &binlog.BusinessSysInteractionLog{
  207. //ID: 0,
  208. InteractionKey: positionKey,
  209. InteractionVal: fmt.Sprint(position),
  210. Remark: "mysql中binlog的position位置",
  211. ModifyTime: time.Now(),
  212. CreateTime: time.Now(),
  213. }
  214. err = positionLog.Create()
  215. if err != nil {
  216. return
  217. }
  218. } else {
  219. positionLog.InteractionVal = fmt.Sprint(position)
  220. positionLog.ModifyTime = time.Now()
  221. err = positionLog.Update([]string{"InteractionVal", "ModifyTime"})
  222. if err != nil {
  223. return
  224. }
  225. }
  226. return
  227. }
  228. // timingModifyBinlogNamePosition
  229. // @Description: 定时修改binlog文件名称和位置
  230. // @author: Roc
  231. // @datetime 2024-05-17 13:08:13
  232. func timingModifyBinlogNamePosition() {
  233. for {
  234. // 延时30s执行
  235. time.Sleep(30 * time.Second)
  236. // 获取最新的binlog文件名称和位置
  237. fileName, position, err := getBinlogNamePosition()
  238. if err != nil {
  239. return
  240. }
  241. if fileName != `` && position != 0 {
  242. // 修改记录本次启动时的binlog文件名称和位置
  243. modifyBinlogNamePosition(fileName, position)
  244. }
  245. }
  246. }