123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- package binlog
- import (
- "eta/eta_bridge/global"
- "eta/eta_bridge/models/index"
- "eta/eta_bridge/utils"
- "fmt"
- "github.com/go-mysql-org/go-mysql/canal"
- "github.com/go-mysql-org/go-mysql/mysql"
- _ "github.com/go-sql-driver/mysql"
- "math/rand"
- "time"
- )
- func ListenMysql() {
- var err error
- defer func() {
- if err != nil {
- fmt.Println("数据库监听服务异常,err:", err)
- }
- }()
- if global.CONFIG.Mysql.Binlog.Host == "" {
- panic("mysqlHost is empty")
-
-
- }
- if global.CONFIG.Mysql.Binlog.User == "" {
- panic("user is empty")
- }
- if global.CONFIG.Mysql.Binlog.Password == "" {
- panic("password is empty")
- }
- if global.CONFIG.Mysql.Binlog.Db == "" {
- panic("db is empty")
- }
-
-
-
- includeTableRegex := []string{
- global.CONFIG.Mysql.Binlog.Db + ".edb_info$",
- global.CONFIG.Mysql.Binlog.Db + ".edb_classify$",
- global.CONFIG.Mysql.Binlog.Db + ".base_from_mysteel_chemical_index$",
- global.CONFIG.Mysql.Binlog.Db + ".base_from_smm_index$",
- global.CONFIG.Mysql.Binlog.Db + ".edb_data*",
- }
-
- serverId := global.CONFIG.Mysql.Binlog.ServerID
- if serverId == 0 {
- serverId = uint32(rand.New(rand.NewSource(time.Now().Unix())).Intn(1000)) + 1001
- }
- cfg := &canal.Config{
-
- ServerID: serverId,
-
- Flavor: "mysql",
-
- Addr: global.CONFIG.Mysql.Binlog.Host,
- User: global.CONFIG.Mysql.Binlog.User,
- Password: global.CONFIG.Mysql.Binlog.Password,
-
-
-
- SemiSyncEnabled: false,
-
- UseDecimal: true,
-
-
-
- IncludeTableRegex: includeTableRegex,
- Logger: global.FILE_LOG,
- }
-
- {
- binlogFormat, tmpErr := index.GetBinlogFormat()
- if tmpErr != nil {
- err = tmpErr
- return
- }
- if binlogFormat.Value != "ROW" {
- panic("mysql binlog format is not ROW")
- return
- }
- }
- var fileName string
- var position uint32
- fileName = global.Rc.GetStr(utils.CACHE_MYSQL_MASTER_FILENAME)
- position64, tmpErr := global.Rc.GetUInt64(utils.CACHE_MYSQL_MASTER_POSITION)
- if tmpErr != nil && tmpErr.Error() != utils.RedisNoKeyErr {
- panic("mysql binlog position is not found,err:" + tmpErr.Error())
- return
- }
- position = uint32(position64)
-
- if fileName == `` || position == 0 {
- item, tmpErr := index.GetShowMaster()
- if tmpErr != nil {
- err = tmpErr
- return
- }
- fileName = item.File
- position = item.Position
- }
- c, err := canal.NewCanal(cfg)
- if err != nil {
- fmt.Println("err:", err)
- return
- }
- binlogHandler := &MyEventHandler{}
- binlogHandler.SetBinlogFileName(fileName, position)
- c.SetEventHandler(binlogHandler)
-
- pos := mysql.Position{
- Name: fileName,
- Pos: position,
- }
- err = c.RunFrom(pos)
- }
|