123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- package binlog
- import (
- "errors"
- "eta/eta_bridge/global"
- "eta/eta_bridge/models/index"
- "eta/eta_bridge/utils"
- "fmt"
- "github.com/go-mysql-org/go-mysql/canal"
- "github.com/go-mysql-org/go-mysql/mysql"
- _ "github.com/go-sql-driver/mysql"
- "math/rand"
- "net/url"
- "strings"
- "time"
- )
- var mysqlHost, mysqlUser, mysqlPwd, mysqlDb string
- func init() {
- for _, sqlConfig := range global.CONFIG.Mysql.List {
- if sqlConfig.AliasName == `index` {
-
- tmpList := strings.Split(sqlConfig.Dsn, ":")
- mysqlUser = tmpList[0]
-
- tmpList = strings.Split(strings.Join(tmpList[1:], ":"), "@")
- lenTmp := len(tmpList)
- mysqlPwd = strings.Join(tmpList[:lenTmp-1], "@")
-
- tmpList = strings.Split(tmpList[lenTmp-1], "tcp(")
- tmpList = strings.Split(tmpList[1], ")")
- mysqlHost = tmpList[0]
-
- u, err := url.Parse(tmpList[1])
- if err != nil {
- panic(err)
- }
-
- mysqlDb = u.Path[1:]
- }
- }
- if mysqlHost == "" {
- panic("mysqlHost is empty")
- }
-
-
-
- }
- func ListenMysql() {
- var err error
- defer func() {
- if err != nil {
- fmt.Println("数据库监听服务异常,err:", err)
- }
- }()
- if mysqlHost == "" {
- err = errors.New("mysqlHost is empty")
- return
- }
-
-
-
- includeTableRegex := []string{
- mysqlDb + ".edb_info$",
- mysqlDb + ".edb_classify$",
- mysqlDb + ".base_from_mysteel_chemical_index$",
- mysqlDb + ".base_from_smm_index$",
- mysqlDb + ".edb_data*",
- }
- cfg := &canal.Config{
-
- ServerID: uint32(rand.New(rand.NewSource(time.Now().Unix())).Intn(1000)) + 1001,
-
- Flavor: "mysql",
-
- Addr: mysqlHost,
- User: mysqlUser,
- Password: mysqlPwd,
-
-
-
- SemiSyncEnabled: false,
-
- UseDecimal: true,
-
-
-
- IncludeTableRegex: includeTableRegex,
- }
-
- {
- binlogFormat, tmpErr := index.GetBinlogFormat()
- if tmpErr != nil {
- err = tmpErr
- return
- }
- if binlogFormat.Value != "ROW" {
- panic("mysql binlog format is not ROW")
- return
- }
- }
- var fileName string
- var position uint32
- fileName = global.Rc.GetStr(utils.CACHE_MYSQL_MASTER_FILENAME)
- position64, tmpErr := global.Rc.GetUInt64(utils.CACHE_MYSQL_MASTER_POSITION)
- if tmpErr != nil && tmpErr.Error() != utils.RedisNoKeyErr {
- panic("mysql binlog position is not found,err:" + tmpErr.Error())
- return
- }
- position = uint32(position64)
-
- if fileName == `` || position == 0 {
- item, tmpErr := index.GetShowMaster()
- if tmpErr != nil {
- err = tmpErr
- return
- }
- fileName = item.File
- position = item.Position
- }
- c, err := canal.NewCanal(cfg)
- if err != nil {
- fmt.Println("err:", err)
- return
- }
- binlogHandler := &MyEventHandler{}
- binlogHandler.SetBinlogFileName(fileName, position)
- c.SetEventHandler(binlogHandler)
-
- pos := mysql.Position{
- Name: fileName,
- Pos: position,
- }
- err = c.RunFrom(pos)
- }
|