123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353 |
- package binlog
- import (
- "eta/eta_api/models/binlog"
- "eta/eta_api/utils"
- "fmt"
- "math/rand"
- "strconv"
- "time"
- "github.com/go-mysql-org/go-mysql/canal"
- "github.com/go-mysql-org/go-mysql/mysql"
- _ "github.com/go-sql-driver/mysql"
- )
- func ListenMysql() {
- var err error
- defer func() {
- if err != nil {
- tips := fmt.Sprintf("ListenMysql-数据库监听服务异常, %v", err)
- fmt.Println(tips)
- utils.FileLog.Info(tips)
- }
- utils.FileLog.Info("ListenMysql end")
- }()
- if utils.MYSQL_DATA_BINLOG_URL == "" {
- err = fmt.Errorf("mysql url is empty")
- return
- }
- if utils.MYSQL_DATA_BINLOG_USER == "" {
- err = fmt.Errorf("mysql user is empty")
- return
- }
- if utils.MYSQL_DATA_BINLOG_PWD == "" {
- err = fmt.Errorf("mysql password is empty")
- return
- }
- if utils.MYSQL_DATA_BINLOG_DB == "" {
- err = fmt.Errorf("mysql db is empty")
- return
- }
- includeTableRegex := []string{
- utils.MYSQL_DATA_BINLOG_DB + ".edb_info$",
-
- utils.MYSQL_DATA_BINLOG_DB + ".base_from_rzd_index$",
- utils.MYSQL_DATA_BINLOG_DB + ".base_from_hisugar_index$",
- utils.MYSQL_DATA_BINLOG_DB + ".base_from_ly_index$",
- utils.MYSQL_DATA_BINLOG_DB + ".base_from_sci_hq_index$",
- utils.MYSQL_DATA_BINLOG_DB + ".base_from_ths_hf_index$",
- utils.MYSQL_DATA_BINLOG_DB + ".base_from_oilchem_index$",
- utils.MYSQL_DATA_BINLOG_DB + ".base_from_ccf_index$",
- utils.MYSQL_DATA_BINLOG_DB + ".base_from_mysteel_chemical_index$",
- utils.MYSQL_DATA_BINLOG_DB + ".base_from_smm_index$",
- utils.MYSQL_DATA_BINLOG_DB + ".base_from_baiinfo_index$",
- utils.MYSQL_DATA_BINLOG_DB + ".base_from_sci_index$",
- utils.MYSQL_DATA_BINLOG_DB + ".base_from_coalmine_mapping$",
- utils.MYSQL_DATA_BINLOG_DB + ".base_from_eia_steo_index$",
- utils.MYSQL_DATA_BINLOG_DB + ".base_from_icpi_index$",
- utils.MYSQL_DATA_BINLOG_DB + ".base_from_yongyi_index$",
- utils.MYSQL_DATA_BINLOG_DB + ".base_from_fenwei_index$",
- utils.MYSQL_DATA_BINLOG_DB + ".base_from_sci99_index$",
- utils.MYSQL_DATA_BINLOG_DB + ".base_from_bloomberg_index$",
- utils.MYSQL_DATA_BINLOG_DB + ".base_from_mtjh_mapping$",
- utils.MYSQL_DATA_BINLOG_DB + ".base_from_usda_fas_index$",
- utils.MYSQL_DATA_BINLOG_DB + ".base_from_business_index$",
- }
-
- if utils.MYSQL_DATA_BINLOG_DB_EDB != "" {
- includeTableRegex = append(includeTableRegex, utils.MYSQL_DATA_BINLOG_DB_EDB+".edbinfo$")
- }
- if utils.MYSQL_DATA_BINLOG_DB_GL != "" {
- includeTableRegex = append(includeTableRegex, utils.MYSQL_DATA_BINLOG_DB_GL+".mb_index_main_info$")
- }
-
- var serverId uint32
- if utils.MYSQL_DATA_BINLOG_SERVER_ID != "" {
- id, _ := strconv.ParseUint(utils.MYSQL_DATA_BINLOG_SERVER_ID, 10, 32)
- serverId = uint32(id)
- }
- if serverId == 0 {
- serverId = uint32(rand.New(rand.NewSource(time.Now().Unix())).Intn(1000)) + 1001
- }
- cfg := &canal.Config{
-
- ServerID: serverId,
-
- Flavor: "mysql",
-
- Addr: utils.MYSQL_DATA_BINLOG_URL,
- User: utils.MYSQL_DATA_BINLOG_USER,
- Password: utils.MYSQL_DATA_BINLOG_PWD,
-
-
-
- SemiSyncEnabled: false,
-
- UseDecimal: true,
-
-
-
- IncludeTableRegex: includeTableRegex,
- }
-
- binlogFormat, e := binlog.GetBinlogFormat()
- if e != nil {
- err = fmt.Errorf("get binlog format err: %v", e)
- return
- }
- if binlogFormat.Value != "ROW" {
- err = fmt.Errorf("mysql binlog format is not ROW")
- return
- }
-
- fileName, position, e := getBinlogNamePosition()
- if e != nil {
- err = fmt.Errorf("获取binlog文件名称和位置失败, %v", e)
- return
- }
-
- modifyBinlogNamePosition(fileName, position)
- c, e := canal.NewCanal(cfg)
- if e != nil {
- err = fmt.Errorf("new canal err: %v", e)
- return
- }
- utils.FileLog.Debug(fmt.Sprintf("记录上一次启动时的fileName: %s, position: %d", fileName, position))
- binlogHandler := &EdbEventHandler{}
- binlogHandler.SetBinlogFileName(fileName, position)
- c.SetEventHandler(binlogHandler)
-
- pos := mysql.Position{
- Name: fileName,
- Pos: position,
- }
- if e = c.RunFrom(pos); e != nil {
- fmt.Printf("启动监听异常: %v, 重新定位binlog\n", e)
- if c != nil {
- c.Close()
- }
-
- rename, reposition, e := initBinlogNamePosition()
- if e != nil {
- err = fmt.Errorf("重新定位binlog失败, %v", e)
- return
- }
- pos.Name = rename
- pos.Pos = reposition
-
- canalNew, e := canal.NewCanal(cfg)
- if e != nil {
- err = fmt.Errorf("renew canal err: %v", e)
- return
- }
- binlogHandler.SetBinlogFileName(rename, reposition)
- canalNew.SetEventHandler(binlogHandler)
- c = canalNew
- if e = c.RunFrom(pos); e != nil {
- err = fmt.Errorf("重新监听binlog失败, %v", e)
- return
- }
- }
-
- go timingModifyBinlogNamePosition()
-
- go binlogHandler.SyncToRedis()
- }
- func getBinlogNamePosition() (fileName string, position uint32, err error) {
-
- fileName = utils.Rc.GetStr(utils.CACHE_MYSQL_DATA_FILENAME)
- position64, err := utils.Rc.GetUInt64(utils.CACHE_MYSQL_DATA_POSITION)
- if err != nil {
- if err.Error() != utils.RedisNoKeyErr {
- panic("mysql binlog position is not found,err:" + err.Error())
- return
- }
- err = nil
- }
- position = uint32(position64)
-
- if fileName == `` || position == 0 {
-
- fileNameKey := binlog.BinlogFileNameKey
- fileNameLog, tmpErr := binlog.GetBusinessSysInteractionLogByKey(fileNameKey)
- if tmpErr == nil {
- fileName = fileNameLog.InteractionVal
- }
-
- positionKey := binlog.BinlogPositionKey
- positionLog, tmpErr := binlog.GetBusinessSysInteractionLogByKey(positionKey)
- if tmpErr == nil {
- positionStr := positionLog.InteractionVal
- positionInt, tmpErr := strconv.Atoi(positionStr)
- if tmpErr == nil {
- position = uint32(positionInt)
- }
- }
- }
-
- if fileName == `` || position == 0 {
- item, tmpErr := binlog.GetShowMaster()
- if tmpErr != nil {
- err = tmpErr
- return
- }
- fileName = item.File
- position = item.Position
- }
- return
- }
- func modifyBinlogNamePosition(fileName string, position uint32) {
- var err error
- defer func() {
- if err != nil {
- utils.FileLog.Error(fmt.Sprintf("修改binlog文件名称和位置异常, fileName: %s, position: %d, err: %v", fileName, position, err))
- }
- }()
-
- fileNameKey := binlog.BinlogFileNameKey
- fileNameLog, err := binlog.GetBusinessSysInteractionLogByKey(fileNameKey)
- if err != nil {
- if !utils.IsErrNoRow(err) {
- return
- }
- err = nil
- fileNameLog = &binlog.BusinessSysInteractionLog{
-
- InteractionKey: fileNameKey,
- InteractionVal: fileName,
- Remark: "mysql中binlog的filename名称",
- ModifyTime: time.Now(),
- CreateTime: time.Now(),
- }
- err = fileNameLog.Create()
- if err != nil {
- return
- }
- } else {
- fileNameLog.InteractionVal = fileName
- fileNameLog.ModifyTime = time.Now()
- err = fileNameLog.Update([]string{"InteractionVal", "ModifyTime"})
- if err != nil {
- return
- }
- }
-
- positionKey := binlog.BinlogPositionKey
- positionLog, err := binlog.GetBusinessSysInteractionLogByKey(positionKey)
- if err != nil {
- if !utils.IsErrNoRow(err) {
- return
- }
- err = nil
- positionLog = &binlog.BusinessSysInteractionLog{
-
- InteractionKey: positionKey,
- InteractionVal: fmt.Sprint(position),
- Remark: "mysql中binlog的position位置",
- ModifyTime: time.Now(),
- CreateTime: time.Now(),
- }
- err = positionLog.Create()
- if err != nil {
- return
- }
- } else {
- positionLog.InteractionVal = fmt.Sprint(position)
- positionLog.ModifyTime = time.Now()
- err = positionLog.Update([]string{"InteractionVal", "ModifyTime"})
- if err != nil {
- return
- }
- }
- return
- }
- func timingModifyBinlogNamePosition() {
- for {
-
- time.Sleep(30 * time.Second)
-
- fileName, position, err := getBinlogNamePosition()
- if err != nil {
- return
- }
- if fileName != `` && position != 0 {
-
- modifyBinlogNamePosition(fileName, position)
- }
- }
- }
- func initBinlogNamePosition() (fileName string, position uint32, err error) {
-
- status, e := binlog.GetShowMaster()
- if e != nil {
- err = fmt.Errorf("get mysql master status err: %v", e)
- return
- }
- fileName = status.File
- position = status.Position
-
- _ = utils.Rc.Put(utils.CACHE_MYSQL_DATA_FILENAME, fileName, 31*24*time.Hour)
- _ = utils.Rc.Put(utils.CACHE_MYSQL_DATA_POSITION, position, 31*24*time.Hour)
-
- modifyBinlogNamePosition(fileName, position)
- return
- }
|