package binlog import ( "encoding/json" "eta/eta_bridge/global" "eta/eta_bridge/models/index" "eta/eta_bridge/utils" "fmt" "github.com/go-mysql-org/go-mysql/canal" "github.com/go-mysql-org/go-mysql/mysql" "github.com/go-mysql-org/go-mysql/replication" "github.com/pingcap/errors" "reflect" "time" ) type MyEventHandler struct { canal.DummyEventHandler fileName string position uint32 } func (h *MyEventHandler) OnRow(e *canal.RowsEvent) (err error) { //fmt.Printf("%s %v\n", e.Action, e.Rows) //fmt.Println(e.Table.Columns) //fmt.Println(e.Action) switch e.Action { case canal.InsertAction: err = h.Insert(e) case canal.UpdateAction: err = h.Update(e) case canal.DeleteAction: err = h.Delete(e) default: return errors.New("操作异常") } fmt.Println("fileName:", h.fileName, ";position:", h.position) // 每次操作完成后都将当前位置记录到缓存 global.Rc.Put(utils.CACHE_MYSQL_MASTER_FILENAME, h.fileName, 31*24*time.Hour) global.Rc.Put(utils.CACHE_MYSQL_MASTER_POSITION, h.position, 31*24*time.Hour) return nil } func (h *MyEventHandler) OnPosSynced(header *replication.EventHeader, p mysql.Position, set mysql.GTIDSet, f bool) error { h.fileName = p.Name h.position = p.Pos // 旋转binlog日志的时候,需要将当前位置记录到缓存 global.Rc.Put(utils.CACHE_MYSQL_MASTER_FILENAME, h.fileName, 31*24*time.Hour) global.Rc.Put(utils.CACHE_MYSQL_MASTER_POSITION, h.position, 31*24*time.Hour) return nil } func (h *MyEventHandler) String() string { return "MyEventHandler" } func (h *MyEventHandler) Insert(e *canal.RowsEvent) error { // 批量插入的时候,e.Rows的长度会大于0 //if len(e.Rows) != 1 { // fmt.Println("新增数据异常,没有新数据:", e.Rows) // return nil //} fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name) for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list) logData := make(map[string]interface{}) dataLen := len(row) for i, v := range e.Table.Columns { if i < dataLen { tmpData := row[i] if tmpData != nil && reflect.TypeOf(tmpData).Kind() == reflect.Slice { tmpOld := tmpData.([]byte) tmpData = string(tmpOld) } logData[v.Name] = tmpData //tmpV = fmt.Sprintf("%v", tmpData) } } dataByte, _ := json.Marshal(logData) log(e.Table.Schema, e.Table.Name, e.Action, ``, string(dataByte)) } return nil } func (h *MyEventHandler) Update(e *canal.RowsEvent) error { if len(e.Rows) != 2 { fmt.Println("更新数据异常,没有原始数据和新数据:", e.Rows) return nil } //fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name) logOldData := make(map[string]interface{}) logNewData := make(map[string]interface{}) oldDataLen := len(e.Rows[0]) newDataLen := len(e.Rows[0]) //maxDataLen := oldDataLen //if maxDataLen < newDataLen { // maxDataLen = newDataLen //} for i, v := range e.Table.Columns { //if v.IsUnsigned //var tmpV string //if i < dataLen { // tmpV = fmt.Sprintf("原数据:%v;新数据:%v", e.Rows[0][i], e.Rows[1][i]) //} //fmt.Println(v.Name, ":", tmpV) if i < oldDataLen { oldData := e.Rows[0][i] if oldData != nil && reflect.TypeOf(oldData).Kind() == reflect.Slice { tmpOld := oldData.([]byte) oldData = string(tmpOld) } logOldData[v.Name] = oldData } if i < newDataLen { newData := e.Rows[1][i] if newData != nil && reflect.TypeOf(newData).Kind() == reflect.Slice { tmpNew := newData.([]byte) newData = string(tmpNew) } logNewData[v.Name] = newData } //if i < maxDataLen { // oldData := e.Rows[0][i] // newData := e.Rows[1][i] // // if oldData != nil && reflect.TypeOf(oldData).Kind() == reflect.Slice { // tmpOld := oldData.([]byte) // oldData = string(tmpOld) // } // if newData != nil && reflect.TypeOf(newData).Kind() == reflect.Slice { // tmpNew := newData.([]byte) // newData = string(tmpNew) // } // // // //if oldData != newData { // // tmpV = fmt.Sprintf("原数据:%v;新数据:%v", oldData, newData) // //} //} //if tmpV != `` { // fmt.Println(v.Name, ":", tmpV) //} } logOldDataByte, _ := json.Marshal(logOldData) logNewDataByte, _ := json.Marshal(logNewData) log(e.Table.Schema, e.Table.Name, e.Action, string(logOldDataByte), string(logNewDataByte)) return nil } func (h *MyEventHandler) Delete(e *canal.RowsEvent) error { // 批量删除的时候,e.Rows的长度会大于0 //if len(e.Rows) != 1 { // fmt.Println("删除数据异常,没有原始数据:", e.Rows) // return nil //} fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name) for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list) logData := make(map[string]interface{}) dataLen := len(row) for i, v := range e.Table.Columns { if i < dataLen { tmpData := row[i] if tmpData != nil && reflect.TypeOf(tmpData).Kind() == reflect.Slice { tmpOld := tmpData.([]byte) tmpData = string(tmpOld) } logData[v.Name] = tmpData //tmpV = fmt.Sprintf("%v", tmpData) } } dataByte, _ := json.Marshal(logData) log(e.Table.Schema, e.Table.Name, e.Action, string(dataByte), ``) } return nil } func (h *MyEventHandler) Delete3(e *canal.RowsEvent) error { if len(e.Rows) != 1 { fmt.Println("删除数据异常,没有原始数据:", e.Rows) return nil } fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name) dataLen := len(e.Rows[0]) logData := make(map[string]interface{}) for i, v := range e.Table.Columns { //var tmpV interface{} if i < dataLen { //tmpV = fmt.Sprintf("%v", e.Rows[0][i]) tmpData := e.Rows[0][i] if tmpData != nil && reflect.TypeOf(tmpData).Kind() == reflect.Slice { tmpOld := tmpData.([]byte) tmpData = string(tmpOld) } logData[v.Name] = tmpData //fmt.Println(oldData) } } dataByte, _ := json.Marshal(logData) log(e.Table.Schema, e.Table.Name, e.Action, string(dataByte), ``) return nil } // SetBinlogFileName // @Description: 设置当前的binlog文件名和位置 // @author: Roc // @receiver h // @datetime 2024-02-29 18:09:36 // @param fileName string // @param position uint32 func (h *MyEventHandler) SetBinlogFileName(fileName string, position uint32) { h.fileName = fileName h.position = position fmt.Println("init fileName:", h.fileName, ";position:", h.position) } // log 简单的日志记录 func log(dbName, tableName, opType, oldData, newData string) { item := index.EdbUpdateLog{ OpDbName: dbName, OpTableName: tableName, OpType: opType, OldData: oldData, NewData: newData, CreateTime: time.Now(), } err := item.Create() if err != nil { fmt.Println("log create err:", err.Error()) } }