handler.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. package binlog
  2. import (
  3. "eta/eta_api/utils"
  4. "fmt"
  5. "reflect"
  6. "time"
  7. "github.com/go-mysql-org/go-mysql/canal"
  8. "github.com/go-mysql-org/go-mysql/mysql"
  9. "github.com/go-mysql-org/go-mysql/replication"
  10. "github.com/pingcap/errors"
  11. )
  12. type EdbEventHandler struct {
  13. canal.DummyEventHandler
  14. fileName string
  15. position uint32
  16. }
  17. func (h *EdbEventHandler) OnRow(e *canal.RowsEvent) (err error) {
  18. // 监听逻辑
  19. switch e.Action {
  20. case canal.InsertAction:
  21. err = h.Insert(e)
  22. case canal.UpdateAction:
  23. err = h.Update(e)
  24. case canal.DeleteAction:
  25. err = h.Delete(e)
  26. default:
  27. return errors.New("操作异常")
  28. }
  29. fmt.Println("fileName:", h.fileName, ";position:", h.position)
  30. // 每次操作完成后都将当前位置记录到缓存
  31. utils.Rc.Put(utils.CACHE_MYSQL_DATA_FILENAME, h.fileName, 31*24*time.Hour)
  32. utils.Rc.Put(utils.CACHE_MYSQL_DATA_POSITION, h.position, 31*24*time.Hour)
  33. return nil
  34. }
  35. func (h *EdbEventHandler) OnPosSynced(header *replication.EventHeader, p mysql.Position, set mysql.GTIDSet, f bool) error {
  36. h.fileName = p.Name
  37. h.position = p.Pos
  38. // 旋转binlog日志的时候,需要将当前位置记录到缓存
  39. utils.Rc.Put(utils.CACHE_MYSQL_DATA_FILENAME, h.fileName, 31*24*time.Hour)
  40. utils.Rc.Put(utils.CACHE_MYSQL_DATA_POSITION, h.position, 31*24*time.Hour)
  41. return nil
  42. }
  43. func (h *EdbEventHandler) String() string {
  44. return "MyEventHandler"
  45. }
  46. func (h *EdbEventHandler) Insert(e *canal.RowsEvent) error {
  47. // 批量插入的时候,e.Rows的长度会大于0
  48. fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
  49. for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list)
  50. logData := make(map[string]interface{})
  51. dataLen := len(row)
  52. for i, v := range e.Table.Columns {
  53. if i < dataLen {
  54. tmpData := row[i]
  55. if tmpData != nil && reflect.TypeOf(tmpData).Kind() == reflect.Slice {
  56. tmpOld := tmpData.([]byte)
  57. tmpData = string(tmpOld)
  58. }
  59. logData[v.Name] = tmpData
  60. }
  61. }
  62. }
  63. return nil
  64. }
  65. func (h *EdbEventHandler) Update(e *canal.RowsEvent) error {
  66. if len(e.Rows) != 2 {
  67. fmt.Println("更新数据异常,没有原始数据和新数据:", e.Rows)
  68. return nil
  69. }
  70. logOldData := make(map[string]interface{})
  71. logNewData := make(map[string]interface{})
  72. oldDataLen := len(e.Rows[0])
  73. newDataLen := len(e.Rows[0])
  74. for i, v := range e.Table.Columns {
  75. if i < oldDataLen {
  76. oldData := e.Rows[0][i]
  77. if oldData != nil && reflect.TypeOf(oldData).Kind() == reflect.Slice {
  78. tmpOld := oldData.([]byte)
  79. oldData = string(tmpOld)
  80. }
  81. logOldData[v.Name] = oldData
  82. }
  83. if i < newDataLen {
  84. newData := e.Rows[1][i]
  85. if newData != nil && reflect.TypeOf(newData).Kind() == reflect.Slice {
  86. tmpNew := newData.([]byte)
  87. newData = string(tmpNew)
  88. }
  89. logNewData[v.Name] = newData
  90. }
  91. }
  92. return nil
  93. }
  94. func (h *EdbEventHandler) Delete(e *canal.RowsEvent) error {
  95. // 批量删除的时候,e.Rows的长度会大于0
  96. fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
  97. for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list)
  98. logData := make(map[string]interface{})
  99. dataLen := len(row)
  100. for i, v := range e.Table.Columns {
  101. if i < dataLen {
  102. tmpData := row[i]
  103. if tmpData != nil && reflect.TypeOf(tmpData).Kind() == reflect.Slice {
  104. tmpOld := tmpData.([]byte)
  105. tmpData = string(tmpOld)
  106. }
  107. logData[v.Name] = tmpData
  108. }
  109. }
  110. }
  111. return nil
  112. }
  113. func (h *EdbEventHandler) Delete3(e *canal.RowsEvent) error {
  114. if len(e.Rows) != 1 {
  115. fmt.Println("删除数据异常,没有原始数据:", e.Rows)
  116. return nil
  117. }
  118. fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
  119. dataLen := len(e.Rows[0])
  120. logData := make(map[string]interface{})
  121. for i, v := range e.Table.Columns {
  122. if i < dataLen {
  123. tmpData := e.Rows[0][i]
  124. if tmpData != nil && reflect.TypeOf(tmpData).Kind() == reflect.Slice {
  125. tmpOld := tmpData.([]byte)
  126. tmpData = string(tmpOld)
  127. }
  128. logData[v.Name] = tmpData
  129. }
  130. }
  131. return nil
  132. }
  133. // SetBinlogFileName
  134. // @Description: 设置当前的binlog文件名和位置
  135. // @author: Roc
  136. // @receiver h
  137. // @datetime 2024-02-29 18:09:36
  138. // @param fileName string
  139. // @param position uint32
  140. func (h *EdbEventHandler) SetBinlogFileName(fileName string, position uint32) {
  141. h.fileName = fileName
  142. h.position = position
  143. fmt.Println("init fileName:", h.fileName, ";position:", h.position)
  144. }