handler.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. package binlog
  2. import (
  3. dataSourceModel "eta/eta_api/models/data_source"
  4. edbmonitorSvr "eta/eta_api/services/edb_monitor"
  5. "eta/eta_api/utils"
  6. "fmt"
  7. "reflect"
  8. "time"
  9. "github.com/go-mysql-org/go-mysql/canal"
  10. "github.com/go-mysql-org/go-mysql/mysql"
  11. "github.com/go-mysql-org/go-mysql/replication"
  12. "github.com/go-mysql-org/go-mysql/schema"
  13. )
  14. type EdbEventHandler struct {
  15. canal.DummyEventHandler
  16. fileName string
  17. position uint32
  18. }
  19. func (h *EdbEventHandler) OnRow(e *canal.RowsEvent) (err error) {
  20. // 监听逻辑
  21. switch e.Action {
  22. case canal.InsertAction:
  23. err = h.Insert(e)
  24. if err != nil {
  25. utils.FileLog.Info("binlog insert error:", err)
  26. }
  27. case canal.UpdateAction:
  28. err = h.Update(e)
  29. if err != nil {
  30. utils.FileLog.Info("binlog update error:", err)
  31. }
  32. default:
  33. return nil
  34. }
  35. fmt.Println("fileName:", h.fileName, ";position:", h.position)
  36. // 每次操作完成后都将当前位置记录到缓存
  37. utils.Rc.Put(utils.CACHE_MYSQL_DATA_FILENAME, h.fileName, 31*24*time.Hour)
  38. utils.Rc.Put(utils.CACHE_MYSQL_DATA_POSITION, h.position, 31*24*time.Hour)
  39. return nil
  40. }
  41. func (h *EdbEventHandler) OnPosSynced(header *replication.EventHeader, p mysql.Position, set mysql.GTIDSet, f bool) error {
  42. h.fileName = p.Name
  43. h.position = p.Pos
  44. return nil
  45. }
  46. func (h *EdbEventHandler) SyncToRedis() {
  47. for {
  48. // 旋转binlog日志的时候,需要将当前位置记录到缓存
  49. utils.Rc.Put(utils.CACHE_MYSQL_DATA_FILENAME, h.fileName, 31*24*time.Hour)
  50. utils.Rc.Put(utils.CACHE_MYSQL_DATA_POSITION, h.position, 31*24*time.Hour)
  51. time.Sleep(3 * time.Second)
  52. }
  53. }
  54. func (h *EdbEventHandler) String() string {
  55. return "EdbEventHandler"
  56. }
  57. func (h *EdbEventHandler) Insert(e *canal.RowsEvent) error {
  58. // 批量插入的时候,e.Rows的长度会大于0
  59. fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
  60. // 指标库
  61. tableName := e.Table.Name
  62. if tableName == "edb_info" {
  63. for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list)
  64. newEdbInfo := h.MapRowToStruct(e.Table.Columns, row)
  65. if ok := edbmonitorSvr.EdbLocalSet.IsExist(newEdbInfo.EdbInfoId); ok {
  66. err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo)
  67. if err != nil {
  68. return err
  69. }
  70. } else {
  71. ok, err := utils.Rc.SIsMember(edbmonitorSvr.EDB_MONITOR_ID_SET_CACHE, newEdbInfo.EdbInfoId)
  72. if err != nil {
  73. return err
  74. }
  75. if ok {
  76. err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo)
  77. if err != nil {
  78. return err
  79. }
  80. }
  81. }
  82. // if monitors, ok := edbMonitorMap[newEdbInfo.EdbInfoId]; ok {
  83. // for _, monitor := range monitors {
  84. // err = edbmonitorSvr.ModifyEdbMonitorState(monitor, newEdbInfo.EdbCode, newEdbInfo.Source, newEdbInfo.SubSource)
  85. // if err != nil {
  86. // continue
  87. // }
  88. // }
  89. // }
  90. }
  91. return nil
  92. }
  93. // 数据源
  94. indexOb := dataSourceModel.GetEsBaseFromIndexByTableName(tableName)
  95. if indexOb == nil {
  96. return fmt.Errorf("数据表无对应数据源: %s", tableName)
  97. }
  98. for _, row := range e.Rows {
  99. indexItem := DataSourceMapRowToStruct(e.Table.Columns, row, indexOb)
  100. // 写入队列(此处无需做去重处理)
  101. if e := utils.Rc.LPush(utils.CACHE_DATA_SOURCE_ES_HANDLE, indexItem); e != nil {
  102. return fmt.Errorf("写入redis队列失败, %v", e)
  103. }
  104. }
  105. return nil
  106. }
  107. func (h *EdbEventHandler) Update(e *canal.RowsEvent) error {
  108. tableName := e.Table.Name
  109. // 指标库
  110. lenRows := len(e.Rows)
  111. if tableName == "edb_info" {
  112. //if len(e.Rows) != 2 {
  113. // fmt.Println("更新数据异常,没有原始数据和新数据:", e.Rows)
  114. // return nil
  115. //}
  116. // 由于UPDATE语句影响行数超过1时e.Rows长度会大于2,所以此处遍历处理
  117. for i := 0; i < lenRows; i += 2 {
  118. if i+1 >= lenRows {
  119. continue
  120. }
  121. oldEdbInfo := h.MapRowToStruct(e.Table.Columns, e.Rows[i])
  122. newEdbInfo := h.MapRowToStruct(e.Table.Columns, e.Rows[i+1])
  123. if oldEdbInfo.EndValue != newEdbInfo.EndValue {
  124. if ok := edbmonitorSvr.EdbLocalSet.IsExist(newEdbInfo.EdbInfoId); ok {
  125. err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo)
  126. if err != nil {
  127. return err
  128. }
  129. } else {
  130. ok, err := utils.Rc.SIsMember(edbmonitorSvr.EDB_MONITOR_ID_SET_CACHE, newEdbInfo.EdbInfoId)
  131. if err != nil {
  132. return err
  133. }
  134. if ok {
  135. err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, newEdbInfo)
  136. if err != nil {
  137. return err
  138. }
  139. }
  140. }
  141. }
  142. }
  143. // if monitors, ok := edbMonitorMap[newEdbInfo.EdbInfoId]; ok {
  144. // for _, monitor := range monitors {
  145. // err = edbmonitorSvr.ModifyEdbMonitorState(monitor, newEdbInfo.EdbCode, newEdbInfo.Source, newEdbInfo.SubSource)
  146. // if err != nil {
  147. // continue
  148. // }
  149. // }
  150. // }
  151. return nil
  152. }
  153. // 数据源
  154. indexOb := dataSourceModel.GetEsBaseFromIndexByTableName(tableName)
  155. if indexOb == nil {
  156. return fmt.Errorf("数据表无对应数据源: %s", tableName)
  157. }
  158. for i := 0; i < lenRows; i += 2 {
  159. if i+1 >= lenRows {
  160. continue
  161. }
  162. // 这里只取[i+1]即UPDATE后的数据
  163. indexItem := DataSourceMapRowToStruct(e.Table.Columns, e.Rows[i+1], indexOb)
  164. if e := utils.Rc.LPush(utils.CACHE_DATA_SOURCE_ES_HANDLE, indexItem); e != nil {
  165. utils.FileLog.Info(fmt.Sprintf("binlog update data source lpush err: %v", e))
  166. continue
  167. }
  168. }
  169. return nil
  170. }
  171. func (h *EdbEventHandler) MapRowToStruct(columns []schema.TableColumn, row []interface{}) edbmonitorSvr.EdbInfoBingLog {
  172. newEdbInfo := edbmonitorSvr.EdbInfoBingLog{}
  173. for i, column := range columns {
  174. value := reflect.ValueOf(row[i])
  175. switch column.Name {
  176. case "edb_info_id":
  177. newEdbInfo.EdbInfoId = int(value.Int())
  178. case "edb_info_type":
  179. newEdbInfo.EdbInfoType = int(value.Uint())
  180. case "source":
  181. newEdbInfo.Source = int(value.Int())
  182. case "edb_code":
  183. newEdbInfo.EdbCode = value.String()
  184. case "start_date":
  185. newEdbInfo.StartDate = value.String()
  186. case "end_date":
  187. newEdbInfo.EndDate = value.String()
  188. case "unique_code":
  189. newEdbInfo.UniqueCode = value.String()
  190. case "create_time":
  191. newEdbInfo.CreateTime = value.String()
  192. case "modify_time":
  193. newEdbInfo.ModifyTime = value.String()
  194. case "base_modify_time":
  195. if value.IsValid() {
  196. newEdbInfo.BaseModifyTime = value.String()
  197. }
  198. case "min_value":
  199. newEdbInfo.MinValue = value.Float()
  200. case "max_value":
  201. newEdbInfo.MaxValue = value.Float()
  202. case "latest_date":
  203. newEdbInfo.LatestDate = value.String()
  204. case "latest_value":
  205. newEdbInfo.LatestValue = value.Float()
  206. case "end_value":
  207. newEdbInfo.EndValue = value.Float()
  208. case "data_update_time":
  209. newEdbInfo.DataUpdateTime = value.String()
  210. case "er_data_update_date":
  211. newEdbInfo.ErDataUpdateDate = value.String()
  212. case "sub_source":
  213. newEdbInfo.SubSource = int(value.Int())
  214. default:
  215. continue
  216. }
  217. }
  218. return newEdbInfo
  219. }
  220. // SetBinlogFileName
  221. // @Description: 设置当前的binlog文件名和位置
  222. // @author: Roc
  223. // @receiver h
  224. // @datetime 2024-02-29 18:09:36
  225. // @param fileName string
  226. // @param position uint32
  227. func (h *EdbEventHandler) SetBinlogFileName(fileName string, position uint32) {
  228. h.fileName = fileName
  229. h.position = position
  230. fmt.Println("init fileName:", h.fileName, ";position:", h.position)
  231. }
  232. // DataSourceMapRowToStruct 数据源-binlog转为es结构体
  233. func DataSourceMapRowToStruct(columns []schema.TableColumn, row []interface{}, indexOb dataSourceModel.EsBaseFromIndex) dataSourceModel.SearchDataSource {
  234. item := dataSourceModel.SearchDataSource{}
  235. source, subSource, sourceName := indexOb.SourceInfo()
  236. item.Source = source
  237. item.SubSource = subSource
  238. item.SourceName = sourceName
  239. // 根据不同数据源匹配对应的字段名
  240. indexCols := indexOb.EsCols()
  241. for i, column := range columns {
  242. value := reflect.ValueOf(row[i])
  243. switch column.Name {
  244. case indexCols.PrimaryId:
  245. if !value.IsValid() {
  246. continue
  247. }
  248. if value.Kind() == reflect.Int || value.Kind() == reflect.Int32 || value.Kind() == reflect.Int64 {
  249. item.PrimaryId = int(value.Int())
  250. }
  251. if value.Kind() == reflect.Uint || value.Kind() == reflect.Uint32 || value.Kind() == reflect.Uint64 {
  252. item.PrimaryId = int(value.Uint())
  253. }
  254. case indexCols.IndexCode:
  255. if value.IsValid() {
  256. item.IndexCode = value.String()
  257. }
  258. case indexCols.IndexName:
  259. if value.IsValid() {
  260. item.IndexName = value.String()
  261. }
  262. case indexCols.ClassifyId:
  263. if !value.IsValid() {
  264. continue
  265. }
  266. if value.Kind() == reflect.Int || value.Kind() == reflect.Int32 || value.Kind() == reflect.Int64 {
  267. item.ClassifyId = int(value.Int())
  268. }
  269. if value.Kind() == reflect.Uint || value.Kind() == reflect.Uint32 || value.Kind() == reflect.Uint64 {
  270. item.ClassifyId = int(value.Uint())
  271. }
  272. case indexCols.Unit:
  273. if value.IsValid() {
  274. item.Unit = value.String()
  275. }
  276. case indexCols.Frequency:
  277. if value.IsValid() {
  278. item.Frequency = value.String()
  279. }
  280. case indexCols.StartDate:
  281. if value.IsValid() {
  282. item.StartDate = value.String()
  283. }
  284. case indexCols.EndDate:
  285. if value.IsValid() {
  286. item.EndDate = value.String()
  287. }
  288. case indexCols.LatestValue:
  289. if !value.IsValid() {
  290. continue
  291. }
  292. if value.Kind() == reflect.String {
  293. item.LatestValue = value.String()
  294. }
  295. if value.Kind() == reflect.Int || value.Kind() == reflect.Int32 || value.Kind() == reflect.Int64 {
  296. item.LatestValue = fmt.Sprint(value.Int())
  297. }
  298. if value.Kind() == reflect.Float64 {
  299. item.LatestValue = fmt.Sprint(value.Float())
  300. }
  301. case indexCols.CreateTime:
  302. if value.IsValid() {
  303. item.CreateTime = value.String()
  304. }
  305. case indexCols.ModifyTime:
  306. if value.IsValid() {
  307. item.ModifyTime = value.String()
  308. }
  309. default:
  310. continue
  311. }
  312. }
  313. return item
  314. }