123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340 |
- package binlog
- import (
- dataSourceModel "eta/eta_api/models/data_source"
- edbmonitorSvr "eta/eta_api/services/edb_monitor"
- "eta/eta_api/utils"
- "fmt"
- "reflect"
- "time"
- "github.com/go-mysql-org/go-mysql/canal"
- "github.com/go-mysql-org/go-mysql/mysql"
- "github.com/go-mysql-org/go-mysql/replication"
- "github.com/go-mysql-org/go-mysql/schema"
- )
- type EdbEventHandler struct {
- canal.DummyEventHandler
- fileName string
- position uint32
- }
- func (h *EdbEventHandler) OnRow(e *canal.RowsEvent) (err error) {
- // 监听逻辑
- switch e.Action {
- case canal.InsertAction:
- err = h.Insert(e)
- if err != nil {
- utils.FileLog.Info("binlog insert error:", err)
- }
- case canal.UpdateAction:
- err = h.Update(e)
- if err != nil {
- utils.FileLog.Info("binlog update error:", err)
- }
- default:
- return nil
- }
- fmt.Println("fileName:", h.fileName, ";position:", h.position)
- // 每次操作完成后都将当前位置记录到缓存
- utils.Rc.Put(utils.CACHE_MYSQL_DATA_FILENAME, h.fileName, 31*24*time.Hour)
- utils.Rc.Put(utils.CACHE_MYSQL_DATA_POSITION, h.position, 31*24*time.Hour)
- return nil
- }
- func (h *EdbEventHandler) OnPosSynced(header *replication.EventHeader, p mysql.Position, set mysql.GTIDSet, f bool) error {
- h.fileName = p.Name
- h.position = p.Pos
- return nil
- }
- func (h *EdbEventHandler) SyncToRedis() {
- for {
- // 旋转binlog日志的时候,需要将当前位置记录到缓存
- utils.Rc.Put(utils.CACHE_MYSQL_DATA_FILENAME, h.fileName, 31*24*time.Hour)
- utils.Rc.Put(utils.CACHE_MYSQL_DATA_POSITION, h.position, 31*24*time.Hour)
- time.Sleep(3 * time.Second)
- }
- }
- func (h *EdbEventHandler) String() string {
- return "EdbEventHandler"
- }
- func (h *EdbEventHandler) Insert(e *canal.RowsEvent) error {
- // 批量插入的时候,e.Rows的长度会大于0
- fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
- // 指标库
- tableName := e.Table.Name
- if tableName == "edb_info" {
- for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list)
- newEdbInfo := h.MapRowToStruct(e.Table.Columns, row)
- if ok := edbmonitorSvr.EdbLocalSet.IsExist(newEdbInfo.EdbInfoId); ok {
- err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo)
- if err != nil {
- return err
- }
- } else {
- ok, err := utils.Rc.SIsMember(edbmonitorSvr.EDB_MONITOR_ID_SET_CACHE, newEdbInfo.EdbInfoId)
- if err != nil {
- return err
- }
- if ok {
- err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo)
- if err != nil {
- return err
- }
- }
- }
- // if monitors, ok := edbMonitorMap[newEdbInfo.EdbInfoId]; ok {
- // for _, monitor := range monitors {
- // err = edbmonitorSvr.ModifyEdbMonitorState(monitor, newEdbInfo.EdbCode, newEdbInfo.Source, newEdbInfo.SubSource)
- // if err != nil {
- // continue
- // }
- // }
- // }
- }
- return nil
- }
- // 数据源
- indexOb := dataSourceModel.GetEsBaseFromIndexByTableName(tableName)
- if indexOb == nil {
- return fmt.Errorf("数据表无对应数据源: %s", tableName)
- }
- for _, row := range e.Rows {
- indexItem := DataSourceMapRowToStruct(e.Table.Columns, row, indexOb)
- // 写入队列(此处无需做去重处理)
- if e := utils.Rc.LPush(utils.CACHE_DATA_SOURCE_ES_HANDLE, indexItem); e != nil {
- return fmt.Errorf("写入redis队列失败, %v", e)
- }
- }
- return nil
- }
- func (h *EdbEventHandler) Update(e *canal.RowsEvent) error {
- tableName := e.Table.Name
- // 指标库
- lenRows := len(e.Rows)
- if tableName == "edb_info" {
- //if len(e.Rows) != 2 {
- // fmt.Println("更新数据异常,没有原始数据和新数据:", e.Rows)
- // return nil
- //}
- // 由于UPDATE语句影响行数超过1时e.Rows长度会大于2,所以此处遍历处理
- for i := 0; i < lenRows; i += 2 {
- if i+1 >= lenRows {
- continue
- }
- oldEdbInfo := h.MapRowToStruct(e.Table.Columns, e.Rows[i])
- newEdbInfo := h.MapRowToStruct(e.Table.Columns, e.Rows[i+1])
- if oldEdbInfo.EndValue != newEdbInfo.EndValue {
- if ok := edbmonitorSvr.EdbLocalSet.IsExist(newEdbInfo.EdbInfoId); ok {
- err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo)
- if err != nil {
- return err
- }
- } else {
- ok, err := utils.Rc.SIsMember(edbmonitorSvr.EDB_MONITOR_ID_SET_CACHE, newEdbInfo.EdbInfoId)
- if err != nil {
- return err
- }
- if ok {
- err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo)
- if err != nil {
- return err
- }
- }
- }
- }
- }
- // if monitors, ok := edbMonitorMap[newEdbInfo.EdbInfoId]; ok {
- // for _, monitor := range monitors {
- // err = edbmonitorSvr.ModifyEdbMonitorState(monitor, newEdbInfo.EdbCode, newEdbInfo.Source, newEdbInfo.SubSource)
- // if err != nil {
- // continue
- // }
- // }
- // }
- return nil
- }
- // 数据源
- indexOb := dataSourceModel.GetEsBaseFromIndexByTableName(tableName)
- if indexOb == nil {
- return fmt.Errorf("数据表无对应数据源: %s", tableName)
- }
- for i := 0; i < lenRows; i += 2 {
- if i+1 >= lenRows {
- continue
- }
- // 这里只取[i+1]即UPDATE后的数据
- indexItem := DataSourceMapRowToStruct(e.Table.Columns, e.Rows[i+1], indexOb)
- if e := utils.Rc.LPush(utils.CACHE_DATA_SOURCE_ES_HANDLE, indexItem); e != nil {
- utils.FileLog.Info(fmt.Sprintf("binlog update data source lpush err: %v", e))
- continue
- }
- }
- return nil
- }
- func (h *EdbEventHandler) MapRowToStruct(columns []schema.TableColumn, row []interface{}) edbmonitorSvr.EdbInfoBingLog {
- newEdbInfo := edbmonitorSvr.EdbInfoBingLog{}
- for i, column := range columns {
- value := reflect.ValueOf(row[i])
- switch column.Name {
- case "edb_info_id":
- newEdbInfo.EdbInfoId = int(value.Int())
- case "edb_info_type":
- newEdbInfo.EdbInfoType = int(value.Uint())
- case "source":
- newEdbInfo.Source = int(value.Int())
- case "edb_code":
- newEdbInfo.EdbCode = value.String()
- case "start_date":
- newEdbInfo.StartDate = value.String()
- case "end_date":
- newEdbInfo.EndDate = value.String()
- case "unique_code":
- newEdbInfo.UniqueCode = value.String()
- case "create_time":
- newEdbInfo.CreateTime = value.String()
- case "modify_time":
- newEdbInfo.ModifyTime = value.String()
- case "base_modify_time":
- if value.IsValid() {
- newEdbInfo.BaseModifyTime = value.String()
- }
- case "min_value":
- newEdbInfo.MinValue = value.Float()
- case "max_value":
- newEdbInfo.MaxValue = value.Float()
- case "latest_date":
- newEdbInfo.LatestDate = value.String()
- case "latest_value":
- newEdbInfo.LatestValue = value.Float()
- case "end_value":
- newEdbInfo.EndValue = value.Float()
- case "data_update_time":
- newEdbInfo.DataUpdateTime = value.String()
- case "er_data_update_date":
- newEdbInfo.ErDataUpdateDate = value.String()
- case "sub_source":
- newEdbInfo.SubSource = int(value.Int())
- default:
- continue
- }
- }
- return newEdbInfo
- }
- // SetBinlogFileName
- // @Description: 设置当前的binlog文件名和位置
- // @author: Roc
- // @receiver h
- // @datetime 2024-02-29 18:09:36
- // @param fileName string
- // @param position uint32
- func (h *EdbEventHandler) SetBinlogFileName(fileName string, position uint32) {
- h.fileName = fileName
- h.position = position
- fmt.Println("init fileName:", h.fileName, ";position:", h.position)
- }
- // DataSourceMapRowToStruct 数据源-binlog转为es结构体
- func DataSourceMapRowToStruct(columns []schema.TableColumn, row []interface{}, indexOb dataSourceModel.EsBaseFromIndex) dataSourceModel.SearchDataSource {
- item := dataSourceModel.SearchDataSource{}
- source, subSource, sourceName := indexOb.SourceInfo()
- item.Source = source
- item.SubSource = subSource
- item.SourceName = sourceName
- // 根据不同数据源匹配对应的字段名
- indexCols := indexOb.EsCols()
- for i, column := range columns {
- value := reflect.ValueOf(row[i])
- switch column.Name {
- case indexCols.PrimaryId:
- if !value.IsValid() {
- continue
- }
- if value.Kind() == reflect.Int || value.Kind() == reflect.Int32 || value.Kind() == reflect.Int64 {
- item.PrimaryId = int(value.Int())
- }
- if value.Kind() == reflect.Uint || value.Kind() == reflect.Uint32 || value.Kind() == reflect.Uint64 {
- item.PrimaryId = int(value.Uint())
- }
- case indexCols.IndexCode:
- if value.IsValid() {
- item.IndexCode = value.String()
- }
- case indexCols.IndexName:
- if value.IsValid() {
- item.IndexName = value.String()
- }
- case indexCols.ClassifyId:
- if !value.IsValid() {
- continue
- }
- if value.Kind() == reflect.Int || value.Kind() == reflect.Int32 || value.Kind() == reflect.Int64 {
- item.ClassifyId = int(value.Int())
- }
- if value.Kind() == reflect.Uint || value.Kind() == reflect.Uint32 || value.Kind() == reflect.Uint64 {
- item.ClassifyId = int(value.Uint())
- }
- case indexCols.Unit:
- if value.IsValid() {
- item.Unit = value.String()
- }
- case indexCols.Frequency:
- if value.IsValid() {
- item.Frequency = value.String()
- }
- case indexCols.StartDate:
- if value.IsValid() {
- item.StartDate = value.String()
- }
- case indexCols.EndDate:
- if value.IsValid() {
- item.EndDate = value.String()
- }
- case indexCols.LatestValue:
- if !value.IsValid() {
- continue
- }
- if value.Kind() == reflect.String {
- item.LatestValue = value.String()
- }
- if value.Kind() == reflect.Int || value.Kind() == reflect.Int32 || value.Kind() == reflect.Int64 {
- item.LatestValue = fmt.Sprint(value.Int())
- }
- if value.Kind() == reflect.Float64 {
- item.LatestValue = fmt.Sprint(value.Float())
- }
- case indexCols.CreateTime:
- if value.IsValid() {
- item.CreateTime = value.String()
- }
- case indexCols.ModifyTime:
- if value.IsValid() {
- item.ModifyTime = value.String()
- }
- default:
- continue
- }
- }
- return item
- }
|