zqbao 4 meses atrás
pai
commit
e4ede1f19e
2 arquivos alterados com 14 adições e 68 exclusões
  1. 5 10
      services/binlog/binlog.go
  2. 9 58
      services/binlog/handler.go

+ 5 - 10
services/binlog/binlog.go

@@ -21,23 +21,18 @@ func ListenMysql() {
 		}
 	}()
 	if utils.MYSQL_DATA_BINLOG_URL == "" {
-		panic("mysqlHost is empty")
-		//err = errors.New("mysqlHost is empty")
-		//return
+		panic("mysql url is empty")
 	}
 
 	if utils.MYSQL_DATA_BINLOG_USER == "" {
-		panic("user is empty")
+		panic("mysql user is empty")
 	}
 	if utils.MYSQL_DATA_BINLOG_PWD == "" {
-		panic("password is empty")
+		panic("mysql password is empty")
 	}
 	if utils.MYSQL_DATA_BINLOG_DB == "" {
-		panic("db is empty")
+		panic("mysql db is empty")
 	}
-	//includeTableRegex := []string{
-	//	"test_hz_data.edb_info",
-	//}
 
 	includeTableRegex := []string{
 		utils.MYSQL_DATA_BINLOG_DB + ".edb_info$",
@@ -108,7 +103,7 @@ func ListenMysql() {
 	}
 	utils.FileLog.Debug("记录上一次启动时的fileName:", fileName, ";position:", position)
 
-	binlogHandler := &edbEventHandler{}
+	binlogHandler := &EdbEventHandler{}
 	binlogHandler.SetBinlogFileName(fileName, position)
 	c.SetEventHandler(binlogHandler)
 	//c.Run()

+ 9 - 58
services/binlog/handler.go

@@ -12,16 +12,13 @@ import (
 	"github.com/pingcap/errors"
 )
 
-type edbEventHandler struct {
+type EdbEventHandler struct {
 	canal.DummyEventHandler
 	fileName string
 	position uint32
 }
 
-func (h *edbEventHandler) OnRow(e *canal.RowsEvent) (err error) {
-	//fmt.Printf("%s %v\n", e.Action, e.Rows)
-	//fmt.Println(e.Table.Columns)
-	//fmt.Println(e.Action)
+func (h *EdbEventHandler) OnRow(e *canal.RowsEvent) (err error) {
 
 	// 监听逻辑
 	switch e.Action {
@@ -44,7 +41,7 @@ func (h *edbEventHandler) OnRow(e *canal.RowsEvent) (err error) {
 	return nil
 }
 
-func (h *edbEventHandler) OnPosSynced(header *replication.EventHeader, p mysql.Position, set mysql.GTIDSet, f bool) error {
+func (h *EdbEventHandler) OnPosSynced(header *replication.EventHeader, p mysql.Position, set mysql.GTIDSet, f bool) error {
 	h.fileName = p.Name
 	h.position = p.Pos
 
@@ -55,16 +52,12 @@ func (h *edbEventHandler) OnPosSynced(header *replication.EventHeader, p mysql.P
 	return nil
 }
 
-func (h *edbEventHandler) String() string {
+func (h *EdbEventHandler) String() string {
 	return "MyEventHandler"
 }
 
-func (h *edbEventHandler) Insert(e *canal.RowsEvent) error {
+func (h *EdbEventHandler) Insert(e *canal.RowsEvent) error {
 	// 批量插入的时候,e.Rows的长度会大于0
-	//if len(e.Rows) != 1 {
-	//	fmt.Println("新增数据异常,没有新数据:", e.Rows)
-	//	return nil
-	//}
 	fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
 
 	for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list)
@@ -78,7 +71,6 @@ func (h *edbEventHandler) Insert(e *canal.RowsEvent) error {
 					tmpData = string(tmpOld)
 				}
 				logData[v.Name] = tmpData
-				//tmpV = fmt.Sprintf("%v", tmpData)
 			}
 		}
 	}
@@ -86,30 +78,18 @@ func (h *edbEventHandler) Insert(e *canal.RowsEvent) error {
 	return nil
 }
 
-func (h *edbEventHandler) Update(e *canal.RowsEvent) error {
+func (h *EdbEventHandler) Update(e *canal.RowsEvent) error {
 	if len(e.Rows) != 2 {
 		fmt.Println("更新数据异常,没有原始数据和新数据:", e.Rows)
 		return nil
 	}
 
-	//fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
-
 	logOldData := make(map[string]interface{})
 	logNewData := make(map[string]interface{})
 
 	oldDataLen := len(e.Rows[0])
 	newDataLen := len(e.Rows[0])
-	//maxDataLen := oldDataLen
-	//if maxDataLen < newDataLen {
-	//	maxDataLen = newDataLen
-	//}
 	for i, v := range e.Table.Columns {
-		//if v.IsUnsigned
-		//var tmpV string
-		//if i < dataLen {
-		//	tmpV = fmt.Sprintf("原数据:%v;新数据:%v", e.Rows[0][i], e.Rows[1][i])
-		//}
-		//fmt.Println(v.Name, ":", tmpV)
 
 		if i < oldDataLen {
 			oldData := e.Rows[0][i]
@@ -128,38 +108,13 @@ func (h *edbEventHandler) Update(e *canal.RowsEvent) error {
 			logNewData[v.Name] = newData
 		}
 
-		//if i < maxDataLen {
-		//	oldData := e.Rows[0][i]
-		//	newData := e.Rows[1][i]
-		//
-		//	if oldData != nil && reflect.TypeOf(oldData).Kind() == reflect.Slice {
-		//		tmpOld := oldData.([]byte)
-		//		oldData = string(tmpOld)
-		//	}
-		//	if newData != nil && reflect.TypeOf(newData).Kind() == reflect.Slice {
-		//		tmpNew := newData.([]byte)
-		//		newData = string(tmpNew)
-		//	}
-		//
-		//
-		//	//if oldData != newData {
-		//	//	tmpV = fmt.Sprintf("原数据:%v;新数据:%v", oldData, newData)
-		//	//}
-		//}
-		//if tmpV != `` {
-		//	fmt.Println(v.Name, ":", tmpV)
-		//}
 	}
 
 	return nil
 }
 
-func (h *edbEventHandler) Delete(e *canal.RowsEvent) error {
+func (h *EdbEventHandler) Delete(e *canal.RowsEvent) error {
 	// 批量删除的时候,e.Rows的长度会大于0
-	//if len(e.Rows) != 1 {
-	//	fmt.Println("删除数据异常,没有原始数据:", e.Rows)
-	//	return nil
-	//}
 	fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
 
 	for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list)
@@ -173,7 +128,6 @@ func (h *edbEventHandler) Delete(e *canal.RowsEvent) error {
 					tmpData = string(tmpOld)
 				}
 				logData[v.Name] = tmpData
-				//tmpV = fmt.Sprintf("%v", tmpData)
 			}
 		}
 	}
@@ -181,7 +135,7 @@ func (h *edbEventHandler) Delete(e *canal.RowsEvent) error {
 	return nil
 }
 
-func (h *edbEventHandler) Delete3(e *canal.RowsEvent) error {
+func (h *EdbEventHandler) Delete3(e *canal.RowsEvent) error {
 	if len(e.Rows) != 1 {
 		fmt.Println("删除数据异常,没有原始数据:", e.Rows)
 		return nil
@@ -191,16 +145,13 @@ func (h *edbEventHandler) Delete3(e *canal.RowsEvent) error {
 	dataLen := len(e.Rows[0])
 	logData := make(map[string]interface{})
 	for i, v := range e.Table.Columns {
-		//var tmpV interface{}
 		if i < dataLen {
-			//tmpV = fmt.Sprintf("%v", e.Rows[0][i])
 			tmpData := e.Rows[0][i]
 			if tmpData != nil && reflect.TypeOf(tmpData).Kind() == reflect.Slice {
 				tmpOld := tmpData.([]byte)
 				tmpData = string(tmpOld)
 			}
 			logData[v.Name] = tmpData
-			//fmt.Println(oldData)
 		}
 	}
 
@@ -214,7 +165,7 @@ func (h *edbEventHandler) Delete3(e *canal.RowsEvent) error {
 // @datetime 2024-02-29 18:09:36
 // @param fileName string
 // @param position uint32
-func (h *edbEventHandler) SetBinlogFileName(fileName string, position uint32) {
+func (h *EdbEventHandler) SetBinlogFileName(fileName string, position uint32) {
 	h.fileName = fileName
 	h.position = position