|
@@ -1,6 +1,7 @@
|
|
|
package binlog
|
|
|
|
|
|
import (
|
|
|
+ "errors"
|
|
|
"eta/eta_bridge/global"
|
|
|
"eta/eta_bridge/models/index"
|
|
|
"eta/eta_bridge/utils"
|
|
@@ -134,6 +135,8 @@ func ListenMysql() {
|
|
|
}
|
|
|
// 修改记录本次启动时的binlog文件名称和位置
|
|
|
modifyBinlogNamePosition(fileName, position)
|
|
|
+ // 定时修改binlog文件名称和位置
|
|
|
+ go timingModifyBinlogNamePosition()
|
|
|
|
|
|
c, err := canal.NewCanal(cfg)
|
|
|
if err != nil {
|
|
@@ -153,9 +156,6 @@ func ListenMysql() {
|
|
|
Pos: position,
|
|
|
}
|
|
|
err = c.RunFrom(pos)
|
|
|
-
|
|
|
- // 定时修改binlog文件名称和位置
|
|
|
- go timingModifyBinlogNamePosition()
|
|
|
}
|
|
|
|
|
|
// getBinlogNamePosition
|
|
@@ -197,7 +197,7 @@ func getBinlogNamePosition() (fileName string, position uint32, err error) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // 如果没有从redis中获取到上次监听到的binlog的文件名称,或者位置为0,则从mysql中获取,则从 MySQL 中获取最新的文件名和位置。
|
|
|
+ // 如果从表中没有取到数据,则从mysql中获取,则从 MySQL 中获取最新的文件名和位置。
|
|
|
if fileName == `` || position == 0 {
|
|
|
item, tmpErr := index.GetShowMaster()
|
|
|
if tmpErr != nil {
|
|
@@ -252,7 +252,7 @@ func modifyBinlogNamePosition(fileName string, position uint32) {
|
|
|
fileNameKey := index.BinlogFileNameKey
|
|
|
fileNameLog, err := index.GetBusinessSysInteractionLogByKey(fileNameKey)
|
|
|
if err != nil {
|
|
|
- if err != utils.ErrNoRow {
|
|
|
+ if !errors.Is(err, utils.ErrNoRow) {
|
|
|
return
|
|
|
}
|
|
|
err = nil
|
|
@@ -281,7 +281,7 @@ func modifyBinlogNamePosition(fileName string, position uint32) {
|
|
|
positionKey := index.BinlogPositionKey
|
|
|
positionLog, err := index.GetBusinessSysInteractionLogByKey(positionKey)
|
|
|
if err != nil {
|
|
|
- if err != utils.ErrNoRow {
|
|
|
+ if !errors.Is(err, utils.ErrNoRow) {
|
|
|
return
|
|
|
}
|
|
|
err = nil
|