binlog.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. package binlog
  2. import (
  3. "errors"
  4. "eta_gn/eta_bridge/global"
  5. "eta_gn/eta_bridge/models/index"
  6. "eta_gn/eta_bridge/utils"
  7. "fmt"
  8. "github.com/go-mysql-org/go-mysql/canal"
  9. "github.com/go-mysql-org/go-mysql/mysql"
  10. _ "github.com/go-sql-driver/mysql"
  11. "math/rand"
  12. "strconv"
  13. "time"
  14. )
  15. func ListenMysql() {
  16. var err error
  17. defer func() {
  18. if err != nil {
  19. fmt.Println("数据库监听服务异常,err:", err)
  20. }
  21. }()
  22. if global.CONFIG.Mysql.Binlog.Host == "" {
  23. panic("mysqlHost is empty")
  24. }
  25. if global.CONFIG.Mysql.Binlog.User == "" {
  26. panic("user is empty")
  27. }
  28. if global.CONFIG.Mysql.Binlog.Password == "" {
  29. panic("password is empty")
  30. }
  31. if global.CONFIG.Mysql.Binlog.Db == "" {
  32. panic("db is empty")
  33. }
  34. includeTableRegex := []string{
  35. global.CONFIG.Mysql.Binlog.Db + ".edb_info$",
  36. global.CONFIG.Mysql.Binlog.Db + ".edb_classify$",
  37. global.CONFIG.Mysql.Binlog.Db + ".base_from_mysteel_chemical_index$",
  38. global.CONFIG.Mysql.Binlog.Db + ".base_from_smm_index$",
  39. global.CONFIG.Mysql.Binlog.Db + ".edb_data*",
  40. }
  41. serverId := global.CONFIG.Mysql.Binlog.ServerID
  42. if serverId == 0 {
  43. serverId = uint32(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(1000)) + 1001
  44. }
  45. cfg := &canal.Config{
  46. ServerID: serverId,
  47. Flavor: "mysql",
  48. Addr: global.CONFIG.Mysql.Binlog.Host,
  49. User: global.CONFIG.Mysql.Binlog.User,
  50. Password: global.CONFIG.Mysql.Binlog.Password,
  51. SemiSyncEnabled: false,
  52. UseDecimal: true,
  53. IncludeTableRegex: includeTableRegex,
  54. }
  55. {
  56. binlogFormat, tmpErr := index.GetBinlogFormat()
  57. if tmpErr != nil {
  58. err = tmpErr
  59. return
  60. }
  61. if binlogFormat.Value != "ROW" {
  62. panic("mysql binlog format is not ROW")
  63. return
  64. }
  65. }
  66. fileName, position, err := getBinlogNamePosition()
  67. if err != nil {
  68. return
  69. }
  70. modifyBinlogNamePosition(fileName, position)
  71. go timingModifyBinlogNamePosition()
  72. c, err := canal.NewCanal(cfg)
  73. if err != nil {
  74. fmt.Println("err:", err)
  75. return
  76. }
  77. global.FILE_LOG.Debug("记录上一次启动时的fileName:", fileName, ";position:", position)
  78. binlogHandler := &MyEventHandler{}
  79. binlogHandler.SetBinlogFileName(fileName, position)
  80. c.SetEventHandler(binlogHandler)
  81. pos := mysql.Position{
  82. Name: fileName,
  83. Pos: position,
  84. }
  85. err = c.RunFrom(pos)
  86. }
  87. func getBinlogNamePosition() (fileName string, position uint32, err error) {
  88. fileName = global.Rc.GetStr(utils.CACHE_MYSQL_MASTER_FILENAME)
  89. position64, err := global.Rc.GetUInt64(utils.CACHE_MYSQL_MASTER_POSITION)
  90. if err != nil {
  91. if err.Error() != utils.RedisNoKeyErr {
  92. panic("mysql binlog position is not found,err:" + err.Error())
  93. return
  94. }
  95. err = nil
  96. }
  97. position = uint32(position64)
  98. if fileName == `` || position == 0 {
  99. fileNameKey := index.BinlogFileNameKey
  100. fileNameLog, tmpErr := index.GetBusinessSysInteractionLogByKey(fileNameKey)
  101. if tmpErr == nil {
  102. fileName = fileNameLog.InteractionKey
  103. }
  104. positionKey := index.BinlogPositionKey
  105. positionLog, tmpErr := index.GetBusinessSysInteractionLogByKey(positionKey)
  106. if tmpErr == nil {
  107. positionStr := positionLog.InteractionKey
  108. positionInt, tmpErr := strconv.Atoi(positionStr)
  109. if tmpErr == nil {
  110. position = uint32(positionInt)
  111. }
  112. }
  113. }
  114. if fileName == `` || position == 0 {
  115. item, tmpErr := index.GetShowMaster()
  116. if tmpErr != nil {
  117. err = tmpErr
  118. return
  119. }
  120. fileName = item.File
  121. position = item.Position
  122. }
  123. return
  124. }
  125. func timingModifyBinlogNamePosition() {
  126. for {
  127. time.Sleep(30 * time.Second)
  128. fileName, position, err := getBinlogNamePosition()
  129. if err != nil {
  130. return
  131. }
  132. if fileName != `` && position != 0 {
  133. modifyBinlogNamePosition(fileName, position)
  134. }
  135. }
  136. }
  137. func modifyBinlogNamePosition(fileName string, position uint32) {
  138. var err error
  139. defer func() {
  140. if err != nil {
  141. global.FILE_LOG.Error("修改binlog文件名称和位置异常,fileName", fileName, ",position:", position, ",err:", err)
  142. }
  143. }()
  144. fileNameKey := index.BinlogFileNameKey
  145. fileNameLog, err := index.GetBusinessSysInteractionLogByKey(fileNameKey)
  146. if err != nil {
  147. if !errors.Is(err, utils.ErrNoRow) {
  148. return
  149. }
  150. err = nil
  151. fileNameLog = &index.BusinessSysInteractionLog{
  152. InteractionKey: fileNameKey,
  153. InteractionVal: fileName,
  154. Remark: "mysql中binlog的filename名称",
  155. ModifyTime: time.Now(),
  156. CreateTime: time.Now(),
  157. }
  158. err = fileNameLog.Create()
  159. if err != nil {
  160. return
  161. }
  162. } else {
  163. fileNameLog.InteractionVal = fileName
  164. fileNameLog.ModifyTime = time.Now()
  165. err = fileNameLog.Update([]string{"InteractionVal", "ModifyTime"})
  166. if err != nil {
  167. return
  168. }
  169. }
  170. positionKey := index.BinlogPositionKey
  171. positionLog, err := index.GetBusinessSysInteractionLogByKey(positionKey)
  172. if err != nil {
  173. if !errors.Is(err, utils.ErrNoRow) {
  174. return
  175. }
  176. err = nil
  177. positionLog = &index.BusinessSysInteractionLog{
  178. InteractionKey: positionKey,
  179. InteractionVal: fmt.Sprint(position),
  180. Remark: "mysql中binlog的position位置",
  181. ModifyTime: time.Now(),
  182. CreateTime: time.Now(),
  183. }
  184. err = positionLog.Create()
  185. if err != nil {
  186. return
  187. }
  188. } else {
  189. positionLog.InteractionVal = fmt.Sprint(position)
  190. positionLog.ModifyTime = time.Now()
  191. err = positionLog.Update([]string{"InteractionVal", "ModifyTime"})
  192. if err != nil {
  193. return
  194. }
  195. }
  196. return
  197. }