handler.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. package binlog
  2. import (
  3. edbmonitor "eta/eta_api/models/edb_monitor"
  4. edbmonitorSvr "eta/eta_api/services/edb_monitor"
  5. "eta/eta_api/utils"
  6. "fmt"
  7. "reflect"
  8. "time"
  9. "github.com/go-mysql-org/go-mysql/canal"
  10. "github.com/go-mysql-org/go-mysql/mysql"
  11. "github.com/go-mysql-org/go-mysql/replication"
  12. "github.com/go-mysql-org/go-mysql/schema"
  13. )
  14. type EdbInfoBingLog struct {
  15. EdbInfoId int `orm:"column(edb_info_id);pk"`
  16. EdbInfoType int `description:"指标类型,0:普通指标,1:预测指标"`
  17. Source int `description:"来源id"`
  18. EdbCode string `description:"指标编码"`
  19. StartDate string `description:"起始日期"`
  20. EndDate string `description:"终止日期"`
  21. UniqueCode string `description:"指标唯一编码"`
  22. CreateTime string
  23. ModifyTime string
  24. BaseModifyTime string
  25. MinValue float64 `description:"指标最小值"`
  26. MaxValue float64 `description:"指标最大值"`
  27. LatestDate string `description:"数据最新日期(实际日期)"`
  28. LatestValue float64 `description:"数据最新值(实际值)"`
  29. EndValue float64 `description:"数据的最新值(预测日期的最新值)"`
  30. DataUpdateTime string `description:"最近一次数据发生变化的时间"`
  31. ErDataUpdateDate string `description:"本次更新,数据发生变化的最早日期"`
  32. SubSource int `description:"子数据来源:0:经济数据库,1:日期序列"`
  33. }
  34. type EdbEventHandler struct {
  35. canal.DummyEventHandler
  36. fileName string
  37. position uint32
  38. }
  39. func (h *EdbEventHandler) OnRow(e *canal.RowsEvent) (err error) {
  40. // 监听逻辑
  41. switch e.Action {
  42. case canal.InsertAction:
  43. err = h.Insert(e)
  44. if err != nil {
  45. utils.FileLog.Info("binlog insert error:", err)
  46. }
  47. case canal.UpdateAction:
  48. err = h.Update(e)
  49. if err != nil {
  50. utils.FileLog.Info("binlog update error:", err)
  51. }
  52. default:
  53. return nil
  54. }
  55. fmt.Println("fileName:", h.fileName, ";position:", h.position)
  56. // 每次操作完成后都将当前位置记录到缓存
  57. utils.Rc.Put(utils.CACHE_MYSQL_DATA_FILENAME, h.fileName, 31*24*time.Hour)
  58. utils.Rc.Put(utils.CACHE_MYSQL_DATA_POSITION, h.position, 31*24*time.Hour)
  59. return nil
  60. }
  61. func (h *EdbEventHandler) OnPosSynced(header *replication.EventHeader, p mysql.Position, set mysql.GTIDSet, f bool) error {
  62. h.fileName = p.Name
  63. h.position = p.Pos
  64. // 旋转binlog日志的时候,需要将当前位置记录到缓存
  65. utils.Rc.Put(utils.CACHE_MYSQL_DATA_FILENAME, h.fileName, 31*24*time.Hour)
  66. utils.Rc.Put(utils.CACHE_MYSQL_DATA_POSITION, h.position, 31*24*time.Hour)
  67. return nil
  68. }
  69. func (h *EdbEventHandler) String() string {
  70. return "EdbEventHandler"
  71. }
  72. func (h *EdbEventHandler) Insert(e *canal.RowsEvent) error {
  73. // 批量插入的时候,e.Rows的长度会大于0
  74. fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
  75. edbMonitorList, err := edbmonitor.GetEdbMonitorEdbInfoList()
  76. if err != nil {
  77. return err
  78. }
  79. edbMonitorMap := make(map[int][]*edbmonitor.EdbMonitorInfo)
  80. for _, v := range edbMonitorList {
  81. if _, ok := edbMonitorMap[v.EdbInfoId]; !ok {
  82. edbMonitorMap[v.EdbInfoId] = make([]*edbmonitor.EdbMonitorInfo, 0)
  83. }
  84. edbMonitorMap[v.EdbInfoId] = append(edbMonitorMap[v.EdbInfoId], v)
  85. }
  86. for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list)
  87. edbInfo := h.MapRowToStruct(e.Table.Columns, row)
  88. if monitors, ok := edbMonitorMap[edbInfo.EdbInfoId]; ok {
  89. for _, monitor := range monitors {
  90. err = edbmonitorSvr.ModifyEdbMonitorState(monitor, edbInfo.EdbCode, edbInfo.Source, edbInfo.SubSource)
  91. if err != nil {
  92. continue
  93. }
  94. }
  95. }
  96. }
  97. return nil
  98. }
  99. func (h *EdbEventHandler) Update(e *canal.RowsEvent) error {
  100. if len(e.Rows) != 2 {
  101. fmt.Println("更新数据异常,没有原始数据和新数据:", e.Rows)
  102. return nil
  103. }
  104. edbMonitorList, err := edbmonitor.GetEdbMonitorEdbInfoList()
  105. if err != nil {
  106. return err
  107. }
  108. edbMonitorMap := make(map[int][]*edbmonitor.EdbMonitorInfo)
  109. for _, v := range edbMonitorList {
  110. if _, ok := edbMonitorMap[v.EdbInfoId]; !ok {
  111. edbMonitorMap[v.EdbInfoId] = make([]*edbmonitor.EdbMonitorInfo, 0)
  112. }
  113. edbMonitorMap[v.EdbInfoId] = append(edbMonitorMap[v.EdbInfoId], v)
  114. }
  115. edbInfo := h.MapRowToStruct(e.Table.Columns, e.Rows[1])
  116. if monitors, ok := edbMonitorMap[edbInfo.EdbInfoId]; ok {
  117. for _, monitor := range monitors {
  118. err = edbmonitorSvr.ModifyEdbMonitorState(monitor, edbInfo.EdbCode, edbInfo.Source, edbInfo.SubSource)
  119. if err != nil {
  120. continue
  121. }
  122. }
  123. }
  124. return nil
  125. }
  126. func (h *EdbEventHandler) MapRowToStruct(columns []schema.TableColumn, row []interface{}) EdbInfoBingLog {
  127. edbInfo := EdbInfoBingLog{}
  128. for i, column := range columns {
  129. value := reflect.ValueOf(row[i])
  130. switch column.Name {
  131. case "edb_info_id":
  132. edbInfo.EdbInfoId = int(value.Int())
  133. case "edb_info_type":
  134. edbInfo.EdbInfoType = int(value.Uint())
  135. case "source":
  136. edbInfo.Source = int(value.Int())
  137. case "edb_code":
  138. edbInfo.EdbCode = value.String()
  139. case "start_date":
  140. edbInfo.StartDate = value.String()
  141. case "end_date":
  142. edbInfo.EndDate = value.String()
  143. case "unique_code":
  144. edbInfo.UniqueCode = value.String()
  145. case "create_time":
  146. edbInfo.CreateTime = value.String()
  147. case "modify_time":
  148. edbInfo.ModifyTime = value.String()
  149. case "base_modify_time":
  150. if value.IsValid() {
  151. edbInfo.BaseModifyTime = value.String()
  152. }
  153. case "min_value":
  154. edbInfo.MinValue = value.Float()
  155. case "max_value":
  156. edbInfo.MaxValue = value.Float()
  157. case "latest_date":
  158. edbInfo.LatestDate = value.String()
  159. case "latest_value":
  160. edbInfo.LatestValue = value.Float()
  161. case "end_value":
  162. edbInfo.EndValue = value.Float()
  163. case "data_update_time":
  164. edbInfo.DataUpdateTime = value.String()
  165. case "er_data_update_date":
  166. edbInfo.ErDataUpdateDate = value.String()
  167. case "sub_source":
  168. edbInfo.SubSource = int(value.Int())
  169. default:
  170. continue
  171. }
  172. }
  173. return edbInfo
  174. }
  175. // SetBinlogFileName
  176. // @Description: 设置当前的binlog文件名和位置
  177. // @author: Roc
  178. // @receiver h
  179. // @datetime 2024-02-29 18:09:36
  180. // @param fileName string
  181. // @param position uint32
  182. func (h *EdbEventHandler) SetBinlogFileName(fileName string, position uint32) {
  183. h.fileName = fileName
  184. h.position = position
  185. fmt.Println("init fileName:", h.fileName, ";position:", h.position)
  186. }