Pārlūkot izejas kodu

add:增加binlog监听

zqbao 4 mēneši atpakaļ
vecāks
revīzija
32d9e308f7

+ 20 - 3
go.mod

@@ -20,10 +20,12 @@ require (
 	github.com/beevik/etree v1.3.0
 	github.com/dgrijalva/jwt-go v3.2.0+incompatible
 	github.com/go-ldap/ldap v3.0.3+incompatible
+	github.com/go-mysql-org/go-mysql v1.9.1
 	github.com/go-redis/redis/v8 v8.11.6-0.20220405070650-99c79f7041fc
-	github.com/go-sql-driver/mysql v1.7.0
+	github.com/go-sql-driver/mysql v1.7.1
 	github.com/go-xorm/xorm v0.7.9
 	github.com/gonum/stat v0.0.0-20181125101827-41a0da705a5b
+	github.com/google/uuid v1.6.0
 	github.com/gorilla/websocket v1.5.1
 	github.com/h2non/filetype v1.1.3
 	github.com/jung-kurt/gofpdf v1.16.2
@@ -51,6 +53,8 @@ require (
 )
 
 require (
+	github.com/BurntSushi/toml v1.3.2 // indirect
+	github.com/Masterminds/semver v1.5.0 // indirect
 	github.com/alibabacloud-go/alibabacloud-gateway-spi v0.0.4 // indirect
 	github.com/alibabacloud-go/debug v1.0.0 // indirect
 	github.com/alibabacloud-go/endpoint-util v1.1.0 // indirect
@@ -69,6 +73,7 @@ require (
 	github.com/bradfitz/gomemcache v0.0.0-20220106215444-fb4bf637b56d // indirect
 	github.com/cespare/xxhash/v2 v2.2.0 // indirect
 	github.com/clbanning/mxj/v2 v2.5.5 // indirect
+	github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect
 	github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
 	github.com/dustin/go-humanize v1.0.1 // indirect
 	github.com/fatih/structs v1.1.0 // indirect
@@ -76,6 +81,7 @@ require (
 	github.com/go-playground/locales v0.13.0 // indirect
 	github.com/go-playground/universal-translator v0.17.0 // indirect
 	github.com/go-playground/validator/v10 v10.4.1 // indirect
+	github.com/goccy/go-json v0.10.2 // indirect
 	github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect
 	github.com/golang/protobuf v1.5.3 // indirect
 	github.com/golang/snappy v0.0.1 // indirect
@@ -85,7 +91,6 @@ require (
 	github.com/gonum/internal v0.0.0-20181124074243-f884aa714029 // indirect
 	github.com/gonum/lapack v0.0.0-20181123203213-e4cdc5a0bff9 // indirect
 	github.com/gonum/matrix v0.0.0-20181209220409-c518dec07be9 // indirect
-	github.com/google/uuid v1.6.0 // indirect
 	github.com/gorilla/css v1.0.0 // indirect
 	github.com/hashicorp/golang-lru v0.5.4 // indirect
 	github.com/hashicorp/hcl v1.0.0 // indirect
@@ -94,7 +99,7 @@ require (
 	github.com/jmespath/go-jmespath v0.4.0 // indirect
 	github.com/josharian/intern v1.0.0 // indirect
 	github.com/json-iterator/go v1.1.12 // indirect
-	github.com/klauspost/compress v1.17.6 // indirect
+	github.com/klauspost/compress v1.17.8 // indirect
 	github.com/klauspost/cpuid/v2 v2.2.6 // indirect
 	github.com/leodido/go-urn v1.2.0 // indirect
 	github.com/magiconair/properties v1.8.1 // indirect
@@ -110,16 +115,23 @@ require (
 	github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
 	github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b // indirect
 	github.com/pelletier/go-toml v1.9.2 // indirect
+	github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32 // indirect
+	github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect
+	github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 // indirect
+	github.com/pingcap/tidb/pkg/parser v0.0.0-20231103042308-035ad5ccbe67 // indirect
 	github.com/pkg/errors v0.9.1 // indirect
 	github.com/prometheus/client_golang v1.16.0 // indirect
 	github.com/prometheus/client_model v0.3.0 // indirect
 	github.com/prometheus/common v0.42.0 // indirect
 	github.com/prometheus/procfs v0.10.1 // indirect
+	github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
 	github.com/richardlehane/mscfb v1.0.4 // indirect
 	github.com/richardlehane/msoleps v1.0.3 // indirect
 	github.com/rivo/uniseg v0.4.7 // indirect
 	github.com/rs/xid v1.5.0 // indirect
 	github.com/shiena/ansicolor v0.0.0-20200904210342-c7312218db18 // indirect
+	github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 // indirect
+	github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07 // indirect
 	github.com/sirupsen/logrus v1.9.3 // indirect
 	github.com/spf13/afero v1.1.2 // indirect
 	github.com/spf13/cast v1.5.0 // indirect
@@ -136,7 +148,11 @@ require (
 	github.com/xuri/efp v0.0.0-20231025114914-d1ff6096ae53 // indirect
 	github.com/xuri/nfp v0.0.0-20230919160717-d98342af3f05 // indirect
 	github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
+	go.uber.org/atomic v1.11.0 // indirect
+	go.uber.org/multierr v1.11.0 // indirect
+	go.uber.org/zap v1.26.0 // indirect
 	golang.org/x/crypto v0.19.0 // indirect
+	golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect
 	golang.org/x/image v0.15.0 // indirect
 	golang.org/x/sync v0.2.0 // indirect
 	golang.org/x/sys v0.17.0 // indirect
@@ -146,6 +162,7 @@ require (
 	gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
 	gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d // indirect
 	gopkg.in/ini.v1 v1.67.0 // indirect
+	gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
 	gopkg.in/yaml.v2 v2.4.0 // indirect
 	gopkg.in/yaml.v3 v3.0.1 // indirect
 	xorm.io/builder v0.3.6 // indirect

+ 56 - 6
go.sum

@@ -16,9 +16,13 @@ cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2k
 cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8=
+github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
 github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo=
 github.com/Knetic/govaluate v3.0.0+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
+github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww=
+github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y=
 github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
 github.com/PuerkitoBio/goquery v1.9.1 h1:mTL6XjbJTZdpfL+Gwl5U2h1l9yEkJjhmlTeV9VPW7UI=
 github.com/PuerkitoBio/goquery v1.9.1/go.mod h1:cW1n6TmIMDoORQU5IU/P1T3tGFunOeXEpGP2WHRwkbY=
@@ -115,6 +119,7 @@ github.com/beego/goyaml2 v0.0.0-20130207012346-5545475820dd/go.mod h1:1b+Y/CofkY
 github.com/beego/x2j v0.0.0-20131220205130-a0352aadc542/go.mod h1:kSeGC/p1AbBiEp5kat81+DSQrZenVBZXklMLaELspWU=
 github.com/beevik/etree v1.3.0 h1:hQTc+pylzIKDb23yYprodCWWTt+ojFfUZyzU09a/hmU=
 github.com/beevik/etree v1.3.0/go.mod h1:aiPf89g/1k3AShMVAzriilpcE4R/Vuor90y83zVZWFc=
+github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
 github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
 github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
@@ -149,6 +154,8 @@ github.com/couchbase/gomemcached v0.0.0-20200526233749-ec430f949808/go.mod h1:sr
 github.com/couchbase/goutils v0.0.0-20180530154633-e865a1461c8a/go.mod h1:BQwMFlJzDjFDG3DJUdU0KORxn88UlsOULuxLExMh3Hs=
 github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
 github.com/cupcake/rdb v0.0.0-20161107195141-43ba34106c76/go.mod h1:vYwsqCOLxGiisLwp9rITslkFNpZD5rz43tf41QFkTWY=
+github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 h1:iwZdTE0PVqJCos1vaoKsclOGD3ADKpshg3SRtYBbwso=
+github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548/go.mod h1:e6NPNENfs9mPDVNRekM7lKScauxd5kXTr1Mfyig6TDM=
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -191,6 +198,8 @@ github.com/go-ldap/ldap v3.0.3+incompatible h1:HTeSZO8hWMS1Rgb2Ziku6b8a7qRIZZMHj
 github.com/go-ldap/ldap v3.0.3+incompatible/go.mod h1:qfd9rJvER9Q0/D/Sqn1DfHRoBp40uXYvFoEVrNEPqRc=
 github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
+github.com/go-mysql-org/go-mysql v1.9.1 h1:W2ZKkHkoM4mmkasJCoSYfaE4RQNxXTb6VqiaMpKFrJc=
+github.com/go-mysql-org/go-mysql v1.9.1/go.mod h1:+SgFgTlqjqOQoMc98n9oyUWEgn2KkOL1VmXDoq2ONOs=
 github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A=
 github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
 github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q=
@@ -206,14 +215,16 @@ github.com/go-redis/redis/v8 v8.11.6-0.20220405070650-99c79f7041fc/go.mod h1:25m
 github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
 github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
 github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
-github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc=
-github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
+github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI=
+github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
 github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
 github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
 github.com/go-xorm/sqlfiddle v0.0.0-20180821085327-62ce714f951a h1:9wScpmSP5A3Bk8V3XHWUcJmYTh+ZnlHVyc+A4oZYS3Y=
 github.com/go-xorm/sqlfiddle v0.0.0-20180821085327-62ce714f951a/go.mod h1:56xuuqnHyryaerycW3BfssRdxQstACi0Epw/yC5E2xM=
 github.com/go-xorm/xorm v0.7.9 h1:LZze6n1UvRmM5gpL9/U9Gucwqo6aWlFVlfcHKH10qA0=
 github.com/go-xorm/xorm v0.7.9/go.mod h1:XiVxrMMIhFkwSkh96BW7PACl7UhLtx2iJIHMdmjh5sQ=
+github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
+github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
 github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
 github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
 github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
@@ -352,8 +363,8 @@ github.com/kgiannakakis/mp3duration v0.0.0-20191013070830-d834f8d5ed53/go.mod h1
 github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
 github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
 github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
-github.com/klauspost/compress v1.17.6 h1:60eq2E/jlfwQXtvZEeBUYADs+BwKBWURIY+Gj2eRGjI=
-github.com/klauspost/compress v1.17.6/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
+github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
+github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
 github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
 github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc=
 github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
@@ -458,6 +469,16 @@ github.com/pelletier/go-toml v1.9.2/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCko
 github.com/peterh/liner v1.0.1-0.20171122030339-3681c2a91233/go.mod h1:xIteQHvHuaLYG9IFj6mSxM0fCKrs34IrEQUhOYuGPHc=
 github.com/phpdave11/gofpdi v1.0.7/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI=
 github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
+github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
+github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
+github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32 h1:m5ZsBa5o/0CkzZXfXLaThzKuR85SnHHetqBCpzQ30h8=
+github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
+github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgWM9fSBIvaxsJHuGP0uM74HXtv3MyyGQ=
+github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
+github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 h1:2SOzvGvE8beiC1Y4g9Onkvu6UmuBBOeWRGQEjJaT/JY=
+github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
+github.com/pingcap/tidb/pkg/parser v0.0.0-20231103042308-035ad5ccbe67 h1:m0RZ583HjzG3NweDi4xAcK54NBBPJh+zXp5Fp60dHtw=
+github.com/pingcap/tidb/pkg/parser v0.0.0-20231103042308-035ad5ccbe67/go.mod h1:yRkiqLFwIqibYg2P7h4bclHjHcJiIFRLKhGRyBcKYus=
 github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@@ -498,6 +519,8 @@ github.com/qiniu/qmgo v1.1.8/go.mod h1:QvZkzWNEv0buWPx0kdZsSs6URhESVubacxFPlITmv
 github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
 github.com/rdlucklib/rdluck_tools v1.0.3 h1:iOtK2QPlPQ6CL6c1htCk5VnFCHzyG6DCfJtunrMswK0=
 github.com/rdlucklib/rdluck_tools v1.0.3/go.mod h1:9Onw9o4w19C8KE5lxb8GyxgRBbZweRVkQSc79v38EaA=
+github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
+github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
 github.com/richardlehane/mscfb v1.0.4 h1:WULscsljNPConisD5hR0+OyZjwK46Pfyr6mPu5ZawpM=
 github.com/richardlehane/mscfb v1.0.4/go.mod h1:YzVpcZg9czvAuhk9T+a3avCpcFPMUWm7gK3DypaEsUk=
 github.com/richardlehane/msoleps v1.0.1/go.mod h1:BWev5JBpU9Ko2WAgmZEuiz4/u3ZYTKbjLycmwiWUfWg=
@@ -517,6 +540,7 @@ github.com/ruudk/golang-pdf417 v0.0.0-20181029194003-1af4ab5afa58/go.mod h1:6lfF
 github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
 github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4/go.mod h1:C1a7PQSMz9NShzorzCiG2fk9+xuCgLkPeCvMHYR2OWg=
 github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
+github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
 github.com/shiena/ansicolor v0.0.0-20151119151921-a422bbe96644/go.mod h1:nkxAfR/5quYxwPZhyDxgasBMnRtBZd0FCEpawpjMUFg=
 github.com/shiena/ansicolor v0.0.0-20200904210342-c7312218db18 h1:DAYUYH5869yV94zvCES9F51oYtN5oGlwjxJJz7ZCnik=
 github.com/shiena/ansicolor v0.0.0-20200904210342-c7312218db18/go.mod h1:nkxAfR/5quYxwPZhyDxgasBMnRtBZd0FCEpawpjMUFg=
@@ -524,6 +548,10 @@ github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9Nz
 github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8=
 github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
 github.com/siddontang/go v0.0.0-20170517070808-cb568a3e5cc0/go.mod h1:3yhqj7WBBfRhbBlzyOC3gUxftwsU0u8gqevxwIHQpMw=
+github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 h1:xT+JlYxNGqyT+XcU8iUrN18JYed2TvG9yN5ULG2jATM=
+github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:3yhqj7WBBfRhbBlzyOC3gUxftwsU0u8gqevxwIHQpMw=
+github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07 h1:oI+RNwuC9jF2g2lP0u0cVEEZrc/AYBCuFdvwrLWM/6Q=
+github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07/go.mod h1:yFdBgwXP24JziuRl2NMUahT7nGLNOKi1SIiFxMttVD4=
 github.com/siddontang/goredis v0.0.0-20150324035039-760763f78400/go.mod h1:DDcKzU3qCuvj/tPnimWSsZZzvk9qvkvrIL5naVBPh5s=
 github.com/siddontang/rdb v0.0.0-20150307021120-fc89ed2e418d/go.mod h1:AMEsy7v5z92TR1JKMkLLoaOQk++LVnOKL3ScbJ8GNGA=
 github.com/silenceper/wechat/v2 v2.1.6 h1:2br2DxNzhksmvIBJ+PfMqjqsvoZmd/5BnMIfjKYUBgc=
@@ -554,8 +582,9 @@ github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5q
 github.com/ssdb/gossdb v0.0.0-20180723034631-88f6b59b84ec/go.mod h1:QBvMkMya+gXctz3kmljlUCu/yB3GZ6oee+dUozsezQE=
 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
-github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
 github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
+github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
+github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
 github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
 github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
@@ -631,10 +660,23 @@ go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
 go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
 go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
 go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
-go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
+go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
+go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
 go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
+go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
+go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
+go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
+go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
+go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
 go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
+go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
+go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
+go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
+go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
 go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
+go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
+go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
+go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
 golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
 golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
@@ -660,6 +702,8 @@ golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL
 golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
 golang.org/x/exp v0.0.0-20190829153037-c13cbed26979/go.mod h1:86+5VVa7VpoJ4kLfm080zCjGlMRFzhUhsZKEZO7MGek=
 golang.org/x/exp v0.0.0-20191030013958-a1ab85dbe136/go.mod h1:JXzH8nQsPlswgeRAPE3MuO9GYsAcnJvJ4vnMwN/5qkY=
+golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI=
+golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo=
 golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs=
 golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
 golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
@@ -820,6 +864,8 @@ golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0/go.mod h1:/rFqwRUd4F7ZHNgw
 golang.org/x/tools v0.0.0-20190816200558-6889da9d5479/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
 golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
 golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
 golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
 golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
 golang.org/x/tools v0.0.0-20200509030707-2212a7e161a5/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
@@ -892,6 +938,9 @@ gopkg.in/ini.v1 v1.66.2/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
 gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
 gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
 gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA=
+gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
+gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
+gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
 gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
 gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
 gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
@@ -905,6 +954,7 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
 gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
 gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
 gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

+ 37 - 0
models/binlog/binlog.go

@@ -0,0 +1,37 @@
+package binlog
+
+import "github.com/beego/beego/v2/client/orm"
+
+// BinlogFormatStruct
+// @Description: 数据库的binlog格式
+type BinlogFormatStruct struct {
+	VariableName string `json:"Variable_name"`
+	Value        string `json:"Value"`
+}
+
+// GetBinlogFormat
+// @Description: 获取数据库的binlog格式
+// @return item
+// @return err
+func GetBinlogFormat() (item *BinlogFormatStruct, err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := `SHOW VARIABLES LIKE 'binlog_format';`
+	err = o.Raw(sql).QueryRow(&item)
+	return
+}
+
+type BinlogFileStruct struct {
+	File     string `json:"File"`
+	Position uint32 `json:"Position"`
+}
+
+// GetShowMaster
+// @Description: 获取master的状态
+// @return item
+// @return err
+func GetShowMaster() (item *BinlogFileStruct, err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := `show master status;`
+	err = o.Raw(sql).QueryRow(&item)
+	return
+}

+ 64 - 0
models/binlog/business_sys_interaction_log.go

@@ -0,0 +1,64 @@
+package binlog
+
+import (
+	"time"
+
+	"github.com/beego/beego/v2/client/orm"
+)
+
+// BusinessSysInteractionLog 商家系统交互记录表
+type BusinessSysInteractionLog struct {
+	ID             uint32    `gorm:"primaryKey;column:id;type:int(10) unsigned;not null" json:"-"`
+	InteractionKey string    `gorm:"unique;column:interaction_key;type:varchar(128);not null;default:''" json:"interactionKey"` // 记录Key
+	InteractionVal string    `gorm:"column:interaction_val;type:text;default:null" json:"interactionVal"`                       // 记录值
+	Remark         string    `gorm:"column:remark;type:varchar(128);not null;default:''" json:"remark"`                         // 备注
+	ModifyTime     time.Time `gorm:"column:modify_time;type:datetime;default:null" json:"modifyTime"`                           // 修改日期
+	CreateTime     time.Time `gorm:"column:create_time;type:datetime;default:null" json:"createTime"`                           // 创建时间
+}
+
+// TableName get sql table name.获取数据库表名
+func (m *BusinessSysInteractionLog) TableName() string {
+	return "business_sys_interaction_log"
+}
+
+// BusinessSysInteractionLogColumns get sql column name.获取数据库列名
+var BusinessSysInteractionLogColumns = struct {
+	ID             string
+	InteractionKey string
+	InteractionVal string
+	Remark         string
+	ModifyTime     string
+	CreateTime     string
+}{
+	ID:             "id",
+	InteractionKey: "interaction_key",
+	InteractionVal: "interaction_val",
+	Remark:         "remark",
+	ModifyTime:     "modify_time",
+	CreateTime:     "create_time",
+}
+
+// Create 添加数据
+func (m *BusinessSysInteractionLog) Create() (err error) {
+	o := orm.NewOrmUsingDB("data")
+	_, err = o.Insert(m)
+	return
+}
+
+// Update 更新数据
+func (m *BusinessSysInteractionLog) Update(cols []string) (err error) {
+	o := orm.NewOrmUsingDB("data")
+	_, err = o.Update(m, cols...)
+	return
+}
+
+var BinlogFileNameKey = "binlog_edb_info_filename" // binlog文件名
+var BinlogPositionKey = "binlog_edb_info_position" // binlog位置
+
+// GetBusinessSysInteractionLogByKey 根据记录key获取数据
+func GetBusinessSysInteractionLogByKey(key string) (item *BusinessSysInteractionLog, err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := "SELECT * FROM business_sys_interaction_log WHERE interaction_key = ?"
+	err = o.Raw(sql, key).QueryRow(&item)
+	return
+}

+ 276 - 0
services/binlog/binlog.go

@@ -0,0 +1,276 @@
+package binlog
+
+import (
+	"eta/eta_api/models/binlog"
+	"eta/eta_api/utils"
+	"fmt"
+	"math/rand"
+	"strconv"
+	"time"
+
+	"github.com/go-mysql-org/go-mysql/canal"
+	"github.com/go-mysql-org/go-mysql/mysql"
+	_ "github.com/go-sql-driver/mysql"
+)
+
+func ListenMysql() {
+	var err error
+	defer func() {
+		if err != nil {
+			fmt.Println("数据库监听服务异常,err:", err)
+		}
+	}()
+	if utils.MYSQL_DATA_BINLOG_URL == "" {
+		panic("mysqlHost is empty")
+		//err = errors.New("mysqlHost is empty")
+		//return
+	}
+
+	if utils.MYSQL_DATA_BINLOG_USER == "" {
+		panic("user is empty")
+	}
+	if utils.MYSQL_DATA_BINLOG_PWD == "" {
+		panic("password is empty")
+	}
+	if utils.MYSQL_DATA_BINLOG_DB == "" {
+		panic("db is empty")
+	}
+	//includeTableRegex := []string{
+	//	"test_hz_data.edb_info",
+	//}
+
+	includeTableRegex := []string{
+		utils.MYSQL_DATA_BINLOG_DB + ".edb_info$",
+		// utils.MYSQL_DATA_BINLOG_DB + ".edb_classify$",
+		// utils.MYSQL_DATA_BINLOG_DB + ".base_from_mysteel_chemical_index$",
+		// utils.MYSQL_DATA_BINLOG_DB + ".base_from_smm_index$",
+		// utils.MYSQL_DATA_BINLOG_DB + ".edb_data*",
+	}
+
+	// 主从复制的身份id配置,必须全局唯一,如果没有配置的话,那么会随机生成一个
+	var serverId uint32
+	if utils.MYSQL_DATA_BINLOG_SERVER_ID != "" {
+		id, _ := strconv.ParseUint(utils.MYSQL_DATA_BINLOG_SERVER_ID, 10, 32)
+		serverId = uint32(id)
+	}
+	if serverId == 0 {
+		serverId = uint32(rand.New(rand.NewSource(time.Now().Unix())).Intn(1000)) + 1001
+	}
+	cfg := &canal.Config{
+		// 一个32位无符号整数,用于标识当前 Canal 实例在 MySQL 主从复制体系中的身份。这里使用了一个随机数生成器确保每次启动时分配的 ServerID 是唯一的(在1001到1099之间)。在实际生产环境中,你需要手动指定一个全局唯一的 ServerID。
+		ServerID: serverId,
+		// 指定 Canal 要连接的数据库类型,默认为 "mysql",表明这是一个 MySQL 数据库。
+		Flavor: "mysql",
+		// 设置 MySQL 服务器地址(主机名或 IP 地址)和端口,例如 "127.0.0.1:3306"。
+		Addr:     utils.MYSQL_DATA_BINLOG_URL,
+		User:     utils.MYSQL_DATA_BINLOG_USER,
+		Password: utils.MYSQL_DATA_BINLOG_PWD,
+		// 如果设置为 true,Canal 将以原始二进制格式获取 binlog,否则将以解析后的 SQL 语句形式提供。
+		//RawModeEnabled:  false,
+		// 是否启用半同步复制。当设置为 true 时,MySQL 主库在事务提交后会等待至少一个从库确认已接收并写入 binlog 才返回成功,提高了数据一致性。
+		SemiSyncEnabled: false,
+		//  是否将 MySQL 中的 decimal 类型字段解析为 Go 的 Decimal 类型,而不是 float 或者 string。如果业务中有精确小数计算的需求,应开启此选项以避免精度丢失问题。
+		UseDecimal: true,
+		// 用于控制初始数据导出的相关配置,在 Canal 启动时是否需要全量同步表数据。
+		//Dump:              dumpConf,
+		// 正则表达式字符串,用于定义 Canal 应该监听哪些表的 binlog 事件。只有名称匹配该正则表达式的表才会被 Canal 同步处理。
+		IncludeTableRegex: includeTableRegex,
+	}
+
+	// 校验mysql binlog format,目前仅支持row格式
+	{
+		binlogFormat, tmpErr := binlog.GetBinlogFormat()
+		if tmpErr != nil {
+			err = tmpErr
+			return
+		}
+
+		if binlogFormat.Value != "ROW" {
+			panic("mysql binlog format is not ROW")
+			return
+		}
+	}
+
+	//  获取上一次启动时的binlog文件名称和位置
+	fileName, position, err := getBinlogNamePosition()
+	if err != nil {
+		return
+	}
+	// 修改记录本次启动时的binlog文件名称和位置
+	modifyBinlogNamePosition(fileName, position)
+	// 定时修改binlog文件名称和位置
+	go timingModifyBinlogNamePosition()
+
+	c, err := canal.NewCanal(cfg)
+	if err != nil {
+		fmt.Println("err:", err)
+		return
+	}
+	utils.FileLog.Debug("记录上一次启动时的fileName:", fileName, ";position:", position)
+
+	binlogHandler := &edbEventHandler{}
+	binlogHandler.SetBinlogFileName(fileName, position)
+	c.SetEventHandler(binlogHandler)
+	//c.Run()
+
+	pos := mysql.Position{
+		Name: fileName,
+		Pos:  position,
+	}
+	err = c.RunFrom(pos)
+}
+
+// getBinlogNamePosition
+// @Description: 获取当前binlog文件名称和位置
+// @author: Roc
+// @datetime 2024-05-17 13:18:19
+// @return fileName string
+// @return position uint32
+// @return err error
+func getBinlogNamePosition() (fileName string, position uint32, err error) {
+	// 优先从redis获取
+	fileName = utils.Rc.GetStr(utils.CACHE_MYSQL_DATA_FILENAME)
+	position64, err := utils.Rc.GetUInt64(utils.CACHE_MYSQL_DATA_POSITION)
+	if err != nil {
+		if err.Error() != utils.RedisNoKeyErr {
+			panic("mysql binlog position is not found,err:" + err.Error())
+			return
+		}
+		err = nil
+	}
+
+	position = uint32(position64)
+
+	// 如果没有从redis中获取到上次监听到的binlog的文件名称,或者位置为0,则从mysql中获取,则从 MySQL 中获取最新的文件名和位置。
+	if fileName == `` || position == 0 {
+
+		// binlog文件名
+		fileNameKey := binlog.BinlogFileNameKey
+		fileNameLog, tmpErr := binlog.GetBusinessSysInteractionLogByKey(fileNameKey)
+		if tmpErr == nil {
+			fileName = fileNameLog.InteractionKey
+		}
+
+		// binlog位置
+		positionKey := binlog.BinlogPositionKey
+		positionLog, tmpErr := binlog.GetBusinessSysInteractionLogByKey(positionKey)
+		if tmpErr == nil {
+			positionStr := positionLog.InteractionKey
+			positionInt, tmpErr := strconv.Atoi(positionStr)
+			if tmpErr == nil {
+				position = uint32(positionInt)
+			}
+		}
+	}
+
+	// 如果从表中没有取到数据,则从mysql中获取,则从 MySQL 中获取最新的文件名和位置。
+	if fileName == `` || position == 0 {
+		item, tmpErr := binlog.GetShowMaster()
+		if tmpErr != nil {
+			err = tmpErr
+			return
+		}
+		fileName = item.File
+		position = item.Position
+	}
+
+	return
+}
+
+// modifyBinlogNamePosition
+// @Description: 修改记录本次启动时的binlog文件名称和位置
+// @author: Roc
+// @datetime 2024-05-17 11:32:32
+// @param fileName string
+// @param position uint32
+// @return err error
+func modifyBinlogNamePosition(fileName string, position uint32) {
+	var err error
+	defer func() {
+		if err != nil {
+			utils.FileLog.Error("修改binlog文件名称和位置异常,fileName", fileName, ",position:", position, ",err:", err)
+		}
+	}()
+
+	// fileName 变更
+	fileNameKey := binlog.BinlogFileNameKey
+	fileNameLog, err := binlog.GetBusinessSysInteractionLogByKey(fileNameKey)
+	if err != nil {
+		if err.Error() != utils.ErrNoRow() {
+			return
+		}
+		err = nil
+		fileNameLog = &binlog.BusinessSysInteractionLog{
+			//ID:             0,
+			InteractionKey: fileNameKey,
+			InteractionVal: fileName,
+			Remark:         "mysql中binlog的filename名称",
+			ModifyTime:     time.Now(),
+			CreateTime:     time.Now(),
+		}
+		err = fileNameLog.Create()
+		if err != nil {
+			return
+		}
+	} else {
+		fileNameLog.InteractionVal = fileName
+		fileNameLog.ModifyTime = time.Now()
+		err = fileNameLog.Update([]string{"InteractionVal", "ModifyTime"})
+		if err != nil {
+			return
+		}
+	}
+
+	// position 变更
+	positionKey := binlog.BinlogPositionKey
+	positionLog, err := binlog.GetBusinessSysInteractionLogByKey(positionKey)
+	if err != nil {
+		if err.Error() != utils.ErrNoRow() {
+			return
+		}
+		err = nil
+		positionLog = &binlog.BusinessSysInteractionLog{
+			//ID:             0,
+			InteractionKey: positionKey,
+			InteractionVal: fmt.Sprint(position),
+			Remark:         "mysql中binlog的position位置",
+			ModifyTime:     time.Now(),
+			CreateTime:     time.Now(),
+		}
+		err = positionLog.Create()
+		if err != nil {
+			return
+		}
+	} else {
+		positionLog.InteractionVal = fmt.Sprint(position)
+		positionLog.ModifyTime = time.Now()
+		err = positionLog.Update([]string{"InteractionVal", "ModifyTime"})
+		if err != nil {
+			return
+		}
+	}
+
+	return
+}
+
+// timingModifyBinlogNamePosition
+// @Description: 定时修改binlog文件名称和位置
+// @author: Roc
+// @datetime 2024-05-17 13:08:13
+func timingModifyBinlogNamePosition() {
+	for {
+		// 延时30s执行
+		time.Sleep(30 * time.Second)
+
+		// 获取最新的binlog文件名称和位置
+		fileName, position, err := getBinlogNamePosition()
+		if err != nil {
+			return
+		}
+
+		if fileName != `` && position != 0 {
+			// 修改记录本次启动时的binlog文件名称和位置
+			modifyBinlogNamePosition(fileName, position)
+		}
+	}
+}

+ 222 - 0
services/binlog/handler.go

@@ -0,0 +1,222 @@
+package binlog
+
+import (
+	"eta/eta_api/utils"
+	"fmt"
+	"reflect"
+	"time"
+
+	"github.com/go-mysql-org/go-mysql/canal"
+	"github.com/go-mysql-org/go-mysql/mysql"
+	"github.com/go-mysql-org/go-mysql/replication"
+	"github.com/pingcap/errors"
+)
+
+type edbEventHandler struct {
+	canal.DummyEventHandler
+	fileName string
+	position uint32
+}
+
+func (h *edbEventHandler) OnRow(e *canal.RowsEvent) (err error) {
+	//fmt.Printf("%s %v\n", e.Action, e.Rows)
+	//fmt.Println(e.Table.Columns)
+	//fmt.Println(e.Action)
+
+	// 监听逻辑
+	switch e.Action {
+	case canal.InsertAction:
+		err = h.Insert(e)
+	case canal.UpdateAction:
+		err = h.Update(e)
+	case canal.DeleteAction:
+		err = h.Delete(e)
+	default:
+		return errors.New("操作异常")
+	}
+
+	fmt.Println("fileName:", h.fileName, ";position:", h.position)
+
+	// 每次操作完成后都将当前位置记录到缓存
+	utils.Rc.Put(utils.CACHE_MYSQL_DATA_FILENAME, h.fileName, 31*24*time.Hour)
+	utils.Rc.Put(utils.CACHE_MYSQL_DATA_POSITION, h.position, 31*24*time.Hour)
+
+	return nil
+}
+
+func (h *edbEventHandler) OnPosSynced(header *replication.EventHeader, p mysql.Position, set mysql.GTIDSet, f bool) error {
+	h.fileName = p.Name
+	h.position = p.Pos
+
+	// 旋转binlog日志的时候,需要将当前位置记录到缓存
+	utils.Rc.Put(utils.CACHE_MYSQL_DATA_FILENAME, h.fileName, 31*24*time.Hour)
+	utils.Rc.Put(utils.CACHE_MYSQL_DATA_POSITION, h.position, 31*24*time.Hour)
+
+	return nil
+}
+
+func (h *edbEventHandler) String() string {
+	return "MyEventHandler"
+}
+
+func (h *edbEventHandler) Insert(e *canal.RowsEvent) error {
+	// 批量插入的时候,e.Rows的长度会大于0
+	//if len(e.Rows) != 1 {
+	//	fmt.Println("新增数据异常,没有新数据:", e.Rows)
+	//	return nil
+	//}
+	fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
+
+	for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list)
+		logData := make(map[string]interface{})
+		dataLen := len(row)
+		for i, v := range e.Table.Columns {
+			if i < dataLen {
+				tmpData := row[i]
+				if tmpData != nil && reflect.TypeOf(tmpData).Kind() == reflect.Slice {
+					tmpOld := tmpData.([]byte)
+					tmpData = string(tmpOld)
+				}
+				logData[v.Name] = tmpData
+				//tmpV = fmt.Sprintf("%v", tmpData)
+			}
+		}
+	}
+
+	return nil
+}
+
+func (h *edbEventHandler) Update(e *canal.RowsEvent) error {
+	if len(e.Rows) != 2 {
+		fmt.Println("更新数据异常,没有原始数据和新数据:", e.Rows)
+		return nil
+	}
+
+	//fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
+
+	logOldData := make(map[string]interface{})
+	logNewData := make(map[string]interface{})
+
+	oldDataLen := len(e.Rows[0])
+	newDataLen := len(e.Rows[0])
+	//maxDataLen := oldDataLen
+	//if maxDataLen < newDataLen {
+	//	maxDataLen = newDataLen
+	//}
+	for i, v := range e.Table.Columns {
+		//if v.IsUnsigned
+		//var tmpV string
+		//if i < dataLen {
+		//	tmpV = fmt.Sprintf("原数据:%v;新数据:%v", e.Rows[0][i], e.Rows[1][i])
+		//}
+		//fmt.Println(v.Name, ":", tmpV)
+
+		if i < oldDataLen {
+			oldData := e.Rows[0][i]
+			if oldData != nil && reflect.TypeOf(oldData).Kind() == reflect.Slice {
+				tmpOld := oldData.([]byte)
+				oldData = string(tmpOld)
+			}
+			logOldData[v.Name] = oldData
+		}
+		if i < newDataLen {
+			newData := e.Rows[1][i]
+			if newData != nil && reflect.TypeOf(newData).Kind() == reflect.Slice {
+				tmpNew := newData.([]byte)
+				newData = string(tmpNew)
+			}
+			logNewData[v.Name] = newData
+		}
+
+		//if i < maxDataLen {
+		//	oldData := e.Rows[0][i]
+		//	newData := e.Rows[1][i]
+		//
+		//	if oldData != nil && reflect.TypeOf(oldData).Kind() == reflect.Slice {
+		//		tmpOld := oldData.([]byte)
+		//		oldData = string(tmpOld)
+		//	}
+		//	if newData != nil && reflect.TypeOf(newData).Kind() == reflect.Slice {
+		//		tmpNew := newData.([]byte)
+		//		newData = string(tmpNew)
+		//	}
+		//
+		//
+		//	//if oldData != newData {
+		//	//	tmpV = fmt.Sprintf("原数据:%v;新数据:%v", oldData, newData)
+		//	//}
+		//}
+		//if tmpV != `` {
+		//	fmt.Println(v.Name, ":", tmpV)
+		//}
+	}
+
+	return nil
+}
+
+func (h *edbEventHandler) Delete(e *canal.RowsEvent) error {
+	// 批量删除的时候,e.Rows的长度会大于0
+	//if len(e.Rows) != 1 {
+	//	fmt.Println("删除数据异常,没有原始数据:", e.Rows)
+	//	return nil
+	//}
+	fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
+
+	for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list)
+		logData := make(map[string]interface{})
+		dataLen := len(row)
+		for i, v := range e.Table.Columns {
+			if i < dataLen {
+				tmpData := row[i]
+				if tmpData != nil && reflect.TypeOf(tmpData).Kind() == reflect.Slice {
+					tmpOld := tmpData.([]byte)
+					tmpData = string(tmpOld)
+				}
+				logData[v.Name] = tmpData
+				//tmpV = fmt.Sprintf("%v", tmpData)
+			}
+		}
+	}
+
+	return nil
+}
+
+func (h *edbEventHandler) Delete3(e *canal.RowsEvent) error {
+	if len(e.Rows) != 1 {
+		fmt.Println("删除数据异常,没有原始数据:", e.Rows)
+		return nil
+	}
+	fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
+
+	dataLen := len(e.Rows[0])
+	logData := make(map[string]interface{})
+	for i, v := range e.Table.Columns {
+		//var tmpV interface{}
+		if i < dataLen {
+			//tmpV = fmt.Sprintf("%v", e.Rows[0][i])
+			tmpData := e.Rows[0][i]
+			if tmpData != nil && reflect.TypeOf(tmpData).Kind() == reflect.Slice {
+				tmpOld := tmpData.([]byte)
+				tmpData = string(tmpOld)
+			}
+			logData[v.Name] = tmpData
+			//fmt.Println(oldData)
+		}
+	}
+
+	return nil
+}
+
+// SetBinlogFileName
+// @Description: 设置当前的binlog文件名和位置
+// @author: Roc
+// @receiver h
+// @datetime 2024-02-29 18:09:36
+// @param fileName string
+// @param position uint32
+func (h *edbEventHandler) SetBinlogFileName(fileName string, position uint32) {
+	h.fileName = fileName
+	h.position = position
+
+	fmt.Println("init fileName:", h.fileName, ";position:", h.position)
+}

+ 18 - 0
utils/config.go

@@ -31,6 +31,15 @@ var (
 	MgoDataDbName string       // mongodb指标数据的库名
 )
 
+// binlog
+var (
+	MYSQL_DATA_BINLOG_URL       string // 数据指标库binlog地址
+	MYSQL_DATA_BINLOG_USER      string // 数据指标库binlog用户名
+	MYSQL_DATA_BINLOG_PWD       string // 数据指标库binlog密码
+	MYSQL_DATA_BINLOG_DB        string // 数据指标库binlog位置
+	MYSQL_DATA_BINLOG_SERVER_ID string // 数据指标库binlog server_id
+)
+
 // 基础配置
 var (
 	STATIC_DIR       string
@@ -315,6 +324,15 @@ func init() {
 	//ETA-AI
 	MYSQL_AI_URL = config["mysql_url_ai"]
 
+	// binlog配置
+	{
+		MYSQL_DATA_BINLOG_URL = config["mysql_data_binlog_url"]
+		MYSQL_DATA_BINLOG_USER = config["mysql_data_binlog_user"]
+		MYSQL_DATA_BINLOG_PWD = config["mysql_data_binlog_pwd"]
+		MYSQL_DATA_BINLOG_DB = config["mysql_data_binlog_db"]
+		MYSQL_DATA_BINLOG_SERVER_ID = config["mysql_data_binlog_server_id"]
+	}
+
 	// mongodb数据库连接配置
 	MgoUrlData = config["mgo_url_data"]
 

+ 3 - 0
utils/constants.go

@@ -513,3 +513,6 @@ var (
 	BASE_START_DATE_UnSpace = "19000101"                                            //基础数据开始日期
 	BASE_END_DATE_UnSpace   = time.Now().AddDate(4, 0, 0).Format(FormatDateUnSpace) //基础数据结束日期
 )
+
+const CACHE_MYSQL_DATA_FILENAME = "eta:mysql:eta_index:binlog:filename"
+const CACHE_MYSQL_DATA_POSITION = "eta:mysql:eta_index:binlog:position"

+ 5 - 0
utils/redis.go

@@ -7,6 +7,8 @@ import (
 
 type RedisClient interface {
 	Get(key string) interface{}
+	GetStr(key string) string
+	GetUInt64(key string) (uint64, error)
 	RedisBytes(key string) (data []byte, err error)
 	RedisString(key string) (data string, err error)
 	RedisInt(key string) (data int, err error)
@@ -31,3 +33,6 @@ func initRedis(redisType string, conf string) (redisClient RedisClient, err erro
 
 	return
 }
+
+// redis没有key的错误
+const RedisNoKeyErr = "redis: nil"

+ 21 - 1
utils/redis/cluster_redis.go

@@ -5,9 +5,10 @@ import (
 	"encoding/json"
 	"errors"
 	"fmt"
-	"github.com/go-redis/redis/v8"
 	"strings"
 	"time"
+
+	"github.com/go-redis/redis/v8"
 )
 
 // ClusterRedisClient
@@ -86,6 +87,25 @@ func (rc *ClusterRedisClient) Get(key string) interface{} {
 	return data
 }
 
+// GetStr
+// @Description: 根据key获取字符串数据
+// @receiver rc
+// @param key
+// @return string
+func (rc *ClusterRedisClient) GetStr(key string) string {
+	return rc.redisClient.Get(context.TODO(), key).Val()
+}
+
+// GetUInt64
+// @Description: 根据key获取uint64数据
+// @receiver rc
+// @param key
+// @return int
+// @return error
+func (rc *ClusterRedisClient) GetUInt64(key string) (uint64, error) {
+	return rc.redisClient.Get(context.TODO(), key).Uint64()
+}
+
 // RedisBytes
 // @Description: 根据key获取字节编码数据
 // @receiver rc

+ 21 - 1
utils/redis/standalone_redis.go

@@ -5,9 +5,10 @@ import (
 	"encoding/json"
 	"errors"
 	"fmt"
-	"github.com/go-redis/redis/v8"
 	"strconv"
 	"time"
+
+	"github.com/go-redis/redis/v8"
 )
 
 // StandaloneRedisClient
@@ -78,6 +79,25 @@ func (rc *StandaloneRedisClient) Get(key string) interface{} {
 	return data
 }
 
+// GetStr
+// @Description: 根据key获取字符串数据
+// @receiver rc
+// @param key
+// @return string
+func (rc *StandaloneRedisClient) GetStr(key string) string {
+	return rc.redisClient.Get(context.TODO(), key).Val()
+}
+
+// GetUInt64
+// @Description: 根据key获取uint64数据
+// @receiver rc
+// @param key
+// @return int
+// @return error
+func (rc *StandaloneRedisClient) GetUInt64(key string) (uint64, error) {
+	return rc.redisClient.Get(context.TODO(), key).Uint64()
+}
+
 // RedisBytes
 // @Description: 根据key获取字节编码数据
 // @receiver rc