123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250 |
- package binlog
- import (
- "encoding/json"
- "eta/eta_bridge/global"
- "eta/eta_bridge/models/index"
- "eta/eta_bridge/utils"
- "fmt"
- "github.com/go-mysql-org/go-mysql/canal"
- "github.com/go-mysql-org/go-mysql/mysql"
- "github.com/go-mysql-org/go-mysql/replication"
- "github.com/pingcap/errors"
- "reflect"
- "time"
- )
- type MyEventHandler struct {
- canal.DummyEventHandler
- fileName string
- position uint32
- }
- func (h *MyEventHandler) OnRow(e *canal.RowsEvent) (err error) {
- //fmt.Printf("%s %v\n", e.Action, e.Rows)
- //fmt.Println(e.Table.Columns)
- //fmt.Println(e.Action)
- switch e.Action {
- case canal.InsertAction:
- err = h.Insert(e)
- case canal.UpdateAction:
- err = h.Update(e)
- case canal.DeleteAction:
- err = h.Delete(e)
- default:
- return errors.New("操作异常")
- }
- fmt.Println("fileName:", h.fileName, ";position:", h.position)
- // 每次操作完成后都将当前位置记录到缓存
- global.Rc.Put(utils.CACHE_MYSQL_MASTER_FILENAME, h.fileName, 31*24*time.Hour)
- global.Rc.Put(utils.CACHE_MYSQL_MASTER_POSITION, h.position, 31*24*time.Hour)
- return nil
- }
- func (h *MyEventHandler) OnPosSynced(header *replication.EventHeader, p mysql.Position, set mysql.GTIDSet, f bool) error {
- h.fileName = p.Name
- h.position = p.Pos
- // 旋转binlog日志的时候,需要将当前位置记录到缓存
- global.Rc.Put(utils.CACHE_MYSQL_MASTER_FILENAME, h.fileName, 31*24*time.Hour)
- global.Rc.Put(utils.CACHE_MYSQL_MASTER_POSITION, h.position, 31*24*time.Hour)
- return nil
- }
- func (h *MyEventHandler) String() string {
- return "MyEventHandler"
- }
- func (h *MyEventHandler) Insert(e *canal.RowsEvent) error {
- // 批量插入的时候,e.Rows的长度会大于0
- //if len(e.Rows) != 1 {
- // fmt.Println("新增数据异常,没有新数据:", e.Rows)
- // return nil
- //}
- fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
- for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list)
- logData := make(map[string]interface{})
- dataLen := len(row)
- for i, v := range e.Table.Columns {
- if i < dataLen {
- tmpData := row[i]
- if tmpData != nil && reflect.TypeOf(tmpData).Kind() == reflect.Slice {
- tmpOld := tmpData.([]byte)
- tmpData = string(tmpOld)
- }
- logData[v.Name] = tmpData
- //tmpV = fmt.Sprintf("%v", tmpData)
- }
- }
- dataByte, _ := json.Marshal(logData)
- log(e.Table.Schema, e.Table.Name, e.Action, ``, string(dataByte))
- }
- return nil
- }
- func (h *MyEventHandler) Update(e *canal.RowsEvent) error {
- if len(e.Rows) != 2 {
- fmt.Println("更新数据异常,没有原始数据和新数据:", e.Rows)
- return nil
- }
- //fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
- logOldData := make(map[string]interface{})
- logNewData := make(map[string]interface{})
- oldDataLen := len(e.Rows[0])
- newDataLen := len(e.Rows[0])
- //maxDataLen := oldDataLen
- //if maxDataLen < newDataLen {
- // maxDataLen = newDataLen
- //}
- for i, v := range e.Table.Columns {
- //if v.IsUnsigned
- //var tmpV string
- //if i < dataLen {
- // tmpV = fmt.Sprintf("原数据:%v;新数据:%v", e.Rows[0][i], e.Rows[1][i])
- //}
- //fmt.Println(v.Name, ":", tmpV)
- if i < oldDataLen {
- oldData := e.Rows[0][i]
- if oldData != nil && reflect.TypeOf(oldData).Kind() == reflect.Slice {
- tmpOld := oldData.([]byte)
- oldData = string(tmpOld)
- }
- logOldData[v.Name] = oldData
- }
- if i < newDataLen {
- newData := e.Rows[1][i]
- if newData != nil && reflect.TypeOf(newData).Kind() == reflect.Slice {
- tmpNew := newData.([]byte)
- newData = string(tmpNew)
- }
- logNewData[v.Name] = newData
- }
- //if i < maxDataLen {
- // oldData := e.Rows[0][i]
- // newData := e.Rows[1][i]
- //
- // if oldData != nil && reflect.TypeOf(oldData).Kind() == reflect.Slice {
- // tmpOld := oldData.([]byte)
- // oldData = string(tmpOld)
- // }
- // if newData != nil && reflect.TypeOf(newData).Kind() == reflect.Slice {
- // tmpNew := newData.([]byte)
- // newData = string(tmpNew)
- // }
- //
- //
- // //if oldData != newData {
- // // tmpV = fmt.Sprintf("原数据:%v;新数据:%v", oldData, newData)
- // //}
- //}
- //if tmpV != `` {
- // fmt.Println(v.Name, ":", tmpV)
- //}
- }
- logOldDataByte, _ := json.Marshal(logOldData)
- logNewDataByte, _ := json.Marshal(logNewData)
- log(e.Table.Schema, e.Table.Name, e.Action, string(logOldDataByte), string(logNewDataByte))
- return nil
- }
- func (h *MyEventHandler) Delete(e *canal.RowsEvent) error {
- // 批量删除的时候,e.Rows的长度会大于0
- //if len(e.Rows) != 1 {
- // fmt.Println("删除数据异常,没有原始数据:", e.Rows)
- // return nil
- //}
- fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
- for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list)
- logData := make(map[string]interface{})
- dataLen := len(row)
- for i, v := range e.Table.Columns {
- if i < dataLen {
- tmpData := row[i]
- if tmpData != nil && reflect.TypeOf(tmpData).Kind() == reflect.Slice {
- tmpOld := tmpData.([]byte)
- tmpData = string(tmpOld)
- }
- logData[v.Name] = tmpData
- //tmpV = fmt.Sprintf("%v", tmpData)
- }
- }
- dataByte, _ := json.Marshal(logData)
- log(e.Table.Schema, e.Table.Name, e.Action, string(dataByte), ``)
- }
- return nil
- }
- func (h *MyEventHandler) Delete3(e *canal.RowsEvent) error {
- if len(e.Rows) != 1 {
- fmt.Println("删除数据异常,没有原始数据:", e.Rows)
- return nil
- }
- fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
- dataLen := len(e.Rows[0])
- logData := make(map[string]interface{})
- for i, v := range e.Table.Columns {
- //var tmpV interface{}
- if i < dataLen {
- //tmpV = fmt.Sprintf("%v", e.Rows[0][i])
- tmpData := e.Rows[0][i]
- if tmpData != nil && reflect.TypeOf(tmpData).Kind() == reflect.Slice {
- tmpOld := tmpData.([]byte)
- tmpData = string(tmpOld)
- }
- logData[v.Name] = tmpData
- //fmt.Println(oldData)
- }
- }
- dataByte, _ := json.Marshal(logData)
- log(e.Table.Schema, e.Table.Name, e.Action, string(dataByte), ``)
- return nil
- }
- // SetBinlogFileName
- // @Description: 设置当前的binlog文件名和位置
- // @author: Roc
- // @receiver h
- // @datetime 2024-02-29 18:09:36
- // @param fileName string
- // @param position uint32
- func (h *MyEventHandler) SetBinlogFileName(fileName string, position uint32) {
- h.fileName = fileName
- h.position = position
- fmt.Println("init fileName:", h.fileName, ";position:", h.position)
- }
- // log 简单的日志记录
- func log(dbName, tableName, opType, oldData, newData string) {
- item := index.EdbUpdateLog{
- OpDbName: dbName,
- OpTableName: tableName,
- OpType: opType,
- OldData: oldData,
- NewData: newData,
- CreateTime: time.Now(),
- }
- err := item.Create()
- if err != nil {
- fmt.Println("log create err:", err.Error())
- }
- }
|