|
@@ -70,27 +70,27 @@ func (h *EdbEventHandler) Insert(e *canal.RowsEvent) error {
|
|
|
// 批量插入的时候,e.Rows的长度会大于0
|
|
|
fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
|
|
|
for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list)
|
|
|
- edbInfo := h.MapRowToStruct(e.Table.Columns, row)
|
|
|
- if ok := edbmonitorSvr.EdbLocalSet.IsExist(edbInfo.EdbInfoId); ok {
|
|
|
- err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, edbInfo)
|
|
|
+ newEdbInfo := h.MapRowToStruct(e.Table.Columns, row)
|
|
|
+ if ok := edbmonitorSvr.EdbLocalSet.IsExist(newEdbInfo.EdbInfoId); ok {
|
|
|
+ err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
} else {
|
|
|
- ok, err := utils.Rc.SIsMember(edbmonitorSvr.EDB_MONITOR_ID_SET_CACHE, edbInfo)
|
|
|
+ ok, err := utils.Rc.SIsMember(edbmonitorSvr.EDB_MONITOR_ID_SET_CACHE, newEdbInfo)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
if ok {
|
|
|
- err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, edbInfo)
|
|
|
+ err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- // if monitors, ok := edbMonitorMap[edbInfo.EdbInfoId]; ok {
|
|
|
+ // if monitors, ok := edbMonitorMap[newEdbInfo.EdbInfoId]; ok {
|
|
|
// for _, monitor := range monitors {
|
|
|
- // err = edbmonitorSvr.ModifyEdbMonitorState(monitor, edbInfo.EdbCode, edbInfo.Source, edbInfo.SubSource)
|
|
|
+ // err = edbmonitorSvr.ModifyEdbMonitorState(monitor, newEdbInfo.EdbCode, newEdbInfo.Source, newEdbInfo.SubSource)
|
|
|
// if err != nil {
|
|
|
// continue
|
|
|
// }
|
|
@@ -105,28 +105,31 @@ func (h *EdbEventHandler) Update(e *canal.RowsEvent) error {
|
|
|
fmt.Println("更新数据异常,没有原始数据和新数据:", e.Rows)
|
|
|
return nil
|
|
|
}
|
|
|
- edbInfo := h.MapRowToStruct(e.Table.Columns, e.Rows[1])
|
|
|
- if ok := edbmonitorSvr.EdbLocalSet.IsExist(edbInfo.EdbInfoId); ok {
|
|
|
- err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, edbInfo)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- } else {
|
|
|
- ok, err := utils.Rc.SIsMember(edbmonitorSvr.EDB_MONITOR_ID_SET_CACHE, edbInfo)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- if ok {
|
|
|
- err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, edbInfo)
|
|
|
+ oldEdbInfo := h.MapRowToStruct(e.Table.Columns, e.Rows[0])
|
|
|
+ newEdbInfo := h.MapRowToStruct(e.Table.Columns, e.Rows[1])
|
|
|
+ if oldEdbInfo.EndValue != newEdbInfo.EndValue {
|
|
|
+ if ok := edbmonitorSvr.EdbLocalSet.IsExist(newEdbInfo.EdbInfoId); ok {
|
|
|
+ err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
+ } else {
|
|
|
+ ok, err := utils.Rc.SIsMember(edbmonitorSvr.EDB_MONITOR_ID_SET_CACHE, newEdbInfo)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ if ok {
|
|
|
+ err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // if monitors, ok := edbMonitorMap[edbInfo.EdbInfoId]; ok {
|
|
|
+ // if monitors, ok := edbMonitorMap[newEdbInfo.EdbInfoId]; ok {
|
|
|
// for _, monitor := range monitors {
|
|
|
- // err = edbmonitorSvr.ModifyEdbMonitorState(monitor, edbInfo.EdbCode, edbInfo.Source, edbInfo.SubSource)
|
|
|
+ // err = edbmonitorSvr.ModifyEdbMonitorState(monitor, newEdbInfo.EdbCode, newEdbInfo.Source, newEdbInfo.SubSource)
|
|
|
// if err != nil {
|
|
|
// continue
|
|
|
// }
|
|
@@ -136,53 +139,53 @@ func (h *EdbEventHandler) Update(e *canal.RowsEvent) error {
|
|
|
}
|
|
|
|
|
|
func (h *EdbEventHandler) MapRowToStruct(columns []schema.TableColumn, row []interface{}) edbmonitorSvr.EdbInfoBingLog {
|
|
|
- edbInfo := edbmonitorSvr.EdbInfoBingLog{}
|
|
|
+ newEdbInfo := edbmonitorSvr.EdbInfoBingLog{}
|
|
|
for i, column := range columns {
|
|
|
value := reflect.ValueOf(row[i])
|
|
|
switch column.Name {
|
|
|
case "edb_info_id":
|
|
|
- edbInfo.EdbInfoId = int(value.Int())
|
|
|
+ newEdbInfo.EdbInfoId = int(value.Int())
|
|
|
case "edb_info_type":
|
|
|
- edbInfo.EdbInfoType = int(value.Uint())
|
|
|
+ newEdbInfo.EdbInfoType = int(value.Uint())
|
|
|
case "source":
|
|
|
- edbInfo.Source = int(value.Int())
|
|
|
+ newEdbInfo.Source = int(value.Int())
|
|
|
case "edb_code":
|
|
|
- edbInfo.EdbCode = value.String()
|
|
|
+ newEdbInfo.EdbCode = value.String()
|
|
|
case "start_date":
|
|
|
- edbInfo.StartDate = value.String()
|
|
|
+ newEdbInfo.StartDate = value.String()
|
|
|
case "end_date":
|
|
|
- edbInfo.EndDate = value.String()
|
|
|
+ newEdbInfo.EndDate = value.String()
|
|
|
case "unique_code":
|
|
|
- edbInfo.UniqueCode = value.String()
|
|
|
+ newEdbInfo.UniqueCode = value.String()
|
|
|
case "create_time":
|
|
|
- edbInfo.CreateTime = value.String()
|
|
|
+ newEdbInfo.CreateTime = value.String()
|
|
|
case "modify_time":
|
|
|
- edbInfo.ModifyTime = value.String()
|
|
|
+ newEdbInfo.ModifyTime = value.String()
|
|
|
case "base_modify_time":
|
|
|
if value.IsValid() {
|
|
|
- edbInfo.BaseModifyTime = value.String()
|
|
|
+ newEdbInfo.BaseModifyTime = value.String()
|
|
|
}
|
|
|
case "min_value":
|
|
|
- edbInfo.MinValue = value.Float()
|
|
|
+ newEdbInfo.MinValue = value.Float()
|
|
|
case "max_value":
|
|
|
- edbInfo.MaxValue = value.Float()
|
|
|
+ newEdbInfo.MaxValue = value.Float()
|
|
|
case "latest_date":
|
|
|
- edbInfo.LatestDate = value.String()
|
|
|
+ newEdbInfo.LatestDate = value.String()
|
|
|
case "latest_value":
|
|
|
- edbInfo.LatestValue = value.Float()
|
|
|
+ newEdbInfo.LatestValue = value.Float()
|
|
|
case "end_value":
|
|
|
- edbInfo.EndValue = value.Float()
|
|
|
+ newEdbInfo.EndValue = value.Float()
|
|
|
case "data_update_time":
|
|
|
- edbInfo.DataUpdateTime = value.String()
|
|
|
+ newEdbInfo.DataUpdateTime = value.String()
|
|
|
case "er_data_update_date":
|
|
|
- edbInfo.ErDataUpdateDate = value.String()
|
|
|
+ newEdbInfo.ErDataUpdateDate = value.String()
|
|
|
case "sub_source":
|
|
|
- edbInfo.SubSource = int(value.Int())
|
|
|
+ newEdbInfo.SubSource = int(value.Int())
|
|
|
default:
|
|
|
continue
|
|
|
}
|
|
|
}
|
|
|
- return edbInfo
|
|
|
+ return newEdbInfo
|
|
|
}
|
|
|
|
|
|
// SetBinlogFileName
|