Browse Source

fix:优化binlog监听逻辑

zqbao 4 months ago
parent
commit
0cb44566dc

+ 7 - 0
models/edb_monitor/edb_monitor.go

@@ -83,6 +83,13 @@ func GetEdbMonitorInfoById(id int) (item *EdbMonitorInfo, err error) {
 	return
 }
 
+func GetEdbMonitorInfoByEdbInfoId(edbInfoId int) (items []*EdbMonitorInfo, err error) {
+	o := orm.NewOrmUsingDB("data")
+	sql := `SELECT * FROM edb_monitor_info WHERE edb_info_id =?`
+	_, err = o.Raw(sql, edbInfoId).QueryRows(&items)
+	return
+}
+
 func GetEdbMonitorInfoCountByClassifyId(classifyId []int) (count int, err error) {
 	if len(classifyId) == 0 {
 		return

+ 45 - 57
services/binlog/handler.go

@@ -1,7 +1,6 @@
 package binlog
 
 import (
-	edbmonitor "eta/eta_api/models/edb_monitor"
 	edbmonitorSvr "eta/eta_api/services/edb_monitor"
 	"eta/eta_api/utils"
 	"fmt"
@@ -14,27 +13,6 @@ import (
 	"github.com/go-mysql-org/go-mysql/schema"
 )
 
-type EdbInfoBingLog struct {
-	EdbInfoId        int    `orm:"column(edb_info_id);pk"`
-	EdbInfoType      int    `description:"指标类型,0:普通指标,1:预测指标"`
-	Source           int    `description:"来源id"`
-	EdbCode          string `description:"指标编码"`
-	StartDate        string `description:"起始日期"`
-	EndDate          string `description:"终止日期"`
-	UniqueCode       string `description:"指标唯一编码"`
-	CreateTime       string
-	ModifyTime       string
-	BaseModifyTime   string
-	MinValue         float64 `description:"指标最小值"`
-	MaxValue         float64 `description:"指标最大值"`
-	LatestDate       string  `description:"数据最新日期(实际日期)"`
-	LatestValue      float64 `description:"数据最新值(实际值)"`
-	EndValue         float64 `description:"数据的最新值(预测日期的最新值)"`
-	DataUpdateTime   string  `description:"最近一次数据发生变化的时间"`
-	ErDataUpdateDate string  `description:"本次更新,数据发生变化的最早日期"`
-	SubSource        int     `description:"子数据来源:0:经济数据库,1:日期序列"`
-}
-
 type EdbEventHandler struct {
 	canal.DummyEventHandler
 	fileName string
@@ -86,28 +64,33 @@ func (h *EdbEventHandler) String() string {
 func (h *EdbEventHandler) Insert(e *canal.RowsEvent) error {
 	// 批量插入的时候,e.Rows的长度会大于0
 	fmt.Println(e.Header.ServerID, ";", e.Table.Schema, ".", e.Table.Name)
-	edbMonitorList, err := edbmonitor.GetEdbMonitorEdbInfoList()
-	if err != nil {
-		return err
-	}
-	edbMonitorMap := make(map[int][]*edbmonitor.EdbMonitorInfo)
-	for _, v := range edbMonitorList {
-		if _, ok := edbMonitorMap[v.EdbInfoId]; !ok {
-			edbMonitorMap[v.EdbInfoId] = make([]*edbmonitor.EdbMonitorInfo, 0)
-		}
-		edbMonitorMap[v.EdbInfoId] = append(edbMonitorMap[v.EdbInfoId], v)
-	}
-
 	for _, row := range e.Rows { // 遍历当前插入的数据列表(存在批量插入的情况,所以是list)
 		edbInfo := h.MapRowToStruct(e.Table.Columns, row)
-		if monitors, ok := edbMonitorMap[edbInfo.EdbInfoId]; ok {
-			for _, monitor := range monitors {
-				err = edbmonitorSvr.ModifyEdbMonitorState(monitor, edbInfo.EdbCode, edbInfo.Source, edbInfo.SubSource)
+		if ok := edbmonitorSvr.EdbLocalSet.IsExist(edbInfo.EdbInfoId); ok {
+			err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, edbInfo)
+			if err != nil {
+				return err
+			}
+		} else {
+			ok, err := utils.Rc.SIsMember(edbmonitorSvr.EDB_MONITOR_ID_SET_CACHE, edbInfo)
+			if err != nil {
+				return err
+			}
+			if ok {
+				err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, edbInfo)
 				if err != nil {
-					continue
+					return err
 				}
 			}
 		}
+		// if monitors, ok := edbMonitorMap[edbInfo.EdbInfoId]; ok {
+		// 	for _, monitor := range monitors {
+		// 		err = edbmonitorSvr.ModifyEdbMonitorState(monitor, edbInfo.EdbCode, edbInfo.Source, edbInfo.SubSource)
+		// 		if err != nil {
+		// 			continue
+		// 		}
+		// 	}
+		// }
 	}
 	return nil
 }
@@ -117,33 +100,38 @@ func (h *EdbEventHandler) Update(e *canal.RowsEvent) error {
 		fmt.Println("更新数据异常,没有原始数据和新数据:", e.Rows)
 		return nil
 	}
-
-	edbMonitorList, err := edbmonitor.GetEdbMonitorEdbInfoList()
-	if err != nil {
-		return err
-	}
-	edbMonitorMap := make(map[int][]*edbmonitor.EdbMonitorInfo)
-	for _, v := range edbMonitorList {
-		if _, ok := edbMonitorMap[v.EdbInfoId]; !ok {
-			edbMonitorMap[v.EdbInfoId] = make([]*edbmonitor.EdbMonitorInfo, 0)
-		}
-		edbMonitorMap[v.EdbInfoId] = append(edbMonitorMap[v.EdbInfoId], v)
-	}
-
 	edbInfo := h.MapRowToStruct(e.Table.Columns, e.Rows[1])
-	if monitors, ok := edbMonitorMap[edbInfo.EdbInfoId]; ok {
-		for _, monitor := range monitors {
-			err = edbmonitorSvr.ModifyEdbMonitorState(monitor, edbInfo.EdbCode, edbInfo.Source, edbInfo.SubSource)
+	if ok := edbmonitorSvr.EdbLocalSet.IsExist(edbInfo.EdbInfoId); ok {
+		err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, edbInfo)
+		if err != nil {
+			return err
+		}
+	} else {
+		ok, err := utils.Rc.SIsMember(edbmonitorSvr.EDB_MONITOR_ID_SET_CACHE, edbInfo)
+		if err != nil {
+			return err
+		}
+		if ok {
+			err := utils.Rc.LPush(edbmonitorSvr.EDB_MONITOR_HANDLE_LIST_CACHE, edbInfo)
 			if err != nil {
-				continue
+				return err
 			}
 		}
 	}
+
+	// if monitors, ok := edbMonitorMap[edbInfo.EdbInfoId]; ok {
+	// 	for _, monitor := range monitors {
+	// 		err = edbmonitorSvr.ModifyEdbMonitorState(monitor, edbInfo.EdbCode, edbInfo.Source, edbInfo.SubSource)
+	// 		if err != nil {
+	// 			continue
+	// 		}
+	// 	}
+	// }
 	return nil
 }
 
-func (h *EdbEventHandler) MapRowToStruct(columns []schema.TableColumn, row []interface{}) EdbInfoBingLog {
-	edbInfo := EdbInfoBingLog{}
+func (h *EdbEventHandler) MapRowToStruct(columns []schema.TableColumn, row []interface{}) edbmonitorSvr.EdbInfoBingLog {
+	edbInfo := edbmonitorSvr.EdbInfoBingLog{}
 	for i, column := range columns {
 		value := reflect.ValueOf(row[i])
 		switch column.Name {

+ 38 - 0
services/edb_monitor/constants.go

@@ -0,0 +1,38 @@
+package edbmonitor
+
+const EDB_MONITOR_ID_SET_CACHE = "eta:edb_monitor:monitor:id"
+const EDB_MONITOR_HANDLE_LIST_CACHE = "eta:edb_monitor:handle:id"
+
+// 预警触发状态
+const (
+	EDB_MONITOR_STATE_CLOSE = iota
+	EDB_MONITOR_STATE_NO_TRIGGER
+	EDB_MONITOR_STATE_TRIGGER_SUCCESS
+)
+
+// 预警突破方式
+const (
+	EDB_MONITOR_TYPE_UP = iota
+	EDB_MONITOR_TYPE_DOWN
+)
+
+type EdbInfoBingLog struct {
+	EdbInfoId        int
+	EdbInfoType      int    `description:"指标类型,0:普通指标,1:预测指标"`
+	Source           int    `description:"来源id"`
+	EdbCode          string `description:"指标编码"`
+	StartDate        string `description:"起始日期"`
+	EndDate          string `description:"终止日期"`
+	UniqueCode       string `description:"指标唯一编码"`
+	CreateTime       string
+	ModifyTime       string
+	BaseModifyTime   string
+	MinValue         float64 `description:"指标最小值"`
+	MaxValue         float64 `description:"指标最大值"`
+	LatestDate       string  `description:"数据最新日期(实际日期)"`
+	LatestValue      float64 `description:"数据最新值(实际值)"`
+	EndValue         float64 `description:"数据的最新值(预测日期的最新值)"`
+	DataUpdateTime   string  `description:"最近一次数据发生变化的时间"`
+	ErDataUpdateDate string  `description:"本次更新,数据发生变化的最早日期"`
+	SubSource        int     `description:"子数据来源:0:经济数据库,1:日期序列"`
+}

+ 128 - 23
services/edb_monitor/edb_monitor.go

@@ -1,6 +1,7 @@
 package edbmonitor
 
 import (
+	"encoding/json"
 	"eta/eta_api/models/data_manage"
 	edbmonitor "eta/eta_api/models/edb_monitor"
 	"eta/eta_api/models/edb_monitor/request"
@@ -10,23 +11,43 @@ import (
 	"fmt"
 	"strconv"
 	"strings"
+	"sync"
 	"time"
 
 	"github.com/rdlucklib/rdluck_tools/paging"
 )
 
-// 预警触发状态
-const (
-	EDB_MONITOR_STATE_CLOSE = iota
-	EDB_MONITOR_STATE_NO_TRIGGER
-	EDB_MONITOR_STATE_TRIGGER_SUCCESS
-)
+type SafeEdbMonitorSet struct {
+	m  map[int]struct{}
+	mu sync.Mutex
+}
 
-// 预警突破方式
-const (
-	EDB_MONITOR_TYPE_UP = iota
-	EDB_MONITOR_TYPE_DOWN
-)
+func NewSafeEdbMonitorSet() *SafeEdbMonitorSet {
+	return &SafeEdbMonitorSet{
+		m: make(map[int]struct{}),
+	}
+}
+
+func (e *SafeEdbMonitorSet) Add(id int) {
+	e.mu.Lock()
+	defer e.mu.Unlock()
+	e.m[id] = struct{}{}
+}
+
+func (e *SafeEdbMonitorSet) IsExist(id int) (ok bool) {
+	e.mu.Lock()
+	defer e.mu.Unlock()
+	_, ok = e.m[id]
+	return
+}
+
+func (e *SafeEdbMonitorSet) Remove(id int) {
+	e.mu.Lock()
+	defer e.mu.Unlock()
+	delete(e.m, id)
+}
+
+var EdbLocalSet = NewSafeEdbMonitorSet()
 
 func GetMonitorList(classifyId, level, state, userId string, pageSize, currentIndex int) (resp response.EdbMonitorInfoListResp, msg string, err error) {
 	if pageSize <= 0 {
@@ -269,7 +290,12 @@ func SaveEdbMonitorInfo(req request.EdbMonitorInfoSaveReq, adminId int) (msg str
 		}
 		edbMonitorInfo.EdbMonitorId = int(insertId)
 	}
-	err = ModifyEdbMonitorState(edbMonitorInfo, edb.EdbCode, edb.Source, edb.SubSource)
+	err = utils.Rc.SAdd(EDB_MONITOR_ID_SET_CACHE, edbMonitorInfo.EdbInfoId)
+	if err != nil {
+		utils.FileLog.Info("监控指标添加缓存失败", err)
+	}
+	EdbLocalSet.Add(edbMonitorInfo.EdbInfoId)
+	err = ModifyEdbMonitorStateAndSendMsg(edbMonitorInfo, edb.EdbCode, edb.Source, edb.SubSource)
 	if err != nil {
 		msg = "更新指标预警失败"
 		err = fmt.Errorf("ModifyEdbMonitorState err:%w", err)
@@ -278,7 +304,7 @@ func SaveEdbMonitorInfo(req request.EdbMonitorInfoSaveReq, adminId int) (msg str
 	return
 }
 
-func ModifyEdbMonitorState(edbMonitorInfo *edbmonitor.EdbMonitorInfo, edbCode string, source, subSource int) (err error) {
+func ModifyEdbMonitorStateAndSendMsg(edbMonitorInfo *edbmonitor.EdbMonitorInfo, edbCode string, source, subSource int) (err error) {
 	cond := ` AND edb_code = ? `
 	pars := []interface{}{edbCode}
 	latestTwoData, er := data_manage.GetEdbDataListByCondition(cond, pars, source, subSource, 2, 0)
@@ -286,6 +312,29 @@ func ModifyEdbMonitorState(edbMonitorInfo *edbmonitor.EdbMonitorInfo, edbCode st
 		err = fmt.Errorf("GetEdbDataListByCondition err:%w", er)
 		return
 	}
+	err = ModifyEdbMonitorState(edbMonitorInfo, latestTwoData)
+	if err != nil {
+		return
+	}
+
+	if edbMonitorInfo.State == EDB_MONITOR_STATE_TRIGGER_SUCCESS {
+		SendAndLogMessage(edbMonitorInfo)
+	}
+	return
+}
+
+func GetTwoEdbDataByDataTime(edbCode, dataTime string, source, subSource int) (twoData []*data_manage.EdbData, err error) {
+	cond := ` AND edb_code = ? AND data_time <= ? `
+	pars := []interface{}{edbCode, dataTime}
+	twoData, er := data_manage.GetEdbDataListByCondition(cond, pars, source, subSource, 2, 0)
+	if er != nil {
+		err = fmt.Errorf("GetEdbDataListByCondition err:%w", er)
+		return
+	}
+	return
+}
+
+func ModifyEdbMonitorState(edbMonitorInfo *edbmonitor.EdbMonitorInfo, latestTwoData []*data_manage.EdbData) (err error) {
 	var updateCols []string
 	edbMonitorInfo.EdbLatestDate = latestTwoData[0].DataTime
 	edbMonitorInfo.EdbLatestValue = latestTwoData[0].Value
@@ -306,17 +355,18 @@ func ModifyEdbMonitorState(edbMonitorInfo *edbmonitor.EdbMonitorInfo, edbCode st
 	if err != nil {
 		return
 	}
+	return
+}
 
-	if triggerState == EDB_MONITOR_STATE_TRIGGER_SUCCESS {
-		triggerTime := edbMonitorInfo.MonitorTriggerTime.Format(utils.FormatDateTime)
-		err = SendMessages(edbMonitorInfo.CreateUserId, edbMonitorInfo.EdbInfoId, edbMonitorInfo.EdbInfoType, edbMonitorInfo.EdbMonitorClassifyId, edbMonitorInfo.EdbUniqueCode, edbMonitorInfo.EdbMonitorName, triggerTime)
-		isRead := 1
-		if err != nil {
-			isRead = 0
-		}
-		err = LogMessage(edbMonitorInfo.EdbMonitorName, edbMonitorInfo.EdbUniqueCode, edbMonitorInfo.MonitorTriggerTime, edbMonitorInfo.EdbInfoId, edbMonitorInfo.EdbInfoType, edbMonitorInfo.CreateUserId, isRead, edbMonitorInfo.EdbClassifyId)
+func SendAndLogMessage(edbMonitorInfo *edbmonitor.EdbMonitorInfo) (err error) {
+	triggerTime := edbMonitorInfo.MonitorTriggerTime.Format(utils.FormatDateTime)
+	err = SendMessages(edbMonitorInfo.CreateUserId, edbMonitorInfo.EdbInfoId, edbMonitorInfo.EdbInfoType, edbMonitorInfo.EdbClassifyId, edbMonitorInfo.EdbUniqueCode, edbMonitorInfo.EdbMonitorName, triggerTime)
+	isRead := 1
+	if err != nil {
+		err = nil
+		isRead = 0
 	}
-
+	err = LogMessage(edbMonitorInfo.EdbMonitorName, edbMonitorInfo.EdbUniqueCode, edbMonitorInfo.MonitorTriggerTime, edbMonitorInfo.EdbInfoId, edbMonitorInfo.EdbInfoType, edbMonitorInfo.CreateUserId, isRead, edbMonitorInfo.EdbClassifyId)
 	return
 }
 
@@ -466,7 +516,7 @@ func RestartEdbMonitorInfo(req request.EdbMonitorInfoRestartReq, adminId int) (m
 	edbMonitor.EdbTriggerDate = time.Time{}
 	edbMonitor.MonitorTriggerTime = time.Time{}
 	edbMonitor.ModifyTime = time.Now()
-	err = ModifyEdbMonitorState(edbMonitor, edbMonitor.EdbCode, edbMonitor.Source, edbMonitor.SubSource)
+	err = ModifyEdbMonitorStateAndSendMsg(edbMonitor, edbMonitor.EdbCode, edbMonitor.Source, edbMonitor.SubSource)
 	if err != nil {
 		msg = "重启失败"
 		err = fmt.Errorf("修改预警状态失败, err:%w", err)
@@ -497,6 +547,61 @@ func GetEdbMonitorInfoUserList() (resp response.EdbMonitorInfoCreateUserResp, ms
 	return
 }
 
+func InitEdbMonitorInfo() {
+	edbMonitorList, err := edbmonitor.GetEdbMonitorEdbInfoList()
+	if err != nil {
+		utils.FileLog.Error("获取预警列表失败, err:%w", err)
+	}
+	edbInfoIdList := make([]interface{}, 0)
+	for _, v := range edbMonitorList {
+		if v.EdbInfoId != 0 {
+			EdbLocalSet.Add(v.EdbInfoId)
+			edbInfoIdList = append(edbInfoIdList, v.EdbInfoId)
+		}
+	}
+	utils.Rc.SAdd(EDB_MONITOR_ID_SET_CACHE, edbInfoIdList...)
+}
+
+func HandleEdbMonitorEdbInfo() {
+	InitEdbMonitorInfo()
+	for {
+		utils.Rc.Brpop(EDB_MONITOR_HANDLE_LIST_CACHE, func(b []byte) {
+			edbInfo := new(EdbInfoBingLog)
+			if err := json.Unmarshal(b, &edbInfo); err != nil {
+				return
+			}
+			edbMonitorList, err := edbmonitor.GetEdbMonitorInfoByEdbInfoId(edbInfo.EdbInfoId)
+			if err != nil {
+				utils.FileLog.Error("获取预警列表失败, err:%w, edbInfoId:%d", err, edbInfo.EdbInfoId)
+				return
+			}
+			if len(edbMonitorList) == 0 {
+				utils.Rc.SRem(EDB_MONITOR_ID_SET_CACHE, edbInfo.EdbInfoId)
+				EdbLocalSet.Remove(edbInfo.EdbInfoId)
+				return
+			}
+			for _, v := range edbMonitorList {
+				twoData, err := GetTwoEdbDataByDataTime(v.EdbCode, edbInfo.EndDate, v.Source, v.SubSource)
+				if err != nil {
+					utils.FileLog.Error("获取数据失败, err:%w, edbInfoId:%d", err, edbInfo.EdbInfoId)
+					continue
+				}
+				err = ModifyEdbMonitorState(v, twoData)
+				if err != nil {
+					utils.FileLog.Error("更新预警状态失败, err:%w, edbMonitorId:%d", err, v.EdbMonitorId)
+					continue
+				}
+				if v.State == EDB_MONITOR_STATE_TRIGGER_SUCCESS {
+					err = SendAndLogMessage(v)
+					if err != nil {
+						utils.FileLog.Error("发送预警消息失败, err:%w, edbMonitorId:%d", err, v.EdbMonitorId)
+						continue
+					}
+				}
+			}
+		})
+	}
+}
 func toEdbMonitorInfoItems(edbmonitor []*edbmonitor.EdbMonitorInfo, userMap, classifyPathMap, infoMap map[int]string) []*response.EdbMonitorInfoItem {
 	res := make([]*response.EdbMonitorInfoItem, 0, len(edbmonitor))
 	for _, v := range edbmonitor {

+ 0 - 13
services/edb_monitor/edb_monitor_classify.go

@@ -45,19 +45,6 @@ func SaveEdbMonitorClassify(req request.EdbMonitorClassifySaveReq) (msg string,
 	if parentClassify != nil {
 		rootId = parentClassify.RootId
 	}
-	var parentId []int
-	if req.ParentId != rootId {
-		if req.ParentId > 0 {
-			parentId = append(parentId, req.ParentId)
-		}
-		if rootId > 0 {
-			parentId = append(parentId, rootId)
-		}
-	} else {
-		if req.ParentId > 0 {
-			parentId = append(parentId, req.ParentId)
-		}
-	}
 
 	if req.ClassifyId > 0 {
 		classifyInfo, er := edbmonitor.GetEdbMonitorClassifyById(req.ClassifyId)

+ 3 - 0
services/task.go

@@ -3,6 +3,7 @@ package services
 import (
 	"eta/eta_api/models"
 	"eta/eta_api/services/data"
+	edbmonitor "eta/eta_api/services/edb_monitor"
 	"eta/eta_api/utils"
 	"fmt"
 	"strings"
@@ -48,6 +49,8 @@ func Task() {
 	// 进行指标替换操作
 	go DealReplaceEdbCache()
 
+	go edbmonitor.HandleEdbMonitorEdbInfo()
+
 	// TODO:修复权限
 	//FixEnCompanyPermission()
 	fmt.Println("task end")

+ 2 - 2
utils/constants.go

@@ -514,5 +514,5 @@ var (
 	BASE_END_DATE_UnSpace   = time.Now().AddDate(4, 0, 0).Format(FormatDateUnSpace) //基础数据结束日期
 )
 
-const CACHE_MYSQL_DATA_FILENAME = "eta:mysql:test:binlog:filename"
-const CACHE_MYSQL_DATA_POSITION = "eta:mysql:test:binlog:position"
+const CACHE_MYSQL_DATA_FILENAME = "eta:mysql:test1:binlog:filename"
+const CACHE_MYSQL_DATA_POSITION = "eta:mysql:test1:binlog:position"

+ 3 - 0
utils/redis.go

@@ -21,6 +21,9 @@ type RedisClient interface {
 	GetRedisTTL(key string) time.Duration
 	Incrby(key string, num int) (interface{}, error)
 	Do(commandName string, args ...interface{}) (reply interface{}, err error)
+	SAdd(key string, args ...interface{}) (err error)
+	SRem(key string, args ...interface{}) (err error)
+	SIsMember(key string, args interface{}) (bool, error)
 }
 
 func initRedis(redisType string, conf string) (redisClient RedisClient, err error) {

+ 34 - 0
utils/redis/cluster_redis.go

@@ -283,3 +283,37 @@ func (rc *ClusterRedisClient) Do(commandName string, args ...interface{}) (reply
 	newArgs = append(newArgs, args...)
 	return rc.redisClient.Do(context.TODO(), newArgs...).Result()
 }
+
+// SAdd
+// @Description: 写入set元素
+// @receiver rc
+// @param commandName
+// @param args
+// @return reply
+// @return err
+func (rc *ClusterRedisClient) SAdd(key string, args ...interface{}) (err error) {
+	return rc.redisClient.SAdd(context.TODO(), key, args...).Err()
+}
+
+// SAdd
+// @Description: 删除set集合中指定的元素
+// @receiver rc
+// @param commandName
+// @param args
+// @return reply
+// @return err
+func (rc *ClusterRedisClient) SRem(key string, args ...interface{}) (err error) {
+	return rc.redisClient.SRem(context.TODO(), key, args...).Err()
+}
+
+// SAdd
+// @Description: 判断元素是否在集合中
+// @receiver rc
+// @param commandName
+// @param args
+// @return reply
+// @return err
+func (rc *ClusterRedisClient) SIsMember(key string, args interface{}) (isMember bool, err error) {
+	isMember, err = rc.redisClient.SIsMember(context.TODO(), key, args).Result()
+	return
+}

+ 34 - 0
utils/redis/standalone_redis.go

@@ -275,3 +275,37 @@ func (rc *StandaloneRedisClient) Do(commandName string, args ...interface{}) (re
 	newArgs = append(newArgs, args...)
 	return rc.redisClient.Do(context.TODO(), newArgs...).Result()
 }
+
+// SAdd
+// @Description: 写入set元素
+// @receiver rc
+// @param commandName
+// @param args
+// @return reply
+// @return err
+func (rc *StandaloneRedisClient) SAdd(key string, args ...interface{}) (err error) {
+	return rc.redisClient.SAdd(context.TODO(), key, args...).Err()
+}
+
+// SAdd
+// @Description: 删除set集合中指定的元素
+// @receiver rc
+// @param commandName
+// @param args
+// @return reply
+// @return err
+func (rc *StandaloneRedisClient) SRem(key string, args ...interface{}) (err error) {
+	return rc.redisClient.SRem(context.TODO(), key, args...).Err()
+}
+
+// SAdd
+// @Description: 判断元素是否在集合中
+// @receiver rc
+// @param commandName
+// @param args
+// @return reply
+// @return err
+func (rc *StandaloneRedisClient) SIsMember(key string, args interface{}) (isMember bool, err error) {
+	isMember, err = rc.redisClient.SIsMember(context.TODO(), key, args).Result()
+	return
+}