binlog.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. package binlog
  2. import (
  3. "eta/eta_api/models/binlog"
  4. "eta/eta_api/utils"
  5. "fmt"
  6. "math/rand"
  7. "strconv"
  8. "time"
  9. "github.com/go-mysql-org/go-mysql/canal"
  10. "github.com/go-mysql-org/go-mysql/mysql"
  11. _ "github.com/go-sql-driver/mysql"
  12. )
  13. func ListenMysql() {
  14. var err error
  15. defer func() {
  16. if err != nil {
  17. tips := fmt.Sprintf("ListenMysql-数据库监听服务异常, %v", err)
  18. fmt.Println(tips)
  19. utils.FileLog.Info(tips)
  20. }
  21. utils.FileLog.Info("ListenMysql end")
  22. }()
  23. if utils.MYSQL_DATA_BINLOG_URL == "" {
  24. err = fmt.Errorf("mysql url is empty")
  25. return
  26. }
  27. if utils.MYSQL_DATA_BINLOG_USER == "" {
  28. err = fmt.Errorf("mysql user is empty")
  29. return
  30. }
  31. if utils.MYSQL_DATA_BINLOG_PWD == "" {
  32. err = fmt.Errorf("mysql password is empty")
  33. return
  34. }
  35. if utils.MYSQL_DATA_BINLOG_DB == "" {
  36. err = fmt.Errorf("mysql db is empty")
  37. return
  38. }
  39. includeTableRegex := []string{
  40. utils.MYSQL_DATA_BINLOG_DB + ".edb_info$",
  41. // 数据源
  42. utils.MYSQL_DATA_BINLOG_DB + ".base_from_rzd_index$",
  43. utils.MYSQL_DATA_BINLOG_DB + ".base_from_hisugar_index$",
  44. utils.MYSQL_DATA_BINLOG_DB + ".base_from_ly_index$",
  45. utils.MYSQL_DATA_BINLOG_DB + ".base_from_sci_hq_index$",
  46. utils.MYSQL_DATA_BINLOG_DB + ".base_from_ths_hf_index$",
  47. utils.MYSQL_DATA_BINLOG_DB + ".base_from_oilchem_index$",
  48. utils.MYSQL_DATA_BINLOG_DB + ".base_from_ccf_index$",
  49. utils.MYSQL_DATA_BINLOG_DB + ".base_from_mysteel_chemical_index$",
  50. utils.MYSQL_DATA_BINLOG_DB + ".base_from_smm_index$",
  51. utils.MYSQL_DATA_BINLOG_DB + ".base_from_baiinfo_index$",
  52. utils.MYSQL_DATA_BINLOG_DB + ".base_from_sci_index$",
  53. utils.MYSQL_DATA_BINLOG_DB + ".base_from_coalmine_mapping$",
  54. utils.MYSQL_DATA_BINLOG_DB + ".base_from_eia_steo_index$",
  55. utils.MYSQL_DATA_BINLOG_DB + ".base_from_icpi_index$",
  56. utils.MYSQL_DATA_BINLOG_DB + ".base_from_yongyi_index$",
  57. utils.MYSQL_DATA_BINLOG_DB + ".base_from_fenwei_index$",
  58. utils.MYSQL_DATA_BINLOG_DB + ".base_from_sci99_index$",
  59. utils.MYSQL_DATA_BINLOG_DB + ".base_from_bloomberg_index$",
  60. utils.MYSQL_DATA_BINLOG_DB + ".base_from_mtjh_mapping$",
  61. utils.MYSQL_DATA_BINLOG_DB + ".base_from_usda_fas_index$",
  62. utils.MYSQL_DATA_BINLOG_DB + ".base_from_business_index$",
  63. }
  64. // 监听手工指标库和钢联库
  65. if utils.MYSQL_DATA_BINLOG_DB_EDB != "" {
  66. includeTableRegex = append(includeTableRegex, utils.MYSQL_DATA_BINLOG_DB_EDB+".edbinfo$")
  67. }
  68. if utils.MYSQL_DATA_BINLOG_DB_GL != "" {
  69. includeTableRegex = append(includeTableRegex, utils.MYSQL_DATA_BINLOG_DB_GL+".mb_index_main_info$")
  70. }
  71. // 主从复制的身份id配置,必须全局唯一,如果没有配置的话,那么会随机生成一个
  72. var serverId uint32
  73. if utils.MYSQL_DATA_BINLOG_SERVER_ID != "" {
  74. id, _ := strconv.ParseUint(utils.MYSQL_DATA_BINLOG_SERVER_ID, 10, 32)
  75. serverId = uint32(id)
  76. }
  77. if serverId == 0 {
  78. serverId = uint32(rand.New(rand.NewSource(time.Now().Unix())).Intn(1000)) + 1001
  79. }
  80. cfg := &canal.Config{
  81. // 一个32位无符号整数,用于标识当前 Canal 实例在 MySQL 主从复制体系中的身份。这里使用了一个随机数生成器确保每次启动时分配的 ServerID 是唯一的(在1001到1099之间)。在实际生产环境中,你需要手动指定一个全局唯一的 ServerID。
  82. ServerID: serverId,
  83. // 指定 Canal 要连接的数据库类型,默认为 "mysql",表明这是一个 MySQL 数据库。
  84. Flavor: "mysql",
  85. // 设置 MySQL 服务器地址(主机名或 IP 地址)和端口,例如 "127.0.0.1:3306"。
  86. Addr: utils.MYSQL_DATA_BINLOG_URL,
  87. User: utils.MYSQL_DATA_BINLOG_USER,
  88. Password: utils.MYSQL_DATA_BINLOG_PWD,
  89. // 如果设置为 true,Canal 将以原始二进制格式获取 binlog,否则将以解析后的 SQL 语句形式提供。
  90. //RawModeEnabled: false,
  91. // 是否启用半同步复制。当设置为 true 时,MySQL 主库在事务提交后会等待至少一个从库确认已接收并写入 binlog 才返回成功,提高了数据一致性。
  92. SemiSyncEnabled: false,
  93. // 是否将 MySQL 中的 decimal 类型字段解析为 Go 的 Decimal 类型,而不是 float 或者 string。如果业务中有精确小数计算的需求,应开启此选项以避免精度丢失问题。
  94. UseDecimal: true,
  95. // 用于控制初始数据导出的相关配置,在 Canal 启动时是否需要全量同步表数据。
  96. //Dump: dumpConf,
  97. // 正则表达式字符串,用于定义 Canal 应该监听哪些表的 binlog 事件。只有名称匹配该正则表达式的表才会被 Canal 同步处理。
  98. IncludeTableRegex: includeTableRegex,
  99. }
  100. // 校验mysql binlog format,目前仅支持row格式
  101. binlogFormat, e := binlog.GetBinlogFormat()
  102. if e != nil {
  103. err = fmt.Errorf("get binlog format err: %v", e)
  104. return
  105. }
  106. if binlogFormat.Value != "ROW" {
  107. err = fmt.Errorf("mysql binlog format is not ROW")
  108. return
  109. }
  110. // 获取上一次启动时的binlog文件名称和位置
  111. fileName, position, e := getBinlogNamePosition()
  112. if e != nil {
  113. err = fmt.Errorf("获取binlog文件名称和位置失败, %v", e)
  114. return
  115. }
  116. // 修改记录本次启动时的binlog文件名称和位置
  117. modifyBinlogNamePosition(fileName, position)
  118. c, e := canal.NewCanal(cfg)
  119. if e != nil {
  120. err = fmt.Errorf("new canal err: %v", e)
  121. return
  122. }
  123. utils.FileLog.Debug(fmt.Sprintf("记录上一次启动时的fileName: %s, position: %d", fileName, position))
  124. binlogHandler := &EdbEventHandler{}
  125. binlogHandler.SetBinlogFileName(fileName, position)
  126. c.SetEventHandler(binlogHandler)
  127. //c.Run()
  128. pos := mysql.Position{
  129. Name: fileName,
  130. Pos: position,
  131. }
  132. if e = c.RunFrom(pos); e != nil {
  133. fmt.Printf("启动监听异常: %v, 重新定位binlog\n", e)
  134. if c != nil {
  135. c.Close()
  136. }
  137. // 重新获取binlog位置
  138. rename, reposition, e := initBinlogNamePosition()
  139. if e != nil {
  140. err = fmt.Errorf("重新定位binlog失败, %v", e)
  141. return
  142. }
  143. pos.Name = rename
  144. pos.Pos = reposition
  145. // 重新尝试监听, 再起不来就退出报异常
  146. canalNew, e := canal.NewCanal(cfg)
  147. if e != nil {
  148. err = fmt.Errorf("renew canal err: %v", e)
  149. return
  150. }
  151. binlogHandler.SetBinlogFileName(rename, reposition)
  152. canalNew.SetEventHandler(binlogHandler)
  153. c = canalNew
  154. if e = c.RunFrom(pos); e != nil {
  155. err = fmt.Errorf("重新监听binlog失败, %v", e)
  156. return
  157. }
  158. }
  159. // 定时修改binlog文件名称和位置
  160. go timingModifyBinlogNamePosition()
  161. // 同步到redis
  162. go binlogHandler.SyncToRedis()
  163. }
  164. // getBinlogNamePosition
  165. // @Description: 获取当前binlog文件名称和位置
  166. // @author: Roc
  167. // @datetime 2024-05-17 13:18:19
  168. // @return fileName string
  169. // @return position uint32
  170. // @return err error
  171. func getBinlogNamePosition() (fileName string, position uint32, err error) {
  172. // 优先从redis获取
  173. fileName = utils.Rc.GetStr(utils.CACHE_MYSQL_DATA_FILENAME)
  174. position64, err := utils.Rc.GetUInt64(utils.CACHE_MYSQL_DATA_POSITION)
  175. if err != nil {
  176. if err.Error() != utils.RedisNoKeyErr {
  177. panic("mysql binlog position is not found,err:" + err.Error())
  178. return
  179. }
  180. err = nil
  181. }
  182. position = uint32(position64)
  183. // 如果没有从redis中获取到上次监听到的binlog的文件名称,或者位置为0,则从mysql中获取,则从 MySQL 中获取最新的文件名和位置。
  184. if fileName == `` || position == 0 {
  185. // binlog文件名
  186. fileNameKey := binlog.BinlogFileNameKey
  187. fileNameLog, tmpErr := binlog.GetBusinessSysInteractionLogByKey(fileNameKey)
  188. if tmpErr == nil {
  189. fileName = fileNameLog.InteractionVal
  190. }
  191. // binlog位置
  192. positionKey := binlog.BinlogPositionKey
  193. positionLog, tmpErr := binlog.GetBusinessSysInteractionLogByKey(positionKey)
  194. if tmpErr == nil {
  195. positionStr := positionLog.InteractionVal
  196. positionInt, tmpErr := strconv.Atoi(positionStr)
  197. if tmpErr == nil {
  198. position = uint32(positionInt)
  199. }
  200. }
  201. }
  202. // 从mysql中获取最新的binlog文件名称和位置,名称不一致则以mysql中为准
  203. if fileName == `` || position == 0 {
  204. item, tmpErr := binlog.GetShowMaster()
  205. if tmpErr != nil {
  206. err = tmpErr
  207. return
  208. }
  209. fileName = item.File
  210. position = item.Position
  211. }
  212. return
  213. }
  214. // modifyBinlogNamePosition
  215. // @Description: 修改记录本次启动时的binlog文件名称和位置
  216. // @author: Roc
  217. // @datetime 2024-05-17 11:32:32
  218. // @param fileName string
  219. // @param position uint32
  220. // @return err error
  221. func modifyBinlogNamePosition(fileName string, position uint32) {
  222. var err error
  223. defer func() {
  224. if err != nil {
  225. utils.FileLog.Error(fmt.Sprintf("修改binlog文件名称和位置异常, fileName: %s, position: %d, err: %v", fileName, position, err))
  226. }
  227. }()
  228. // fileName 变更
  229. fileNameKey := binlog.BinlogFileNameKey
  230. fileNameLog, err := binlog.GetBusinessSysInteractionLogByKey(fileNameKey)
  231. if err != nil {
  232. if err.Error() != utils.ErrNoRow() {
  233. return
  234. }
  235. err = nil
  236. fileNameLog = &binlog.BusinessSysInteractionLog{
  237. //ID: 0,
  238. InteractionKey: fileNameKey,
  239. InteractionVal: fileName,
  240. Remark: "mysql中binlog的filename名称",
  241. ModifyTime: time.Now(),
  242. CreateTime: time.Now(),
  243. }
  244. err = fileNameLog.Create()
  245. if err != nil {
  246. return
  247. }
  248. } else {
  249. fileNameLog.InteractionVal = fileName
  250. fileNameLog.ModifyTime = time.Now()
  251. err = fileNameLog.Update([]string{"InteractionVal", "ModifyTime"})
  252. if err != nil {
  253. return
  254. }
  255. }
  256. // position 变更
  257. positionKey := binlog.BinlogPositionKey
  258. positionLog, err := binlog.GetBusinessSysInteractionLogByKey(positionKey)
  259. if err != nil {
  260. if err.Error() != utils.ErrNoRow() {
  261. return
  262. }
  263. err = nil
  264. positionLog = &binlog.BusinessSysInteractionLog{
  265. //ID: 0,
  266. InteractionKey: positionKey,
  267. InteractionVal: fmt.Sprint(position),
  268. Remark: "mysql中binlog的position位置",
  269. ModifyTime: time.Now(),
  270. CreateTime: time.Now(),
  271. }
  272. err = positionLog.Create()
  273. if err != nil {
  274. return
  275. }
  276. } else {
  277. positionLog.InteractionVal = fmt.Sprint(position)
  278. positionLog.ModifyTime = time.Now()
  279. err = positionLog.Update([]string{"InteractionVal", "ModifyTime"})
  280. if err != nil {
  281. return
  282. }
  283. }
  284. return
  285. }
  286. // timingModifyBinlogNamePosition
  287. // @Description: 定时修改binlog文件名称和位置
  288. // @author: Roc
  289. // @datetime 2024-05-17 13:08:13
  290. func timingModifyBinlogNamePosition() {
  291. for {
  292. // 延时30s执行
  293. time.Sleep(30 * time.Second)
  294. // 获取最新的binlog文件名称和位置
  295. fileName, position, err := getBinlogNamePosition()
  296. if err != nil {
  297. return
  298. }
  299. if fileName != `` && position != 0 {
  300. // 修改记录本次启动时的binlog文件名称和位置
  301. modifyBinlogNamePosition(fileName, position)
  302. }
  303. }
  304. }
  305. // initBinlogNamePosition 初始化/重新定位binlog文件名称和位置
  306. func initBinlogNamePosition() (fileName string, position uint32, err error) {
  307. // 获取mysql当前binlog文件名称和位置
  308. status, e := binlog.GetShowMaster()
  309. if e != nil {
  310. err = fmt.Errorf("get mysql master status err: %v", e)
  311. return
  312. }
  313. fileName = status.File
  314. position = status.Position
  315. // 写入redis
  316. _ = utils.Rc.Put(utils.CACHE_MYSQL_DATA_FILENAME, fileName, 31*24*time.Hour)
  317. _ = utils.Rc.Put(utils.CACHE_MYSQL_DATA_POSITION, position, 31*24*time.Hour)
  318. // 更新MySQL
  319. modifyBinlogNamePosition(fileName, position)
  320. return
  321. }