handler.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. package binlog
  2. import (
  3. "encoding/json"
  4. "eta_gn/eta_bridge/global"
  5. "eta_gn/eta_bridge/models/index"
  6. "eta_gn/eta_bridge/utils"
  7. "fmt"
  8. "github.com/go-mysql-org/go-mysql/canal"
  9. "github.com/go-mysql-org/go-mysql/mysql"
  10. "github.com/go-mysql-org/go-mysql/replication"
  11. "github.com/pingcap/errors"
  12. "reflect"
  13. "time"
  14. )
  15. type MyEventHandler struct {
  16. canal.DummyEventHandler
  17. fileName string
  18. position uint32
  19. }
  20. func (h *MyEventHandler) OnRow(e *canal.RowsEvent) (err error) {
  21. switch e.Action {
  22. case canal.InsertAction:
  23. err = h.Insert(e)
  24. case canal.UpdateAction:
  25. err = h.Update(e)
  26. case canal.DeleteAction:
  27. err = h.Delete(e)
  28. default:
  29. return errors.New("操作异常")
  30. }
  31. fmt.Println("fileName:", h.fileName, ";position:", h.position)
  32. global.Rc.Put(utils.CACHE_MYSQL_MASTER_FILENAME, h.fileName, 31*24*time.Hour)
  33. global.Rc.Put(utils.CACHE_MYSQL_MASTER_POSITION, h.position, 31*24*time.Hour)
  34. return nil
  35. }
  36. func (h *MyEventHandler) OnPosSynced(header *replication.EventHeader, p mysql.Position, set mysql.GTIDSet, f bool) error {
  37. h.fileName = p.Name
  38. h.position = p.Pos
  39. global.Rc.Put(utils.CACHE_MYSQL_MASTER_FILENAME, h.fileName, 31*24*time.Hour)
  40. global.Rc.Put(utils.CACHE_MYSQL_MASTER_POSITION, h.position, 31*24*time.Hour)
  41. return nil
  42. }
  43. func (h *MyEventHandler) String() string {
  44. return "MyEventHandler"
  45. }
  46. func (h *MyEventHandler) Insert(e *canal.RowsEvent) error {
  47. fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
  48. for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list)
  49. logData := make(map[string]interface{})
  50. dataLen := len(row)
  51. for i, v := range e.Table.Columns {
  52. if i < dataLen {
  53. tmpData := row[i]
  54. if tmpData != nil && reflect.TypeOf(tmpData).Kind() == reflect.Slice {
  55. tmpOld := tmpData.([]byte)
  56. tmpData = string(tmpOld)
  57. }
  58. logData[v.Name] = tmpData
  59. }
  60. }
  61. dataByte, _ := json.Marshal(logData)
  62. log(e.Table.Schema, e.Table.Name, e.Action, ``, string(dataByte))
  63. }
  64. return nil
  65. }
  66. func (h *MyEventHandler) Update(e *canal.RowsEvent) error {
  67. if len(e.Rows) != 2 {
  68. fmt.Println("更新数据异常,没有原始数据和新数据:", e.Rows)
  69. return nil
  70. }
  71. logOldData := make(map[string]interface{})
  72. logNewData := make(map[string]interface{})
  73. oldDataLen := len(e.Rows[0])
  74. newDataLen := len(e.Rows[0])
  75. for i, v := range e.Table.Columns {
  76. if i < oldDataLen {
  77. oldData := e.Rows[0][i]
  78. if oldData != nil && reflect.TypeOf(oldData).Kind() == reflect.Slice {
  79. tmpOld := oldData.([]byte)
  80. oldData = string(tmpOld)
  81. }
  82. logOldData[v.Name] = oldData
  83. }
  84. if i < newDataLen {
  85. newData := e.Rows[1][i]
  86. if newData != nil && reflect.TypeOf(newData).Kind() == reflect.Slice {
  87. tmpNew := newData.([]byte)
  88. newData = string(tmpNew)
  89. }
  90. logNewData[v.Name] = newData
  91. }
  92. }
  93. logOldDataByte, _ := json.Marshal(logOldData)
  94. logNewDataByte, _ := json.Marshal(logNewData)
  95. log(e.Table.Schema, e.Table.Name, e.Action, string(logOldDataByte), string(logNewDataByte))
  96. return nil
  97. }
  98. func (h *MyEventHandler) Delete(e *canal.RowsEvent) error {
  99. fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
  100. for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list)
  101. logData := make(map[string]interface{})
  102. dataLen := len(row)
  103. for i, v := range e.Table.Columns {
  104. if i < dataLen {
  105. tmpData := row[i]
  106. if tmpData != nil && reflect.TypeOf(tmpData).Kind() == reflect.Slice {
  107. tmpOld := tmpData.([]byte)
  108. tmpData = string(tmpOld)
  109. }
  110. logData[v.Name] = tmpData
  111. }
  112. }
  113. dataByte, _ := json.Marshal(logData)
  114. log(e.Table.Schema, e.Table.Name, e.Action, string(dataByte), ``)
  115. }
  116. return nil
  117. }
  118. func (h *MyEventHandler) Delete3(e *canal.RowsEvent) error {
  119. if len(e.Rows) != 1 {
  120. fmt.Println("删除数据异常,没有原始数据:", e.Rows)
  121. return nil
  122. }
  123. fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
  124. dataLen := len(e.Rows[0])
  125. logData := make(map[string]interface{})
  126. for i, v := range e.Table.Columns {
  127. if i < dataLen {
  128. tmpData := e.Rows[0][i]
  129. if tmpData != nil && reflect.TypeOf(tmpData).Kind() == reflect.Slice {
  130. tmpOld := tmpData.([]byte)
  131. tmpData = string(tmpOld)
  132. }
  133. logData[v.Name] = tmpData
  134. }
  135. }
  136. dataByte, _ := json.Marshal(logData)
  137. log(e.Table.Schema, e.Table.Name, e.Action, string(dataByte), ``)
  138. return nil
  139. }
  140. func (h *MyEventHandler) SetBinlogFileName(fileName string, position uint32) {
  141. h.fileName = fileName
  142. h.position = position
  143. fmt.Println("init fileName:", h.fileName, ";position:", h.position)
  144. }
  145. func log(dbName, tableName, opType, oldData, newData string) {
  146. item := index.EdbUpdateLog{
  147. OpDbName: dbName,
  148. OpTableName: tableName,
  149. OpType: opType,
  150. OldData: oldData,
  151. NewData: newData,
  152. CreateTime: time.Now(),
  153. }
  154. err := item.Create()
  155. if err != nil {
  156. fmt.Println("log create err:", err.Error())
  157. }
  158. }