handler.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. package binlog
  2. import (
  3. dataSourceModel "eta/eta_api/models/data_source"
  4. edbmonitorSvr "eta/eta_api/services/edb_monitor"
  5. "eta/eta_api/utils"
  6. "fmt"
  7. "reflect"
  8. "time"
  9. "github.com/go-mysql-org/go-mysql/canal"
  10. "github.com/go-mysql-org/go-mysql/mysql"
  11. "github.com/go-mysql-org/go-mysql/replication"
  12. "github.com/go-mysql-org/go-mysql/schema"
  13. )
  14. type EdbEventHandler struct {
  15. canal.DummyEventHandler
  16. fileName string
  17. position uint32
  18. }
  19. func (h *EdbEventHandler) OnRow(e *canal.RowsEvent) (err error) {
  20. // 监听逻辑
  21. switch e.Action {
  22. case canal.InsertAction:
  23. err = h.Insert(e)
  24. if err != nil {
  25. utils.FileLog.Info("binlog insert error:", err)
  26. }
  27. case canal.UpdateAction:
  28. err = h.Update(e)
  29. if err != nil {
  30. utils.FileLog.Info("binlog update error:", err)
  31. }
  32. default:
  33. return nil
  34. }
  35. fmt.Println("fileName:", h.fileName, ";position:", h.position)
  36. // 每次操作完成后都将当前位置记录到缓存
  37. utils.Rc.Put(utils.CACHE_MYSQL_DATA_FILENAME, h.fileName, 31*24*time.Hour)
  38. utils.Rc.Put(utils.CACHE_MYSQL_DATA_POSITION, h.position, 31*24*time.Hour)
  39. return nil
  40. }
  41. func (h *EdbEventHandler) OnPosSynced(header *replication.EventHeader, p mysql.Position, set mysql.GTIDSet, f bool) error {
  42. h.fileName = p.Name
  43. h.position = p.Pos
  44. return nil
  45. }
  46. func (h *EdbEventHandler) SyncToRedis() {
  47. for {
  48. // 旋转binlog日志的时候,需要将当前位置记录到缓存
  49. utils.Rc.Put(utils.CACHE_MYSQL_DATA_FILENAME, h.fileName, 31*24*time.Hour)
  50. utils.Rc.Put(utils.CACHE_MYSQL_DATA_POSITION, h.position, 31*24*time.Hour)
  51. time.Sleep(3 * time.Second)
  52. }
  53. }
  54. func (h *EdbEventHandler) String() string {
  55. return "EdbEventHandler"
  56. }
  57. func (h *EdbEventHandler) Insert(e *canal.RowsEvent) error {
  58. // 批量插入的时候,e.Rows的长度会大于0
  59. fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
  60. // 指标库
  61. tableName := e.Table.Name
  62. if tableName == "edb_info" {
  63. for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list)
  64. newEdbInfo := h.MapRowToStruct(e.Table.Columns, row)
  65. if ok := edbmonitorSvr.EdbLocalSet.IsExist(newEdbInfo.EdbInfoId); ok {
  66. err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo)
  67. if err != nil {
  68. return err
  69. }
  70. } else {
  71. ok, err := utils.Rc.SIsMember(edbmonitorSvr.EDB_MONITOR_ID_SET_CACHE, newEdbInfo.EdbInfoId)
  72. if err != nil {
  73. return err
  74. }
  75. if ok {
  76. err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo)
  77. if err != nil {
  78. return err
  79. }
  80. }
  81. }
  82. // if monitors, ok := edbMonitorMap[newEdbInfo.EdbInfoId]; ok {
  83. // for _, monitor := range monitors {
  84. // err = edbmonitorSvr.ModifyEdbMonitorState(monitor, newEdbInfo.EdbCode, newEdbInfo.Source, newEdbInfo.SubSource)
  85. // if err != nil {
  86. // continue
  87. // }
  88. // }
  89. // }
  90. }
  91. return nil
  92. }
  93. // 数据源
  94. indexOb := dataSourceModel.GetEsBaseFromIndexByTableName(tableName)
  95. if indexOb == nil {
  96. return fmt.Errorf("数据表无对应数据源: %s", tableName)
  97. }
  98. for _, row := range e.Rows {
  99. indexItem := DataSourceMapRowToStruct(e.Table.Columns, row, indexOb)
  100. // 写入队列(此处无需做去重处理)
  101. if e := utils.Rc.LPush(utils.CACHE_DATA_SOURCE_ES_HANDLE, indexItem); e != nil {
  102. return fmt.Errorf("写入redis队列失败, %v", e)
  103. }
  104. }
  105. return nil
  106. }
  107. func (h *EdbEventHandler) Update(e *canal.RowsEvent) error {
  108. tableName := e.Table.Name
  109. // 指标库
  110. lenRows := len(e.Rows)
  111. if tableName == "edb_info" {
  112. //if len(e.Rows) != 2 {
  113. // fmt.Println("更新数据异常,没有原始数据和新数据:", e.Rows)
  114. // return nil
  115. //}
  116. // 由于UPDATE语句影响行数超过1时e.Rows长度会大于2,所以此处遍历处理
  117. for i := 0; i < lenRows; i += 2 {
  118. if i+1 >= lenRows {
  119. continue
  120. }
  121. oldEdbInfo := h.MapRowToStruct(e.Table.Columns, e.Rows[i])
  122. newEdbInfo := h.MapRowToStruct(e.Table.Columns, e.Rows[i+1])
  123. if oldEdbInfo.EndValue != newEdbInfo.EndValue {
  124. if ok := edbmonitorSvr.EdbLocalSet.IsExist(newEdbInfo.EdbInfoId); ok {
  125. err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo)
  126. if err != nil {
  127. return err
  128. }
  129. } else {
  130. ok, err := utils.Rc.SIsMember(edbmonitorSvr.EDB_MONITOR_ID_SET_CACHE, newEdbInfo.EdbInfoId)
  131. if err != nil {
  132. return err
  133. }
  134. if ok {
  135. err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo)
  136. if err != nil {
  137. return err
  138. }
  139. }
  140. }
  141. }
  142. }
  143. // if monitors, ok := edbMonitorMap[newEdbInfo.EdbInfoId]; ok {
  144. // for _, monitor := range monitors {
  145. // err = edbmonitorSvr.ModifyEdbMonitorState(monitor, newEdbInfo.EdbCode, newEdbInfo.Source, newEdbInfo.SubSource)
  146. // if err != nil {
  147. // continue
  148. // }
  149. // }
  150. // }
  151. return nil
  152. }
  153. // 数据源
  154. indexOb := dataSourceModel.GetEsBaseFromIndexByTableName(tableName)
  155. if indexOb == nil {
  156. return fmt.Errorf("数据表无对应数据源: %s", tableName)
  157. }
  158. for i := 0; i < lenRows; i += 2 {
  159. if i+1 >= lenRows {
  160. continue
  161. }
  162. // 这里只取[i+1]即UPDATE后的数据
  163. indexItem := DataSourceMapRowToStruct(e.Table.Columns, e.Rows[i+1], indexOb)
  164. if e := utils.Rc.LPush(utils.CACHE_DATA_SOURCE_ES_HANDLE, indexItem); e != nil {
  165. utils.FileLog.Info(fmt.Sprintf("binlog update data source lpush err: %v", e))
  166. continue
  167. }
  168. }
  169. return nil
  170. }
  171. func (h *EdbEventHandler) MapRowToStruct(columns []schema.TableColumn, row []interface{}) edbmonitorSvr.EdbInfoBingLog {
  172. defer func() {
  173. if r := recover(); r != nil {
  174. utils.FileLog.Info(fmt.Sprintf("binlog update data source panic;data:%v; err: %v", columns, r))
  175. }
  176. }()
  177. newEdbInfo := edbmonitorSvr.EdbInfoBingLog{}
  178. for i, column := range columns {
  179. value := reflect.ValueOf(row[i])
  180. // 数据无效的话,那么就过滤掉
  181. if !value.IsValid() {
  182. continue
  183. }
  184. switch column.Name {
  185. case "edb_info_id":
  186. newEdbInfo.EdbInfoId = int(value.Int())
  187. case "edb_info_type":
  188. newEdbInfo.EdbInfoType = int(value.Uint())
  189. case "source":
  190. newEdbInfo.Source = int(value.Int())
  191. case "edb_code":
  192. newEdbInfo.EdbCode = value.String()
  193. case "start_date":
  194. newEdbInfo.StartDate = value.String()
  195. case "end_date":
  196. newEdbInfo.EndDate = value.String()
  197. case "unique_code":
  198. newEdbInfo.UniqueCode = value.String()
  199. case "create_time":
  200. newEdbInfo.CreateTime = value.String()
  201. case "modify_time":
  202. newEdbInfo.ModifyTime = value.String()
  203. case "base_modify_time":
  204. if value.IsValid() {
  205. newEdbInfo.BaseModifyTime = value.String()
  206. }
  207. case "min_value":
  208. newEdbInfo.MinValue = value.Float()
  209. case "max_value":
  210. newEdbInfo.MaxValue = value.Float()
  211. case "latest_date":
  212. newEdbInfo.LatestDate = value.String()
  213. case "latest_value":
  214. newEdbInfo.LatestValue = value.Float()
  215. case "end_value":
  216. newEdbInfo.EndValue = value.Float()
  217. case "data_update_time":
  218. newEdbInfo.DataUpdateTime = value.String()
  219. case "er_data_update_date":
  220. newEdbInfo.ErDataUpdateDate = value.String()
  221. case "sub_source":
  222. newEdbInfo.SubSource = int(value.Int())
  223. default:
  224. continue
  225. }
  226. }
  227. return newEdbInfo
  228. }
  229. // SetBinlogFileName
  230. // @Description: 设置当前的binlog文件名和位置
  231. // @author: Roc
  232. // @receiver h
  233. // @datetime 2024-02-29 18:09:36
  234. // @param fileName string
  235. // @param position uint32
  236. func (h *EdbEventHandler) SetBinlogFileName(fileName string, position uint32) {
  237. h.fileName = fileName
  238. h.position = position
  239. fmt.Println("init fileName:", h.fileName, ";position:", h.position)
  240. }
  241. // DataSourceMapRowToStruct 数据源-binlog转为es结构体
  242. func DataSourceMapRowToStruct(columns []schema.TableColumn, row []interface{}, indexOb dataSourceModel.EsBaseFromIndex) dataSourceModel.SearchDataSource {
  243. item := dataSourceModel.SearchDataSource{}
  244. source, subSource, sourceName := indexOb.SourceInfo()
  245. item.Source = source
  246. item.SubSource = subSource
  247. item.SourceName = sourceName
  248. // 根据不同数据源匹配对应的字段名
  249. indexCols := indexOb.EsCols()
  250. for i, column := range columns {
  251. value := reflect.ValueOf(row[i])
  252. switch column.Name {
  253. case indexCols.PrimaryId:
  254. if !value.IsValid() {
  255. continue
  256. }
  257. if value.Kind() == reflect.Int || value.Kind() == reflect.Int32 || value.Kind() == reflect.Int64 {
  258. item.PrimaryId = int(value.Int())
  259. }
  260. if value.Kind() == reflect.Uint || value.Kind() == reflect.Uint32 || value.Kind() == reflect.Uint64 {
  261. item.PrimaryId = int(value.Uint())
  262. }
  263. case indexCols.IndexCode:
  264. if value.IsValid() {
  265. item.IndexCode = value.String()
  266. }
  267. case indexCols.IndexName:
  268. if value.IsValid() {
  269. item.IndexName = value.String()
  270. }
  271. case indexCols.ClassifyId:
  272. if !value.IsValid() {
  273. continue
  274. }
  275. if value.Kind() == reflect.Int || value.Kind() == reflect.Int32 || value.Kind() == reflect.Int64 {
  276. item.ClassifyId = int(value.Int())
  277. }
  278. if value.Kind() == reflect.Uint || value.Kind() == reflect.Uint32 || value.Kind() == reflect.Uint64 {
  279. item.ClassifyId = int(value.Uint())
  280. }
  281. case indexCols.Unit:
  282. if value.IsValid() {
  283. item.Unit = value.String()
  284. }
  285. case indexCols.Frequency:
  286. if value.IsValid() {
  287. item.Frequency = value.String()
  288. }
  289. case indexCols.StartDate:
  290. if value.IsValid() {
  291. item.StartDate = value.String()
  292. }
  293. case indexCols.EndDate:
  294. if value.IsValid() {
  295. item.EndDate = value.String()
  296. }
  297. case indexCols.LatestValue:
  298. if !value.IsValid() {
  299. continue
  300. }
  301. if value.Kind() == reflect.String {
  302. item.LatestValue = value.String()
  303. }
  304. if value.Kind() == reflect.Int || value.Kind() == reflect.Int32 || value.Kind() == reflect.Int64 {
  305. item.LatestValue = fmt.Sprint(value.Int())
  306. }
  307. if value.Kind() == reflect.Float64 {
  308. item.LatestValue = fmt.Sprint(value.Float())
  309. }
  310. case indexCols.CreateTime:
  311. if value.IsValid() {
  312. item.CreateTime = value.String()
  313. }
  314. case indexCols.ModifyTime:
  315. if value.IsValid() {
  316. item.ModifyTime = value.String()
  317. }
  318. default:
  319. continue
  320. }
  321. }
  322. return item
  323. }