binlog.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. package binlog
  2. import (
  3. "errors"
  4. "eta/eta_bridge/global"
  5. "eta/eta_bridge/models/index"
  6. "eta/eta_bridge/utils"
  7. "fmt"
  8. "github.com/go-mysql-org/go-mysql/canal"
  9. "github.com/go-mysql-org/go-mysql/mysql"
  10. _ "github.com/go-sql-driver/mysql"
  11. "math/rand"
  12. "strconv"
  13. "time"
  14. )
  15. //var mysqlHost, mysqlUser, mysqlPwd, mysqlDb string
  16. //func init() {
  17. //
  18. // for _, sqlConfig := range global.CONFIG.Mysql.List {
  19. // if sqlConfig.AliasName == `index` {
  20. // // 找出用户
  21. // tmpList := strings.Split(sqlConfig.Dsn, ":")
  22. // mysqlUser = tmpList[0]
  23. // // 找出密码
  24. // tmpList = strings.Split(strings.Join(tmpList[1:], ":"), "@")
  25. // lenTmp := len(tmpList)
  26. // mysqlPwd = strings.Join(tmpList[:lenTmp-1], "@")
  27. // // 找出地址
  28. // tmpList = strings.Split(tmpList[lenTmp-1], "tcp(")
  29. // tmpList = strings.Split(tmpList[1], ")")
  30. // mysqlHost = tmpList[0]
  31. //
  32. // // 找出数据库名称
  33. // u, err := url.Parse(tmpList[1])
  34. // if err != nil {
  35. // panic(err) // 如果解析失败,处理错误
  36. // }
  37. //
  38. // // 获取Path部分的内容
  39. // mysqlDb = u.Path[1:]
  40. //
  41. // }
  42. // }
  43. //
  44. // if mysqlHost == "" {
  45. // panic("mysqlHost is empty")
  46. // }
  47. //
  48. // //fmt.Println("HOST:", mysqlHost)
  49. // //fmt.Println("user:", mysqlUser)
  50. // //fmt.Println("password:", mysqlPwd)
  51. //
  52. //}
  53. func ListenMysql() {
  54. var err error
  55. defer func() {
  56. if err != nil {
  57. fmt.Println("数据库监听服务异常,err:", err)
  58. }
  59. }()
  60. if global.CONFIG.Mysql.Binlog.Host == "" {
  61. panic("mysqlHost is empty")
  62. //err = errors.New("mysqlHost is empty")
  63. //return
  64. }
  65. if global.CONFIG.Mysql.Binlog.User == "" {
  66. panic("user is empty")
  67. }
  68. if global.CONFIG.Mysql.Binlog.Password == "" {
  69. panic("password is empty")
  70. }
  71. if global.CONFIG.Mysql.Binlog.Db == "" {
  72. panic("db is empty")
  73. }
  74. //includeTableRegex := []string{
  75. // "test_hz_data.edb_info",
  76. //}
  77. includeTableRegex := []string{
  78. global.CONFIG.Mysql.Binlog.Db + ".edb_info$",
  79. global.CONFIG.Mysql.Binlog.Db + ".edb_classify$",
  80. global.CONFIG.Mysql.Binlog.Db + ".base_from_mysteel_chemical_index$",
  81. global.CONFIG.Mysql.Binlog.Db + ".base_from_smm_index$",
  82. global.CONFIG.Mysql.Binlog.Db + ".edb_data*",
  83. }
  84. // 主从复制的身份id配置,必须全局唯一,如果没有配置的话,那么会随机生成一个
  85. serverId := global.CONFIG.Mysql.Binlog.ServerID
  86. if serverId == 0 {
  87. serverId = uint32(rand.New(rand.NewSource(time.Now().Unix())).Intn(1000)) + 1001
  88. }
  89. cfg := &canal.Config{
  90. // 一个32位无符号整数,用于标识当前 Canal 实例在 MySQL 主从复制体系中的身份。这里使用了一个随机数生成器确保每次启动时分配的 ServerID 是唯一的(在1001到1099之间)。在实际生产环境中,你需要手动指定一个全局唯一的 ServerID。
  91. ServerID: serverId,
  92. // 指定 Canal 要连接的数据库类型,默认为 "mysql",表明这是一个 MySQL 数据库。
  93. Flavor: "mysql",
  94. // 设置 MySQL 服务器地址(主机名或 IP 地址)和端口,例如 "127.0.0.1:3306"。
  95. Addr: global.CONFIG.Mysql.Binlog.Host,
  96. User: global.CONFIG.Mysql.Binlog.User,
  97. Password: global.CONFIG.Mysql.Binlog.Password,
  98. // 如果设置为 true,Canal 将以原始二进制格式获取 binlog,否则将以解析后的 SQL 语句形式提供。
  99. //RawModeEnabled: false,
  100. // 是否启用半同步复制。当设置为 true 时,MySQL 主库在事务提交后会等待至少一个从库确认已接收并写入 binlog 才返回成功,提高了数据一致性。
  101. SemiSyncEnabled: false,
  102. // 是否将 MySQL 中的 decimal 类型字段解析为 Go 的 Decimal 类型,而不是 float 或者 string。如果业务中有精确小数计算的需求,应开启此选项以避免精度丢失问题。
  103. UseDecimal: true,
  104. // 用于控制初始数据导出的相关配置,在 Canal 启动时是否需要全量同步表数据。
  105. //Dump: dumpConf,
  106. // 正则表达式字符串,用于定义 Canal 应该监听哪些表的 binlog 事件。只有名称匹配该正则表达式的表才会被 Canal 同步处理。
  107. IncludeTableRegex: includeTableRegex,
  108. }
  109. // 校验mysql binlog format,目前仅支持row格式
  110. {
  111. binlogFormat, tmpErr := index.GetBinlogFormat()
  112. if tmpErr != nil {
  113. err = tmpErr
  114. return
  115. }
  116. if binlogFormat.Value != "ROW" {
  117. panic("mysql binlog format is not ROW")
  118. return
  119. }
  120. }
  121. // 获取上一次启动时的binlog文件名称和位置
  122. fileName, position, err := getBinlogNamePosition()
  123. if err != nil {
  124. return
  125. }
  126. // 修改记录本次启动时的binlog文件名称和位置
  127. modifyBinlogNamePosition(fileName, position)
  128. // 定时修改binlog文件名称和位置
  129. go timingModifyBinlogNamePosition()
  130. c, err := canal.NewCanal(cfg)
  131. if err != nil {
  132. fmt.Println("err:", err)
  133. return
  134. }
  135. global.FILE_LOG.Debug("记录上一次启动时的fileName:", fileName, ";position:", position)
  136. binlogHandler := &MyEventHandler{}
  137. binlogHandler.SetBinlogFileName(fileName, position)
  138. c.SetEventHandler(binlogHandler)
  139. //c.Run()
  140. pos := mysql.Position{
  141. Name: fileName,
  142. Pos: position,
  143. }
  144. err = c.RunFrom(pos)
  145. }
  146. // getBinlogNamePosition
  147. // @Description: 获取当前binlog文件名称和位置
  148. // @author: Roc
  149. // @datetime 2024-05-17 13:18:19
  150. // @return fileName string
  151. // @return position uint32
  152. // @return err error
  153. func getBinlogNamePosition() (fileName string, position uint32, err error) {
  154. // 优先从redis获取
  155. fileName = global.Rc.GetStr(utils.CACHE_MYSQL_MASTER_FILENAME)
  156. position64, err := global.Rc.GetUInt64(utils.CACHE_MYSQL_MASTER_POSITION)
  157. if err != nil {
  158. if err.Error() != utils.RedisNoKeyErr {
  159. panic("mysql binlog position is not found,err:" + err.Error())
  160. return
  161. }
  162. err = nil
  163. }
  164. position = uint32(position64)
  165. // 如果没有从redis中获取到上次监听到的binlog的文件名称,或者位置为0,则从mysql中获取,则从 MySQL 中获取最新的文件名和位置。
  166. if fileName == `` || position == 0 {
  167. // binlog文件名
  168. fileNameKey := index.BinlogFileNameKey
  169. fileNameLog, tmpErr := index.GetBusinessSysInteractionLogByKey(fileNameKey)
  170. if tmpErr == nil {
  171. fileName = fileNameLog.InteractionKey
  172. }
  173. // binlog位置
  174. positionKey := index.BinlogPositionKey
  175. positionLog, tmpErr := index.GetBusinessSysInteractionLogByKey(positionKey)
  176. if tmpErr == nil {
  177. positionStr := positionLog.InteractionKey
  178. positionInt, tmpErr := strconv.Atoi(positionStr)
  179. if tmpErr == nil {
  180. position = uint32(positionInt)
  181. }
  182. }
  183. }
  184. // 如果从表中没有取到数据,则从mysql中获取,则从 MySQL 中获取最新的文件名和位置。
  185. if fileName == `` || position == 0 {
  186. item, tmpErr := index.GetShowMaster()
  187. if tmpErr != nil {
  188. err = tmpErr
  189. return
  190. }
  191. fileName = item.File
  192. position = item.Position
  193. }
  194. return
  195. }
  196. // timingModifyBinlogNamePosition
  197. // @Description: 定时修改binlog文件名称和位置
  198. // @author: Roc
  199. // @datetime 2024-05-17 13:08:13
  200. func timingModifyBinlogNamePosition() {
  201. for {
  202. // 延时30s执行
  203. time.Sleep(30 * time.Second)
  204. // 获取最新的binlog文件名称和位置
  205. fileName, position, err := getBinlogNamePosition()
  206. if err != nil {
  207. return
  208. }
  209. if fileName != `` && position != 0 {
  210. // 修改记录本次启动时的binlog文件名称和位置
  211. modifyBinlogNamePosition(fileName, position)
  212. }
  213. }
  214. }
  215. // modifyBinlogNamePosition
  216. // @Description: 修改记录本次启动时的binlog文件名称和位置
  217. // @author: Roc
  218. // @datetime 2024-05-17 11:32:32
  219. // @param fileName string
  220. // @param position uint32
  221. // @return err error
  222. func modifyBinlogNamePosition(fileName string, position uint32) {
  223. var err error
  224. defer func() {
  225. if err != nil {
  226. global.FILE_LOG.Error("修改binlog文件名称和位置异常,fileName", fileName, ",position:", position, ",err:", err)
  227. }
  228. }()
  229. // fileName 变更
  230. fileNameKey := index.BinlogFileNameKey
  231. fileNameLog, err := index.GetBusinessSysInteractionLogByKey(fileNameKey)
  232. if err != nil {
  233. if !errors.Is(err, utils.ErrNoRow) {
  234. return
  235. }
  236. err = nil
  237. fileNameLog = &index.BusinessSysInteractionLog{
  238. //ID: 0,
  239. InteractionKey: fileNameKey,
  240. InteractionVal: fileName,
  241. Remark: "mysql中binlog的filename名称",
  242. ModifyTime: time.Now(),
  243. CreateTime: time.Now(),
  244. }
  245. err = fileNameLog.Create()
  246. if err != nil {
  247. return
  248. }
  249. } else {
  250. fileNameLog.InteractionVal = fileName
  251. fileNameLog.ModifyTime = time.Now()
  252. err = fileNameLog.Update([]string{"InteractionVal", "ModifyTime"})
  253. if err != nil {
  254. return
  255. }
  256. }
  257. // position 变更
  258. positionKey := index.BinlogPositionKey
  259. positionLog, err := index.GetBusinessSysInteractionLogByKey(positionKey)
  260. if err != nil {
  261. if !errors.Is(err, utils.ErrNoRow) {
  262. return
  263. }
  264. err = nil
  265. positionLog = &index.BusinessSysInteractionLog{
  266. //ID: 0,
  267. InteractionKey: positionKey,
  268. InteractionVal: fmt.Sprint(position),
  269. Remark: "mysql中binlog的position位置",
  270. ModifyTime: time.Now(),
  271. CreateTime: time.Now(),
  272. }
  273. err = positionLog.Create()
  274. if err != nil {
  275. return
  276. }
  277. } else {
  278. positionLog.InteractionVal = fmt.Sprint(position)
  279. positionLog.ModifyTime = time.Now()
  280. err = positionLog.Update([]string{"InteractionVal", "ModifyTime"})
  281. if err != nil {
  282. return
  283. }
  284. }
  285. return
  286. }