handler.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. package binlog
  2. import (
  3. edbmonitorSvr "eta/eta_api/services/edb_monitor"
  4. "eta/eta_api/utils"
  5. "fmt"
  6. "reflect"
  7. "time"
  8. "github.com/go-mysql-org/go-mysql/canal"
  9. "github.com/go-mysql-org/go-mysql/mysql"
  10. "github.com/go-mysql-org/go-mysql/replication"
  11. "github.com/go-mysql-org/go-mysql/schema"
  12. )
  13. type EdbEventHandler struct {
  14. canal.DummyEventHandler
  15. fileName string
  16. position uint32
  17. }
  18. func (h *EdbEventHandler) OnRow(e *canal.RowsEvent) (err error) {
  19. // 监听逻辑
  20. switch e.Action {
  21. case canal.InsertAction:
  22. err = h.Insert(e)
  23. if err != nil {
  24. utils.FileLog.Info("binlog insert error:", err)
  25. }
  26. case canal.UpdateAction:
  27. err = h.Update(e)
  28. if err != nil {
  29. utils.FileLog.Info("binlog update error:", err)
  30. }
  31. default:
  32. return nil
  33. }
  34. fmt.Println("fileName:", h.fileName, ";position:", h.position)
  35. // 每次操作完成后都将当前位置记录到缓存
  36. utils.Rc.Put(utils.CACHE_MYSQL_DATA_FILENAME, h.fileName, 31*24*time.Hour)
  37. utils.Rc.Put(utils.CACHE_MYSQL_DATA_POSITION, h.position, 31*24*time.Hour)
  38. return nil
  39. }
  40. func (h *EdbEventHandler) OnPosSynced(header *replication.EventHeader, p mysql.Position, set mysql.GTIDSet, f bool) error {
  41. h.fileName = p.Name
  42. h.position = p.Pos
  43. return nil
  44. }
  45. func (h *EdbEventHandler) SyncToRedis() {
  46. for {
  47. // 旋转binlog日志的时候,需要将当前位置记录到缓存
  48. utils.Rc.Put(utils.CACHE_MYSQL_DATA_FILENAME, h.fileName, 31*24*time.Hour)
  49. utils.Rc.Put(utils.CACHE_MYSQL_DATA_POSITION, h.position, 31*24*time.Hour)
  50. time.Sleep(3 * time.Second)
  51. }
  52. }
  53. func (h *EdbEventHandler) String() string {
  54. return "EdbEventHandler"
  55. }
  56. func (h *EdbEventHandler) Insert(e *canal.RowsEvent) error {
  57. // 批量插入的时候,e.Rows的长度会大于0
  58. fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
  59. for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list)
  60. edbInfo := h.MapRowToStruct(e.Table.Columns, row)
  61. if ok := edbmonitorSvr.EdbLocalSet.IsExist(edbInfo.EdbInfoId); ok {
  62. err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, edbInfo)
  63. if err != nil {
  64. return err
  65. }
  66. } else {
  67. ok, err := utils.Rc.SIsMember(edbmonitorSvr.EDB_MONITOR_ID_SET_CACHE, edbInfo)
  68. if err != nil {
  69. return err
  70. }
  71. if ok {
  72. err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, edbInfo)
  73. if err != nil {
  74. return err
  75. }
  76. }
  77. }
  78. // if monitors, ok := edbMonitorMap[edbInfo.EdbInfoId]; ok {
  79. // for _, monitor := range monitors {
  80. // err = edbmonitorSvr.ModifyEdbMonitorState(monitor, edbInfo.EdbCode, edbInfo.Source, edbInfo.SubSource)
  81. // if err != nil {
  82. // continue
  83. // }
  84. // }
  85. // }
  86. }
  87. return nil
  88. }
  89. func (h *EdbEventHandler) Update(e *canal.RowsEvent) error {
  90. if len(e.Rows) != 2 {
  91. fmt.Println("更新数据异常,没有原始数据和新数据:", e.Rows)
  92. return nil
  93. }
  94. edbInfo := h.MapRowToStruct(e.Table.Columns, e.Rows[1])
  95. if ok := edbmonitorSvr.EdbLocalSet.IsExist(edbInfo.EdbInfoId); ok {
  96. err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, edbInfo)
  97. if err != nil {
  98. return err
  99. }
  100. } else {
  101. ok, err := utils.Rc.SIsMember(edbmonitorSvr.EDB_MONITOR_ID_SET_CACHE, edbInfo)
  102. if err != nil {
  103. return err
  104. }
  105. if ok {
  106. err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, edbInfo)
  107. if err != nil {
  108. return err
  109. }
  110. }
  111. }
  112. // if monitors, ok := edbMonitorMap[edbInfo.EdbInfoId]; ok {
  113. // for _, monitor := range monitors {
  114. // err = edbmonitorSvr.ModifyEdbMonitorState(monitor, edbInfo.EdbCode, edbInfo.Source, edbInfo.SubSource)
  115. // if err != nil {
  116. // continue
  117. // }
  118. // }
  119. // }
  120. return nil
  121. }
  122. func (h *EdbEventHandler) MapRowToStruct(columns []schema.TableColumn, row []interface{}) edbmonitorSvr.EdbInfoBingLog {
  123. edbInfo := edbmonitorSvr.EdbInfoBingLog{}
  124. for i, column := range columns {
  125. value := reflect.ValueOf(row[i])
  126. switch column.Name {
  127. case "edb_info_id":
  128. edbInfo.EdbInfoId = int(value.Int())
  129. case "edb_info_type":
  130. edbInfo.EdbInfoType = int(value.Uint())
  131. case "source":
  132. edbInfo.Source = int(value.Int())
  133. case "edb_code":
  134. edbInfo.EdbCode = value.String()
  135. case "start_date":
  136. edbInfo.StartDate = value.String()
  137. case "end_date":
  138. edbInfo.EndDate = value.String()
  139. case "unique_code":
  140. edbInfo.UniqueCode = value.String()
  141. case "create_time":
  142. edbInfo.CreateTime = value.String()
  143. case "modify_time":
  144. edbInfo.ModifyTime = value.String()
  145. case "base_modify_time":
  146. if value.IsValid() {
  147. edbInfo.BaseModifyTime = value.String()
  148. }
  149. case "min_value":
  150. edbInfo.MinValue = value.Float()
  151. case "max_value":
  152. edbInfo.MaxValue = value.Float()
  153. case "latest_date":
  154. edbInfo.LatestDate = value.String()
  155. case "latest_value":
  156. edbInfo.LatestValue = value.Float()
  157. case "end_value":
  158. edbInfo.EndValue = value.Float()
  159. case "data_update_time":
  160. edbInfo.DataUpdateTime = value.String()
  161. case "er_data_update_date":
  162. edbInfo.ErDataUpdateDate = value.String()
  163. case "sub_source":
  164. edbInfo.SubSource = int(value.Int())
  165. default:
  166. continue
  167. }
  168. }
  169. return edbInfo
  170. }
  171. // SetBinlogFileName
  172. // @Description: 设置当前的binlog文件名和位置
  173. // @author: Roc
  174. // @receiver h
  175. // @datetime 2024-02-29 18:09:36
  176. // @param fileName string
  177. // @param position uint32
  178. func (h *EdbEventHandler) SetBinlogFileName(fileName string, position uint32) {
  179. h.fileName = fileName
  180. h.position = position
  181. fmt.Println("init fileName:", h.fileName, ";position:", h.position)
  182. }