Forráskód Böngészése

add:增加binlog监听

zqbao 4 hónapja
szülő
commit
429e54e1c6

+ 36 - 12
controllers/edb_monitor/edb_monitor_message.go

@@ -26,8 +26,6 @@ var upgrader = websocket.Upgrader{
 	},
 }
 
-var edbMonitorMessageClient = make(map[int]*websocket.Conn)
-
 // GetMonitorLevel
 // @Title 预警管理消息
 // @Description 预警管理消息
@@ -52,7 +50,7 @@ func (m *EdbMonitorMessageController) Connect() {
 	connKey := edbmonitor.EDB_MONITOR_MESSAGE_CONNECT_CACHE + strconv.Itoa(sysUser.AdminId)
 	ok := utils.Rc.IsExist(connKey)
 	if !ok {
-		conn = edbMonitorMessageClient[sysUser.AdminId]
+		conn = edbmonitor.MonitorMessageConn[sysUser.AdminId]
 		if conn != nil {
 			conn.Close()
 		}
@@ -71,23 +69,49 @@ func (m *EdbMonitorMessageController) Connect() {
 	}
 	defer conn.Close()
 
-	edbMonitorMessageClient[sysUser.AdminId] = conn
+	edbmonitor.MonitorMessageConn[sysUser.AdminId] = conn
 	conn.SetCloseHandler(func(code int, text string) error {
-		delete(edbMonitorMessageClient, sysUser.AdminId)
+		delete(edbmonitor.MonitorMessageConn, sysUser.AdminId)
 		utils.Rc.Delete(connKey)
 		return nil
 	})
+	messageList, err := edbmonitor.GetHistoryMessages(sysUser.AdminId)
+	if err != nil {
+		utils.FileLog.Error("获取指标预警信息历史失败,err:%s, adminId:%d", err.Error(), sysUser.AdminId)
+	}
+	success := make(chan int, 10)
+	go func() {
+		defer close(success)
+		for _, msg := range messageList {
+			err = edbmonitor.SendMessages(sysUser.AdminId, msg.EdbInfoId, msg.EdbInfoType, msg.Message, msg.TriggerTime)
+			if err != nil {
+				utils.FileLog.Error("指标预警信息发送失败,err:%s, adminId:%d", err.Error(), sysUser.AdminId)
+			} else {
+				success <- msg.EdbMonitorMessageId
+			}
+			time.Sleep(3 * time.Second)
+		}
+	}()
+	go func() {
+		for {
+			msgId, ok := <-success
+			if !ok {
+				return
+			}
+			_, err = edbmonitor.ReadEdbMonitorMessage(msgId, sysUser.AdminId)
+			if err != nil {
+				utils.FileLog.Error("指标预警信息已读失败,err:%s, adminId:%d", err.Error(), sysUser.AdminId)
+			}
+		}
+	}()
+
 	for {
 		ok = utils.Rc.IsExist(connKey)
 		if !ok {
 			br.Msg = "连接已断开"
 			return
 		}
-		err = edbmonitor.SendMessages(conn, sysUser.AdminId)
-		if err != nil {
-			utils.FileLog.Error("指标预警信息发送失败,err:%s, adminId:%d", err.Error(), sysUser.AdminId)
-		}
-		time.Sleep(1 * time.Minute)
+		time.Sleep(10 * time.Second)
 	}
 }
 
@@ -111,7 +135,7 @@ func (m *EdbMonitorMessageController) Health() {
 		return
 	}
 
-	conn := edbMonitorMessageClient[sysUser.AdminId]
+	conn := edbmonitor.MonitorMessageConn[sysUser.AdminId]
 	if conn == nil {
 		br.Msg = "系统错误"
 		br.ErrMsg = "健康检查失败,err:连接已断开"
@@ -151,7 +175,7 @@ func (m *EdbMonitorMessageController) Close() {
 		return
 	}
 
-	conn := edbMonitorMessageClient[sysUser.AdminId]
+	conn := edbmonitor.MonitorMessageConn[sysUser.AdminId]
 	if conn != nil {
 		conn.Close()
 	}

+ 6 - 6
models/binlog/binlog.go

@@ -5,8 +5,8 @@ import "github.com/beego/beego/v2/client/orm"
 // BinlogFormatStruct
 // @Description: 数据库的binlog格式
 type BinlogFormatStruct struct {
-	VariableName string `json:"Variable_name"`
-	Value        string `json:"Value"`
+	VariableName string `orm:"column(Variable_name)"`
+	Value        string `orm:"column(Value)"`
 }
 
 // GetBinlogFormat
@@ -15,14 +15,14 @@ type BinlogFormatStruct struct {
 // @return err
 func GetBinlogFormat() (item *BinlogFormatStruct, err error) {
 	o := orm.NewOrmUsingDB("data")
-	sql := `SHOW VARIABLES LIKE 'binlog_format';`
+	sql := `SHOW VARIABLES LIKE 'binlog_format'`
 	err = o.Raw(sql).QueryRow(&item)
 	return
 }
 
 type BinlogFileStruct struct {
-	File     string `json:"File"`
-	Position uint32 `json:"Position"`
+	File     string `orm:"column(File)"`
+	Position uint32 `orm:"column(Position)"`
 }
 
 // GetShowMaster
@@ -31,7 +31,7 @@ type BinlogFileStruct struct {
 // @return err
 func GetShowMaster() (item *BinlogFileStruct, err error) {
 	o := orm.NewOrmUsingDB("data")
-	sql := `show master status;`
+	sql := `show master status`
 	err = o.Raw(sql).QueryRow(&item)
 	return
 }

+ 6 - 6
models/binlog/business_sys_interaction_log.go

@@ -8,12 +8,12 @@ import (
 
 // BusinessSysInteractionLog 商家系统交互记录表
 type BusinessSysInteractionLog struct {
-	ID             uint32    `gorm:"primaryKey;column:id;type:int(10) unsigned;not null" json:"-"`
-	InteractionKey string    `gorm:"unique;column:interaction_key;type:varchar(128);not null;default:''" json:"interactionKey"` // 记录Key
-	InteractionVal string    `gorm:"column:interaction_val;type:text;default:null" json:"interactionVal"`                       // 记录值
-	Remark         string    `gorm:"column:remark;type:varchar(128);not null;default:''" json:"remark"`                         // 备注
-	ModifyTime     time.Time `gorm:"column:modify_time;type:datetime;default:null" json:"modifyTime"`                           // 修改日期
-	CreateTime     time.Time `gorm:"column:create_time;type:datetime;default:null" json:"createTime"`                           // 创建时间
+	ID             uint32    `orm:"column(id);pk"`
+	InteractionKey string    // 记录Key
+	InteractionVal string    // 记录值
+	Remark         string    // 备注
+	ModifyTime     time.Time // 修改日期
+	CreateTime     time.Time // 创建时间
 }
 
 // TableName get sql table name.获取数据库表名

+ 14 - 0
models/db.go

@@ -3,6 +3,7 @@ package models
 import (
 	"eta/eta_api/models/ai_summary"
 	"eta/eta_api/models/aimod"
+	binlogDao "eta/eta_api/models/binlog"
 	"eta/eta_api/models/company"
 	"eta/eta_api/models/data_manage"
 	"eta/eta_api/models/data_manage/chart_theme"
@@ -25,6 +26,7 @@ import (
 	"eta/eta_api/models/speech_recognition"
 	"eta/eta_api/models/system"
 	"eta/eta_api/models/yb"
+	binlogSvr "eta/eta_api/services/binlog"
 	"eta/eta_api/utils"
 	"time"
 
@@ -207,6 +209,12 @@ func init() {
 
 	// 初始化指标监控
 	initEdbMonitor()
+
+	// 开启mysql binlog监听
+	if utils.MYSQL_DATA_BINLOG_URL != "" {
+		initBinlog()
+		go binlogSvr.ListenMysql()
+	}
 	// 初始化部分数据表变量(直接init会有顺序问题=_=!)
 	afterInitTable()
 }
@@ -648,6 +656,12 @@ func initEdbMonitor() {
 	)
 }
 
+func initBinlog() {
+	orm.RegisterModel(
+		new(binlogDao.BusinessSysInteractionLog), // binlog表
+	)
+}
+
 // afterInitTable
 // @Description: 初始化表结构的的后置操作
 // @author: Roc

+ 3 - 3
models/edb_monitor/edb_monitor_message.go

@@ -36,9 +36,9 @@ func GetEdbMonitorMessageById(id int) (item *EdbMonitorMessage, err error) {
 	return
 }
 
-func GetEdbMonitorMessageByAdminId(adminId int) (item *EdbMonitorMessage, err error) {
+func GetEdbMonitorMessageByAdminId(adminId int) (items []*EdbMonitorMessage, err error) {
 	o := orm.NewOrmUsingDB("data")
-	sql := "SELECT * FROM edb_monitor_message WHERE admin_id =? AND is_read = 0 ORDER BY create_time DESC LIMIT 1"
-	err = o.Raw(sql, adminId).QueryRow(&item)
+	sql := "SELECT * FROM edb_monitor_message WHERE admin_id =? AND is_read = 0 ORDER BY create_time ASC"
+	_, err = o.Raw(sql, adminId).QueryRows(&items)
 	return
 }

+ 109 - 75
services/binlog/handler.go

@@ -1,6 +1,8 @@
 package binlog
 
 import (
+	edbmonitor "eta/eta_api/models/edb_monitor"
+	edbmonitorSvr "eta/eta_api/services/edb_monitor"
 	"eta/eta_api/utils"
 	"fmt"
 	"reflect"
@@ -9,9 +11,30 @@ import (
 	"github.com/go-mysql-org/go-mysql/canal"
 	"github.com/go-mysql-org/go-mysql/mysql"
 	"github.com/go-mysql-org/go-mysql/replication"
-	"github.com/pingcap/errors"
+	"github.com/go-mysql-org/go-mysql/schema"
 )
 
+type EdbInfoBingLog struct {
+	EdbInfoId        int    `orm:"column(edb_info_id);pk"`
+	EdbInfoType      int    `description:"指标类型,0:普通指标,1:预测指标"`
+	Source           int    `description:"来源id"`
+	EdbCode          string `description:"指标编码"`
+	StartDate        string `description:"起始日期"`
+	EndDate          string `description:"终止日期"`
+	UniqueCode       string `description:"指标唯一编码"`
+	CreateTime       string
+	ModifyTime       string
+	BaseModifyTime   string
+	MinValue         float64 `description:"指标最小值"`
+	MaxValue         float64 `description:"指标最大值"`
+	LatestDate       string  `description:"数据最新日期(实际日期)"`
+	LatestValue      float64 `description:"数据最新值(实际值)"`
+	EndValue         float64 `description:"数据的最新值(预测日期的最新值)"`
+	DataUpdateTime   string  `description:"最近一次数据发生变化的时间"`
+	ErDataUpdateDate string  `description:"本次更新,数据发生变化的最早日期"`
+	SubSource        int     `description:"子数据来源:0:经济数据库,1:日期序列"`
+}
+
 type EdbEventHandler struct {
 	canal.DummyEventHandler
 	fileName string
@@ -24,12 +47,16 @@ func (h *EdbEventHandler) OnRow(e *canal.RowsEvent) (err error) {
 	switch e.Action {
 	case canal.InsertAction:
 		err = h.Insert(e)
+		if err != nil {
+			utils.FileLog.Info("binlog insert error:", err)
+		}
 	case canal.UpdateAction:
 		err = h.Update(e)
-	case canal.DeleteAction:
-		err = h.Delete(e)
+		if err != nil {
+			utils.FileLog.Info("binlog update error:", err)
+		}
 	default:
-		return errors.New("操作异常")
+		return nil
 	}
 
 	fmt.Println("fileName:", h.fileName, ";position:", h.position)
@@ -53,28 +80,35 @@ func (h *EdbEventHandler) OnPosSynced(header *replication.EventHeader, p mysql.P
 }
 
 func (h *EdbEventHandler) String() string {
-	return "MyEventHandler"
+	return "EdbEventHandler"
 }
 
 func (h *EdbEventHandler) Insert(e *canal.RowsEvent) error {
 	// 批量插入的时候,e.Rows的长度会大于0
 	fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
+	edbMonitorList, err := edbmonitor.GetEdbMonitorEdbInfoList()
+	if err != nil {
+		return err
+	}
+	edbMonitorMap := make(map[int][]*edbmonitor.EdbMonitorInfo)
+	for _, v := range edbMonitorList {
+		if _, ok := edbMonitorMap[v.EdbInfoId]; !ok {
+			edbMonitorMap[v.EdbInfoId] = make([]*edbmonitor.EdbMonitorInfo, 0)
+		}
+		edbMonitorMap[v.EdbInfoId] = append(edbMonitorMap[v.EdbInfoId], v)
+	}
 
 	for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list)
-		logData := make(map[string]interface{})
-		dataLen := len(row)
-		for i, v := range e.Table.Columns {
-			if i < dataLen {
-				tmpData := row[i]
-				if tmpData != nil && reflect.TypeOf(tmpData).Kind() == reflect.Slice {
-					tmpOld := tmpData.([]byte)
-					tmpData = string(tmpOld)
+		edbInfo := h.MapRowToStruct(e.Table.Columns, row)
+		if monitors, ok := edbMonitorMap[edbInfo.EdbInfoId]; ok {
+			for _, monitor := range monitors {
+				err = edbmonitorSvr.ModifyEdbMonitorState(monitor, edbInfo.EdbCode, edbInfo.Source, edbInfo.SubSource)
+				if err != nil {
+					continue
 				}
-				logData[v.Name] = tmpData
 			}
 		}
 	}
-
 	return nil
 }
 
@@ -84,78 +118,78 @@ func (h *EdbEventHandler) Update(e *canal.RowsEvent) error {
 		return nil
 	}
 
-	logOldData := make(map[string]interface{})
-	logNewData := make(map[string]interface{})
-
-	oldDataLen := len(e.Rows[0])
-	newDataLen := len(e.Rows[0])
-	for i, v := range e.Table.Columns {
-
-		if i < oldDataLen {
-			oldData := e.Rows[0][i]
-			if oldData != nil && reflect.TypeOf(oldData).Kind() == reflect.Slice {
-				tmpOld := oldData.([]byte)
-				oldData = string(tmpOld)
-			}
-			logOldData[v.Name] = oldData
-		}
-		if i < newDataLen {
-			newData := e.Rows[1][i]
-			if newData != nil && reflect.TypeOf(newData).Kind() == reflect.Slice {
-				tmpNew := newData.([]byte)
-				newData = string(tmpNew)
-			}
-			logNewData[v.Name] = newData
+	edbMonitorList, err := edbmonitor.GetEdbMonitorEdbInfoList()
+	if err != nil {
+		return err
+	}
+	edbMonitorMap := make(map[int][]*edbmonitor.EdbMonitorInfo)
+	for _, v := range edbMonitorList {
+		if _, ok := edbMonitorMap[v.EdbInfoId]; !ok {
+			edbMonitorMap[v.EdbInfoId] = make([]*edbmonitor.EdbMonitorInfo, 0)
 		}
-
+		edbMonitorMap[v.EdbInfoId] = append(edbMonitorMap[v.EdbInfoId], v)
 	}
 
-	return nil
-}
-
-func (h *EdbEventHandler) Delete(e *canal.RowsEvent) error {
-	// 批量删除的时候,e.Rows的长度会大于0
-	fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
-
-	for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list)
-		logData := make(map[string]interface{})
-		dataLen := len(row)
-		for i, v := range e.Table.Columns {
-			if i < dataLen {
-				tmpData := row[i]
-				if tmpData != nil && reflect.TypeOf(tmpData).Kind() == reflect.Slice {
-					tmpOld := tmpData.([]byte)
-					tmpData = string(tmpOld)
-				}
-				logData[v.Name] = tmpData
+	edbInfo := h.MapRowToStruct(e.Table.Columns, e.Rows[1])
+	if monitors, ok := edbMonitorMap[edbInfo.EdbInfoId]; ok {
+		for _, monitor := range monitors {
+			err = edbmonitorSvr.ModifyEdbMonitorState(monitor, edbInfo.EdbCode, edbInfo.Source, edbInfo.SubSource)
+			if err != nil {
+				continue
 			}
 		}
 	}
-
 	return nil
 }
 
-func (h *EdbEventHandler) Delete3(e *canal.RowsEvent) error {
-	if len(e.Rows) != 1 {
-		fmt.Println("删除数据异常,没有原始数据:", e.Rows)
-		return nil
-	}
-	fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
-
-	dataLen := len(e.Rows[0])
-	logData := make(map[string]interface{})
-	for i, v := range e.Table.Columns {
-		if i < dataLen {
-			tmpData := e.Rows[0][i]
-			if tmpData != nil && reflect.TypeOf(tmpData).Kind() == reflect.Slice {
-				tmpOld := tmpData.([]byte)
-				tmpData = string(tmpOld)
+func (h *EdbEventHandler) MapRowToStruct(columns []schema.TableColumn, row []interface{}) EdbInfoBingLog {
+	edbInfo := EdbInfoBingLog{}
+	for i, column := range columns {
+		value := reflect.ValueOf(row[i])
+		switch column.Name {
+		case "edb_info_id":
+			edbInfo.EdbInfoId = int(value.Int())
+		case "edb_info_type":
+			edbInfo.EdbInfoType = int(value.Uint())
+		case "source":
+			edbInfo.Source = int(value.Int())
+		case "edb_code":
+			edbInfo.EdbCode = value.String()
+		case "start_date":
+			edbInfo.StartDate = value.String()
+		case "end_date":
+			edbInfo.EndDate = value.String()
+		case "unique_code":
+			edbInfo.UniqueCode = value.String()
+		case "create_time":
+			edbInfo.CreateTime = value.String()
+		case "modify_time":
+			edbInfo.ModifyTime = value.String()
+		case "base_modify_time":
+			if value.IsValid() {
+				edbInfo.BaseModifyTime = value.String()
 			}
-			logData[v.Name] = tmpData
+		case "min_value":
+			edbInfo.MinValue = value.Float()
+		case "max_value":
+			edbInfo.MaxValue = value.Float()
+		case "latest_date":
+			edbInfo.LatestDate = value.String()
+		case "latest_value":
+			edbInfo.LatestValue = value.Float()
+		case "end_value":
+			edbInfo.EndValue = value.Float()
+		case "data_update_time":
+			edbInfo.DataUpdateTime = value.String()
+		case "er_data_update_date":
+			edbInfo.ErDataUpdateDate = value.String()
+		case "sub_source":
+			edbInfo.SubSource = int(value.Int())
+		default:
+			continue
 		}
 	}
-
-	return nil
+	return edbInfo
 }
 
 // SetBinlogFileName

+ 9 - 0
services/edb_monitor/edb_monitor.go

@@ -285,8 +285,17 @@ func ModifyEdbMonitorState(edbMonitorInfo *edbmonitor.EdbMonitorInfo, edbCode st
 	if err != nil {
 		return
 	}
+
 	if triggerState == EDB_MONITOR_STATE_TRIGGER_SUCCESS {
+		triggerTime := edbMonitorInfo.MonitorTriggerTime.Format(utils.FormatDateTime)
+		err = SendMessages(edbMonitorInfo.CreateUserId, edbMonitorInfo.EdbInfoId, edbMonitorInfo.EdbInfoType, edbMonitorInfo.EdbMonitorName, triggerTime)
+		isRead := 1
+		if err != nil {
+			isRead = 0
+		}
+		err = LogMessage(edbMonitorInfo.EdbMonitorName, edbMonitorInfo.MonitorTriggerTime, edbMonitorInfo.EdbInfoId, edbMonitorInfo.EdbInfoType, edbMonitorInfo.CreateUserId, isRead)
 	}
+
 	return
 }
 

+ 30 - 11
services/edb_monitor/edb_monitor_message.go

@@ -14,6 +14,8 @@ var (
 	EDB_MONITOR_MESSAGE_CONNECT_CACHE = "edb_monitor_message_cache:"
 )
 
+var MonitorMessageConn = make(map[int]*websocket.Conn)
+
 func ReadEdbMonitorMessage(messageId, adminId int) (msg string, err error) {
 	message, err := edbmonitor.GetEdbMonitorMessageById(messageId)
 	if err != nil {
@@ -38,12 +40,12 @@ func ReadEdbMonitorMessage(messageId, adminId int) (msg string, err error) {
 	return
 }
 
-func LogMessage(content string, triggerTime time.Time, edbInfoId, edbInfoType, adminId int) (err error) {
+func LogMessage(content string, triggerTime time.Time, edbInfoId, edbInfoType, adminId, isRead int) (err error) {
 	message := &edbmonitor.EdbMonitorMessage{
 		EdbInfoId:          edbInfoId,
 		EdbInfoType:        edbInfoType,
 		AdminId:            adminId,
-		IsRead:             0,
+		IsRead:             isRead,
 		Message:            content,
 		MonitorTriggerTime: triggerTime,
 		CreateTime:         time.Now(),
@@ -53,17 +55,34 @@ func LogMessage(content string, triggerTime time.Time, edbInfoId, edbInfoType, a
 	return err
 }
 
-func SendMessages(conn *websocket.Conn, adminId int) (err error) {
-	message, err := edbmonitor.GetEdbMonitorMessageByAdminId(adminId)
-	if err != nil {
-		return
+func SendMessages(adminId int, edbInfoId int, edbInfoType int, message string, triggerTime string) (err error) {
+	conn := MonitorMessageConn[adminId]
+	if conn == nil {
+		return errors.New("no connection")
 	}
 	msg := response.EdbMonitorMessageResp{
-		EdbMonitorMessageId: message.EdbMonitorMessageId,
-		EdbInfoId:           message.EdbInfoId,
-		EdbInfoType:         message.EdbInfoType,
-		Message:             message.Message,
-		TriggerTime:         utils.TimeTransferString(utils.FormatDateTime, message.MonitorTriggerTime),
+		EdbInfoId:   edbInfoId,
+		EdbInfoType: edbInfoType,
+		Message:     message,
+		TriggerTime: triggerTime,
 	}
 	return conn.WriteJSON(msg)
 }
+
+func GetHistoryMessages(adminId int) (items []*response.EdbMonitorMessageResp, err error) {
+	messageList, err := edbmonitor.GetEdbMonitorMessageByAdminId(adminId)
+	if err != nil {
+		return
+	}
+	for _, message := range messageList {
+		item := response.EdbMonitorMessageResp{
+			EdbMonitorMessageId: message.EdbMonitorMessageId,
+			EdbInfoId:           message.EdbInfoId,
+			EdbInfoType:         message.EdbInfoType,
+			Message:             message.Message,
+			TriggerTime:         utils.TimeTransferString(utils.FormatDateTime, message.MonitorTriggerTime),
+		}
+		items = append(items, &item)
+	}
+	return
+}