ソースを参照

Merge remote-tracking branch 'origin/master'

Roc 1 ヶ月 前
コミット
a947739071
2 ファイル変更110 行追加72 行削除
  1. 87 36
      services/binlog/binlog.go
  2. 23 36
      services/binlog/handler.go

+ 87 - 36
services/binlog/binlog.go

@@ -17,29 +17,31 @@ func ListenMysql() {
 	var err error
 	defer func() {
 		if err != nil {
-			fmt.Println("数据库监听服务异常,err:", err)
+			tips := fmt.Sprintf("ListenMysql-数据库监听服务异常, %v", err)
+			fmt.Println(tips)
+			utils.FileLog.Info(tips)
 		}
+		utils.FileLog.Info("ListenMysql end")
 	}()
 	if utils.MYSQL_DATA_BINLOG_URL == "" {
-		panic("mysql url is empty")
+		err = fmt.Errorf("mysql url is empty")
+		return
 	}
-
 	if utils.MYSQL_DATA_BINLOG_USER == "" {
-		panic("mysql user is empty")
+		err = fmt.Errorf("mysql user is empty")
+		return
 	}
 	if utils.MYSQL_DATA_BINLOG_PWD == "" {
-		panic("mysql password is empty")
+		err = fmt.Errorf("mysql password is empty")
+		return
 	}
 	if utils.MYSQL_DATA_BINLOG_DB == "" {
-		panic("mysql db is empty")
+		err = fmt.Errorf("mysql db is empty")
+		return
 	}
 
 	includeTableRegex := []string{
 		utils.MYSQL_DATA_BINLOG_DB + ".edb_info$",
-		// utils.MYSQL_DATA_BINLOG_DB + ".edb_classify$",
-		// utils.MYSQL_DATA_BINLOG_DB + ".base_from_mysteel_chemical_index$",
-		// utils.MYSQL_DATA_BINLOG_DB + ".base_from_smm_index$",
-		// utils.MYSQL_DATA_BINLOG_DB + ".edb_data*",
 
 		// 数据源
 		utils.MYSQL_DATA_BINLOG_DB + ".base_from_rzd_index$",
@@ -104,47 +106,77 @@ func ListenMysql() {
 	}
 
 	// 校验mysql binlog format,目前仅支持row格式
-	{
-		binlogFormat, tmpErr := binlog.GetBinlogFormat()
-		if tmpErr != nil {
-			err = tmpErr
-			return
-		}
-
-		if binlogFormat.Value != "ROW" {
-			panic("mysql binlog format is not ROW")
-		}
+	binlogFormat, e := binlog.GetBinlogFormat()
+	if e != nil {
+		err = fmt.Errorf("get binlog format err: %v", e)
+		return
+	}
+	if binlogFormat.Value != "ROW" {
+		err = fmt.Errorf("mysql binlog format is not ROW")
+		return
 	}
 
-	//  获取上一次启动时的binlog文件名称和位置
-	fileName, position, err := getBinlogNamePosition()
-	if err != nil {
+	// 获取上一次启动时的binlog文件名称和位置
+	fileName, position, e := getBinlogNamePosition()
+	if e != nil {
+		err = fmt.Errorf("获取binlog文件名称和位置失败, %v", e)
 		return
 	}
+
 	// 修改记录本次启动时的binlog文件名称和位置
 	modifyBinlogNamePosition(fileName, position)
-	// 定时修改binlog文件名称和位置
-	go timingModifyBinlogNamePosition()
 
-	c, err := canal.NewCanal(cfg)
-	if err != nil {
-		fmt.Println("err:", err)
+	c, e := canal.NewCanal(cfg)
+	if e != nil {
+		err = fmt.Errorf("new canal err: %v", e)
 		return
 	}
-	utils.FileLog.Debug("记录上一次启动时的fileName:", fileName, ";position:", position)
+	utils.FileLog.Debug(fmt.Sprintf("记录上一次启动时的fileName: %s, position: %d", fileName, position))
 
 	binlogHandler := &EdbEventHandler{}
 	binlogHandler.SetBinlogFileName(fileName, position)
 	c.SetEventHandler(binlogHandler)
 	//c.Run()
-	// 同步到redis
-	go binlogHandler.SyncToRedis()
 
 	pos := mysql.Position{
 		Name: fileName,
 		Pos:  position,
 	}
-	err = c.RunFrom(pos)
+	if e = c.RunFrom(pos); e != nil {
+		fmt.Printf("启动监听异常: %v, 重新定位binlog\n", e)
+		if c != nil {
+			c.Close()
+		}
+
+		// 重新获取binlog位置
+		rename, reposition, e := initBinlogNamePosition()
+		if e != nil {
+			err = fmt.Errorf("重新定位binlog失败, %v", e)
+			return
+		}
+		pos.Name = rename
+		pos.Pos = reposition
+
+		// 重新尝试监听, 再起不来就退出报异常
+		canalNew, e := canal.NewCanal(cfg)
+		if e != nil {
+			err = fmt.Errorf("renew canal err: %v", e)
+			return
+		}
+		binlogHandler.SetBinlogFileName(rename, reposition)
+		canalNew.SetEventHandler(binlogHandler)
+		c = canalNew
+		if e = c.RunFrom(pos); e != nil {
+			err = fmt.Errorf("重新监听binlog失败, %v", e)
+			return
+		}
+	}
+
+	// 定时修改binlog文件名称和位置
+	go timingModifyBinlogNamePosition()
+
+	// 同步到redis
+	go binlogHandler.SyncToRedis()
 }
 
 // getBinlogNamePosition
@@ -170,19 +202,18 @@ func getBinlogNamePosition() (fileName string, position uint32, err error) {
 
 	// 如果没有从redis中获取到上次监听到的binlog的文件名称,或者位置为0,则从mysql中获取,则从 MySQL 中获取最新的文件名和位置。
 	if fileName == `` || position == 0 {
-
 		// binlog文件名
 		fileNameKey := binlog.BinlogFileNameKey
 		fileNameLog, tmpErr := binlog.GetBusinessSysInteractionLogByKey(fileNameKey)
 		if tmpErr == nil {
-			fileName = fileNameLog.InteractionKey
+			fileName = fileNameLog.InteractionVal
 		}
 
 		// binlog位置
 		positionKey := binlog.BinlogPositionKey
 		positionLog, tmpErr := binlog.GetBusinessSysInteractionLogByKey(positionKey)
 		if tmpErr == nil {
-			positionStr := positionLog.InteractionKey
+			positionStr := positionLog.InteractionVal
 			positionInt, tmpErr := strconv.Atoi(positionStr)
 			if tmpErr == nil {
 				position = uint32(positionInt)
@@ -214,7 +245,7 @@ func modifyBinlogNamePosition(fileName string, position uint32) {
 	var err error
 	defer func() {
 		if err != nil {
-			utils.FileLog.Error("修改binlog文件名称和位置异常,fileName", fileName, ",position:", position, ",err:", err)
+			utils.FileLog.Error(fmt.Sprintf("修改binlog文件名称和位置异常, fileName: %s, position: %d, err: %v", fileName, position, err))
 		}
 	}()
 
@@ -300,3 +331,23 @@ func timingModifyBinlogNamePosition() {
 		}
 	}
 }
+
+// initBinlogNamePosition 初始化/重新定位binlog文件名称和位置
+func initBinlogNamePosition() (fileName string, position uint32, err error) {
+	// 获取mysql当前binlog文件名称和位置
+	status, e := binlog.GetShowMaster()
+	if e != nil {
+		err = fmt.Errorf("get mysql master status err: %v", e)
+		return
+	}
+	fileName = status.File
+	position = status.Position
+
+	// 写入redis
+	_ = utils.Rc.Put(utils.CACHE_MYSQL_DATA_FILENAME, fileName, 31*24*time.Hour)
+	_ = utils.Rc.Put(utils.CACHE_MYSQL_DATA_POSITION, position, 31*24*time.Hour)
+
+	// 更新MySQL
+	modifyBinlogNamePosition(fileName, position)
+	return
+}

+ 23 - 36
services/binlog/handler.go

@@ -6,6 +6,7 @@ import (
 	"eta/eta_api/utils"
 	"fmt"
 	"reflect"
+	"runtime/debug"
 	"time"
 
 	"github.com/go-mysql-org/go-mysql/canal"
@@ -21,6 +22,15 @@ type EdbEventHandler struct {
 }
 
 func (h *EdbEventHandler) OnRow(e *canal.RowsEvent) (err error) {
+	// 下面有可能出现panic导致监听挂掉
+	defer func() {
+		if r := recover(); r != nil {
+			utils.FileLog.Error("binlog OnRow panic: ", r)
+
+			stackTrace := debug.Stack()
+			utils.FileLog.Error("binlog OnRow panic stack: ", string(stackTrace))
+		}
+	}()
 
 	// 监听逻辑
 	switch e.Action {
@@ -222,9 +232,7 @@ func (h *EdbEventHandler) MapRowToStruct(columns []schema.TableColumn, row []int
 		case "modify_time":
 			newEdbInfo.ModifyTime = value.String()
 		case "base_modify_time":
-			if value.IsValid() {
-				newEdbInfo.BaseModifyTime = value.String()
-			}
+			newEdbInfo.BaseModifyTime = value.String()
 		case "min_value":
 			newEdbInfo.MinValue = value.Float()
 		case "max_value":
@@ -273,13 +281,14 @@ func DataSourceMapRowToStruct(columns []schema.TableColumn, row []interface{}, i
 	// 根据不同数据源匹配对应的字段名
 	indexCols := indexOb.EsCols()
 	for i, column := range columns {
+		// 数据无效的话,那么就过滤掉
 		value := reflect.ValueOf(row[i])
+		if !value.IsValid() {
+			continue
+		}
 
 		switch column.Name {
 		case indexCols.PrimaryId:
-			if !value.IsValid() {
-				continue
-			}
 			if value.Kind() == reflect.Int || value.Kind() == reflect.Int32 || value.Kind() == reflect.Int64 {
 				item.PrimaryId = int(value.Int())
 			}
@@ -287,17 +296,10 @@ func DataSourceMapRowToStruct(columns []schema.TableColumn, row []interface{}, i
 				item.PrimaryId = int(value.Uint())
 			}
 		case indexCols.IndexCode:
-			if value.IsValid() {
-				item.IndexCode = value.String()
-			}
+			item.IndexCode = value.String()
 		case indexCols.IndexName:
-			if value.IsValid() {
-				item.IndexName = value.String()
-			}
+			item.IndexName = value.String()
 		case indexCols.ClassifyId:
-			if !value.IsValid() {
-				continue
-			}
 			if value.Kind() == reflect.Int || value.Kind() == reflect.Int32 || value.Kind() == reflect.Int64 {
 				item.ClassifyId = int(value.Int())
 			}
@@ -305,25 +307,14 @@ func DataSourceMapRowToStruct(columns []schema.TableColumn, row []interface{}, i
 				item.ClassifyId = int(value.Uint())
 			}
 		case indexCols.Unit:
-			if value.IsValid() {
-				item.Unit = value.String()
-			}
+			item.Unit = value.String()
 		case indexCols.Frequency:
-			if value.IsValid() {
-				item.Frequency = value.String()
-			}
+			item.Frequency = value.String()
 		case indexCols.StartDate:
-			if value.IsValid() {
-				item.StartDate = value.String()
-			}
+			item.StartDate = value.String()
 		case indexCols.EndDate:
-			if value.IsValid() {
-				item.EndDate = value.String()
-			}
+			item.EndDate = value.String()
 		case indexCols.LatestValue:
-			if !value.IsValid() {
-				continue
-			}
 			if value.Kind() == reflect.String {
 				item.LatestValue = value.String()
 			}
@@ -334,13 +325,9 @@ func DataSourceMapRowToStruct(columns []schema.TableColumn, row []interface{}, i
 				item.LatestValue = fmt.Sprint(value.Float())
 			}
 		case indexCols.CreateTime:
-			if value.IsValid() {
-				item.CreateTime = value.String()
-			}
+			item.CreateTime = value.String()
 		case indexCols.ModifyTime:
-			if value.IsValid() {
-				item.ModifyTime = value.String()
-			}
+			item.ModifyTime = value.String()
 		default:
 			continue
 		}