浏览代码

feat:象屿数仓

Roc 1 年之前
父节点
当前提交
ad4afe70f4

+ 5 - 3
config/config.go

@@ -46,6 +46,7 @@ type Mysql struct {
 	//LogMode             bool        `mapstructure:"log-mode" json:"log-mode" yaml:"log-mode" description:"是否开启日志"`
 	Stdout              bool        `mapstructure:"stdout" json:"stdout" yaml:"stdout" description:"日志是否输出在控制台"`
 	DefaultDsnAliasName string      `mapstructure:"default-dsn-alias-name" json:"default-dsn-alias-name" yaml:"default-dsn-alias-name" description:"默认的数据库连接别名"`
+	IsListenMysqlBinlog bool        `mapstructure:"is-listen-mysql-binlog" json:"is-listen-mysql-binlog" yaml:"is-listen-mysql-binlog" description:"是否监听mysql binlog"`
 	List                []MysqlConn `mapstructure:"list" json:"list" yaml:"list" description:"数据库链接配置列表"`
 }
 
@@ -59,9 +60,10 @@ type MysqlConn struct {
 
 // Redis redis配置
 type Redis struct {
-	Address  string `mapstructure:"address" json:"address" yaml:"address" description:"redis服务链接地址"`
-	Password string `mapstructure:"password" json:"password" yaml:"password" description:"redis服务密码"`
-	Db       int    `mapstructure:"db" json:"db" yaml:"db" description:"默认使用的redis库"`
+	Address   string `mapstructure:"address" json:"address" yaml:"address" description:"redis服务链接地址"`
+	Password  string `mapstructure:"password" json:"password" yaml:"password" description:"redis服务密码"`
+	Db        int    `mapstructure:"db" json:"db" yaml:"db" description:"默认使用的redis库"`
+	ServeType string `mapstructure:"serve-type" json:"serve-type" yaml:"serve-type" description:"redis服务类型,单机或者cluster;默认单机"`
 }
 
 // OracleJY 嘉悦物产数据库配置

+ 92 - 5
controller/xiangyu/index.go

@@ -9,14 +9,14 @@ import (
 	"github.com/go-playground/validator/v10"
 )
 
-// PushData
-// @Description: 数据推送
+// PushIndexDataResp
+// @Description: 指标信息推送
 // @author: Roc
 // @receiver xc
 // @datetime 2024-02-27 17:53:24
 // @param c *gin.Context
-func (xc *XiangyuController) PushData(c *gin.Context) {
-	var req xiangyuSrc.PushDataParamReq
+func (xc *XiangyuController) PushIndexDataResp(c *gin.Context) {
+	var req xiangyuSrc.PushBaseParamReq
 	if e := c.Bind(&req); e != nil {
 		err, ok := e.(validator.ValidationErrors)
 		if !ok {
@@ -27,7 +27,94 @@ func (xc *XiangyuController) PushData(c *gin.Context) {
 		return
 	}
 
-	err, errMsg := xiangyu.PushData(req)
+	err, errMsg := xiangyu.PushIndexDataResp(req)
+
+	if err != nil {
+		resp.FailData(errMsg, err.Error(), c)
+		return
+	}
+	resp.OkData("同步成功", "", c)
+
+	return
+}
+
+// PushIndexValueDataResp
+// @Description: 指标日期值信息推送
+// @author: Roc
+// @receiver xc
+// @datetime 2024-02-27 17:53:24
+// @param c *gin.Context
+func (xc *XiangyuController) PushIndexValueDataResp(c *gin.Context) {
+	var req xiangyuSrc.PushBaseParamReq
+	if e := c.Bind(&req); e != nil {
+		err, ok := e.(validator.ValidationErrors)
+		if !ok {
+			resp.FailData("参数解析失败", "Err:"+e.Error(), c)
+			return
+		}
+		resp.FailData("参数解析失败", err.Translate(global.Trans), c)
+		return
+	}
+
+	err, errMsg := xiangyu.PushEdbValueDataResp(req)
+
+	if err != nil {
+		resp.FailData(errMsg, err.Error(), c)
+		return
+	}
+	resp.OkData("同步成功", "", c)
+
+	return
+}
+
+// PushClassifyDataResp
+// @Description: 指标分类信息推送
+// @author: Roc
+// @receiver xc
+// @datetime 2024-02-27 17:53:24
+// @param c *gin.Context
+func (xc *XiangyuController) PushClassifyDataResp(c *gin.Context) {
+	var req xiangyuSrc.PushBaseParamReq
+	if e := c.Bind(&req); e != nil {
+		err, ok := e.(validator.ValidationErrors)
+		if !ok {
+			resp.FailData("参数解析失败", "Err:"+e.Error(), c)
+			return
+		}
+		resp.FailData("参数解析失败", err.Translate(global.Trans), c)
+		return
+	}
+
+	err, errMsg := xiangyu.PushClassifyDataResp(req)
+
+	if err != nil {
+		resp.FailData(errMsg, err.Error(), c)
+		return
+	}
+	resp.OkData("同步成功", "", c)
+
+	return
+}
+
+// PushEdbClassifyDataResp
+// @Description: 指标与分类的关系信息推送
+// @author: Roc
+// @receiver xc
+// @datetime 2024-02-27 17:53:24
+// @param c *gin.Context
+func (xc *XiangyuController) PushEdbClassifyDataResp(c *gin.Context) {
+	var req xiangyuSrc.PushBaseParamReq
+	if e := c.Bind(&req); e != nil {
+		err, ok := e.(validator.ValidationErrors)
+		if !ok {
+			resp.FailData("参数解析失败", "Err:"+e.Error(), c)
+			return
+		}
+		resp.FailData("参数解析失败", err.Translate(global.Trans), c)
+		return
+	}
+
+	err, errMsg := xiangyu.PushEdbClassifyDataResp(req)
 
 	if err != nil {
 		resp.FailData(errMsg, err.Error(), c)

+ 4 - 4
core/run_server.go

@@ -10,15 +10,15 @@ func RunServe() {
 	// 初始化路由
 	r := init_serve.InitRouter()
 
-	// 初始化mysql数据库
-	init_serve.Mysql()
-
 	// 如果配置了redis,那么链接redis
 	if global.CONFIG.Serve.UseRedis {
 		//初始化redis
-		init_serve.RedisTool()
+		init_serve.Redis()
 	}
 
+	// 初始化mysql数据库
+	init_serve.Mysql()
+
 	if global.CONFIG.OracleJY.Account != "" {
 		//初始化oracle
 		init_serve.OracleJy()

+ 6 - 8
global/global.go

@@ -6,13 +6,10 @@ import (
 	"eta/eta_bridge/utils"
 	"fmt"
 	"github.com/fsnotify/fsnotify"
-	"github.com/go-redis/redis/v8"
 	oplogging "github.com/op/go-logging"
 	"github.com/spf13/viper"
 	"gorm.io/gorm"
 	"io"
-
-	"github.com/rdlucklib/rdluck_tools/cache"
 )
 
 var (
@@ -21,12 +18,13 @@ var (
 	FILE_LOG      *oplogging.Logger   // 自定义的输出日志
 	MYSQL         map[string]*gorm.DB //数据库连接配置
 	MYSQL_LOG     io.Writer
-	DEFAULT_MYSQL *gorm.DB      //默认数据库连接配置
-	Redis         *redis.Client //redis链接
-	OracleJy      *sql.DB       //嘉悦物产数据库连接
+	DEFAULT_MYSQL *gorm.DB //默认数据库连接配置
+	//Redis         *redis.Client //redis链接
+	OracleJy *sql.DB //嘉悦物产数据库连接
 
-	Rc *cache.Cache //redis缓存
-	Re error        //redis错误
+	//Rc *cache.Cache //redis缓存
+	Rc utils.RedisClient //redis缓存
+	Re error             //redis错误
 )
 
 const ConfigFile = "config/config_debug.yaml" //本地(测试)环境下的配置文件地址

+ 14 - 0
go.mod

@@ -25,9 +25,11 @@ require (
 )
 
 require (
+	github.com/BurntSushi/toml v0.3.1 // indirect
 	github.com/KyleBanks/depth v1.2.1 // indirect
 	github.com/PuerkitoBio/purell v1.1.1 // indirect
 	github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
+	github.com/benbjohnson/clock v1.1.0 // indirect
 	github.com/bytedance/sonic v1.9.1 // indirect
 	github.com/cespare/xxhash/v2 v2.2.0 // indirect
 	github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
@@ -36,6 +38,7 @@ require (
 	github.com/garyburd/redigo v1.6.3 // indirect
 	github.com/gin-contrib/sse v0.1.0 // indirect
 	github.com/go-logfmt/logfmt v0.6.0 // indirect
+	github.com/go-mysql-org/go-mysql v1.7.0 // indirect
 	github.com/go-openapi/jsonpointer v0.19.5 // indirect
 	github.com/go-openapi/jsonreference v0.19.6 // indirect
 	github.com/go-openapi/spec v0.20.4 // indirect
@@ -43,6 +46,7 @@ require (
 	github.com/go-sql-driver/mysql v1.7.0 // indirect
 	github.com/goccy/go-json v0.10.2 // indirect
 	github.com/godror/knownpb v0.1.1 // indirect
+	github.com/google/uuid v1.3.0 // indirect
 	github.com/hashicorp/hcl v1.0.0 // indirect
 	github.com/jinzhu/inflection v1.0.0 // indirect
 	github.com/jinzhu/now v1.1.5 // indirect
@@ -59,9 +63,15 @@ require (
 	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
 	github.com/modern-go/reflect2 v1.0.2 // indirect
 	github.com/pelletier/go-toml/v2 v2.0.8 // indirect
+	github.com/pingcap/errors v0.11.5-0.20210425183316-da1aaba5fb63 // indirect
+	github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7 // indirect
+	github.com/pingcap/tidb/parser v0.0.0-20221126021158-6b02a5d8ba7d // indirect
 	github.com/pkg/errors v0.9.1 // indirect
 	github.com/robfig/cron/v3 v3.0.1 // indirect
 	github.com/shiena/ansicolor v0.0.0-20200904210342-c7312218db18 // indirect
+	github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 // indirect
+	github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 // indirect
+	github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07 // indirect
 	github.com/spf13/afero v1.9.5 // indirect
 	github.com/spf13/cast v1.5.1 // indirect
 	github.com/spf13/jwalterweatherman v1.1.0 // indirect
@@ -69,6 +79,9 @@ require (
 	github.com/subosito/gotenv v1.4.2 // indirect
 	github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
 	github.com/ugorji/go/codec v1.2.11 // indirect
+	go.uber.org/atomic v1.9.0 // indirect
+	go.uber.org/multierr v1.8.0 // indirect
+	go.uber.org/zap v1.21.0 // indirect
 	golang.org/x/arch v0.3.0 // indirect
 	golang.org/x/crypto v0.13.0 // indirect
 	golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
@@ -79,6 +92,7 @@ require (
 	google.golang.org/protobuf v1.30.0 // indirect
 	gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
 	gopkg.in/ini.v1 v1.67.0 // indirect
+	gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
 	gopkg.in/yaml.v2 v2.4.0 // indirect
 	gopkg.in/yaml.v3 v3.0.1 // indirect
 )

+ 6 - 0
init_serve/mysql.go

@@ -2,6 +2,7 @@ package init_serve
 
 import (
 	"eta/eta_bridge/global"
+	"eta/eta_bridge/services/binlog"
 	"fmt"
 	"gorm.io/driver/mysql"
 	"gorm.io/gorm"
@@ -71,5 +72,10 @@ func Mysql() {
 	//全局赋值数据库链接
 	global.MYSQL = mysqlMap
 
+	// 开启mysql binlog监听
+	if mysqlConf.IsListenMysqlBinlog {
+		go binlog.ListenMysql()
+	}
+
 	fmt.Println("mysql init end")
 }

+ 46 - 31
init_serve/redis.go

@@ -1,42 +1,57 @@
 package init_serve
 
 import (
-	"context"
 	"eta/eta_bridge/global"
+	"eta/eta_bridge/utils"
 	"fmt"
-	"github.com/go-redis/redis/v8"
-
-	"github.com/rdlucklib/rdluck_tools/cache"
 )
 
-func Redis() {
-	redisConf := global.CONFIG.Redis
-	client := redis.NewClient(&redis.Options{
-		Addr:     redisConf.Address,
-		Password: redisConf.Password,
-		DB:       redisConf.Db,
-		//PoolSize: 10, //连接池最大socket连接数,默认为10倍CPU数, 10 * runtime.NumCPU(暂不配置)
-	})
-	_, err := client.Ping(context.TODO()).Result()
-	if err != nil {
-		global.LOG.Error("redis 链接失败:", err)
-		panic("redis 链接失败:" + err.Error())
-	}
+//func Redis() {
+//	redisConf := global.CONFIG.Redis
+//	client := redis.NewClient(&redis.Options{
+//		Addr:     redisConf.Address,
+//		Password: redisConf.Password,
+//		DB:       redisConf.Db,
+//		//PoolSize: 10, //连接池最大socket连接数,默认为10倍CPU数, 10 * runtime.NumCPU(暂不配置)
+//	})
+//	_, err := client.Ping(context.TODO()).Result()
+//	if err != nil {
+//		global.LOG.Error("redis 链接失败:", err)
+//		panic("redis 链接失败:" + err.Error())
+//	}
+//
+//	//全局赋值redis链接
+//	global.Redis = client
+//}
+//
+//func RedisTool() {
+//	fmt.Println("init RedisTool start")
+//	redisConf := global.CONFIG.Redis
+//	fmt.Println(redisConf)
+//	REDIS_CACHE := fmt.Sprintf(`{"key":"redis","conn":"%s","password":"%s"}`, redisConf.Address, redisConf.Password)
+//	fmt.Println("REDIS_CACHE:" + REDIS_CACHE)
+//	global.Rc, global.Re = cache.NewCache(REDIS_CACHE) //初始化缓存
+//	if global.Re != nil {
+//		fmt.Println(global.Re)
+//		panic(global.Re)
+//	}
+//	fmt.Println("init RedisTool end")
+//}
 
-	//全局赋值redis链接
-	global.Redis = client
-}
+func Redis() {
+	var conf string
+	//switch global.CONFIG.Redis.ServeType {
+	//case "cluster": // 集群
+	//	conf = fmt.Sprintf(`{"key":"redis","conn":"%s","password":"%s"}`,global.CONFIG.Redis.Address, global.CONFIG.Redis.Password)
+	//default: // 默认走单机
+	//	redisClient, err = redis.InitStandaloneRedis(conf)
+	//}
+	conf = fmt.Sprintf(`{"key":"redis","conn":"%s","password":"%s"}`, global.CONFIG.Redis.Address, global.CONFIG.Redis.Password)
+	redisClient, err := utils.InitRedis(global.CONFIG.Redis.ServeType, conf)
 
-func RedisTool() {
-	fmt.Println("init RedisTool start")
-	redisConf := global.CONFIG.Redis
-	fmt.Println(redisConf)
-	REDIS_CACHE := fmt.Sprintf(`{"key":"redis","conn":"%s","password":"%s"}`, redisConf.Address, redisConf.Password)
-	fmt.Println("REDIS_CACHE:" + REDIS_CACHE)
-	global.Rc, global.Re = cache.NewCache(REDIS_CACHE) //初始化缓存
-	if global.Re != nil {
-		fmt.Println(global.Re)
-		panic(global.Re)
+	if err != nil {
+		fmt.Println("redis链接异常:", err)
+		panic(any(err))
 	}
-	fmt.Println("init RedisTool end")
+	global.Rc = redisClient
 }

+ 56 - 7
logic/xiangyu/index.go

@@ -4,20 +4,69 @@ import (
 	"eta/eta_bridge/services/xiangyu"
 )
 
-// LoginEta
-// @Description: 获取eta的session
+// PushIndexDataResp
+// @Description: 推送指标信息数据
 // @author: Roc
-// @datetime 2024-01-23 17:44:15
-// @param code string
-// @return resp response.LoginResp
+// @datetime 2024-02-28 17:48:01
+// @param data xiangyu.PushBaseParamReq
 // @return err error
 // @return errMsg string
-func PushData(data xiangyu.PushDataParamReq) (err error, errMsg string) {
+func PushIndexDataResp(data xiangyu.PushBaseParamReq) (err error, errMsg string) {
 	//  获取xiangyu token
 
 	errMsg = `同步失败`
 	data.TableCode = "CY_DATA_ASSETS_INFO" // 固定字段
-	_, err = xiangyu.PushData(data)
+	_, err = xiangyu.PushIndexData(data)
+	//fmt.Println(resp)
+
+	return
+}
+
+// PushEdbValueDataResp
+// @Description: 推送指标对应的日期值数据
+// @author: Roc
+// @datetime 2024-02-29 09:39:07
+// @param data xiangyu.PushBaseParamReq
+// @return err error
+// @return errMsg string
+func PushEdbValueDataResp(data xiangyu.PushBaseParamReq) (err error, errMsg string) {
+	//  获取xiangyu token
+
+	errMsg = `同步失败`
+	data.TableCode = "CY_DATA_ASSETS_DETAIL" // 固定字段
+	_, err = xiangyu.PushEdbValue(data)
+	//fmt.Println(resp)
+
+	return
+}
+
+// PushClassifyDataResp
+// @Description: 推送分类数据
+// @author: Roc
+// @datetime 2024-02-29 09:39:07
+// @param data xiangyu.PushBaseParamReq
+// @return err error
+// @return errMsg string
+func PushClassifyDataResp(data xiangyu.PushBaseParamReq) (err error, errMsg string) {
+	errMsg = `同步失败`
+	data.TableCode = "CY_CATALOG_CLASSIFY" // 固定字段
+	_, err = xiangyu.PushClassify(data)
+	//fmt.Println(resp)
+
+	return
+}
+
+// PushEdbClassifyDataResp
+// @Description: 推送指标分类数据
+// @author: Roc
+// @datetime 2024-02-29 09:39:07
+// @param data xiangyu.PushBaseParamReq
+// @return err error
+// @return errMsg string
+func PushEdbClassifyDataResp(data xiangyu.PushBaseParamReq) (err error, errMsg string) {
+	errMsg = `同步失败`
+	data.TableCode = "CY_CATALOG_INDEX_ASSOCIATE" // 固定字段
+	_, err = xiangyu.PushBase(data)
 	//fmt.Println(resp)
 
 	return

+ 36 - 0
models/index/base.go

@@ -0,0 +1,36 @@
+package index
+
+import "eta/eta_bridge/global"
+
+// BinlogFormatStruct
+// @Description: 数据库的binlog格式
+type BinlogFormatStruct struct {
+	VariableName string `json:"Variable_name"`
+	Value        string `json:"Value"`
+}
+
+// GetBinlogFormat
+// @Description: 获取数据库的binlog格式
+// @return item
+// @return err
+func GetBinlogFormat() (item *BinlogFormatStruct, err error) {
+	sql := `SHOW VARIABLES LIKE 'binlog_format';`
+	err = global.MYSQL["index"].Raw(sql).First(&item).Error
+	return
+}
+
+type BinlogFileStruct struct {
+	File     string `json:"File"`
+	Position uint32 `json:"Position"`
+}
+
+// GetShowMaster
+// @Description: 获取master的状态
+// @return item
+// @return err
+func GetShowMaster() (item *BinlogFileStruct, err error) {
+	sql := `show master status;`
+	err = global.MYSQL["index"].Raw(sql).First(&item).Error
+
+	return
+}

+ 26 - 0
models/index/edb_update_log.go

@@ -0,0 +1,26 @@
+package index
+
+import (
+	"eta/eta_bridge/global"
+	"time"
+)
+
+type EdbUpdateLog struct {
+	Id          int64     `gorm:"column:id;type:bigint(20) UNSIGNED;primaryKey;not null;" json:"id"`
+	OpDbName    string    `gorm:"column:op_db_name;type:varchar(255);comment:库名;default:NULL;" json:"op_db_name"` // 库名
+	OpTableName string    `gorm:"column:op_table_name;type:varchar(255);comment:表名;" json:"op_table_name"`        // 表名
+	OpType      string    `gorm:"column:op_type;type:varchar(16);comment:变更类型;default:NULL;" json:"op_type"`      // 变更类型
+	OldData     string    `gorm:"column:old_data;type:text;comment:历史数据;" json:"old_data"`                        // 历史数据
+	NewData     string    `gorm:"column:new_data;type:text;comment:新数据;" json:"new_data"`                         // 新数据
+	CreateTime  time.Time `gorm:"column:create_time;type:timestamp;default:CURRENT_TIMESTAMP;" json:"create_time"`
+}
+
+func (m *EdbUpdateLog) TableName() string {
+	return "edb_update_log"
+}
+
+// Create 添加数据
+func (m *EdbUpdateLog) Create() (err error) {
+	err = global.MYSQL["index"].Create(m).Error
+	return
+}

+ 2 - 2
models/request/xiangyu/index.go

@@ -1,8 +1,8 @@
 package xiangyu
 
-// PushDataParamReq
+// PushIndexParamReq
 // @Description: 业务报文
-type PushDataParamReq struct {
+type PushIndexParamReq struct {
 	SerialID    string                 `json:"serialID" description:"流水号"`
 	TableCode   string                 `json:"tableCode" description:"数据表编码"`
 	Total       int                    `json:"total" description:"本次落表数据总数"`

+ 4 - 1
routers/xiangyu.go

@@ -31,5 +31,8 @@ func initAuthXiangyu(r *gin.RouterGroup) {
 func initIndexXiangyu(r *gin.RouterGroup) {
 	control := new(xiangyu.XiangyuController)
 	group := r.Group("xy/").Use(middleware.InternalToken())
-	group.POST("index/pushData", control.PushData)
+	group.POST("index/pushIndexData", control.PushIndexDataResp)
+	group.POST("index/pushIndexValue", control.PushIndexValueDataResp)
+	group.POST("index/pushClassify", control.PushClassifyDataResp)
+	group.POST("index/pushEdbClassify", control.PushEdbClassifyDataResp)
 }

+ 144 - 0
services/binlog/binlog.go

@@ -0,0 +1,144 @@
+package binlog
+
+import (
+	"eta/eta_bridge/global"
+	"eta/eta_bridge/models/index"
+	"eta/eta_bridge/utils"
+	"fmt"
+	"github.com/go-mysql-org/go-mysql/canal"
+	"github.com/go-mysql-org/go-mysql/mysql"
+	_ "github.com/go-sql-driver/mysql"
+	"math/rand"
+	"net/url"
+	"strings"
+	"time"
+)
+
+var mysqlHost, mysqlUser, mysqlPwd, mysqlDb string
+
+func init() {
+	//eta()
+	//local()
+
+	for _, sqlConfig := range global.CONFIG.Mysql.List {
+		if sqlConfig.AliasName == `index` {
+			// 找出用户
+			tmpList := strings.Split(sqlConfig.Dsn, ":")
+			mysqlUser = tmpList[0]
+			// 找出密码
+			tmpList = strings.Split(strings.Join(tmpList[1:], ":"), "@")
+			lenTmp := len(tmpList)
+			mysqlPwd = strings.Join(tmpList[:lenTmp-1], "@")
+			// 找出地址
+			tmpList = strings.Split(tmpList[lenTmp-1], "tcp(")
+			tmpList = strings.Split(tmpList[1], ")")
+			mysqlHost = tmpList[0]
+
+			// 找出数据库名称
+			u, err := url.Parse(tmpList[1])
+			if err != nil {
+				panic(err) // 如果解析失败,处理错误
+			}
+
+			// 获取Path部分的内容
+			mysqlDb = u.Path[1:]
+
+		}
+	}
+
+	fmt.Println("HOST:", mysqlHost)
+	fmt.Println("user:", mysqlUser)
+	fmt.Println("password:", mysqlPwd)
+
+}
+
+func ListenMysql() {
+	var err error
+	defer func() {
+		if err != nil {
+			fmt.Println("数据库监听服务异常,err:", err)
+		}
+	}()
+	//includeTableRegex := []string{
+	//	"test_hz_data.edb_info",
+	//}
+
+	includeTableRegex := []string{
+		mysqlDb + ".edb_info$",
+		mysqlDb + ".edb_classify$",
+		mysqlDb + ".edb_data*",
+	}
+	cfg := &canal.Config{
+		// 一个32位无符号整数,用于标识当前 Canal 实例在 MySQL 主从复制体系中的身份。这里使用了一个随机数生成器确保每次启动时分配的 ServerID 是唯一的(在1001到1099之间)。在实际生产环境中,你需要手动指定一个全局唯一的 ServerID。
+		ServerID: uint32(rand.New(rand.NewSource(time.Now().Unix())).Intn(1000)) + 1001,
+		// 指定 Canal 要连接的数据库类型,默认为 "mysql",表明这是一个 MySQL 数据库。
+		Flavor: "mysql",
+		// 设置 MySQL 服务器地址(主机名或 IP 地址)和端口,例如 "127.0.0.1:3306"。
+		Addr:     mysqlHost,
+		User:     mysqlUser,
+		Password: mysqlPwd,
+		// 如果设置为 true,Canal 将以原始二进制格式获取 binlog,否则将以解析后的 SQL 语句形式提供。
+		//RawModeEnabled:  false,
+		// 是否启用半同步复制。当设置为 true 时,MySQL 主库在事务提交后会等待至少一个从库确认已接收并写入 binlog 才返回成功,提高了数据一致性。
+		SemiSyncEnabled: false,
+		//  是否将 MySQL 中的 decimal 类型字段解析为 Go 的 Decimal 类型,而不是 float 或者 string。如果业务中有精确小数计算的需求,应开启此选项以避免精度丢失问题。
+		UseDecimal: true,
+		// 用于控制初始数据导出的相关配置,在 Canal 启动时是否需要全量同步表数据。
+		//Dump:              dumpConf,
+		// 正则表达式字符串,用于定义 Canal 应该监听哪些表的 binlog 事件。只有名称匹配该正则表达式的表才会被 Canal 同步处理。
+		IncludeTableRegex: includeTableRegex,
+	}
+
+	// 校验mysql binlog format,目前仅支持row格式
+	{
+		binlogFormat, tmpErr := index.GetBinlogFormat()
+		if tmpErr != nil {
+			err = tmpErr
+			return
+		}
+
+		if binlogFormat.Value != "ROW" {
+			panic("mysql binlog format is not ROW")
+			return
+		}
+	}
+
+	var fileName string
+	var position uint32
+
+	fileName = global.Rc.GetStr(utils.CACHE_MYSQL_MASTER_FILENAME)
+	position64, tmpErr := global.Rc.GetUInt64(utils.CACHE_MYSQL_MASTER_POSITION)
+	if tmpErr != nil && tmpErr.Error() != utils.RedisNoKeyErr {
+		panic("mysql binlog position is not found,err:" + tmpErr.Error())
+		return
+	}
+	position = uint32(position64)
+
+	// 如果没有从redis中获取到上次监听到的binlog的文件名称,或者位置为0,则从mysql中获取,则从 MySQL 中获取最新的文件名和位置。
+	if fileName == `` || position == 0 {
+		item, tmpErr := index.GetShowMaster()
+		if tmpErr != nil {
+			err = tmpErr
+			return
+		}
+		fileName = item.File
+		position = item.Position
+	}
+
+	c, err := canal.NewCanal(cfg)
+	if err != nil {
+		fmt.Println("err:", err)
+		return
+	}
+
+	binlogHandler := &MyEventHandler{}
+	binlogHandler.SetBinlogFileName(fileName, position)
+	c.SetEventHandler(binlogHandler)
+	//c.Run()
+
+	pos := mysql.Position{
+		Name: fileName,
+		Pos:  position,
+	}
+	err = c.RunFrom(pos)
+}

+ 250 - 0
services/binlog/handler.go

@@ -0,0 +1,250 @@
+package binlog
+
+import (
+	"encoding/json"
+	"eta/eta_bridge/global"
+	"eta/eta_bridge/models/index"
+	"eta/eta_bridge/utils"
+	"fmt"
+	"github.com/go-mysql-org/go-mysql/canal"
+	"github.com/go-mysql-org/go-mysql/mysql"
+	"github.com/go-mysql-org/go-mysql/replication"
+	"github.com/pingcap/errors"
+	"reflect"
+	"time"
+)
+
+type MyEventHandler struct {
+	canal.DummyEventHandler
+	fileName string
+	position uint32
+}
+
+func (h *MyEventHandler) OnRow(e *canal.RowsEvent) (err error) {
+	//fmt.Printf("%s %v\n", e.Action, e.Rows)
+	//fmt.Println(e.Table.Columns)
+	//fmt.Println(e.Action)
+
+	switch e.Action {
+	case canal.InsertAction:
+		err = h.Insert(e)
+	case canal.UpdateAction:
+		err = h.Update(e)
+	case canal.DeleteAction:
+		err = h.Delete(e)
+	default:
+		return errors.New("操作异常")
+	}
+
+	fmt.Println("fileName:", h.fileName, ";position:", h.position)
+
+	// 每次操作完成后都将当前位置记录到缓存
+	global.Rc.Put(utils.CACHE_MYSQL_MASTER_FILENAME, h.fileName, 24*time.Hour)
+	global.Rc.Put(utils.CACHE_MYSQL_MASTER_POSITION, h.position, 24*time.Hour)
+
+	return nil
+}
+
+func (h *MyEventHandler) OnPosSynced(header *replication.EventHeader, p mysql.Position, set mysql.GTIDSet, f bool) error {
+	h.fileName = p.Name
+	h.position = p.Pos
+
+	// 旋转binlog日志的时候,需要将当前位置记录到缓存
+	global.Rc.Put(utils.CACHE_MYSQL_MASTER_FILENAME, h.fileName, 24*time.Hour)
+	global.Rc.Put(utils.CACHE_MYSQL_MASTER_POSITION, h.position, 24*time.Hour)
+
+	return nil
+}
+
+func (h *MyEventHandler) String() string {
+	return "MyEventHandler"
+}
+
+func (h *MyEventHandler) Insert(e *canal.RowsEvent) error {
+	// 批量插入的时候,e.Rows的长度会大于0
+	//if len(e.Rows) != 1 {
+	//	fmt.Println("新增数据异常,没有新数据:", e.Rows)
+	//	return nil
+	//}
+	fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
+
+	for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list)
+		logData := make(map[string]interface{})
+		dataLen := len(row)
+		for i, v := range e.Table.Columns {
+			if i < dataLen {
+				tmpData := row[i]
+				if tmpData != nil && reflect.TypeOf(tmpData).Kind() == reflect.Slice {
+					tmpOld := tmpData.([]byte)
+					tmpData = string(tmpOld)
+				}
+				logData[v.Name] = tmpData
+				//tmpV = fmt.Sprintf("%v", tmpData)
+			}
+		}
+		dataByte, _ := json.Marshal(logData)
+		log(e.Table.Schema, e.Table.Name, e.Action, ``, string(dataByte))
+	}
+
+	return nil
+}
+
+func (h *MyEventHandler) Update(e *canal.RowsEvent) error {
+	if len(e.Rows) != 2 {
+		fmt.Println("更新数据异常,没有原始数据和新数据:", e.Rows)
+		return nil
+	}
+
+	//fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
+
+	logOldData := make(map[string]interface{})
+	logNewData := make(map[string]interface{})
+
+	oldDataLen := len(e.Rows[0])
+	newDataLen := len(e.Rows[0])
+	//maxDataLen := oldDataLen
+	//if maxDataLen < newDataLen {
+	//	maxDataLen = newDataLen
+	//}
+	for i, v := range e.Table.Columns {
+		//if v.IsUnsigned
+		//var tmpV string
+		//if i < dataLen {
+		//	tmpV = fmt.Sprintf("原数据:%v;新数据:%v", e.Rows[0][i], e.Rows[1][i])
+		//}
+		//fmt.Println(v.Name, ":", tmpV)
+
+		if i < oldDataLen {
+			oldData := e.Rows[0][i]
+			if oldData != nil && reflect.TypeOf(oldData).Kind() == reflect.Slice {
+				tmpOld := oldData.([]byte)
+				oldData = string(tmpOld)
+			}
+			logOldData[v.Name] = oldData
+		}
+		if i < newDataLen {
+			newData := e.Rows[1][i]
+			if newData != nil && reflect.TypeOf(newData).Kind() == reflect.Slice {
+				tmpNew := newData.([]byte)
+				newData = string(tmpNew)
+			}
+			logNewData[v.Name] = newData
+		}
+
+		//if i < maxDataLen {
+		//	oldData := e.Rows[0][i]
+		//	newData := e.Rows[1][i]
+		//
+		//	if oldData != nil && reflect.TypeOf(oldData).Kind() == reflect.Slice {
+		//		tmpOld := oldData.([]byte)
+		//		oldData = string(tmpOld)
+		//	}
+		//	if newData != nil && reflect.TypeOf(newData).Kind() == reflect.Slice {
+		//		tmpNew := newData.([]byte)
+		//		newData = string(tmpNew)
+		//	}
+		//
+		//
+		//	//if oldData != newData {
+		//	//	tmpV = fmt.Sprintf("原数据:%v;新数据:%v", oldData, newData)
+		//	//}
+		//}
+		//if tmpV != `` {
+		//	fmt.Println(v.Name, ":", tmpV)
+		//}
+	}
+
+	logOldDataByte, _ := json.Marshal(logOldData)
+	logNewDataByte, _ := json.Marshal(logNewData)
+	log(e.Table.Schema, e.Table.Name, e.Action, string(logOldDataByte), string(logNewDataByte))
+
+	return nil
+}
+
+func (h *MyEventHandler) Delete(e *canal.RowsEvent) error {
+	// 批量删除的时候,e.Rows的长度会大于0
+	//if len(e.Rows) != 1 {
+	//	fmt.Println("删除数据异常,没有原始数据:", e.Rows)
+	//	return nil
+	//}
+	fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
+
+	for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list)
+		logData := make(map[string]interface{})
+		dataLen := len(row)
+		for i, v := range e.Table.Columns {
+			if i < dataLen {
+				tmpData := row[i]
+				if tmpData != nil && reflect.TypeOf(tmpData).Kind() == reflect.Slice {
+					tmpOld := tmpData.([]byte)
+					tmpData = string(tmpOld)
+				}
+				logData[v.Name] = tmpData
+				//tmpV = fmt.Sprintf("%v", tmpData)
+			}
+		}
+		dataByte, _ := json.Marshal(logData)
+		log(e.Table.Schema, e.Table.Name, e.Action, string(dataByte), ``)
+	}
+
+	return nil
+}
+
+func (h *MyEventHandler) Delete3(e *canal.RowsEvent) error {
+	if len(e.Rows) != 1 {
+		fmt.Println("删除数据异常,没有原始数据:", e.Rows)
+		return nil
+	}
+	fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
+
+	dataLen := len(e.Rows[0])
+	logData := make(map[string]interface{})
+	for i, v := range e.Table.Columns {
+		//var tmpV interface{}
+		if i < dataLen {
+			//tmpV = fmt.Sprintf("%v", e.Rows[0][i])
+			tmpData := e.Rows[0][i]
+			if tmpData != nil && reflect.TypeOf(tmpData).Kind() == reflect.Slice {
+				tmpOld := tmpData.([]byte)
+				tmpData = string(tmpOld)
+			}
+			logData[v.Name] = tmpData
+			//fmt.Println(oldData)
+		}
+	}
+
+	dataByte, _ := json.Marshal(logData)
+	log(e.Table.Schema, e.Table.Name, e.Action, string(dataByte), ``)
+
+	return nil
+}
+
+// SetBinlogFileName
+// @Description: 设置当前的binlog文件名和位置
+// @author: Roc
+// @receiver h
+// @datetime 2024-02-29 18:09:36
+// @param fileName string
+// @param position uint32
+func (h *MyEventHandler) SetBinlogFileName(fileName string, position uint32) {
+	h.fileName = fileName
+	h.position = position
+
+	fmt.Println("init fileName:", h.fileName, ";position:", h.position)
+}
+
+// log 简单的日志记录
+func log(dbName, tableName, opType, oldData, newData string) {
+	item := index.EdbUpdateLog{
+		OpDbName:    dbName,
+		OpTableName: tableName,
+		OpType:      opType,
+		OldData:     oldData,
+		NewData:     newData,
+		CreateTime:  time.Now(),
+	}
+	err := item.Create()
+	if err != nil {
+		fmt.Println("log create err:", err.Error())
+	}
+}

+ 230 - 18
services/xiangyu/index.go

@@ -27,26 +27,28 @@ type PushResp struct {
 	ExpiresIn    int    `json:"expires_in"`
 }
 
-type PushDataReq struct {
-	In0        In0              `json:"in0"`
-	Parameters PushDataParamReq `json:"parameters" description:"业务报文"`
+// PushIndexReq
+// @Description: 指标推送请求
+type PushIndexReq struct {
+	In0        In0               `json:"in0"`
+	Parameters PushIndexParamReq `json:"parameters" description:"业务报文"`
 }
 
-// PushDataParamReq
+// PushIndexParamReq
 // @Description: 业务报文
-type PushDataParamReq struct {
-	SerialID    string                 `json:"serialID" description:"流水号"`
-	TableCode   string                 `json:"tableCode" description:"数据表编码"`
-	Total       int                    `json:"total" description:"本次落表数据总数"`
-	IsEmailWarn int                    `json:"isEmailWarn" description:"是否发送预警邮件,(1-是 0-否)"`
-	Data        []PushDataParamDataReq `json:"data" description:"报文体,指标数据列表"`
+type PushIndexParamReq struct {
+	SerialID    string             `json:"serialID" description:"流水号"`
+	TableCode   string             `json:"tableCode" description:"数据表编码"`
+	Total       int                `json:"total" description:"本次落表数据总数"`
+	IsEmailWarn int                `json:"isEmailWarn" description:"是否发送预警邮件,(1-是 0-否)"`
+	Data        []PushIndexItemReq `json:"data" description:"报文体,指标数据列表"`
 }
 
-// PushDataParamDataReq
+// PushIndexItemReq
 // @Description: 指标数据结构
-type PushDataParamDataReq struct {
+type PushIndexItemReq struct {
 	SourceIndexCode string `json:"source_index_code" description:"上游来源指标ID"`
-	IndexCode       string `json:"Index_code" description:""`
+	IndexCode       string `json:"index_code" description:""`
 	IndexName       string `json:"index_name" description:""`
 	IndexShortName  string `json:"index_short_name" description:""`
 	FrequenceName   string `json:"frequence_name" description:""`
@@ -82,7 +84,7 @@ type PushDataParamDataReq struct {
 	Status         int    `json:"status" description:""`
 }
 
-type PushDataResp struct {
+type PushIndexDataResp struct {
 	Out struct {
 		ReturnCode string `json:"returnCode"`
 		ReturnMsg  string `json:"returnMsg"`
@@ -90,16 +92,169 @@ type PushDataResp struct {
 	} `json:"out"`
 }
 
-// GetToken
+// PushIndexData
+// @Description: 指标信息推送
+// @author: Roc
+// @datetime 2024-02-28 17:45:03
+// @param data PushIndexParamReq
+// @return resp *PushDataResp
+// @return err error
+func PushIndexData(data PushBaseParamReq) (resp *PushIndexDataResp, err error) {
+	urlPath := `/DAQ/CY/ProxyServices/pushMarketPricePS`
+	req := PushBaseReq{
+		In0: In0{
+			PageTotal: "",
+			PageNo:    "",
+			DocType:   "pushMarketPricePS",
+			Property:  "",
+			//DocCode:   getDocCode(),
+			Source: global.CONFIG.Xiangyu.SystemCode,
+			Target: global.CONFIG.Xiangyu.IndexSyncTarget,
+		},
+		Parameters: data,
+	}
+
+	postData, err := json.Marshal(req)
+	if err != nil {
+		return
+	}
+	result, err := HttpPostIndex(urlPath, string(postData))
+	if err != nil {
+		return
+	}
+
+	//  解析响应结果
+	err = json.Unmarshal(result, &resp)
+	if err != nil {
+		return
+	}
+
+	if resp.Out.ReturnCode != "S" {
+		err = errors.New(fmt.Sprintf("响应代码:%s,错误信息:%s", resp.Out.ReturnCode, resp.Out.ReturnMsg))
+		return
+	}
+
+	return
+}
+
+// PushIndexValueReq
+// @Description: 指标日期值推送请求
+type PushIndexValueReq struct {
+	In0        In0                    `json:"in0"`
+	Parameters PushIndexValueParamReq `json:"parameters" description:"业务报文"`
+}
+
+// PushIndexValueParamReq
+// @Description: 业务报文
+type PushIndexValueParamReq struct {
+	SerialID    string                  `json:"serialID" description:"流水号"`
+	TableCode   string                  `json:"tableCode" description:"数据表编码"`
+	Total       int                     `json:"total" description:"本次落表数据总数"`
+	IsEmailWarn int                     `json:"isEmailWarn" description:"是否发送预警邮件,(1-是 0-否)"`
+	Data        []PushIndexValueItemReq `json:"data" description:"报文体,指标日期值数据列表"`
+}
+
+// PushIndexValueItemReq
+// @Description: 指标日期值数据结构
+type PushIndexValueItemReq struct {
+	Id           string `json:"id"`
+	IndexCode    string `json:"index_code" description:"指标代码"`
+	Value        string `json:"value" description:"数值"`
+	BusinessDate string `json:"business_date" description:"业务日期(数据日期)"`
+	CreateTime   string `json:"create_time" description:"数据进入ETA的时间"`
+	UpdateTime   string `json:"update_time" description:"eta库中修改数据的时间"`
+	Status       string `json:"status" description:"逻辑删除使用,0-禁用,1-启用"`
+}
+
+// PushEdbValue
 // @Description: 获取token信息
 // @author: Roc
 // @datetime 2024-01-23 15:40:56
 // @param code string
 // @return resp *GetTokenResp
 // @return err error
-func PushData(data PushDataParamReq) (resp *PushDataResp, err error) {
+func PushEdbValue(data PushBaseParamReq) (resp *PushIndexDataResp, err error) {
+	urlPath := `/DAQ/CY/ProxyServices/pushMarketPricePS`
+	req := PushBaseReq{
+		In0: In0{
+			PageTotal: "",
+			PageNo:    "",
+			DocType:   "pushMarketPricePS",
+			Property:  "",
+			//DocCode:   getDocCode(),
+			Source: global.CONFIG.Xiangyu.SystemCode,
+			Target: global.CONFIG.Xiangyu.IndexSyncTarget,
+		},
+		Parameters: data,
+	}
+
+	postData, err := json.Marshal(req)
+	if err != nil {
+		return
+	}
+	result, err := HttpPostIndex(urlPath, string(postData))
+	if err != nil {
+		return
+	}
+
+	//  解析响应结果
+	err = json.Unmarshal(result, &resp)
+	if err != nil {
+		return
+	}
+
+	if resp.Out.ReturnCode != "S" {
+		err = errors.New(fmt.Sprintf("响应代码:%s,错误信息:%s", resp.Out.ReturnCode, resp.Out.ReturnMsg))
+		return
+	}
+
+	return
+}
+
+// PushBaseReq
+// @Description: 基础请求
+type PushBaseReq struct {
+	In0        In0              `json:"in0"`
+	Parameters PushBaseParamReq `json:"parameters" description:"业务报文"`
+}
+
+// PushBaseParamReq
+// @Description: 基础业务报文
+type PushBaseParamReq struct {
+	SerialID    string      `json:"serialID" description:"流水号"`
+	TableCode   string      `json:"tableCode" description:"数据表编码"`
+	Total       int         `json:"total" description:"本次落表数据总数"`
+	IsEmailWarn int         `json:"isEmailWarn" description:"是否发送预警邮件,(1-是 0-否)"`
+	Data        interface{} `json:"data" description:"报文体,指标日期值数据列表"`
+}
+
+// PushClassifyItemReq
+// @Description: 指标分类数据结构
+type PushClassifyItemReq struct {
+	ClassifyId      int    `json:"classify_id" description:"自增id"`
+	ClassifyType    int    `json:"classify_type" description:"分类类型,0:普通指标分类,1:预测指标分类"`
+	ClassifyName    string `json:"classify_name" description:"分类名称"`
+	ParentId        int    `json:"parent_id" description:"父级id"`
+	HasData         int    `json:"has_data" description:"是否存在指标数据,1:有,2:无"`
+	CreateTime      string `json:"create_time" description:"创建时间"`
+	UpdateTime      string `json:"update_time" description:"修改时间"`
+	SysUserId       int    `json:"sys_user_id" description:"创建人id"`
+	SysUserRealName string `json:"sys_user_real_name" description:"创建人姓名"`
+	Level           int    `json:"level" description:"层级"`
+	UniqueCode      string `json:"unique_code" description:"唯一编码"`
+	SortColumn      int    `json:"sort_column" description:"排序字段,越小越靠前,默认值:10"`
+}
+
+// PushClassify
+// @Description: 推送指标分类
+// @author: Roc
+// @datetime 2024-01-23 15:40:56
+// @param code string
+// @return resp *GetTokenResp
+// @return err error
+func PushClassify(data PushBaseParamReq) (resp *PushIndexDataResp, err error) {
 	urlPath := `/DAQ/CY/ProxyServices/pushMarketPricePS`
-	req := PushDataReq{
+	req := PushBaseReq{
 		In0: In0{
 			PageTotal: "",
 			PageNo:    "",
@@ -107,7 +262,7 @@ func PushData(data PushDataParamReq) (resp *PushDataResp, err error) {
 			Property:  "",
 			//DocCode:   getDocCode(),
 			Source: global.CONFIG.Xiangyu.SystemCode,
-			Target: global.CONFIG.Xiangyu.UserSyncTarget,
+			Target: global.CONFIG.Xiangyu.IndexSyncTarget,
 		},
 		Parameters: data,
 	}
@@ -135,6 +290,63 @@ func PushData(data PushDataParamReq) (resp *PushDataResp, err error) {
 	return
 }
 
+// PushBase
+// @Description: 基础推送接口
+// @author: Roc
+// @datetime 2024-01-23 15:40:56
+// @param code string
+// @return resp *GetTokenResp
+// @return err error
+func PushBase(data PushBaseParamReq) (resp *PushIndexDataResp, err error) {
+	urlPath := `/DAQ/CY/ProxyServices/pushMarketPricePS`
+	req := PushBaseReq{
+		In0: In0{
+			PageTotal: "",
+			PageNo:    "",
+			DocType:   "pushMarketPricePS",
+			Property:  "",
+			//DocCode:   getDocCode(),
+			Source: global.CONFIG.Xiangyu.SystemCode,
+			Target: global.CONFIG.Xiangyu.IndexSyncTarget,
+		},
+		Parameters: data,
+	}
+
+	postData, err := json.Marshal(req)
+	if err != nil {
+		return
+	}
+	result, err := HttpPostIndex(urlPath, string(postData))
+	if err != nil {
+		return
+	}
+
+	//  解析响应结果
+	err = json.Unmarshal(result, &resp)
+	if err != nil {
+		return
+	}
+
+	if resp.Out.ReturnCode != "S" {
+		err = errors.New(fmt.Sprintf("响应代码:%s,错误信息:%s", resp.Out.ReturnCode, resp.Out.ReturnMsg))
+		return
+	}
+
+	return
+}
+
+// PushEdbClassifyItemReq
+// @Description: 指标与目录的关系请求结构
+type PushEdbClassifyItemReq struct {
+	Id         string `json:"id" description:"唯一主键"`
+	ClassifyId int    `json:"classify_id" description:"目录分类ID"`
+	IndexCode  string `json:"index_code" description:"指标ID"`
+	CreateTime string `json:"create_time" description:"创建时间"`
+	CreateUser string `json:"create_user" description:"创建人"`
+	UpdateTime string `json:"update_time" description:"修改时间"`
+	UpdateUser string `json:"update_user" description:"修改人"`
+}
+
 // HttpPostIndex
 // @Description: post请求
 // @author: Roc

+ 3 - 0
utils/constants.go

@@ -97,3 +97,6 @@ const (
 	BusinessCodeSandbox = "E2023080700" // ETA体验版
 	BusinessCodeRelease = "E2023080900" // 弘则ETA
 )
+
+const CACHE_MYSQL_MASTER_FILENAME = "eta:mysql:binlog:filename"
+const CACHE_MYSQL_MASTER_POSITION = "eta:mysql:binlog:position"

+ 41 - 0
utils/redis.go

@@ -0,0 +1,41 @@
+package utils
+
+import (
+	"eta/eta_bridge/utils/redis"
+	"time"
+)
+
+type RedisClient interface {
+	Get(key string) interface{}
+	GetStr(key string) string
+	GetInt64(key string) (int64, error)
+	GetUInt64(key string) (uint64, error)
+	RedisBytes(key string) (data []byte, err error)
+	RedisString(key string) (data string, err error)
+	RedisInt(key string) (data int, err error)
+	Put(key string, val interface{}, timeout time.Duration) error
+	SetNX(key string, val interface{}, timeout time.Duration) bool
+	Delete(key string) error
+	IsExist(key string) bool
+	LPush(key string, val interface{}) error
+	Brpop(key string, callback func([]byte))
+	GetRedisTTL(key string) time.Duration
+	Incrby(key string, num int) (interface{}, error)
+	Do(commandName string, args ...interface{}) (reply interface{}, err error)
+}
+
+func InitRedis(redisType string, conf string) (redisClient RedisClient, err error) {
+	switch redisType {
+	case "cluster": // 集群
+		redisClient, err = redis.InitClusterRedis(conf)
+	default: // 默认走单机
+		redisClient, err = redis.InitStandaloneRedis(conf)
+	}
+
+	return
+}
+
+// redis没有key的错误
+const RedisNoKeyErr = "redis: nil"
+
+//const redisNoKeyErr = redis2.Error(redis2.Nil)

+ 294 - 0
utils/redis/cluster_redis.go

@@ -0,0 +1,294 @@
+package redis
+
+import (
+	"context"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"github.com/go-redis/redis/v8"
+	"strings"
+	"time"
+)
+
+// ClusterRedisClient
+// @Description: 集群的redis客户端
+type ClusterRedisClient struct {
+	redisClient *redis.ClusterClient
+}
+
+var DefaultKey = "zcmRedis"
+
+// InitClusterRedis
+// @Description: 初始化集群redis客户端
+// @param config
+// @return clusterRedisClient
+// @return err
+func InitClusterRedis(config string) (clusterRedisClient *ClusterRedisClient, err error) {
+	var cf map[string]string
+	err = json.Unmarshal([]byte(config), &cf)
+	if err != nil {
+		return
+	}
+	//if _, ok := cf["key"]; !ok {
+	//	cf["key"] = DefaultKey
+	//}
+
+	// 集群地址
+	connList := make([]string, 0)
+	if _, ok := cf["conn"]; !ok {
+		err = errors.New("config has no conn key")
+		return
+	} else {
+		connList = strings.Split(cf["conn"], ",")
+		if len(connList) <= 1 {
+			err = errors.New("conn address less than or equal to 1")
+			return
+		}
+	}
+
+	// 密码
+	if _, ok := cf["password"]; !ok {
+		cf["password"] = ""
+	}
+
+	// 创建 Redis 客户端配置对象
+	clusterOptions := &redis.ClusterOptions{
+		Addrs:    connList, // 设置 Redis 节点的 IP 地址和端口号
+		Password: cf["password"],
+	}
+
+	// 创建 Redis 集群客户端
+	client := redis.NewClusterClient(clusterOptions)
+
+	// 测试连接并获取信息
+	_, err = client.Ping(context.TODO()).Result()
+	if err != nil {
+		err = errors.New("redis 链接失败:" + err.Error())
+		return
+	}
+
+	clusterRedisClient = &ClusterRedisClient{redisClient: client}
+
+	return
+}
+
+// Get
+// @Description: 根据key获取数据(其实是返回的字节编码)
+// @receiver rc
+// @param key
+// @return interface{}
+func (rc *ClusterRedisClient) Get(key string) interface{} {
+	data, err := rc.redisClient.Get(context.TODO(), key).Bytes()
+	if err != nil {
+		return nil
+	}
+
+	return data
+}
+
+// GetStr
+// @Description: 根据key获取字符串数据
+// @receiver rc
+// @param key
+// @return string
+func (rc *ClusterRedisClient) GetStr(key string) string {
+	return rc.redisClient.Get(context.TODO(), key).Val()
+}
+
+// GetInt
+// @Description: 根据key获取int数据
+// @receiver rc
+// @param key
+// @return int
+// @return error
+func (rc *ClusterRedisClient) GetInt64(key string) (int64, error) {
+	return rc.redisClient.Get(context.TODO(), key).Int64()
+}
+
+// GetInt
+// @Description: 根据key获取int数据
+// @receiver rc
+// @param key
+// @return int
+// @return error
+func (rc *ClusterRedisClient) GetUInt64(key string) (uint64, error) {
+	return rc.redisClient.Get(context.TODO(), key).Uint64()
+}
+
+// RedisBytes
+// @Description: 根据key获取字节编码数据
+// @receiver rc
+// @param key
+// @return data
+// @return err
+func (rc *ClusterRedisClient) RedisBytes(key string) (data []byte, err error) {
+	data, err = rc.redisClient.Get(context.TODO(), key).Bytes()
+
+	return
+}
+
+// RedisString
+// @Description: 根据key获取字符串数据
+// @receiver rc
+// @param key
+// @return data
+// @return err
+func (rc *ClusterRedisClient) RedisString(key string) (data string, err error) {
+	data, err = rc.redisClient.Get(context.TODO(), key).Result()
+
+	return
+}
+
+// RedisInt
+// @Description: 根据key获取int数据
+// @receiver rc
+// @param key
+// @return data
+// @return err
+func (rc *ClusterRedisClient) RedisInt(key string) (data int, err error) {
+	data, err = rc.redisClient.Get(context.TODO(), key).Int()
+
+	return
+}
+
+// Put
+// @Description: put一个数据到redis
+// @receiver rc
+// @param key
+// @param val
+// @param timeout
+// @return error
+func (rc *ClusterRedisClient) Put(key string, val interface{}, timeout time.Duration) error {
+	var err error
+	err = rc.redisClient.SetEX(context.TODO(), key, val, timeout).Err()
+	if err != nil {
+		return err
+	}
+
+	err = rc.redisClient.HSet(context.TODO(), DefaultKey, key, true).Err()
+
+	return err
+}
+
+// SetNX
+// @Description: 设置一个会过期时间的值
+// @receiver rc
+// @param key
+// @param val
+// @param timeout
+// @return bool
+func (rc *ClusterRedisClient) SetNX(key string, val interface{}, timeout time.Duration) bool {
+	result, err := rc.redisClient.SetEX(context.TODO(), key, val, timeout).Result()
+	if err != nil || result != "OK" {
+		return false
+	}
+
+	return true
+}
+
+// Delete
+// @Description: 删除redis中的键值对
+// @receiver rc
+// @param key
+// @return error
+func (rc *ClusterRedisClient) Delete(key string) error {
+	var err error
+
+	err = rc.redisClient.Del(context.TODO(), key).Err()
+	if err != nil {
+		return err
+	}
+
+	err = rc.redisClient.HDel(context.TODO(), DefaultKey, key).Err()
+
+	return err
+}
+
+// IsExist
+// @Description: 根据key判断是否写入缓存中
+// @receiver rc
+// @param key
+// @return bool
+func (rc *ClusterRedisClient) IsExist(key string) bool {
+	result, err := rc.redisClient.Exists(context.TODO(), key).Result()
+	if err != nil {
+		return false
+	}
+	if result == 0 {
+		_ = rc.redisClient.HDel(context.TODO(), DefaultKey, key).Err()
+		return false
+	}
+
+	return true
+}
+
+// LPush
+// @Description: 写入list
+// @receiver rc
+// @param key
+// @param val
+// @return error
+func (rc *ClusterRedisClient) LPush(key string, val interface{}) error {
+	data, _ := json.Marshal(val)
+	err := rc.redisClient.LPush(context.TODO(), key, data).Err()
+
+	return err
+}
+
+// Brpop
+// @Description: 从list中读取
+// @receiver rc
+// @param key
+// @param callback
+func (rc *ClusterRedisClient) Brpop(key string, callback func([]byte)) {
+	values, err := rc.redisClient.BRPop(context.TODO(), 1*time.Second, key).Result()
+	if err != nil {
+		return
+	}
+	if len(values) < 2 {
+		fmt.Println("assert is wrong")
+		return
+	}
+
+	callback([]byte(values[1]))
+
+}
+
+// GetRedisTTL
+// @Description: 获取key的过期时间
+// @receiver rc
+// @param key
+// @return time.Duration
+func (rc *ClusterRedisClient) GetRedisTTL(key string) time.Duration {
+	value, err := rc.redisClient.TTL(context.TODO(), key).Result()
+	if err != nil {
+		return 0
+	}
+
+	return value
+
+}
+
+// Incrby
+// @Description: 设置自增值
+// @receiver rc
+// @param key
+// @param num
+// @return interface{}
+// @return error
+func (rc *ClusterRedisClient) Incrby(key string, num int) (interface{}, error) {
+	return rc.redisClient.IncrBy(context.TODO(), key, int64(num)).Result()
+}
+
+// Do
+// @Description: cmd执行redis命令
+// @receiver rc
+// @param commandName
+// @param args
+// @return reply
+// @return err
+func (rc *ClusterRedisClient) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
+	newArgs := []interface{}{commandName}
+	newArgs = append(newArgs, args...)
+	return rc.redisClient.Do(context.TODO(), newArgs...).Result()
+}

+ 286 - 0
utils/redis/standalone_redis.go

@@ -0,0 +1,286 @@
+package redis
+
+import (
+	"context"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"github.com/go-redis/redis/v8"
+	"strconv"
+	"time"
+)
+
+// StandaloneRedisClient
+// @Description: 单机redis客户端
+type StandaloneRedisClient struct {
+	redisClient *redis.Client
+}
+
+func InitStandaloneRedis(config string) (standaloneRedis *StandaloneRedisClient, err error) {
+	var cf map[string]string
+	err = json.Unmarshal([]byte(config), &cf)
+	if err != nil {
+		return
+	}
+	//if _, ok := cf["key"]; !ok {
+	//	cf["key"] = DefaultKey
+	//}
+
+	if _, ok := cf["conn"]; !ok {
+		err = errors.New("config has no conn key")
+		return
+	}
+
+	// db库
+	dbNum := 0
+	// 如果指定了db库
+	if _, ok := cf["dbNum"]; ok {
+		dbNum, err = strconv.Atoi(cf["dbNum"])
+		if err != nil {
+			return
+		}
+	}
+
+	// 密码
+	if _, ok := cf["password"]; !ok {
+		cf["password"] = ""
+	}
+
+	client := redis.NewClient(&redis.Options{
+		Addr:     cf["conn"],
+		Password: cf["password"],
+		DB:       dbNum,
+		//PoolSize: 10, //连接池最大socket连接数,默认为10倍CPU数, 10 * runtime.NumCPU(暂不配置)
+	})
+
+	_, err = client.Ping(context.TODO()).Result()
+	if err != nil {
+		err = errors.New("redis 链接失败:" + err.Error())
+		return
+	}
+
+	standaloneRedis = &StandaloneRedisClient{redisClient: client}
+
+	return
+}
+
+// Get
+// @Description: 根据key获取数据(其实是返回的字节编码)
+// @receiver rc
+// @param key
+// @return interface{}
+func (rc *StandaloneRedisClient) Get(key string) interface{} {
+	data, err := rc.redisClient.Get(context.TODO(), key).Bytes()
+	if err != nil {
+		return nil
+	}
+
+	return data
+}
+
+// GetStr
+// @Description: 根据key获取字符串数据
+// @receiver rc
+// @param key
+// @return string
+func (rc *StandaloneRedisClient) GetStr(key string) string {
+	return rc.redisClient.Get(context.TODO(), key).Val()
+}
+
+// GetInt
+// @Description: 根据key获取int数据
+// @receiver rc
+// @param key
+// @return int
+// @return error
+func (rc *StandaloneRedisClient) GetInt64(key string) (int64, error) {
+	return rc.redisClient.Get(context.TODO(), key).Int64()
+}
+
+// GetInt
+// @Description: 根据key获取int数据
+// @receiver rc
+// @param key
+// @return int
+// @return error
+func (rc *StandaloneRedisClient) GetUInt64(key string) (uint64, error) {
+	return rc.redisClient.Get(context.TODO(), key).Uint64()
+}
+
+// RedisBytes
+// @Description: 根据key获取字节编码数据
+// @receiver rc
+// @param key
+// @return data
+// @return err
+func (rc *StandaloneRedisClient) RedisBytes(key string) (data []byte, err error) {
+	data, err = rc.redisClient.Get(context.TODO(), key).Bytes()
+
+	return
+}
+
+// RedisString
+// @Description: 根据key获取字符串数据
+// @receiver rc
+// @param key
+// @return data
+// @return err
+func (rc *StandaloneRedisClient) RedisString(key string) (data string, err error) {
+	data, err = rc.redisClient.Get(context.TODO(), key).Result()
+
+	return
+}
+
+// RedisInt
+// @Description: 根据key获取int数据
+// @receiver rc
+// @param key
+// @return data
+// @return err
+func (rc *StandaloneRedisClient) RedisInt(key string) (data int, err error) {
+	data, err = rc.redisClient.Get(context.TODO(), key).Int()
+
+	return
+}
+
+// Put
+// @Description: put一个数据到redis
+// @receiver rc
+// @param key
+// @param val
+// @param timeout
+// @return error
+func (rc *StandaloneRedisClient) Put(key string, val interface{}, timeout time.Duration) error {
+	var err error
+	err = rc.redisClient.SetEX(context.TODO(), key, val, timeout).Err()
+	if err != nil {
+		return err
+	}
+
+	err = rc.redisClient.HSet(context.TODO(), DefaultKey, key, true).Err()
+
+	return err
+}
+
+// SetNX
+// @Description: 设置一个会过期时间的值
+// @receiver rc
+// @param key
+// @param val
+// @param timeout
+// @return bool
+func (rc *StandaloneRedisClient) SetNX(key string, val interface{}, timeout time.Duration) bool {
+	result, err := rc.redisClient.SetEX(context.TODO(), key, val, timeout).Result()
+	if err != nil || result != "OK" {
+		return false
+	}
+
+	return true
+}
+
+// Delete
+// @Description: 删除redis中的键值对
+// @receiver rc
+// @param key
+// @return error
+func (rc *StandaloneRedisClient) Delete(key string) error {
+	var err error
+
+	err = rc.redisClient.Del(context.TODO(), key).Err()
+	if err != nil {
+		return err
+	}
+
+	err = rc.redisClient.HDel(context.TODO(), DefaultKey, key).Err()
+
+	return err
+}
+
+// IsExist
+// @Description: 根据key判断是否写入缓存中
+// @receiver rc
+// @param key
+// @return bool
+func (rc *StandaloneRedisClient) IsExist(key string) bool {
+	result, err := rc.redisClient.Exists(context.TODO(), key).Result()
+	if err != nil {
+		return false
+	}
+	if result == 0 {
+		_ = rc.redisClient.HDel(context.TODO(), DefaultKey, key).Err()
+		return false
+	}
+
+	return true
+}
+
+// LPush
+// @Description: 写入list
+// @receiver rc
+// @param key
+// @param val
+// @return error
+func (rc *StandaloneRedisClient) LPush(key string, val interface{}) error {
+	data, _ := json.Marshal(val)
+	err := rc.redisClient.LPush(context.TODO(), key, data).Err()
+
+	return err
+}
+
+// Brpop
+// @Description: 从list中读取
+// @receiver rc
+// @param key
+// @param callback
+func (rc *StandaloneRedisClient) Brpop(key string, callback func([]byte)) {
+	values, err := rc.redisClient.BRPop(context.TODO(), 1*time.Second, key).Result()
+	if err != nil {
+		return
+	}
+	if len(values) < 2 {
+		fmt.Println("assert is wrong")
+		return
+	}
+
+	callback([]byte(values[1]))
+
+}
+
+// GetRedisTTL
+// @Description: 获取key的过期时间
+// @receiver rc
+// @param key
+// @return time.Duration
+func (rc *StandaloneRedisClient) GetRedisTTL(key string) time.Duration {
+	value, err := rc.redisClient.TTL(context.TODO(), key).Result()
+	if err != nil {
+		return 0
+	}
+
+	return value
+
+}
+
+// Incrby
+// @Description: 设置自增值
+// @receiver rc
+// @param key
+// @param num
+// @return interface{}
+// @return error
+func (rc *StandaloneRedisClient) Incrby(key string, num int) (interface{}, error) {
+	return rc.redisClient.IncrBy(context.TODO(), key, int64(num)).Result()
+}
+
+// Do
+// @Description: cmd执行redis命令
+// @receiver rc
+// @param commandName
+// @param args
+// @return reply
+// @return err
+func (rc *StandaloneRedisClient) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
+	newArgs := []interface{}{commandName}
+	newArgs = append(newArgs, args...)
+	return rc.redisClient.Do(context.TODO(), newArgs...).Result()
+}