handler.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. package binlog
  2. import (
  3. "encoding/json"
  4. "eta/eta_api/services/alarm_msg"
  5. edbmonitorSvr "eta/eta_api/services/edb_monitor"
  6. "eta/eta_api/utils"
  7. "fmt"
  8. "reflect"
  9. "time"
  10. "github.com/go-mysql-org/go-mysql/canal"
  11. "github.com/go-mysql-org/go-mysql/mysql"
  12. "github.com/go-mysql-org/go-mysql/replication"
  13. "github.com/go-mysql-org/go-mysql/schema"
  14. )
  15. type EdbEventHandler struct {
  16. canal.DummyEventHandler
  17. fileName string
  18. position uint32
  19. }
  20. func (h *EdbEventHandler) OnRow(e *canal.RowsEvent) (err error) {
  21. // 监听逻辑
  22. switch e.Action {
  23. case canal.InsertAction:
  24. err = h.Insert(e)
  25. if err != nil {
  26. utils.FileLog.Info("binlog insert error:", err)
  27. }
  28. case canal.UpdateAction:
  29. err = h.Update(e)
  30. if err != nil {
  31. utils.FileLog.Info("binlog update error:", err)
  32. }
  33. default:
  34. return nil
  35. }
  36. fmt.Println("fileName:", h.fileName, ";position:", h.position)
  37. // 每次操作完成后都将当前位置记录到缓存
  38. utils.Rc.Put(utils.CACHE_MYSQL_DATA_FILENAME, h.fileName, 31*24*time.Hour)
  39. utils.Rc.Put(utils.CACHE_MYSQL_DATA_POSITION, h.position, 31*24*time.Hour)
  40. return nil
  41. }
  42. func (h *EdbEventHandler) OnPosSynced(header *replication.EventHeader, p mysql.Position, set mysql.GTIDSet, f bool) error {
  43. h.fileName = p.Name
  44. h.position = p.Pos
  45. return nil
  46. }
  47. func (h *EdbEventHandler) SyncToRedis() {
  48. for {
  49. // 旋转binlog日志的时候,需要将当前位置记录到缓存
  50. utils.Rc.Put(utils.CACHE_MYSQL_DATA_FILENAME, h.fileName, 31*24*time.Hour)
  51. utils.Rc.Put(utils.CACHE_MYSQL_DATA_POSITION, h.position, 31*24*time.Hour)
  52. time.Sleep(3 * time.Second)
  53. }
  54. }
  55. func (h *EdbEventHandler) String() string {
  56. return "EdbEventHandler"
  57. }
  58. func (h *EdbEventHandler) Insert(e *canal.RowsEvent) error {
  59. // 批量插入的时候,e.Rows的长度会大于0
  60. fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
  61. for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list)
  62. edbInfo := h.MapRowToStruct(e.Table.Columns, row)
  63. if ok := edbmonitorSvr.EdbLocalSet.IsExist(edbInfo.EdbInfoId); ok {
  64. err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, edbInfo)
  65. if err != nil {
  66. return err
  67. }
  68. } else {
  69. ok, err := utils.Rc.SIsMember(edbmonitorSvr.EDB_MONITOR_ID_SET_CACHE, edbInfo)
  70. if err != nil {
  71. return err
  72. }
  73. if ok {
  74. err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, edbInfo)
  75. if err != nil {
  76. return err
  77. }
  78. }
  79. }
  80. // if monitors, ok := edbMonitorMap[edbInfo.EdbInfoId]; ok {
  81. // for _, monitor := range monitors {
  82. // err = edbmonitorSvr.ModifyEdbMonitorState(monitor, edbInfo.EdbCode, edbInfo.Source, edbInfo.SubSource)
  83. // if err != nil {
  84. // continue
  85. // }
  86. // }
  87. // }
  88. }
  89. return nil
  90. }
  91. func (h *EdbEventHandler) Update(e *canal.RowsEvent) error {
  92. if len(e.Rows) != 2 {
  93. fmt.Println("更新数据异常,没有原始数据和新数据:", e.Rows)
  94. return nil
  95. }
  96. edbInfo := h.MapRowToStruct(e.Table.Columns, e.Rows[1])
  97. if ok := edbmonitorSvr.EdbLocalSet.IsExist(edbInfo.EdbInfoId); ok {
  98. sendBody, _ := json.Marshal(edbInfo)
  99. alarm_msg.SendAlarmMsg(string(sendBody), 1)
  100. err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, edbInfo)
  101. if err != nil {
  102. return err
  103. }
  104. } else {
  105. sendBody, _ := json.Marshal(edbInfo)
  106. alarm_msg.SendAlarmMsg(string(sendBody), 1)
  107. ok, err := utils.Rc.SIsMember(edbmonitorSvr.EDB_MONITOR_ID_SET_CACHE, edbInfo)
  108. if err != nil {
  109. return err
  110. }
  111. if ok {
  112. err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, edbInfo)
  113. if err != nil {
  114. return err
  115. }
  116. }
  117. }
  118. // if monitors, ok := edbMonitorMap[edbInfo.EdbInfoId]; ok {
  119. // for _, monitor := range monitors {
  120. // err = edbmonitorSvr.ModifyEdbMonitorState(monitor, edbInfo.EdbCode, edbInfo.Source, edbInfo.SubSource)
  121. // if err != nil {
  122. // continue
  123. // }
  124. // }
  125. // }
  126. return nil
  127. }
  128. func (h *EdbEventHandler) MapRowToStruct(columns []schema.TableColumn, row []interface{}) edbmonitorSvr.EdbInfoBingLog {
  129. edbInfo := edbmonitorSvr.EdbInfoBingLog{}
  130. for i, column := range columns {
  131. value := reflect.ValueOf(row[i])
  132. switch column.Name {
  133. case "edb_info_id":
  134. edbInfo.EdbInfoId = int(value.Int())
  135. case "edb_info_type":
  136. edbInfo.EdbInfoType = int(value.Uint())
  137. case "source":
  138. edbInfo.Source = int(value.Int())
  139. case "edb_code":
  140. edbInfo.EdbCode = value.String()
  141. case "start_date":
  142. edbInfo.StartDate = value.String()
  143. case "end_date":
  144. edbInfo.EndDate = value.String()
  145. case "unique_code":
  146. edbInfo.UniqueCode = value.String()
  147. case "create_time":
  148. edbInfo.CreateTime = value.String()
  149. case "modify_time":
  150. edbInfo.ModifyTime = value.String()
  151. case "base_modify_time":
  152. if value.IsValid() {
  153. edbInfo.BaseModifyTime = value.String()
  154. }
  155. case "min_value":
  156. edbInfo.MinValue = value.Float()
  157. case "max_value":
  158. edbInfo.MaxValue = value.Float()
  159. case "latest_date":
  160. edbInfo.LatestDate = value.String()
  161. case "latest_value":
  162. edbInfo.LatestValue = value.Float()
  163. case "end_value":
  164. edbInfo.EndValue = value.Float()
  165. case "data_update_time":
  166. edbInfo.DataUpdateTime = value.String()
  167. case "er_data_update_date":
  168. edbInfo.ErDataUpdateDate = value.String()
  169. case "sub_source":
  170. edbInfo.SubSource = int(value.Int())
  171. default:
  172. continue
  173. }
  174. }
  175. return edbInfo
  176. }
  177. // SetBinlogFileName
  178. // @Description: 设置当前的binlog文件名和位置
  179. // @author: Roc
  180. // @receiver h
  181. // @datetime 2024-02-29 18:09:36
  182. // @param fileName string
  183. // @param position uint32
  184. func (h *EdbEventHandler) SetBinlogFileName(fileName string, position uint32) {
  185. h.fileName = fileName
  186. h.position = position
  187. fmt.Println("init fileName:", h.fileName, ";position:", h.position)
  188. }