package binlog import ( dataSourceModel "eta/eta_api/models/data_source" edbmonitorSvr "eta/eta_api/services/edb_monitor" "eta/eta_api/utils" "fmt" "reflect" "time" "github.com/go-mysql-org/go-mysql/canal" "github.com/go-mysql-org/go-mysql/mysql" "github.com/go-mysql-org/go-mysql/replication" "github.com/go-mysql-org/go-mysql/schema" ) type EdbEventHandler struct { canal.DummyEventHandler fileName string position uint32 } func (h *EdbEventHandler) OnRow(e *canal.RowsEvent) (err error) { // 监听逻辑 switch e.Action { case canal.InsertAction: err = h.Insert(e) if err != nil { utils.FileLog.Info("binlog insert error:", err) } case canal.UpdateAction: err = h.Update(e) if err != nil { utils.FileLog.Info("binlog update error:", err) } default: return nil } fmt.Println("fileName:", h.fileName, ";position:", h.position) // 每次操作完成后都将当前位置记录到缓存 utils.Rc.Put(utils.CACHE_MYSQL_DATA_FILENAME, h.fileName, 31*24*time.Hour) utils.Rc.Put(utils.CACHE_MYSQL_DATA_POSITION, h.position, 31*24*time.Hour) return nil } func (h *EdbEventHandler) OnPosSynced(header *replication.EventHeader, p mysql.Position, set mysql.GTIDSet, f bool) error { h.fileName = p.Name h.position = p.Pos return nil } func (h *EdbEventHandler) SyncToRedis() { for { // 旋转binlog日志的时候,需要将当前位置记录到缓存 utils.Rc.Put(utils.CACHE_MYSQL_DATA_FILENAME, h.fileName, 31*24*time.Hour) utils.Rc.Put(utils.CACHE_MYSQL_DATA_POSITION, h.position, 31*24*time.Hour) time.Sleep(3 * time.Second) } } func (h *EdbEventHandler) String() string { return "EdbEventHandler" } func (h *EdbEventHandler) Insert(e *canal.RowsEvent) error { // 批量插入的时候,e.Rows的长度会大于0 fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name) // 指标库 tableName := e.Table.Name if tableName == "edb_info" { for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list) newEdbInfo := h.MapRowToStruct(e.Table.Columns, row) if ok := edbmonitorSvr.EdbLocalSet.IsExist(newEdbInfo.EdbInfoId); ok { err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo) if err != nil { return err } } else { ok, err := utils.Rc.SIsMember(edbmonitorSvr.EDB_MONITOR_ID_SET_CACHE, newEdbInfo.EdbInfoId) if err != nil { return err } if ok { err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo) if err != nil { return err } } } // if monitors, ok := edbMonitorMap[newEdbInfo.EdbInfoId]; ok { // for _, monitor := range monitors { // err = edbmonitorSvr.ModifyEdbMonitorState(monitor, newEdbInfo.EdbCode, newEdbInfo.Source, newEdbInfo.SubSource) // if err != nil { // continue // } // } // } } return nil } // 数据源 indexOb := dataSourceModel.GetEsBaseFromIndexByTableName(tableName) if indexOb == nil { return fmt.Errorf("数据表无对应数据源: %s", tableName) } for _, row := range e.Rows { indexItem := DataSourceMapRowToStruct(e.Table.Columns, row, indexOb) // 写入队列(此处无需做去重处理) if e := utils.Rc.LPush(utils.CACHE_DATA_SOURCE_ES_HANDLE, indexItem); e != nil { return fmt.Errorf("写入redis队列失败, %v", e) } } return nil } func (h *EdbEventHandler) Update(e *canal.RowsEvent) error { tableName := e.Table.Name // 指标库 lenRows := len(e.Rows) if tableName == "edb_info" { //if len(e.Rows) != 2 { // fmt.Println("更新数据异常,没有原始数据和新数据:", e.Rows) // return nil //} // 由于UPDATE语句影响行数超过1时e.Rows长度会大于2,所以此处遍历处理 for i := 0; i < lenRows; i += 2 { if i+1 >= lenRows { continue } oldEdbInfo := h.MapRowToStruct(e.Table.Columns, e.Rows[i]) newEdbInfo := h.MapRowToStruct(e.Table.Columns, e.Rows[i+1]) if oldEdbInfo.EndValue != newEdbInfo.EndValue { if ok := edbmonitorSvr.EdbLocalSet.IsExist(newEdbInfo.EdbInfoId); ok { err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo) if err != nil { return err } } else { ok, err := utils.Rc.SIsMember(edbmonitorSvr.EDB_MONITOR_ID_SET_CACHE, newEdbInfo.EdbInfoId) if err != nil { return err } if ok { err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo) if err != nil { return err } } } } } // if monitors, ok := edbMonitorMap[newEdbInfo.EdbInfoId]; ok { // for _, monitor := range monitors { // err = edbmonitorSvr.ModifyEdbMonitorState(monitor, newEdbInfo.EdbCode, newEdbInfo.Source, newEdbInfo.SubSource) // if err != nil { // continue // } // } // } return nil } // 数据源 indexOb := dataSourceModel.GetEsBaseFromIndexByTableName(tableName) if indexOb == nil { return fmt.Errorf("数据表无对应数据源: %s", tableName) } for i := 0; i < lenRows; i += 2 { if i+1 >= lenRows { continue } // 这里只取[i+1]即UPDATE后的数据 indexItem := DataSourceMapRowToStruct(e.Table.Columns, e.Rows[i+1], indexOb) if e := utils.Rc.LPush(utils.CACHE_DATA_SOURCE_ES_HANDLE, indexItem); e != nil { utils.FileLog.Info(fmt.Sprintf("binlog update data source lpush err: %v", e)) continue } } return nil } func (h *EdbEventHandler) MapRowToStruct(columns []schema.TableColumn, row []interface{}) edbmonitorSvr.EdbInfoBingLog { newEdbInfo := edbmonitorSvr.EdbInfoBingLog{} for i, column := range columns { value := reflect.ValueOf(row[i]) switch column.Name { case "edb_info_id": newEdbInfo.EdbInfoId = int(value.Int()) case "edb_info_type": newEdbInfo.EdbInfoType = int(value.Uint()) case "source": newEdbInfo.Source = int(value.Int()) case "edb_code": newEdbInfo.EdbCode = value.String() case "start_date": newEdbInfo.StartDate = value.String() case "end_date": newEdbInfo.EndDate = value.String() case "unique_code": newEdbInfo.UniqueCode = value.String() case "create_time": newEdbInfo.CreateTime = value.String() case "modify_time": newEdbInfo.ModifyTime = value.String() case "base_modify_time": if value.IsValid() { newEdbInfo.BaseModifyTime = value.String() } case "min_value": newEdbInfo.MinValue = value.Float() case "max_value": newEdbInfo.MaxValue = value.Float() case "latest_date": newEdbInfo.LatestDate = value.String() case "latest_value": newEdbInfo.LatestValue = value.Float() case "end_value": newEdbInfo.EndValue = value.Float() case "data_update_time": newEdbInfo.DataUpdateTime = value.String() case "er_data_update_date": newEdbInfo.ErDataUpdateDate = value.String() case "sub_source": newEdbInfo.SubSource = int(value.Int()) default: continue } } return newEdbInfo } // SetBinlogFileName // @Description: 设置当前的binlog文件名和位置 // @author: Roc // @receiver h // @datetime 2024-02-29 18:09:36 // @param fileName string // @param position uint32 func (h *EdbEventHandler) SetBinlogFileName(fileName string, position uint32) { h.fileName = fileName h.position = position fmt.Println("init fileName:", h.fileName, ";position:", h.position) } // DataSourceMapRowToStruct 数据源-binlog转为es结构体 func DataSourceMapRowToStruct(columns []schema.TableColumn, row []interface{}, indexOb dataSourceModel.EsBaseFromIndex) dataSourceModel.SearchDataSource { item := dataSourceModel.SearchDataSource{} source, subSource, sourceName := indexOb.SourceInfo() item.Source = source item.SubSource = subSource item.SourceName = sourceName // 根据不同数据源匹配对应的字段名 indexCols := indexOb.EsCols() for i, column := range columns { value := reflect.ValueOf(row[i]) switch column.Name { case indexCols.PrimaryId: if !value.IsValid() { continue } if value.Kind() == reflect.Int || value.Kind() == reflect.Int32 || value.Kind() == reflect.Int64 { item.PrimaryId = int(value.Int()) } if value.Kind() == reflect.Uint || value.Kind() == reflect.Uint32 || value.Kind() == reflect.Uint64 { item.PrimaryId = int(value.Uint()) } case indexCols.IndexCode: if value.IsValid() { item.IndexCode = value.String() } case indexCols.IndexName: if value.IsValid() { item.IndexName = value.String() } case indexCols.ClassifyId: if !value.IsValid() { continue } if value.Kind() == reflect.Int || value.Kind() == reflect.Int32 || value.Kind() == reflect.Int64 { item.ClassifyId = int(value.Int()) } if value.Kind() == reflect.Uint || value.Kind() == reflect.Uint32 || value.Kind() == reflect.Uint64 { item.ClassifyId = int(value.Uint()) } case indexCols.Unit: if value.IsValid() { item.Unit = value.String() } case indexCols.Frequency: if value.IsValid() { item.Frequency = value.String() } case indexCols.StartDate: if value.IsValid() { item.StartDate = value.String() } case indexCols.EndDate: if value.IsValid() { item.EndDate = value.String() } case indexCols.LatestValue: if !value.IsValid() { continue } if value.Kind() == reflect.String { item.LatestValue = value.String() } if value.Kind() == reflect.Int || value.Kind() == reflect.Int32 || value.Kind() == reflect.Int64 { item.LatestValue = fmt.Sprint(value.Int()) } if value.Kind() == reflect.Float64 { item.LatestValue = fmt.Sprint(value.Float()) } case indexCols.CreateTime: if value.IsValid() { item.CreateTime = value.String() } case indexCols.ModifyTime: if value.IsValid() { item.ModifyTime = value.String() } default: continue } } return item }