handler.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. package binlog
  2. import (
  3. dataSourceModel "eta/eta_api/models/data_source"
  4. edbmonitorSvr "eta/eta_api/services/edb_monitor"
  5. "eta/eta_api/utils"
  6. "fmt"
  7. "reflect"
  8. "runtime/debug"
  9. "time"
  10. "github.com/go-mysql-org/go-mysql/canal"
  11. "github.com/go-mysql-org/go-mysql/mysql"
  12. "github.com/go-mysql-org/go-mysql/replication"
  13. "github.com/go-mysql-org/go-mysql/schema"
  14. )
  15. type EdbEventHandler struct {
  16. canal.DummyEventHandler
  17. fileName string
  18. position uint32
  19. }
  20. func (h *EdbEventHandler) OnRow(e *canal.RowsEvent) (err error) {
  21. // 下面有可能出现panic导致监听挂掉
  22. defer func() {
  23. if r := recover(); r != nil {
  24. utils.FileLog.Error("binlog OnRow panic: ", r)
  25. stackTrace := debug.Stack()
  26. utils.FileLog.Error("binlog OnRow panic stack: ", string(stackTrace))
  27. }
  28. }()
  29. // 监听逻辑
  30. switch e.Action {
  31. case canal.InsertAction:
  32. err = h.Insert(e)
  33. if err != nil {
  34. utils.FileLog.Info("binlog insert error:", err)
  35. }
  36. case canal.UpdateAction:
  37. err = h.Update(e)
  38. if err != nil {
  39. utils.FileLog.Info("binlog update error:", err)
  40. }
  41. default:
  42. return nil
  43. }
  44. fmt.Println("fileName:", h.fileName, ";position:", h.position)
  45. // 每次操作完成后都将当前位置记录到缓存
  46. utils.Rc.Put(utils.CACHE_MYSQL_DATA_FILENAME, h.fileName, 31*24*time.Hour)
  47. utils.Rc.Put(utils.CACHE_MYSQL_DATA_POSITION, h.position, 31*24*time.Hour)
  48. return nil
  49. }
  50. func (h *EdbEventHandler) OnPosSynced(header *replication.EventHeader, p mysql.Position, set mysql.GTIDSet, f bool) error {
  51. h.fileName = p.Name
  52. h.position = p.Pos
  53. return nil
  54. }
  55. func (h *EdbEventHandler) SyncToRedis() {
  56. for {
  57. // 旋转binlog日志的时候,需要将当前位置记录到缓存
  58. utils.Rc.Put(utils.CACHE_MYSQL_DATA_FILENAME, h.fileName, 31*24*time.Hour)
  59. utils.Rc.Put(utils.CACHE_MYSQL_DATA_POSITION, h.position, 31*24*time.Hour)
  60. time.Sleep(3 * time.Second)
  61. }
  62. }
  63. func (h *EdbEventHandler) String() string {
  64. return "EdbEventHandler"
  65. }
  66. func (h *EdbEventHandler) Insert(e *canal.RowsEvent) error {
  67. // 批量插入的时候,e.Rows的长度会大于0
  68. fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
  69. // 指标库
  70. tableName := e.Table.Name
  71. if tableName == "edb_info" {
  72. for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list)
  73. newEdbInfo := h.MapRowToStruct(e.Table.Columns, row)
  74. if ok := edbmonitorSvr.EdbLocalSet.IsExist(newEdbInfo.EdbInfoId); ok {
  75. err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo)
  76. if err != nil {
  77. return err
  78. }
  79. } else {
  80. ok, err := utils.Rc.SIsMember(edbmonitorSvr.EDB_MONITOR_ID_SET_CACHE, newEdbInfo.EdbInfoId)
  81. if err != nil {
  82. return err
  83. }
  84. if ok {
  85. err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo)
  86. if err != nil {
  87. return err
  88. }
  89. }
  90. }
  91. // if monitors, ok := edbMonitorMap[newEdbInfo.EdbInfoId]; ok {
  92. // for _, monitor := range monitors {
  93. // err = edbmonitorSvr.ModifyEdbMonitorState(monitor, newEdbInfo.EdbCode, newEdbInfo.Source, newEdbInfo.SubSource)
  94. // if err != nil {
  95. // continue
  96. // }
  97. // }
  98. // }
  99. }
  100. return nil
  101. }
  102. // 数据源
  103. indexOb := dataSourceModel.GetEsBaseFromIndexByTableName(tableName)
  104. if indexOb == nil {
  105. return fmt.Errorf("数据表无对应数据源: %s", tableName)
  106. }
  107. for _, row := range e.Rows {
  108. indexItem := DataSourceMapRowToStruct(e.Table.Columns, row, indexOb)
  109. // 写入队列(此处无需做去重处理)
  110. if e := utils.Rc.LPush(utils.CACHE_DATA_SOURCE_ES_HANDLE, indexItem); e != nil {
  111. return fmt.Errorf("写入redis队列失败, %v", e)
  112. }
  113. }
  114. return nil
  115. }
  116. func (h *EdbEventHandler) Update(e *canal.RowsEvent) error {
  117. tableName := e.Table.Name
  118. // 指标库
  119. lenRows := len(e.Rows)
  120. if tableName == "edb_info" {
  121. //if len(e.Rows) != 2 {
  122. // fmt.Println("更新数据异常,没有原始数据和新数据:", e.Rows)
  123. // return nil
  124. //}
  125. // 由于UPDATE语句影响行数超过1时e.Rows长度会大于2,所以此处遍历处理
  126. for i := 0; i < lenRows; i += 2 {
  127. if i+1 >= lenRows {
  128. continue
  129. }
  130. oldEdbInfo := h.MapRowToStruct(e.Table.Columns, e.Rows[i])
  131. newEdbInfo := h.MapRowToStruct(e.Table.Columns, e.Rows[i+1])
  132. if oldEdbInfo.EndValue != newEdbInfo.EndValue {
  133. if ok := edbmonitorSvr.EdbLocalSet.IsExist(newEdbInfo.EdbInfoId); ok {
  134. err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo)
  135. if err != nil {
  136. return err
  137. }
  138. } else {
  139. ok, err := utils.Rc.SIsMember(edbmonitorSvr.EDB_MONITOR_ID_SET_CACHE, newEdbInfo.EdbInfoId)
  140. if err != nil {
  141. return err
  142. }
  143. if ok {
  144. err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo)
  145. if err != nil {
  146. return err
  147. }
  148. }
  149. }
  150. }
  151. }
  152. // if monitors, ok := edbMonitorMap[newEdbInfo.EdbInfoId]; ok {
  153. // for _, monitor := range monitors {
  154. // err = edbmonitorSvr.ModifyEdbMonitorState(monitor, newEdbInfo.EdbCode, newEdbInfo.Source, newEdbInfo.SubSource)
  155. // if err != nil {
  156. // continue
  157. // }
  158. // }
  159. // }
  160. return nil
  161. }
  162. // 数据源
  163. indexOb := dataSourceModel.GetEsBaseFromIndexByTableName(tableName)
  164. if indexOb == nil {
  165. return fmt.Errorf("数据表无对应数据源: %s", tableName)
  166. }
  167. for i := 0; i < lenRows; i += 2 {
  168. if i+1 >= lenRows {
  169. continue
  170. }
  171. // 这里只取[i+1]即UPDATE后的数据
  172. indexItem := DataSourceMapRowToStruct(e.Table.Columns, e.Rows[i+1], indexOb)
  173. if e := utils.Rc.LPush(utils.CACHE_DATA_SOURCE_ES_HANDLE, indexItem); e != nil {
  174. utils.FileLog.Info(fmt.Sprintf("binlog update data source lpush err: %v", e))
  175. continue
  176. }
  177. }
  178. return nil
  179. }
  180. func (h *EdbEventHandler) MapRowToStruct(columns []schema.TableColumn, row []interface{}) edbmonitorSvr.EdbInfoBingLog {
  181. defer func() {
  182. if r := recover(); r != nil {
  183. utils.FileLog.Info(fmt.Sprintf("binlog update data source panic;data:%v; err: %v", columns, r))
  184. }
  185. }()
  186. newEdbInfo := edbmonitorSvr.EdbInfoBingLog{}
  187. for i, column := range columns {
  188. value := reflect.ValueOf(row[i])
  189. // 数据无效的话,那么就过滤掉
  190. if !value.IsValid() {
  191. continue
  192. }
  193. switch column.Name {
  194. case "edb_info_id":
  195. newEdbInfo.EdbInfoId = int(value.Int())
  196. case "edb_info_type":
  197. newEdbInfo.EdbInfoType = int(value.Uint())
  198. case "source":
  199. newEdbInfo.Source = int(value.Int())
  200. case "edb_code":
  201. newEdbInfo.EdbCode = value.String()
  202. case "start_date":
  203. newEdbInfo.StartDate = value.String()
  204. case "end_date":
  205. newEdbInfo.EndDate = value.String()
  206. case "unique_code":
  207. newEdbInfo.UniqueCode = value.String()
  208. case "create_time":
  209. newEdbInfo.CreateTime = value.String()
  210. case "modify_time":
  211. newEdbInfo.ModifyTime = value.String()
  212. case "base_modify_time":
  213. newEdbInfo.BaseModifyTime = value.String()
  214. case "min_value":
  215. newEdbInfo.MinValue = value.Float()
  216. case "max_value":
  217. newEdbInfo.MaxValue = value.Float()
  218. case "latest_date":
  219. newEdbInfo.LatestDate = value.String()
  220. case "latest_value":
  221. newEdbInfo.LatestValue = value.Float()
  222. case "end_value":
  223. newEdbInfo.EndValue = value.Float()
  224. case "data_update_time":
  225. newEdbInfo.DataUpdateTime = value.String()
  226. case "er_data_update_date":
  227. newEdbInfo.ErDataUpdateDate = value.String()
  228. case "sub_source":
  229. newEdbInfo.SubSource = int(value.Int())
  230. default:
  231. continue
  232. }
  233. }
  234. return newEdbInfo
  235. }
  236. // SetBinlogFileName
  237. // @Description: 设置当前的binlog文件名和位置
  238. // @author: Roc
  239. // @receiver h
  240. // @datetime 2024-02-29 18:09:36
  241. // @param fileName string
  242. // @param position uint32
  243. func (h *EdbEventHandler) SetBinlogFileName(fileName string, position uint32) {
  244. h.fileName = fileName
  245. h.position = position
  246. fmt.Println("init fileName:", h.fileName, ";position:", h.position)
  247. }
  248. // DataSourceMapRowToStruct 数据源-binlog转为es结构体
  249. func DataSourceMapRowToStruct(columns []schema.TableColumn, row []interface{}, indexOb dataSourceModel.EsBaseFromIndex) dataSourceModel.SearchDataSource {
  250. item := dataSourceModel.SearchDataSource{}
  251. source, subSource, sourceName := indexOb.SourceInfo()
  252. item.Source = source
  253. item.SubSource = subSource
  254. item.SourceName = sourceName
  255. // 根据不同数据源匹配对应的字段名
  256. indexCols := indexOb.EsCols()
  257. for i, column := range columns {
  258. // 数据无效的话,那么就过滤掉
  259. value := reflect.ValueOf(row[i])
  260. if !value.IsValid() {
  261. continue
  262. }
  263. switch column.Name {
  264. case indexCols.PrimaryId:
  265. if value.Kind() == reflect.Int || value.Kind() == reflect.Int32 || value.Kind() == reflect.Int64 {
  266. item.PrimaryId = int(value.Int())
  267. }
  268. if value.Kind() == reflect.Uint || value.Kind() == reflect.Uint32 || value.Kind() == reflect.Uint64 {
  269. item.PrimaryId = int(value.Uint())
  270. }
  271. case indexCols.IndexCode:
  272. item.IndexCode = value.String()
  273. case indexCols.IndexName:
  274. item.IndexName = value.String()
  275. case indexCols.ClassifyId:
  276. if value.Kind() == reflect.Int || value.Kind() == reflect.Int32 || value.Kind() == reflect.Int64 {
  277. item.ClassifyId = int(value.Int())
  278. }
  279. if value.Kind() == reflect.Uint || value.Kind() == reflect.Uint32 || value.Kind() == reflect.Uint64 {
  280. item.ClassifyId = int(value.Uint())
  281. }
  282. case indexCols.Unit:
  283. item.Unit = value.String()
  284. case indexCols.Frequency:
  285. item.Frequency = value.String()
  286. case indexCols.StartDate:
  287. item.StartDate = value.String()
  288. case indexCols.EndDate:
  289. item.EndDate = value.String()
  290. case indexCols.LatestValue:
  291. if value.Kind() == reflect.String {
  292. item.LatestValue = value.String()
  293. }
  294. if value.Kind() == reflect.Int || value.Kind() == reflect.Int32 || value.Kind() == reflect.Int64 {
  295. item.LatestValue = fmt.Sprint(value.Int())
  296. }
  297. if value.Kind() == reflect.Float64 {
  298. item.LatestValue = fmt.Sprint(value.Float())
  299. }
  300. case indexCols.CreateTime:
  301. item.CreateTime = value.String()
  302. case indexCols.ModifyTime:
  303. item.ModifyTime = value.String()
  304. default:
  305. continue
  306. }
  307. }
  308. return item
  309. }