distrubtLock.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package lock
  2. import (
  3. "context"
  4. "eta/eta_api/utils"
  5. "fmt"
  6. "github.com/go-redis/redis/v8"
  7. "time"
  8. )
  9. const (
  10. lockName = "lock:"
  11. )
  12. var (
  13. ctx = context.Background()
  14. )
  15. func AcquireLock(key string, expiration int, Holder string) bool {
  16. script := redis.NewScript(`local key = KEYS[1]
  17. local clientId = ARGV[1]
  18. local expiration = tonumber(ARGV[2])
  19. if redis.call("EXISTS", key) == 0 then
  20. redis.call("SET", key, clientId, "EX", expiration)
  21. return 1
  22. else
  23. return 0
  24. end`)
  25. lockey := fmt.Sprintf("%s%s", lockName, key)
  26. result, err := script.Run(ctx, utils.Rc.RedisClient(), []string{lockey}, Holder, expiration).Int()
  27. if err != nil {
  28. fmt.Printf("加锁失败:err: %v", err)
  29. return false
  30. }
  31. if result == 1 {
  32. return true
  33. }
  34. return false
  35. }
  36. func TryLock(key string, expiration int, Holder string, timeout time.Duration) error {
  37. redisCtx, cancel := context.WithTimeout(context.Background(), timeout)
  38. defer cancel()
  39. for {
  40. select {
  41. case <-redisCtx.Done():
  42. return fmt.Errorf("等待超时")
  43. default:
  44. if AcquireLock(key, expiration, Holder) {
  45. // 启动异步续约
  46. go renewLock(key, Holder, expiration)
  47. return nil
  48. }
  49. // 等待重试
  50. time.Sleep(50 * time.Millisecond)
  51. }
  52. }
  53. }
  54. func renewLock(key, holder string, expiration int) {
  55. for {
  56. time.Sleep(5 * time.Second)
  57. script := redis.NewScript(`
  58. local key = KEYS[1]
  59. local clientId = ARGV[1]
  60. local expiration = tonumber(ARGV[2])
  61. if redis.call("get", KEYS[1]) == ARGV[1] then
  62. redis.call("SET", key, clientId, "EX", expiration)
  63. return 1
  64. else
  65. return 0
  66. end
  67. `)
  68. lockey := fmt.Sprintf("%s%s", lockName, key)
  69. result, err := script.Run(context.Background(), utils.Rc.RedisClient(), []string{lockey}, holder, expiration).Result()
  70. if err != nil {
  71. fmt.Println("锁续期失败:", err)
  72. return
  73. }
  74. if result.(int64) == 0 {
  75. fmt.Println("持有者变更,停止监听")
  76. return
  77. }
  78. fmt.Printf("锁续期成功")
  79. }
  80. }
  81. func ReleaseLock(key string, holder string) bool {
  82. script := redis.NewScript(`
  83. if redis.call("get", KEYS[1]) == ARGV[1] then
  84. return redis.call("del", KEYS[1])
  85. else
  86. return 0
  87. end
  88. `)
  89. lockey := fmt.Sprintf("%s%s", lockName, key)
  90. result, err := script.Run(ctx, utils.Rc.RedisClient(), []string{lockey}, holder).Int()
  91. if err != nil {
  92. fmt.Printf("解锁失败:err: %v", err)
  93. return false
  94. }
  95. if result == 1 {
  96. return true
  97. }
  98. return false
  99. }