handler.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. package binlog
  2. import (
  3. "eta/eta_api/utils"
  4. "fmt"
  5. "reflect"
  6. "time"
  7. "github.com/go-mysql-org/go-mysql/canal"
  8. "github.com/go-mysql-org/go-mysql/mysql"
  9. "github.com/go-mysql-org/go-mysql/replication"
  10. "github.com/pingcap/errors"
  11. )
  12. type edbEventHandler struct {
  13. canal.DummyEventHandler
  14. fileName string
  15. position uint32
  16. }
  17. func (h *edbEventHandler) OnRow(e *canal.RowsEvent) (err error) {
  18. //fmt.Printf("%s %v\n", e.Action, e.Rows)
  19. //fmt.Println(e.Table.Columns)
  20. //fmt.Println(e.Action)
  21. // 监听逻辑
  22. switch e.Action {
  23. case canal.InsertAction:
  24. err = h.Insert(e)
  25. case canal.UpdateAction:
  26. err = h.Update(e)
  27. case canal.DeleteAction:
  28. err = h.Delete(e)
  29. default:
  30. return errors.New("操作异常")
  31. }
  32. fmt.Println("fileName:", h.fileName, ";position:", h.position)
  33. // 每次操作完成后都将当前位置记录到缓存
  34. utils.Rc.Put(utils.CACHE_MYSQL_DATA_FILENAME, h.fileName, 31*24*time.Hour)
  35. utils.Rc.Put(utils.CACHE_MYSQL_DATA_POSITION, h.position, 31*24*time.Hour)
  36. return nil
  37. }
  38. func (h *edbEventHandler) OnPosSynced(header *replication.EventHeader, p mysql.Position, set mysql.GTIDSet, f bool) error {
  39. h.fileName = p.Name
  40. h.position = p.Pos
  41. // 旋转binlog日志的时候,需要将当前位置记录到缓存
  42. utils.Rc.Put(utils.CACHE_MYSQL_DATA_FILENAME, h.fileName, 31*24*time.Hour)
  43. utils.Rc.Put(utils.CACHE_MYSQL_DATA_POSITION, h.position, 31*24*time.Hour)
  44. return nil
  45. }
  46. func (h *edbEventHandler) String() string {
  47. return "MyEventHandler"
  48. }
  49. func (h *edbEventHandler) Insert(e *canal.RowsEvent) error {
  50. // 批量插入的时候,e.Rows的长度会大于0
  51. //if len(e.Rows) != 1 {
  52. // fmt.Println("新增数据异常,没有新数据:", e.Rows)
  53. // return nil
  54. //}
  55. fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
  56. for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list)
  57. logData := make(map[string]interface{})
  58. dataLen := len(row)
  59. for i, v := range e.Table.Columns {
  60. if i < dataLen {
  61. tmpData := row[i]
  62. if tmpData != nil && reflect.TypeOf(tmpData).Kind() == reflect.Slice {
  63. tmpOld := tmpData.([]byte)
  64. tmpData = string(tmpOld)
  65. }
  66. logData[v.Name] = tmpData
  67. //tmpV = fmt.Sprintf("%v", tmpData)
  68. }
  69. }
  70. }
  71. return nil
  72. }
  73. func (h *edbEventHandler) Update(e *canal.RowsEvent) error {
  74. if len(e.Rows) != 2 {
  75. fmt.Println("更新数据异常,没有原始数据和新数据:", e.Rows)
  76. return nil
  77. }
  78. //fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
  79. logOldData := make(map[string]interface{})
  80. logNewData := make(map[string]interface{})
  81. oldDataLen := len(e.Rows[0])
  82. newDataLen := len(e.Rows[0])
  83. //maxDataLen := oldDataLen
  84. //if maxDataLen < newDataLen {
  85. // maxDataLen = newDataLen
  86. //}
  87. for i, v := range e.Table.Columns {
  88. //if v.IsUnsigned
  89. //var tmpV string
  90. //if i < dataLen {
  91. // tmpV = fmt.Sprintf("原数据:%v;新数据:%v", e.Rows[0][i], e.Rows[1][i])
  92. //}
  93. //fmt.Println(v.Name, ":", tmpV)
  94. if i < oldDataLen {
  95. oldData := e.Rows[0][i]
  96. if oldData != nil && reflect.TypeOf(oldData).Kind() == reflect.Slice {
  97. tmpOld := oldData.([]byte)
  98. oldData = string(tmpOld)
  99. }
  100. logOldData[v.Name] = oldData
  101. }
  102. if i < newDataLen {
  103. newData := e.Rows[1][i]
  104. if newData != nil && reflect.TypeOf(newData).Kind() == reflect.Slice {
  105. tmpNew := newData.([]byte)
  106. newData = string(tmpNew)
  107. }
  108. logNewData[v.Name] = newData
  109. }
  110. //if i < maxDataLen {
  111. // oldData := e.Rows[0][i]
  112. // newData := e.Rows[1][i]
  113. //
  114. // if oldData != nil && reflect.TypeOf(oldData).Kind() == reflect.Slice {
  115. // tmpOld := oldData.([]byte)
  116. // oldData = string(tmpOld)
  117. // }
  118. // if newData != nil && reflect.TypeOf(newData).Kind() == reflect.Slice {
  119. // tmpNew := newData.([]byte)
  120. // newData = string(tmpNew)
  121. // }
  122. //
  123. //
  124. // //if oldData != newData {
  125. // // tmpV = fmt.Sprintf("原数据:%v;新数据:%v", oldData, newData)
  126. // //}
  127. //}
  128. //if tmpV != `` {
  129. // fmt.Println(v.Name, ":", tmpV)
  130. //}
  131. }
  132. return nil
  133. }
  134. func (h *edbEventHandler) Delete(e *canal.RowsEvent) error {
  135. // 批量删除的时候,e.Rows的长度会大于0
  136. //if len(e.Rows) != 1 {
  137. // fmt.Println("删除数据异常,没有原始数据:", e.Rows)
  138. // return nil
  139. //}
  140. fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
  141. for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list)
  142. logData := make(map[string]interface{})
  143. dataLen := len(row)
  144. for i, v := range e.Table.Columns {
  145. if i < dataLen {
  146. tmpData := row[i]
  147. if tmpData != nil && reflect.TypeOf(tmpData).Kind() == reflect.Slice {
  148. tmpOld := tmpData.([]byte)
  149. tmpData = string(tmpOld)
  150. }
  151. logData[v.Name] = tmpData
  152. //tmpV = fmt.Sprintf("%v", tmpData)
  153. }
  154. }
  155. }
  156. return nil
  157. }
  158. func (h *edbEventHandler) Delete3(e *canal.RowsEvent) error {
  159. if len(e.Rows) != 1 {
  160. fmt.Println("删除数据异常,没有原始数据:", e.Rows)
  161. return nil
  162. }
  163. fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
  164. dataLen := len(e.Rows[0])
  165. logData := make(map[string]interface{})
  166. for i, v := range e.Table.Columns {
  167. //var tmpV interface{}
  168. if i < dataLen {
  169. //tmpV = fmt.Sprintf("%v", e.Rows[0][i])
  170. tmpData := e.Rows[0][i]
  171. if tmpData != nil && reflect.TypeOf(tmpData).Kind() == reflect.Slice {
  172. tmpOld := tmpData.([]byte)
  173. tmpData = string(tmpOld)
  174. }
  175. logData[v.Name] = tmpData
  176. //fmt.Println(oldData)
  177. }
  178. }
  179. return nil
  180. }
  181. // SetBinlogFileName
  182. // @Description: 设置当前的binlog文件名和位置
  183. // @author: Roc
  184. // @receiver h
  185. // @datetime 2024-02-29 18:09:36
  186. // @param fileName string
  187. // @param position uint32
  188. func (h *edbEventHandler) SetBinlogFileName(fileName string, position uint32) {
  189. h.fileName = fileName
  190. h.position = position
  191. fmt.Println("init fileName:", h.fileName, ";position:", h.position)
  192. }