handler.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. package binlog
  2. import (
  3. edbmonitorSvr "eta/eta_api/services/edb_monitor"
  4. "eta/eta_api/utils"
  5. "fmt"
  6. "reflect"
  7. "time"
  8. "github.com/go-mysql-org/go-mysql/canal"
  9. "github.com/go-mysql-org/go-mysql/mysql"
  10. "github.com/go-mysql-org/go-mysql/replication"
  11. "github.com/go-mysql-org/go-mysql/schema"
  12. )
  13. type EdbEventHandler struct {
  14. canal.DummyEventHandler
  15. fileName string
  16. position uint32
  17. }
  18. func (h *EdbEventHandler) OnRow(e *canal.RowsEvent) (err error) {
  19. // 监听逻辑
  20. switch e.Action {
  21. case canal.InsertAction:
  22. err = h.Insert(e)
  23. if err != nil {
  24. utils.FileLog.Info("binlog insert error:", err)
  25. }
  26. case canal.UpdateAction:
  27. err = h.Update(e)
  28. if err != nil {
  29. utils.FileLog.Info("binlog update error:", err)
  30. }
  31. default:
  32. return nil
  33. }
  34. fmt.Println("fileName:", h.fileName, ";position:", h.position)
  35. // 每次操作完成后都将当前位置记录到缓存
  36. utils.Rc.Put(utils.CACHE_MYSQL_DATA_FILENAME, h.fileName, 31*24*time.Hour)
  37. utils.Rc.Put(utils.CACHE_MYSQL_DATA_POSITION, h.position, 31*24*time.Hour)
  38. return nil
  39. }
  40. func (h *EdbEventHandler) OnPosSynced(header *replication.EventHeader, p mysql.Position, set mysql.GTIDSet, f bool) error {
  41. h.fileName = p.Name
  42. h.position = p.Pos
  43. return nil
  44. }
  45. func (h *EdbEventHandler) SyncToRedis() {
  46. for {
  47. // 旋转binlog日志的时候,需要将当前位置记录到缓存
  48. utils.Rc.Put(utils.CACHE_MYSQL_DATA_FILENAME, h.fileName, 31*24*time.Hour)
  49. utils.Rc.Put(utils.CACHE_MYSQL_DATA_POSITION, h.position, 31*24*time.Hour)
  50. time.Sleep(3 * time.Second)
  51. }
  52. }
  53. func (h *EdbEventHandler) String() string {
  54. return "EdbEventHandler"
  55. }
  56. func (h *EdbEventHandler) Insert(e *canal.RowsEvent) error {
  57. // 批量插入的时候,e.Rows的长度会大于0
  58. fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
  59. for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list)
  60. newEdbInfo := h.MapRowToStruct(e.Table.Columns, row)
  61. if ok := edbmonitorSvr.EdbLocalSet.IsExist(newEdbInfo.EdbInfoId); ok {
  62. err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo)
  63. if err != nil {
  64. return err
  65. }
  66. } else {
  67. ok, err := utils.Rc.SIsMember(edbmonitorSvr.EDB_MONITOR_ID_SET_CACHE, newEdbInfo)
  68. if err != nil {
  69. return err
  70. }
  71. if ok {
  72. err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo)
  73. if err != nil {
  74. return err
  75. }
  76. }
  77. }
  78. // if monitors, ok := edbMonitorMap[newEdbInfo.EdbInfoId]; ok {
  79. // for _, monitor := range monitors {
  80. // err = edbmonitorSvr.ModifyEdbMonitorState(monitor, newEdbInfo.EdbCode, newEdbInfo.Source, newEdbInfo.SubSource)
  81. // if err != nil {
  82. // continue
  83. // }
  84. // }
  85. // }
  86. }
  87. return nil
  88. }
  89. func (h *EdbEventHandler) Update(e *canal.RowsEvent) error {
  90. if len(e.Rows) != 2 {
  91. fmt.Println("更新数据异常,没有原始数据和新数据:", e.Rows)
  92. return nil
  93. }
  94. oldEdbInfo := h.MapRowToStruct(e.Table.Columns, e.Rows[0])
  95. newEdbInfo := h.MapRowToStruct(e.Table.Columns, e.Rows[1])
  96. if oldEdbInfo.EndValue != newEdbInfo.EndValue {
  97. if ok := edbmonitorSvr.EdbLocalSet.IsExist(newEdbInfo.EdbInfoId); ok {
  98. err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo)
  99. if err != nil {
  100. return err
  101. }
  102. } else {
  103. ok, err := utils.Rc.SIsMember(edbmonitorSvr.EDB_MONITOR_ID_SET_CACHE, newEdbInfo)
  104. if err != nil {
  105. return err
  106. }
  107. if ok {
  108. err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo)
  109. if err != nil {
  110. return err
  111. }
  112. }
  113. }
  114. }
  115. // if monitors, ok := edbMonitorMap[newEdbInfo.EdbInfoId]; ok {
  116. // for _, monitor := range monitors {
  117. // err = edbmonitorSvr.ModifyEdbMonitorState(monitor, newEdbInfo.EdbCode, newEdbInfo.Source, newEdbInfo.SubSource)
  118. // if err != nil {
  119. // continue
  120. // }
  121. // }
  122. // }
  123. return nil
  124. }
  125. func (h *EdbEventHandler) MapRowToStruct(columns []schema.TableColumn, row []interface{}) edbmonitorSvr.EdbInfoBingLog {
  126. newEdbInfo := edbmonitorSvr.EdbInfoBingLog{}
  127. for i, column := range columns {
  128. value := reflect.ValueOf(row[i])
  129. switch column.Name {
  130. case "edb_info_id":
  131. newEdbInfo.EdbInfoId = int(value.Int())
  132. case "edb_info_type":
  133. newEdbInfo.EdbInfoType = int(value.Uint())
  134. case "source":
  135. newEdbInfo.Source = int(value.Int())
  136. case "edb_code":
  137. newEdbInfo.EdbCode = value.String()
  138. case "start_date":
  139. newEdbInfo.StartDate = value.String()
  140. case "end_date":
  141. newEdbInfo.EndDate = value.String()
  142. case "unique_code":
  143. newEdbInfo.UniqueCode = value.String()
  144. case "create_time":
  145. newEdbInfo.CreateTime = value.String()
  146. case "modify_time":
  147. newEdbInfo.ModifyTime = value.String()
  148. case "base_modify_time":
  149. if value.IsValid() {
  150. newEdbInfo.BaseModifyTime = value.String()
  151. }
  152. case "min_value":
  153. newEdbInfo.MinValue = value.Float()
  154. case "max_value":
  155. newEdbInfo.MaxValue = value.Float()
  156. case "latest_date":
  157. newEdbInfo.LatestDate = value.String()
  158. case "latest_value":
  159. newEdbInfo.LatestValue = value.Float()
  160. case "end_value":
  161. newEdbInfo.EndValue = value.Float()
  162. case "data_update_time":
  163. newEdbInfo.DataUpdateTime = value.String()
  164. case "er_data_update_date":
  165. newEdbInfo.ErDataUpdateDate = value.String()
  166. case "sub_source":
  167. newEdbInfo.SubSource = int(value.Int())
  168. default:
  169. continue
  170. }
  171. }
  172. return newEdbInfo
  173. }
  174. // SetBinlogFileName
  175. // @Description: 设置当前的binlog文件名和位置
  176. // @author: Roc
  177. // @receiver h
  178. // @datetime 2024-02-29 18:09:36
  179. // @param fileName string
  180. // @param position uint32
  181. func (h *EdbEventHandler) SetBinlogFileName(fileName string, position uint32) {
  182. h.fileName = fileName
  183. h.position = position
  184. fmt.Println("init fileName:", h.fileName, ";position:", h.position)
  185. }