handler.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. package binlog
  2. import (
  3. "encoding/json"
  4. "eta/eta_bridge/global"
  5. "eta/eta_bridge/models/index"
  6. "eta/eta_bridge/utils"
  7. "fmt"
  8. "github.com/go-mysql-org/go-mysql/canal"
  9. "github.com/go-mysql-org/go-mysql/mysql"
  10. "github.com/go-mysql-org/go-mysql/replication"
  11. "github.com/pingcap/errors"
  12. "reflect"
  13. "time"
  14. )
  15. type MyEventHandler struct {
  16. canal.DummyEventHandler
  17. fileName string
  18. position uint32
  19. }
  20. func (h *MyEventHandler) OnRow(e *canal.RowsEvent) (err error) {
  21. //fmt.Printf("%s %v\n", e.Action, e.Rows)
  22. //fmt.Println(e.Table.Columns)
  23. //fmt.Println(e.Action)
  24. switch e.Action {
  25. case canal.InsertAction:
  26. err = h.Insert(e)
  27. case canal.UpdateAction:
  28. err = h.Update(e)
  29. case canal.DeleteAction:
  30. err = h.Delete(e)
  31. default:
  32. return errors.New("操作异常")
  33. }
  34. fmt.Println("fileName:", h.fileName, ";position:", h.position)
  35. // 每次操作完成后都将当前位置记录到缓存
  36. global.Rc.Put(utils.CACHE_MYSQL_MASTER_FILENAME, h.fileName, 31*24*time.Hour)
  37. global.Rc.Put(utils.CACHE_MYSQL_MASTER_POSITION, h.position, 31*24*time.Hour)
  38. return nil
  39. }
  40. func (h *MyEventHandler) OnPosSynced(header *replication.EventHeader, p mysql.Position, set mysql.GTIDSet, f bool) error {
  41. h.fileName = p.Name
  42. h.position = p.Pos
  43. // 旋转binlog日志的时候,需要将当前位置记录到缓存
  44. global.Rc.Put(utils.CACHE_MYSQL_MASTER_FILENAME, h.fileName, 31*24*time.Hour)
  45. global.Rc.Put(utils.CACHE_MYSQL_MASTER_POSITION, h.position, 31*24*time.Hour)
  46. return nil
  47. }
  48. func (h *MyEventHandler) String() string {
  49. return "MyEventHandler"
  50. }
  51. func (h *MyEventHandler) Insert(e *canal.RowsEvent) error {
  52. // 批量插入的时候,e.Rows的长度会大于0
  53. //if len(e.Rows) != 1 {
  54. // fmt.Println("新增数据异常,没有新数据:", e.Rows)
  55. // return nil
  56. //}
  57. fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
  58. for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list)
  59. logData := make(map[string]interface{})
  60. dataLen := len(row)
  61. for i, v := range e.Table.Columns {
  62. if i < dataLen {
  63. tmpData := row[i]
  64. if tmpData != nil && reflect.TypeOf(tmpData).Kind() == reflect.Slice {
  65. tmpOld := tmpData.([]byte)
  66. tmpData = string(tmpOld)
  67. }
  68. logData[v.Name] = tmpData
  69. //tmpV = fmt.Sprintf("%v", tmpData)
  70. }
  71. }
  72. dataByte, _ := json.Marshal(logData)
  73. log(e.Table.Schema, e.Table.Name, e.Action, ``, string(dataByte))
  74. }
  75. return nil
  76. }
  77. func (h *MyEventHandler) Update(e *canal.RowsEvent) error {
  78. if len(e.Rows) != 2 {
  79. fmt.Println("更新数据异常,没有原始数据和新数据:", e.Rows)
  80. return nil
  81. }
  82. //fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
  83. logOldData := make(map[string]interface{})
  84. logNewData := make(map[string]interface{})
  85. oldDataLen := len(e.Rows[0])
  86. newDataLen := len(e.Rows[0])
  87. //maxDataLen := oldDataLen
  88. //if maxDataLen < newDataLen {
  89. // maxDataLen = newDataLen
  90. //}
  91. for i, v := range e.Table.Columns {
  92. //if v.IsUnsigned
  93. //var tmpV string
  94. //if i < dataLen {
  95. // tmpV = fmt.Sprintf("原数据:%v;新数据:%v", e.Rows[0][i], e.Rows[1][i])
  96. //}
  97. //fmt.Println(v.Name, ":", tmpV)
  98. if i < oldDataLen {
  99. oldData := e.Rows[0][i]
  100. if oldData != nil && reflect.TypeOf(oldData).Kind() == reflect.Slice {
  101. tmpOld := oldData.([]byte)
  102. oldData = string(tmpOld)
  103. }
  104. logOldData[v.Name] = oldData
  105. }
  106. if i < newDataLen {
  107. newData := e.Rows[1][i]
  108. if newData != nil && reflect.TypeOf(newData).Kind() == reflect.Slice {
  109. tmpNew := newData.([]byte)
  110. newData = string(tmpNew)
  111. }
  112. logNewData[v.Name] = newData
  113. }
  114. //if i < maxDataLen {
  115. // oldData := e.Rows[0][i]
  116. // newData := e.Rows[1][i]
  117. //
  118. // if oldData != nil && reflect.TypeOf(oldData).Kind() == reflect.Slice {
  119. // tmpOld := oldData.([]byte)
  120. // oldData = string(tmpOld)
  121. // }
  122. // if newData != nil && reflect.TypeOf(newData).Kind() == reflect.Slice {
  123. // tmpNew := newData.([]byte)
  124. // newData = string(tmpNew)
  125. // }
  126. //
  127. //
  128. // //if oldData != newData {
  129. // // tmpV = fmt.Sprintf("原数据:%v;新数据:%v", oldData, newData)
  130. // //}
  131. //}
  132. //if tmpV != `` {
  133. // fmt.Println(v.Name, ":", tmpV)
  134. //}
  135. }
  136. logOldDataByte, _ := json.Marshal(logOldData)
  137. logNewDataByte, _ := json.Marshal(logNewData)
  138. log(e.Table.Schema, e.Table.Name, e.Action, string(logOldDataByte), string(logNewDataByte))
  139. return nil
  140. }
  141. func (h *MyEventHandler) Delete(e *canal.RowsEvent) error {
  142. // 批量删除的时候,e.Rows的长度会大于0
  143. //if len(e.Rows) != 1 {
  144. // fmt.Println("删除数据异常,没有原始数据:", e.Rows)
  145. // return nil
  146. //}
  147. fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
  148. for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list)
  149. logData := make(map[string]interface{})
  150. dataLen := len(row)
  151. for i, v := range e.Table.Columns {
  152. if i < dataLen {
  153. tmpData := row[i]
  154. if tmpData != nil && reflect.TypeOf(tmpData).Kind() == reflect.Slice {
  155. tmpOld := tmpData.([]byte)
  156. tmpData = string(tmpOld)
  157. }
  158. logData[v.Name] = tmpData
  159. //tmpV = fmt.Sprintf("%v", tmpData)
  160. }
  161. }
  162. dataByte, _ := json.Marshal(logData)
  163. log(e.Table.Schema, e.Table.Name, e.Action, string(dataByte), ``)
  164. }
  165. return nil
  166. }
  167. func (h *MyEventHandler) Delete3(e *canal.RowsEvent) error {
  168. if len(e.Rows) != 1 {
  169. fmt.Println("删除数据异常,没有原始数据:", e.Rows)
  170. return nil
  171. }
  172. fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
  173. dataLen := len(e.Rows[0])
  174. logData := make(map[string]interface{})
  175. for i, v := range e.Table.Columns {
  176. //var tmpV interface{}
  177. if i < dataLen {
  178. //tmpV = fmt.Sprintf("%v", e.Rows[0][i])
  179. tmpData := e.Rows[0][i]
  180. if tmpData != nil && reflect.TypeOf(tmpData).Kind() == reflect.Slice {
  181. tmpOld := tmpData.([]byte)
  182. tmpData = string(tmpOld)
  183. }
  184. logData[v.Name] = tmpData
  185. //fmt.Println(oldData)
  186. }
  187. }
  188. dataByte, _ := json.Marshal(logData)
  189. log(e.Table.Schema, e.Table.Name, e.Action, string(dataByte), ``)
  190. return nil
  191. }
  192. // SetBinlogFileName
  193. // @Description: 设置当前的binlog文件名和位置
  194. // @author: Roc
  195. // @receiver h
  196. // @datetime 2024-02-29 18:09:36
  197. // @param fileName string
  198. // @param position uint32
  199. func (h *MyEventHandler) SetBinlogFileName(fileName string, position uint32) {
  200. h.fileName = fileName
  201. h.position = position
  202. fmt.Println("init fileName:", h.fileName, ";position:", h.position)
  203. }
  204. // log 简单的日志记录
  205. func log(dbName, tableName, opType, oldData, newData string) {
  206. item := index.EdbUpdateLog{
  207. OpDbName: dbName,
  208. OpTableName: tableName,
  209. OpType: opType,
  210. OldData: oldData,
  211. NewData: newData,
  212. CreateTime: time.Now(),
  213. }
  214. err := item.Create()
  215. if err != nil {
  216. fmt.Println("log create err:", err.Error())
  217. }
  218. }