package binlog import ( edbmonitorSvr "eta/eta_api/services/edb_monitor" "eta/eta_api/utils" "fmt" "reflect" "time" "github.com/go-mysql-org/go-mysql/canal" "github.com/go-mysql-org/go-mysql/mysql" "github.com/go-mysql-org/go-mysql/replication" "github.com/go-mysql-org/go-mysql/schema" ) type EdbEventHandler struct { canal.DummyEventHandler fileName string position uint32 } func (h *EdbEventHandler) OnRow(e *canal.RowsEvent) (err error) { // 监听逻辑 switch e.Action { case canal.InsertAction: err = h.Insert(e) if err != nil { utils.FileLog.Info("binlog insert error:", err) } case canal.UpdateAction: err = h.Update(e) if err != nil { utils.FileLog.Info("binlog update error:", err) } default: return nil } fmt.Println("fileName:", h.fileName, ";position:", h.position) // 每次操作完成后都将当前位置记录到缓存 utils.Rc.Put(utils.CACHE_MYSQL_DATA_FILENAME, h.fileName, 31*24*time.Hour) utils.Rc.Put(utils.CACHE_MYSQL_DATA_POSITION, h.position, 31*24*time.Hour) return nil } func (h *EdbEventHandler) OnPosSynced(header *replication.EventHeader, p mysql.Position, set mysql.GTIDSet, f bool) error { h.fileName = p.Name h.position = p.Pos return nil } func (h *EdbEventHandler) SyncToRedis() { for { // 旋转binlog日志的时候,需要将当前位置记录到缓存 utils.Rc.Put(utils.CACHE_MYSQL_DATA_FILENAME, h.fileName, 31*24*time.Hour) utils.Rc.Put(utils.CACHE_MYSQL_DATA_POSITION, h.position, 31*24*time.Hour) time.Sleep(3 * time.Second) } } func (h *EdbEventHandler) String() string { return "EdbEventHandler" } func (h *EdbEventHandler) Insert(e *canal.RowsEvent) error { // 批量插入的时候,e.Rows的长度会大于0 fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name) for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list) newEdbInfo := h.MapRowToStruct(e.Table.Columns, row) if ok := edbmonitorSvr.EdbLocalSet.IsExist(newEdbInfo.EdbInfoId); ok { err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo) if err != nil { return err } } else { ok, err := utils.Rc.SIsMember(edbmonitorSvr.EDB_MONITOR_ID_SET_CACHE, newEdbInfo) if err != nil { return err } if ok { err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo) if err != nil { return err } } } // if monitors, ok := edbMonitorMap[newEdbInfo.EdbInfoId]; ok { // for _, monitor := range monitors { // err = edbmonitorSvr.ModifyEdbMonitorState(monitor, newEdbInfo.EdbCode, newEdbInfo.Source, newEdbInfo.SubSource) // if err != nil { // continue // } // } // } } return nil } func (h *EdbEventHandler) Update(e *canal.RowsEvent) error { if len(e.Rows) != 2 { fmt.Println("更新数据异常,没有原始数据和新数据:", e.Rows) return nil } oldEdbInfo := h.MapRowToStruct(e.Table.Columns, e.Rows[0]) newEdbInfo := h.MapRowToStruct(e.Table.Columns, e.Rows[1]) if oldEdbInfo.EndValue != newEdbInfo.EndValue { if ok := edbmonitorSvr.EdbLocalSet.IsExist(newEdbInfo.EdbInfoId); ok { err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo) if err != nil { return err } } else { ok, err := utils.Rc.SIsMember(edbmonitorSvr.EDB_MONITOR_ID_SET_CACHE, newEdbInfo) if err != nil { return err } if ok { err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo) if err != nil { return err } } } } // if monitors, ok := edbMonitorMap[newEdbInfo.EdbInfoId]; ok { // for _, monitor := range monitors { // err = edbmonitorSvr.ModifyEdbMonitorState(monitor, newEdbInfo.EdbCode, newEdbInfo.Source, newEdbInfo.SubSource) // if err != nil { // continue // } // } // } return nil } func (h *EdbEventHandler) MapRowToStruct(columns []schema.TableColumn, row []interface{}) edbmonitorSvr.EdbInfoBingLog { newEdbInfo := edbmonitorSvr.EdbInfoBingLog{} for i, column := range columns { value := reflect.ValueOf(row[i]) switch column.Name { case "edb_info_id": newEdbInfo.EdbInfoId = int(value.Int()) case "edb_info_type": newEdbInfo.EdbInfoType = int(value.Uint()) case "source": newEdbInfo.Source = int(value.Int()) case "edb_code": newEdbInfo.EdbCode = value.String() case "start_date": newEdbInfo.StartDate = value.String() case "end_date": newEdbInfo.EndDate = value.String() case "unique_code": newEdbInfo.UniqueCode = value.String() case "create_time": newEdbInfo.CreateTime = value.String() case "modify_time": newEdbInfo.ModifyTime = value.String() case "base_modify_time": if value.IsValid() { newEdbInfo.BaseModifyTime = value.String() } case "min_value": newEdbInfo.MinValue = value.Float() case "max_value": newEdbInfo.MaxValue = value.Float() case "latest_date": newEdbInfo.LatestDate = value.String() case "latest_value": newEdbInfo.LatestValue = value.Float() case "end_value": newEdbInfo.EndValue = value.Float() case "data_update_time": newEdbInfo.DataUpdateTime = value.String() case "er_data_update_date": newEdbInfo.ErDataUpdateDate = value.String() case "sub_source": newEdbInfo.SubSource = int(value.Int()) default: continue } } return newEdbInfo } // SetBinlogFileName // @Description: 设置当前的binlog文件名和位置 // @author: Roc // @receiver h // @datetime 2024-02-29 18:09:36 // @param fileName string // @param position uint32 func (h *EdbEventHandler) SetBinlogFileName(fileName string, position uint32) { h.fileName = fileName h.position = position fmt.Println("init fileName:", h.fileName, ";position:", h.position) }