package binlog import ( edbmonitor "eta/eta_api/models/edb_monitor" edbmonitorSvr "eta/eta_api/services/edb_monitor" "eta/eta_api/utils" "fmt" "reflect" "time" "github.com/go-mysql-org/go-mysql/canal" "github.com/go-mysql-org/go-mysql/mysql" "github.com/go-mysql-org/go-mysql/replication" "github.com/go-mysql-org/go-mysql/schema" ) type EdbInfoBingLog struct { EdbInfoId int `orm:"column(edb_info_id);pk"` EdbInfoType int `description:"指标类型,0:普通指标,1:预测指标"` Source int `description:"来源id"` EdbCode string `description:"指标编码"` StartDate string `description:"起始日期"` EndDate string `description:"终止日期"` UniqueCode string `description:"指标唯一编码"` CreateTime string ModifyTime string BaseModifyTime string MinValue float64 `description:"指标最小值"` MaxValue float64 `description:"指标最大值"` LatestDate string `description:"数据最新日期(实际日期)"` LatestValue float64 `description:"数据最新值(实际值)"` EndValue float64 `description:"数据的最新值(预测日期的最新值)"` DataUpdateTime string `description:"最近一次数据发生变化的时间"` ErDataUpdateDate string `description:"本次更新,数据发生变化的最早日期"` SubSource int `description:"子数据来源:0:经济数据库,1:日期序列"` } type EdbEventHandler struct { canal.DummyEventHandler fileName string position uint32 } func (h *EdbEventHandler) OnRow(e *canal.RowsEvent) (err error) { // 监听逻辑 switch e.Action { case canal.InsertAction: err = h.Insert(e) if err != nil { utils.FileLog.Info("binlog insert error:", err) } case canal.UpdateAction: err = h.Update(e) if err != nil { utils.FileLog.Info("binlog update error:", err) } default: return nil } fmt.Println("fileName:", h.fileName, ";position:", h.position) // 每次操作完成后都将当前位置记录到缓存 utils.Rc.Put(utils.CACHE_MYSQL_DATA_FILENAME, h.fileName, 31*24*time.Hour) utils.Rc.Put(utils.CACHE_MYSQL_DATA_POSITION, h.position, 31*24*time.Hour) return nil } func (h *EdbEventHandler) OnPosSynced(header *replication.EventHeader, p mysql.Position, set mysql.GTIDSet, f bool) error { h.fileName = p.Name h.position = p.Pos // 旋转binlog日志的时候,需要将当前位置记录到缓存 utils.Rc.Put(utils.CACHE_MYSQL_DATA_FILENAME, h.fileName, 31*24*time.Hour) utils.Rc.Put(utils.CACHE_MYSQL_DATA_POSITION, h.position, 31*24*time.Hour) return nil } func (h *EdbEventHandler) String() string { return "EdbEventHandler" } func (h *EdbEventHandler) Insert(e *canal.RowsEvent) error { // 批量插入的时候,e.Rows的长度会大于0 fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name) edbMonitorList, err := edbmonitor.GetEdbMonitorEdbInfoList() if err != nil { return err } edbMonitorMap := make(map[int][]*edbmonitor.EdbMonitorInfo) for _, v := range edbMonitorList { if _, ok := edbMonitorMap[v.EdbInfoId]; !ok { edbMonitorMap[v.EdbInfoId] = make([]*edbmonitor.EdbMonitorInfo, 0) } edbMonitorMap[v.EdbInfoId] = append(edbMonitorMap[v.EdbInfoId], v) } for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list) edbInfo := h.MapRowToStruct(e.Table.Columns, row) if monitors, ok := edbMonitorMap[edbInfo.EdbInfoId]; ok { for _, monitor := range monitors { err = edbmonitorSvr.ModifyEdbMonitorState(monitor, edbInfo.EdbCode, edbInfo.Source, edbInfo.SubSource) if err != nil { continue } } } } return nil } func (h *EdbEventHandler) Update(e *canal.RowsEvent) error { if len(e.Rows) != 2 { fmt.Println("更新数据异常,没有原始数据和新数据:", e.Rows) return nil } edbMonitorList, err := edbmonitor.GetEdbMonitorEdbInfoList() if err != nil { return err } edbMonitorMap := make(map[int][]*edbmonitor.EdbMonitorInfo) for _, v := range edbMonitorList { if _, ok := edbMonitorMap[v.EdbInfoId]; !ok { edbMonitorMap[v.EdbInfoId] = make([]*edbmonitor.EdbMonitorInfo, 0) } edbMonitorMap[v.EdbInfoId] = append(edbMonitorMap[v.EdbInfoId], v) } edbInfo := h.MapRowToStruct(e.Table.Columns, e.Rows[1]) if monitors, ok := edbMonitorMap[edbInfo.EdbInfoId]; ok { for _, monitor := range monitors { err = edbmonitorSvr.ModifyEdbMonitorState(monitor, edbInfo.EdbCode, edbInfo.Source, edbInfo.SubSource) if err != nil { continue } } } return nil } func (h *EdbEventHandler) MapRowToStruct(columns []schema.TableColumn, row []interface{}) EdbInfoBingLog { edbInfo := EdbInfoBingLog{} for i, column := range columns { value := reflect.ValueOf(row[i]) switch column.Name { case "edb_info_id": edbInfo.EdbInfoId = int(value.Int()) case "edb_info_type": edbInfo.EdbInfoType = int(value.Uint()) case "source": edbInfo.Source = int(value.Int()) case "edb_code": edbInfo.EdbCode = value.String() case "start_date": edbInfo.StartDate = value.String() case "end_date": edbInfo.EndDate = value.String() case "unique_code": edbInfo.UniqueCode = value.String() case "create_time": edbInfo.CreateTime = value.String() case "modify_time": edbInfo.ModifyTime = value.String() case "base_modify_time": if value.IsValid() { edbInfo.BaseModifyTime = value.String() } case "min_value": edbInfo.MinValue = value.Float() case "max_value": edbInfo.MaxValue = value.Float() case "latest_date": edbInfo.LatestDate = value.String() case "latest_value": edbInfo.LatestValue = value.Float() case "end_value": edbInfo.EndValue = value.Float() case "data_update_time": edbInfo.DataUpdateTime = value.String() case "er_data_update_date": edbInfo.ErDataUpdateDate = value.String() case "sub_source": edbInfo.SubSource = int(value.Int()) default: continue } } return edbInfo } // SetBinlogFileName // @Description: 设置当前的binlog文件名和位置 // @author: Roc // @receiver h // @datetime 2024-02-29 18:09:36 // @param fileName string // @param position uint32 func (h *EdbEventHandler) SetBinlogFileName(fileName string, position uint32) { h.fileName = fileName h.position = position fmt.Println("init fileName:", h.fileName, ";position:", h.position) }