mirror of
https://github.com/dkam/suo.git
synced 2025-01-29 07:42:43 +00:00
Compare commits
60 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b8a1d7d9ac | ||
|
|
c58a247156 | ||
|
|
8c37c24ee6 | ||
|
|
29da8cf090 | ||
|
|
8ed488f071 | ||
|
|
152b6acf9c | ||
|
|
5e10afe534 | ||
|
|
0423eb9e12 | ||
|
|
ca46f5f369 | ||
|
|
1022a6f9d3 | ||
|
|
6be3a5bdda | ||
|
|
aa4da5d739 | ||
|
|
fdb0b7f9d5 | ||
|
|
a13edcf7d1 | ||
|
|
af1c476f08 | ||
|
|
58fae54022 | ||
|
|
2088fd90b3 | ||
|
|
05661e143c | ||
|
|
a23282dcc6 | ||
|
|
323caaee9b | ||
|
|
745d49466f | ||
|
|
161d50deb9 | ||
|
|
81e4a3e143 | ||
|
|
2960c14a4d | ||
|
|
308e918e60 | ||
|
|
aaee69a2df | ||
|
|
c6d1c29ada | ||
|
|
14e442e99d | ||
|
|
498073b92e | ||
|
|
155a3ac40c | ||
|
|
10d3ab09cf | ||
|
|
2724ec6d9d | ||
|
|
185327f59c | ||
|
|
c8a972da31 | ||
|
|
7591c08a28 | ||
|
|
1dee338e16 | ||
|
|
900c723043 | ||
|
|
6e2afdf80a | ||
|
|
49a9757d44 | ||
|
|
754d7d8faf | ||
|
|
857fc63378 | ||
|
|
bb6762bbc6 | ||
|
|
f0977c89f2 | ||
|
|
4d5c96309f | ||
|
|
8d6061b137 | ||
|
|
ce0a4d8d86 | ||
|
|
1fd769eec2 | ||
|
|
37be5ae27b | ||
|
|
a1a226fb59 | ||
|
|
8d7ddaf35a | ||
|
|
1aacc0c1a1 | ||
|
|
8166c6b51d | ||
|
|
7662743123 | ||
|
|
a2fa281b86 | ||
|
|
887219b63d | ||
|
|
c0905fef91 | ||
|
|
a54b795e20 | ||
|
|
0828cd546b | ||
|
|
c2b9de4cf3 | ||
|
|
d4860423fa |
@@ -74,7 +74,7 @@ Style/SpaceInsideBrackets:
|
|||||||
Style/AndOr:
|
Style/AndOr:
|
||||||
Enabled: false
|
Enabled: false
|
||||||
|
|
||||||
Style/TrailingComma:
|
Style/TrailingCommaInLiteral:
|
||||||
Enabled: true
|
Enabled: true
|
||||||
|
|
||||||
Style/SpaceBeforeComma:
|
Style/SpaceBeforeComma:
|
||||||
@@ -98,7 +98,7 @@ Style/SpaceAfterColon:
|
|||||||
Style/SpaceAfterComma:
|
Style/SpaceAfterComma:
|
||||||
Enabled: true
|
Enabled: true
|
||||||
|
|
||||||
Style/SpaceAfterControlKeyword:
|
Style/SpaceAroundKeyword:
|
||||||
Enabled: true
|
Enabled: true
|
||||||
|
|
||||||
Style/SpaceAfterNot:
|
Style/SpaceAfterNot:
|
||||||
@@ -163,7 +163,7 @@ Style/StringLiterals:
|
|||||||
EnforcedStyle: double_quotes
|
EnforcedStyle: double_quotes
|
||||||
|
|
||||||
Metrics/CyclomaticComplexity:
|
Metrics/CyclomaticComplexity:
|
||||||
Max: 8
|
Max: 10
|
||||||
|
|
||||||
Metrics/LineLength:
|
Metrics/LineLength:
|
||||||
Max: 128
|
Max: 128
|
||||||
@@ -214,3 +214,6 @@ Metrics/ParameterLists:
|
|||||||
|
|
||||||
Metrics/PerceivedComplexity:
|
Metrics/PerceivedComplexity:
|
||||||
Enabled: false
|
Enabled: false
|
||||||
|
|
||||||
|
Style/Documentation:
|
||||||
|
Enabled: false
|
||||||
|
|||||||
@@ -1,6 +1,9 @@
|
|||||||
language: ruby
|
language: ruby
|
||||||
rvm:
|
rvm:
|
||||||
- 2.2.0
|
- 2.2.6
|
||||||
|
- 2.3.7
|
||||||
|
- 2.4.4
|
||||||
|
- 2.5.1
|
||||||
services:
|
services:
|
||||||
- memcached
|
- memcached
|
||||||
- redis-server
|
- redis-server
|
||||||
|
|||||||
49
CHANGELOG.md
49
CHANGELOG.md
@@ -1,7 +1,52 @@
|
|||||||
|
## 0.3.3
|
||||||
|
|
||||||
|
- Default TTL for keys to allow for short-lived locking keys (thanks to Ian Remillard) without leaking memory.
|
||||||
|
- Vastly improve initial lock acquisition, especially on Redis (thanks to Jeremy Wadscak).
|
||||||
|
|
||||||
|
## 0.3.2
|
||||||
|
|
||||||
|
- Custom lock tokens (thanks to avokhmin).
|
||||||
|
|
||||||
|
## 0.3.1
|
||||||
|
|
||||||
|
- Slight memory leak fix.
|
||||||
|
|
||||||
|
## 0.3.0
|
||||||
|
|
||||||
|
- Dramatically simplify the interface by forcing clients to specify the key & resources at lock initialization instead of every method call.
|
||||||
|
|
||||||
|
## 0.2.3
|
||||||
|
|
||||||
|
- Clarify documentation further with respect to semaphores.
|
||||||
|
|
||||||
|
## 0.2.2
|
||||||
|
|
||||||
|
- Fix bug with refresh - typo would've prevented real use.
|
||||||
|
- Clean up code.
|
||||||
|
- Improve documentation a bit.
|
||||||
|
- 100% test coverage.
|
||||||
|
|
||||||
|
## 0.2.1
|
||||||
|
|
||||||
|
- Fix bug when dealing with real-world Redis error conditions.
|
||||||
|
|
||||||
|
## 0.2.0
|
||||||
|
|
||||||
|
- Refactor class methods into instance methods to simplify implementation.
|
||||||
|
- Increase thread safety with Memcached implementation.
|
||||||
|
|
||||||
|
## 0.1.3
|
||||||
|
|
||||||
|
- Properly throw Suo::LockClientError when the connection itself fails (Memcache server not reachable, etc.)
|
||||||
|
|
||||||
|
## 0.1.2
|
||||||
|
|
||||||
|
- Fix retry_timeout to properly use the full time (was being calculated incorrectly).
|
||||||
|
- Refactor client implementations to re-use more code.
|
||||||
|
|
||||||
## 0.1.1
|
## 0.1.1
|
||||||
|
|
||||||
- Use [MessagePack](https://github.com/msgpack/msgpack-ruby) for semaphore serialization.
|
- Use [MessagePack](https://github.com/msgpack/msgpack-ruby) for lock serialization.
|
||||||
|
|
||||||
|
|
||||||
## 0.1.0
|
## 0.1.0
|
||||||
|
|
||||||
|
|||||||
66
README.md
66
README.md
@@ -1,8 +1,8 @@
|
|||||||
# Suo [](https://travis-ci.org/nickelser/suo)
|
# Suo [](https://travis-ci.org/nickelser/suo) [](https://codeclimate.com/github/nickelser/suo) [](https://codeclimate.com/github/nickelser/suo) [](http://badge.fury.io/rb/suo)
|
||||||
|
|
||||||
:lock: Distributed semaphores using Memcached or Redis in Ruby.
|
:lock: Distributed semaphores using Memcached or Redis in Ruby.
|
||||||
|
|
||||||
Suo provides a very performant distributed lock solution using Compare-And-Set (`CAS`) commands in Memcached, and `WATCH/MULTI` in Redis.
|
Suo provides a very performant distributed lock solution using Compare-And-Set (`CAS`) commands in Memcached, and `WATCH/MULTI` in Redis. It allows locking both single exclusion (like a mutex - sharing one resource), as well as multiple resources.
|
||||||
|
|
||||||
## Installation
|
## Installation
|
||||||
|
|
||||||
@@ -18,35 +18,75 @@ gem 'suo'
|
|||||||
|
|
||||||
```ruby
|
```ruby
|
||||||
# Memcached
|
# Memcached
|
||||||
suo = Suo::Client::Memcached.new(connection: "127.0.0.1:11211")
|
suo = Suo::Client::Memcached.new("foo_resource", connection: "127.0.0.1:11211")
|
||||||
|
|
||||||
# Redis
|
# Redis
|
||||||
suo = Suo::Client::Redis.new(connection: {host: "10.0.1.1"})
|
suo = Suo::Client::Redis.new("baz_resource", connection: {host: "10.0.1.1"})
|
||||||
|
|
||||||
# Pre-existing client
|
# Pre-existing client
|
||||||
suo = Suo::Client::Memcached.new(client: some_dalli_client)
|
suo = Suo::Client::Memcached.new("bar_resource", client: some_dalli_client)
|
||||||
|
|
||||||
suo.lock("some_key") do
|
suo.lock do
|
||||||
# critical code here
|
# critical code here
|
||||||
@puppies.pet!
|
@puppies.pet!
|
||||||
end
|
end
|
||||||
|
|
||||||
2.times do
|
# The resources argument is the number of resources the semaphore will allow to lock (defaulting to one - a mutex)
|
||||||
Thread.new do
|
suo = Suo::Client::Memcached.new("bar_resource", client: some_dalli_client, resources: 2)
|
||||||
# second argument is the number of resources - so this will run twice
|
|
||||||
suo.lock("other_key", 2, timeout: 0.5) { puts "Will run twice!" }
|
Thread.new { suo.lock { puts "One"; sleep 2 } }
|
||||||
|
Thread.new { suo.lock { puts "Two"; sleep 2 } }
|
||||||
|
Thread.new { suo.lock { puts "Three" } }
|
||||||
|
|
||||||
|
# will print "One" "Two", but not "Three", as there are only 2 resources
|
||||||
|
|
||||||
|
# custom acquisition timeouts (time to acquire)
|
||||||
|
suo = Suo::Client::Memcached.new("protected_key", client: some_dalli_client, acquisition_timeout: 1) # in seconds
|
||||||
|
|
||||||
|
# manually locking/unlocking
|
||||||
|
# the return value from lock without a block is a unique token valid only for the current lock
|
||||||
|
# which must be unlocked manually
|
||||||
|
token = suo.lock
|
||||||
|
foo.baz!
|
||||||
|
suo.unlock(token)
|
||||||
|
|
||||||
|
# custom stale lock expiration (cleaning of dead locks)
|
||||||
|
suo = Suo::Client::Redis.new("other_key", client: some_redis_client, stale_lock_expiration: 60*5)
|
||||||
|
```
|
||||||
|
|
||||||
|
### Stale locks
|
||||||
|
|
||||||
|
"Stale locks" - those acquired more than `stale_lock_expiration` (defaulting to 3600 or one hour) ago - are automatically cleared during any operation on the key (`lock`, `unlock`, `refresh`). The `locked?` method will not return true if only stale locks exist, but will not modify the key itself.
|
||||||
|
|
||||||
|
To re-acquire a lock in the middle of a block, you can use the refresh method on client.
|
||||||
|
|
||||||
|
```ruby
|
||||||
|
suo = Suo::Client::Redis.new("foo")
|
||||||
|
|
||||||
|
# lock is the same token as seen in the manual example, above
|
||||||
|
suo.lock do |token|
|
||||||
|
5.times do
|
||||||
|
baz.bar!
|
||||||
|
suo.refresh(token)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### Time To Live
|
||||||
|
|
||||||
|
```ruby
|
||||||
|
Suo::Client::Redis.new("bar_resource", ttl: 60) #ttl in seconds
|
||||||
|
```
|
||||||
|
|
||||||
|
A key representing a set of lockable resources is removed once the last resource lock is released and the `ttl` time runs out. When another lock is acquired and the key has been removed the key has to be recreated.
|
||||||
|
|
||||||
|
|
||||||
## TODO
|
## TODO
|
||||||
- better stale key handling (refresh blocks)
|
|
||||||
- more race condition tests
|
- more race condition tests
|
||||||
- refactor clients to re-use more code
|
|
||||||
|
|
||||||
## History
|
## History
|
||||||
|
|
||||||
View the [changelog](https://github.com/nickelser/suo/blob/master/CHANGELOG.md)
|
View the [changelog](https://github.com/nickelser/suo/blob/master/CHANGELOG.md).
|
||||||
|
|
||||||
## Contributing
|
## Contributing
|
||||||
|
|
||||||
|
|||||||
16
lib/suo.rb
16
lib/suo.rb
@@ -1,2 +1,16 @@
|
|||||||
|
require "securerandom"
|
||||||
|
require "monitor"
|
||||||
|
|
||||||
|
require "dalli"
|
||||||
|
require "dalli/cas/client"
|
||||||
|
|
||||||
|
require "redis"
|
||||||
|
|
||||||
|
require "msgpack"
|
||||||
|
|
||||||
require "suo/version"
|
require "suo/version"
|
||||||
require "suo/clients"
|
|
||||||
|
require "suo/errors"
|
||||||
|
require "suo/client/base"
|
||||||
|
require "suo/client/memcached"
|
||||||
|
require "suo/client/redis"
|
||||||
|
|||||||
@@ -2,104 +2,177 @@ module Suo
|
|||||||
module Client
|
module Client
|
||||||
class Base
|
class Base
|
||||||
DEFAULT_OPTIONS = {
|
DEFAULT_OPTIONS = {
|
||||||
retry_count: 3,
|
acquisition_timeout: 0.1,
|
||||||
retry_delay: 0.01,
|
acquisition_delay: 0.01,
|
||||||
stale_lock_expiration: 3600
|
stale_lock_expiration: 3600,
|
||||||
|
resources: 1,
|
||||||
|
ttl: 60,
|
||||||
}.freeze
|
}.freeze
|
||||||
|
|
||||||
def initialize(options = {})
|
BLANK_STR = "".freeze
|
||||||
@options = self.class.merge_defaults(options).merge(_initialized: true)
|
|
||||||
|
attr_accessor :client, :key, :resources, :options
|
||||||
|
|
||||||
|
include MonitorMixin
|
||||||
|
|
||||||
|
def initialize(key, options = {})
|
||||||
|
fail "Client required" unless options[:client]
|
||||||
|
|
||||||
|
@options = DEFAULT_OPTIONS.merge(options)
|
||||||
|
@retry_count = (@options[:acquisition_timeout] / @options[:acquisition_delay].to_f).ceil
|
||||||
|
@client = @options[:client]
|
||||||
|
@resources = @options[:resources].to_i
|
||||||
|
@key = key
|
||||||
|
|
||||||
|
super() # initialize Monitor mixin for thread safety
|
||||||
end
|
end
|
||||||
|
|
||||||
def lock(key, resources = 1, options = {})
|
def lock(custom_token = nil)
|
||||||
options = self.class.merge_defaults(@options.merge(options))
|
token = acquire_lock(custom_token)
|
||||||
token = self.class.lock(key, resources, options)
|
|
||||||
|
|
||||||
if token
|
if block_given? && token
|
||||||
begin
|
begin
|
||||||
yield if block_given?
|
yield
|
||||||
ensure
|
ensure
|
||||||
self.class.unlock(key, token, options)
|
unlock(token)
|
||||||
end
|
end
|
||||||
|
|
||||||
true
|
|
||||||
else
|
else
|
||||||
false
|
token
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def locked?(key, resources = 1)
|
def locked?
|
||||||
self.class.locked?(key, resources, @options)
|
|
||||||
end
|
|
||||||
|
|
||||||
class << self
|
|
||||||
def lock(key, resources = 1, options = {}) # rubocop:disable Lint/UnusedMethodArgument
|
|
||||||
fail NotImplementedError
|
|
||||||
end
|
|
||||||
|
|
||||||
def locked?(key, resources = 1, options = {})
|
|
||||||
options = merge_defaults(options)
|
|
||||||
client = options[:client]
|
|
||||||
locks = deserialize_locks(client.get(key))
|
|
||||||
|
|
||||||
locks.size >= resources
|
locks.size >= resources
|
||||||
end
|
end
|
||||||
|
|
||||||
def locks(key, options)
|
def locks
|
||||||
options = merge_defaults(options)
|
val, _ = get
|
||||||
client = options[:client]
|
cleared_locks = deserialize_and_clear_locks(val)
|
||||||
locks = deserialize_locks(client.get(key))
|
|
||||||
|
|
||||||
locks.size
|
cleared_locks
|
||||||
end
|
end
|
||||||
|
|
||||||
def refresh(key, acquisition_token, options = {}) # rubocop:disable Lint/UnusedMethodArgument
|
def refresh(token)
|
||||||
|
retry_with_timeout do
|
||||||
|
val, cas = get
|
||||||
|
|
||||||
|
cas = initial_set if val.nil?
|
||||||
|
|
||||||
|
cleared_locks = deserialize_and_clear_locks(val)
|
||||||
|
|
||||||
|
refresh_lock(cleared_locks, token)
|
||||||
|
|
||||||
|
break if set(serialize_locks(cleared_locks), cas, expire: cleared_locks.empty?)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def unlock(token)
|
||||||
|
return unless token
|
||||||
|
|
||||||
|
retry_with_timeout do
|
||||||
|
val, cas = get
|
||||||
|
|
||||||
|
break if val.nil?
|
||||||
|
|
||||||
|
cleared_locks = deserialize_and_clear_locks(val)
|
||||||
|
|
||||||
|
acquisition_lock = remove_lock(cleared_locks, token)
|
||||||
|
|
||||||
|
break unless acquisition_lock
|
||||||
|
break if set(serialize_locks(cleared_locks), cas, expire: cleared_locks.empty?)
|
||||||
|
end
|
||||||
|
rescue LockClientError => _ # rubocop:disable Lint/HandleExceptions
|
||||||
|
# ignore - assume success due to optimistic locking
|
||||||
|
end
|
||||||
|
|
||||||
|
def clear
|
||||||
fail NotImplementedError
|
fail NotImplementedError
|
||||||
end
|
end
|
||||||
|
|
||||||
def unlock(key, acquisition_token, options = {}) # rubocop:disable Lint/UnusedMethodArgument
|
|
||||||
fail NotImplementedError
|
|
||||||
end
|
|
||||||
|
|
||||||
def clear(key, options = {}) # rubocop:disable Lint/UnusedMethodArgument
|
|
||||||
fail NotImplementedError
|
|
||||||
end
|
|
||||||
|
|
||||||
def merge_defaults(options = {})
|
|
||||||
unless options[:_initialized]
|
|
||||||
options = self::DEFAULT_OPTIONS.merge(options)
|
|
||||||
|
|
||||||
fail "Client required" unless options[:client]
|
|
||||||
end
|
|
||||||
|
|
||||||
if options[:retry_timeout]
|
|
||||||
options[:retry_count] = (options[:retry_timeout] / options[:retry_delay].to_f).floor
|
|
||||||
end
|
|
||||||
|
|
||||||
options
|
|
||||||
end
|
|
||||||
|
|
||||||
private
|
private
|
||||||
|
|
||||||
|
attr_accessor :retry_count
|
||||||
|
|
||||||
|
def acquire_lock(token = nil)
|
||||||
|
token ||= SecureRandom.base64(16)
|
||||||
|
|
||||||
|
retry_with_timeout do
|
||||||
|
val, cas = get
|
||||||
|
|
||||||
|
cas = initial_set if val.nil?
|
||||||
|
|
||||||
|
cleared_locks = deserialize_and_clear_locks(val)
|
||||||
|
|
||||||
|
if cleared_locks.size < resources
|
||||||
|
add_lock(cleared_locks, token)
|
||||||
|
|
||||||
|
newval = serialize_locks(cleared_locks)
|
||||||
|
|
||||||
|
return token if set(newval, cas)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
nil
|
||||||
|
end
|
||||||
|
|
||||||
|
def get
|
||||||
|
fail NotImplementedError
|
||||||
|
end
|
||||||
|
|
||||||
|
def set(newval, cas) # rubocop:disable Lint/UnusedMethodArgument
|
||||||
|
fail NotImplementedError
|
||||||
|
end
|
||||||
|
|
||||||
|
def initial_set(val = BLANK_STR) # rubocop:disable Lint/UnusedMethodArgument
|
||||||
|
fail NotImplementedError
|
||||||
|
end
|
||||||
|
|
||||||
|
def synchronize
|
||||||
|
mon_synchronize { yield }
|
||||||
|
end
|
||||||
|
|
||||||
|
def retry_with_timeout
|
||||||
|
start = Time.now.to_f
|
||||||
|
|
||||||
|
retry_count.times do
|
||||||
|
elapsed = Time.now.to_f - start
|
||||||
|
break if elapsed >= options[:acquisition_timeout]
|
||||||
|
|
||||||
|
synchronize do
|
||||||
|
yield
|
||||||
|
end
|
||||||
|
|
||||||
|
sleep(rand(options[:acquisition_delay] * 1000).to_f / 1000)
|
||||||
|
end
|
||||||
|
rescue => _
|
||||||
|
raise LockClientError
|
||||||
|
end
|
||||||
|
|
||||||
def serialize_locks(locks)
|
def serialize_locks(locks)
|
||||||
MessagePack.pack(locks.map { |time, token| [time.to_f, token] })
|
MessagePack.pack(locks.map { |time, token| [time.to_f, token] })
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def deserialize_and_clear_locks(val)
|
||||||
|
clear_expired_locks(deserialize_locks(val))
|
||||||
|
end
|
||||||
|
|
||||||
def deserialize_locks(val)
|
def deserialize_locks(val)
|
||||||
MessagePack.unpack(val).map do |time, token|
|
unpacked = (val.nil? || val == BLANK_STR) ? [] : MessagePack.unpack(val)
|
||||||
|
|
||||||
|
unpacked.map do |time, token|
|
||||||
[Time.at(time), token]
|
[Time.at(time), token]
|
||||||
end
|
end
|
||||||
rescue EOFError => _
|
rescue EOFError, MessagePack::MalformedFormatError => _
|
||||||
[]
|
[]
|
||||||
end
|
end
|
||||||
|
|
||||||
def clear_expired_locks(locks, options)
|
def clear_expired_locks(locks)
|
||||||
expired = Time.now - options[:stale_lock_expiration]
|
expired = Time.now - options[:stale_lock_expiration]
|
||||||
locks.reject { |time, _| time < expired }
|
locks.reject { |time, _| time < expired }
|
||||||
end
|
end
|
||||||
|
|
||||||
def add_lock(locks, token)
|
def add_lock(locks, token, time = Time.now.to_f)
|
||||||
locks << [Time.now.to_f, token]
|
locks << [time, token]
|
||||||
end
|
end
|
||||||
|
|
||||||
def remove_lock(locks, acquisition_token)
|
def remove_lock(locks, acquisition_token)
|
||||||
@@ -109,8 +182,7 @@ module Suo
|
|||||||
|
|
||||||
def refresh_lock(locks, acquisition_token)
|
def refresh_lock(locks, acquisition_token)
|
||||||
remove_lock(locks, acquisition_token)
|
remove_lock(locks, acquisition_token)
|
||||||
add_lock(locks, token)
|
add_lock(locks, acquisition_token)
|
||||||
end
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -1,7 +0,0 @@
|
|||||||
module Suo
|
|
||||||
module Client
|
|
||||||
module Errors
|
|
||||||
class FailedToAcquireLock < StandardError; end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
@@ -1,136 +1,33 @@
|
|||||||
module Suo
|
module Suo
|
||||||
module Client
|
module Client
|
||||||
class Memcached < Base
|
class Memcached < Base
|
||||||
def initialize(options = {})
|
def initialize(key, options = {})
|
||||||
options[:client] ||= Dalli::Client.new(options[:connection] || ENV["MEMCACHE_SERVERS"] || "127.0.0.1:11211")
|
options[:client] ||= Dalli::Client.new(options[:connection] || ENV["MEMCACHE_SERVERS"] || "127.0.0.1:11211")
|
||||||
super
|
super
|
||||||
end
|
end
|
||||||
|
|
||||||
class << self
|
def clear
|
||||||
def lock(key, resources = 1, options = {})
|
@client.delete(@key)
|
||||||
options = merge_defaults(options)
|
|
||||||
acquisition_token = nil
|
|
||||||
token = SecureRandom.base64(16)
|
|
||||||
client = options[:client]
|
|
||||||
|
|
||||||
begin
|
|
||||||
start = Time.now.to_f
|
|
||||||
|
|
||||||
options[:retry_count].times do
|
|
||||||
if options[:retry_timeout]
|
|
||||||
now = Time.now.to_f
|
|
||||||
break if now - start > options[:retry_timeout]
|
|
||||||
end
|
end
|
||||||
|
|
||||||
val, cas = client.get_cas(key)
|
private
|
||||||
|
|
||||||
# no key has been set yet; we could simply set it, but would lead to race conditions on the initial setting
|
def get
|
||||||
if val.nil?
|
@client.get_cas(@key)
|
||||||
client.set(key, "")
|
|
||||||
next
|
|
||||||
end
|
end
|
||||||
|
|
||||||
locks = clear_expired_locks(deserialize_locks(val.to_s), options)
|
def set(newval, cas, expire: false)
|
||||||
|
if expire
|
||||||
if locks.size < resources
|
@client.set_cas(@key, newval, cas, @options[:ttl])
|
||||||
add_lock(locks, token)
|
else
|
||||||
|
@client.set_cas(@key, newval, cas)
|
||||||
newval = serialize_locks(locks)
|
|
||||||
|
|
||||||
if client.set_cas(key, newval, cas)
|
|
||||||
acquisition_token = token
|
|
||||||
break
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
|
def initial_set(val = BLANK_STR)
|
||||||
end
|
@client.set(@key, val)
|
||||||
rescue => _
|
_val, cas = @client.get_cas(@key)
|
||||||
raise FailedToAcquireLock
|
cas
|
||||||
end
|
|
||||||
|
|
||||||
acquisition_token
|
|
||||||
end
|
|
||||||
|
|
||||||
def refresh(key, acquisition_token, options = {})
|
|
||||||
options = merge_defaults(options)
|
|
||||||
client = options[:client]
|
|
||||||
|
|
||||||
begin
|
|
||||||
start = Time.now.to_f
|
|
||||||
|
|
||||||
options[:retry_count].times do
|
|
||||||
if options[:retry_timeout]
|
|
||||||
now = Time.now.to_f
|
|
||||||
break if now - start > options[:retry_timeout]
|
|
||||||
end
|
|
||||||
|
|
||||||
val, cas = client.get_cas(key)
|
|
||||||
|
|
||||||
# much like with initial set - ensure the key is here
|
|
||||||
if val.nil?
|
|
||||||
client.set(key, "")
|
|
||||||
next
|
|
||||||
end
|
|
||||||
|
|
||||||
locks = clear_expired_locks(deserialize_locks(val), options)
|
|
||||||
|
|
||||||
refresh_lock(locks, acquisition_token)
|
|
||||||
|
|
||||||
newval = serialize_locks(locks)
|
|
||||||
|
|
||||||
break if client.set_cas(key, newval, cas)
|
|
||||||
|
|
||||||
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
|
|
||||||
end
|
|
||||||
rescue => _
|
|
||||||
raise FailedToAcquireLock
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def unlock(key, acquisition_token, options = {})
|
|
||||||
options = merge_defaults(options)
|
|
||||||
client = options[:client]
|
|
||||||
|
|
||||||
return unless acquisition_token
|
|
||||||
|
|
||||||
begin
|
|
||||||
start = Time.now.to_f
|
|
||||||
|
|
||||||
options[:retry_count].times do
|
|
||||||
if options[:retry_timeout]
|
|
||||||
now = Time.now.to_f
|
|
||||||
break if now - start > options[:retry_timeout]
|
|
||||||
end
|
|
||||||
|
|
||||||
val, cas = client.get_cas(key)
|
|
||||||
|
|
||||||
break if val.nil? # lock has expired totally
|
|
||||||
|
|
||||||
locks = clear_expired_locks(deserialize_locks(val), options)
|
|
||||||
|
|
||||||
acquisition_lock = remove_lock(locks, acquisition_token)
|
|
||||||
|
|
||||||
break unless acquisition_lock
|
|
||||||
|
|
||||||
newval = serialize_locks(locks)
|
|
||||||
|
|
||||||
break if client.set_cas(key, newval, cas)
|
|
||||||
|
|
||||||
# another client cleared a token in the interim - try again!
|
|
||||||
|
|
||||||
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
|
|
||||||
end
|
|
||||||
rescue => boom # rubocop:disable Lint/HandleExceptions
|
|
||||||
# since it's optimistic locking - fine if we are unable to release
|
|
||||||
raise boom if ENV["SUO_TEST"]
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def clear(key, options = {})
|
|
||||||
options = merge_defaults(options)
|
|
||||||
options[:client].delete(key)
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -1,165 +1,46 @@
|
|||||||
module Suo
|
module Suo
|
||||||
module Client
|
module Client
|
||||||
class Redis < Base
|
class Redis < Base
|
||||||
def initialize(options = {})
|
OK_STR = "OK".freeze
|
||||||
|
|
||||||
|
def initialize(key, options = {})
|
||||||
options[:client] ||= ::Redis.new(options[:connection] || {})
|
options[:client] ||= ::Redis.new(options[:connection] || {})
|
||||||
super
|
super
|
||||||
end
|
end
|
||||||
|
|
||||||
class << self
|
def clear
|
||||||
def lock(key, resources = 1, options = {})
|
@client.del(@key)
|
||||||
options = merge_defaults(options)
|
|
||||||
acquisition_token = nil
|
|
||||||
token = SecureRandom.base64(16)
|
|
||||||
client = options[:client]
|
|
||||||
|
|
||||||
begin
|
|
||||||
start = Time.now.to_f
|
|
||||||
|
|
||||||
options[:retry_count].times do
|
|
||||||
if options[:retry_timeout]
|
|
||||||
now = Time.now.to_f
|
|
||||||
break if now - start > options[:retry_timeout]
|
|
||||||
end
|
end
|
||||||
|
|
||||||
client.watch(key) do
|
private
|
||||||
begin
|
|
||||||
val = client.get(key)
|
|
||||||
|
|
||||||
locks = clear_expired_locks(deserialize_locks(val.to_s), options)
|
def get
|
||||||
|
[@client.get(@key), nil]
|
||||||
if locks.size < resources
|
|
||||||
add_lock(locks, token)
|
|
||||||
|
|
||||||
newval = serialize_locks(locks)
|
|
||||||
|
|
||||||
ret = client.multi do |multi|
|
|
||||||
multi.set(key, newval)
|
|
||||||
end
|
end
|
||||||
|
|
||||||
acquisition_token = token if ret[0] == "OK"
|
def set(newval, _, expire: false)
|
||||||
|
ret = @client.multi do |multi|
|
||||||
|
if expire
|
||||||
|
multi.setex(@key, @options[:ttl], newval)
|
||||||
|
else
|
||||||
|
multi.set(@key, newval)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
ret && ret[0] == OK_STR
|
||||||
|
end
|
||||||
|
|
||||||
|
def synchronize
|
||||||
|
@client.watch(@key) do
|
||||||
|
yield
|
||||||
end
|
end
|
||||||
ensure
|
ensure
|
||||||
client.unwatch
|
@client.unwatch
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
break if acquisition_token
|
def initial_set(val = BLANK_STR)
|
||||||
|
set(val, nil)
|
||||||
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
|
nil
|
||||||
end
|
|
||||||
rescue => _
|
|
||||||
raise Suo::Client::FailedToAcquireLock
|
|
||||||
end
|
|
||||||
|
|
||||||
acquisition_token
|
|
||||||
end
|
|
||||||
|
|
||||||
def refresh(key, acquisition_token, options = {})
|
|
||||||
options = merge_defaults(options)
|
|
||||||
client = options[:client]
|
|
||||||
refreshed = false
|
|
||||||
|
|
||||||
begin
|
|
||||||
start = Time.now.to_f
|
|
||||||
|
|
||||||
options[:retry_count].times do
|
|
||||||
if options[:retry_timeout]
|
|
||||||
now = Time.now.to_f
|
|
||||||
break if now - start > options[:retry_timeout]
|
|
||||||
end
|
|
||||||
|
|
||||||
client.watch(key) do
|
|
||||||
begin
|
|
||||||
val = client.get(key)
|
|
||||||
|
|
||||||
locks = clear_expired_locks(deserialize_locks(val), options)
|
|
||||||
|
|
||||||
refresh_lock(locks, acquisition_token)
|
|
||||||
|
|
||||||
newval = serialize_locks(locks)
|
|
||||||
|
|
||||||
ret = client.multi do |multi|
|
|
||||||
multi.set(key, newval)
|
|
||||||
end
|
|
||||||
|
|
||||||
refreshed = ret[0] == "OK"
|
|
||||||
ensure
|
|
||||||
client.unwatch
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
break if refreshed
|
|
||||||
|
|
||||||
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
|
|
||||||
end
|
|
||||||
rescue => _
|
|
||||||
raise Suo::Client::FailedToAcquireLock
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def unlock(key, acquisition_token, options = {})
|
|
||||||
options = merge_defaults(options)
|
|
||||||
client = options[:client]
|
|
||||||
|
|
||||||
return unless acquisition_token
|
|
||||||
|
|
||||||
begin
|
|
||||||
start = Time.now.to_f
|
|
||||||
|
|
||||||
options[:retry_count].times do
|
|
||||||
cleared = false
|
|
||||||
|
|
||||||
if options[:retry_timeout]
|
|
||||||
now = Time.now.to_f
|
|
||||||
break if now - start > options[:retry_timeout]
|
|
||||||
end
|
|
||||||
|
|
||||||
client.watch(key) do
|
|
||||||
begin
|
|
||||||
val = client.get(key)
|
|
||||||
|
|
||||||
if val.nil?
|
|
||||||
cleared = true
|
|
||||||
break
|
|
||||||
end
|
|
||||||
|
|
||||||
locks = clear_expired_locks(deserialize_locks(val), options)
|
|
||||||
|
|
||||||
acquisition_lock = remove_lock(locks, acquisition_token)
|
|
||||||
|
|
||||||
unless acquisition_lock
|
|
||||||
# token was already cleared
|
|
||||||
cleared = true
|
|
||||||
break
|
|
||||||
end
|
|
||||||
|
|
||||||
newval = serialize_locks(locks)
|
|
||||||
|
|
||||||
ret = client.multi do |multi|
|
|
||||||
multi.set(key, newval)
|
|
||||||
end
|
|
||||||
|
|
||||||
cleared = ret[0] == "OK"
|
|
||||||
ensure
|
|
||||||
client.unwatch
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
break if cleared
|
|
||||||
|
|
||||||
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
|
|
||||||
end
|
|
||||||
rescue => boom # rubocop:disable Lint/HandleExceptions
|
|
||||||
# since it's optimistic locking - fine if we are unable to release
|
|
||||||
raise boom if ENV["SUO_TEST"]
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def clear(key, options = {})
|
|
||||||
options = merge_defaults(options)
|
|
||||||
options[:client].del(key)
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -1,14 +0,0 @@
|
|||||||
require "securerandom"
|
|
||||||
require "monitor"
|
|
||||||
|
|
||||||
require "dalli"
|
|
||||||
require "dalli/cas/client"
|
|
||||||
|
|
||||||
require "redis"
|
|
||||||
|
|
||||||
require "msgpack"
|
|
||||||
|
|
||||||
require "suo/client/errors"
|
|
||||||
require "suo/client/base"
|
|
||||||
require "suo/client/memcached"
|
|
||||||
require "suo/client/redis"
|
|
||||||
3
lib/suo/errors.rb
Normal file
3
lib/suo/errors.rb
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
module Suo
|
||||||
|
class LockClientError < StandardError; end
|
||||||
|
end
|
||||||
@@ -1,3 +1,3 @@
|
|||||||
module Suo
|
module Suo
|
||||||
VERSION = "0.1.1"
|
VERSION = "0.3.3".freeze
|
||||||
end
|
end
|
||||||
|
|||||||
10
suo.gemspec
10
suo.gemspec
@@ -9,23 +9,25 @@ Gem::Specification.new do |spec|
|
|||||||
spec.authors = ["Nick Elser"]
|
spec.authors = ["Nick Elser"]
|
||||||
spec.email = ["nick.elser@gmail.com"]
|
spec.email = ["nick.elser@gmail.com"]
|
||||||
|
|
||||||
spec.summary = %q(Distributed semaphores using Memcached or Redis.)
|
spec.summary = %q(Distributed locks (mutexes & semaphores) using Memcached or Redis.)
|
||||||
spec.description = %q(Distributed semaphores using Memcached or Redis.)
|
spec.description = %q(Distributed locks (mutexes & semaphores) using Memcached or Redis.)
|
||||||
spec.homepage = "https://github.com/nickelser/suo"
|
spec.homepage = "https://github.com/nickelser/suo"
|
||||||
spec.license = "MIT"
|
spec.license = "MIT"
|
||||||
|
|
||||||
spec.files = `git ls-files -z`.split("\x0")
|
spec.files = `git ls-files -z`.split("\x0")
|
||||||
spec.bindir = "bin"
|
spec.bindir = "bin"
|
||||||
spec.executables = spec.files.grep(%r{^bin/}) { |f| File.basename(f) }
|
|
||||||
spec.test_files = spec.files.grep(%r{^(test|spec|features)/})
|
spec.test_files = spec.files.grep(%r{^(test|spec|features)/})
|
||||||
spec.require_paths = ["lib"]
|
spec.require_paths = ["lib"]
|
||||||
|
|
||||||
|
spec.required_ruby_version = "~> 2.0"
|
||||||
|
|
||||||
spec.add_dependency "dalli"
|
spec.add_dependency "dalli"
|
||||||
spec.add_dependency "redis"
|
spec.add_dependency "redis"
|
||||||
spec.add_dependency "msgpack"
|
spec.add_dependency "msgpack"
|
||||||
|
|
||||||
spec.add_development_dependency "bundler", "~> 1.5"
|
spec.add_development_dependency "bundler", "~> 1.5"
|
||||||
spec.add_development_dependency "rake", "~> 10.0"
|
spec.add_development_dependency "rake", "~> 10.0"
|
||||||
spec.add_development_dependency "rubocop", "~> 0.30.0"
|
spec.add_development_dependency "rubocop", "~> 0.49.0"
|
||||||
spec.add_development_dependency "minitest", "~> 5.5.0"
|
spec.add_development_dependency "minitest", "~> 5.5.0"
|
||||||
|
spec.add_development_dependency "codeclimate-test-reporter", "~> 0.4.7"
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -3,118 +3,361 @@ require "test_helper"
|
|||||||
TEST_KEY = "suo_test_key".freeze
|
TEST_KEY = "suo_test_key".freeze
|
||||||
|
|
||||||
module ClientTests
|
module ClientTests
|
||||||
def test_requires_client
|
def client(options = {})
|
||||||
exception = assert_raises(RuntimeError) do
|
@client.class.new(options[:key] || TEST_KEY, options.merge(client: @client.client))
|
||||||
@klass.lock(TEST_KEY, 1)
|
|
||||||
end
|
end
|
||||||
|
|
||||||
assert_equal "Client required", exception.message
|
def test_throws_failed_error_on_bad_client
|
||||||
|
assert_raises(Suo::LockClientError) do
|
||||||
|
client = @client.class.new(TEST_KEY, client: {})
|
||||||
|
client.lock
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def test_class_single_resource_locking
|
def test_single_resource_locking
|
||||||
lock1 = @klass.lock(TEST_KEY, 1, client: @klass_client)
|
lock1 = @client.lock
|
||||||
refute_nil lock1
|
refute_nil lock1
|
||||||
|
|
||||||
locked = @klass.locked?(TEST_KEY, 1, client: @klass_client)
|
locked = @client.locked?
|
||||||
assert_equal true, locked
|
assert_equal true, locked
|
||||||
|
|
||||||
lock2 = @klass.lock(TEST_KEY, 1, client: @klass_client)
|
lock2 = @client.lock
|
||||||
assert_nil lock2
|
assert_nil lock2
|
||||||
|
|
||||||
@klass.unlock(TEST_KEY, lock1, client: @klass_client)
|
@client.unlock(lock1)
|
||||||
|
|
||||||
|
locked = @client.locked?
|
||||||
|
|
||||||
locked = @klass.locked?(TEST_KEY, 1, client: @klass_client)
|
|
||||||
assert_equal false, locked
|
assert_equal false, locked
|
||||||
end
|
end
|
||||||
|
|
||||||
def test_class_multiple_resource_locking
|
def test_lock_with_custom_token
|
||||||
lock1 = @klass.lock(TEST_KEY, 2, client: @klass_client)
|
token = 'foo-bar'
|
||||||
|
lock = @client.lock token
|
||||||
|
assert_equal lock, token
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_empty_lock_on_invalid_data
|
||||||
|
@client.send(:initial_set, "bad value")
|
||||||
|
assert_equal false, @client.locked?
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_clear
|
||||||
|
lock1 = @client.lock
|
||||||
refute_nil lock1
|
refute_nil lock1
|
||||||
|
|
||||||
locked = @klass.locked?(TEST_KEY, 2, client: @klass_client)
|
@client.clear
|
||||||
assert_equal false, locked
|
|
||||||
|
|
||||||
lock2 = @klass.lock(TEST_KEY, 2, client: @klass_client)
|
assert_equal false, @client.locked?
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_multiple_resource_locking
|
||||||
|
@client = client(resources: 2)
|
||||||
|
|
||||||
|
lock1 = @client.lock
|
||||||
|
refute_nil lock1
|
||||||
|
|
||||||
|
assert_equal false, @client.locked?
|
||||||
|
|
||||||
|
lock2 = @client.lock
|
||||||
refute_nil lock2
|
refute_nil lock2
|
||||||
|
|
||||||
locked = @klass.locked?(TEST_KEY, 2, client: @klass_client)
|
assert_equal true, @client.locked?
|
||||||
assert_equal true, locked
|
|
||||||
|
|
||||||
@klass.unlock(TEST_KEY, lock1, client: @klass_client)
|
@client.unlock(lock1)
|
||||||
|
|
||||||
locked = @klass.locked?(TEST_KEY, 1, client: @klass_client)
|
assert_equal false, @client.locked?
|
||||||
assert_equal true, locked
|
|
||||||
|
|
||||||
@klass.unlock(TEST_KEY, lock2, client: @klass_client)
|
assert_equal 1, @client.locks.size
|
||||||
|
|
||||||
locked = @klass.locked?(TEST_KEY, 1, client: @klass_client)
|
@client.unlock(lock2)
|
||||||
assert_equal false, locked
|
|
||||||
|
assert_equal false, @client.locked?
|
||||||
|
assert_equal 0, @client.locks.size
|
||||||
end
|
end
|
||||||
|
|
||||||
def test_instance_single_resource_locking
|
def test_block_single_resource_locking
|
||||||
locked = false
|
locked = false
|
||||||
|
|
||||||
@client.lock(TEST_KEY, 1) { locked = true }
|
@client.lock { locked = true }
|
||||||
|
|
||||||
assert_equal true, locked
|
assert_equal true, locked
|
||||||
end
|
end
|
||||||
|
|
||||||
def test_instance_unlocks_on_exception
|
def test_block_unlocks_on_exception
|
||||||
assert_raises(RuntimeError) do
|
assert_raises(RuntimeError) do
|
||||||
@client.lock(TEST_KEY, 1) { fail "Test" }
|
@client.lock{ fail "Test" }
|
||||||
end
|
end
|
||||||
|
|
||||||
locked = @klass.locked?(TEST_KEY, 1, client: @klass_client)
|
assert_equal false, @client.locked?
|
||||||
assert_equal false, locked
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def test_instance_multiple_resource_locking
|
def test_readme_example
|
||||||
|
output = Queue.new
|
||||||
|
@client = client(resources: 2)
|
||||||
|
threads = []
|
||||||
|
|
||||||
|
threads << Thread.new { @client.lock { output << "One"; sleep 0.5 } }
|
||||||
|
threads << Thread.new { @client.lock { output << "Two"; sleep 0.5 } }
|
||||||
|
sleep 0.1
|
||||||
|
threads << Thread.new { @client.lock { output << "Three" } }
|
||||||
|
|
||||||
|
threads.each(&:join)
|
||||||
|
|
||||||
|
ret = []
|
||||||
|
|
||||||
|
ret << (output.size > 0 ? output.pop : nil)
|
||||||
|
ret << (output.size > 0 ? output.pop : nil)
|
||||||
|
|
||||||
|
ret.sort!
|
||||||
|
|
||||||
|
assert_equal 0, output.size
|
||||||
|
assert_equal %w(One Two), ret
|
||||||
|
assert_equal false, @client.locked?
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_block_multiple_resource_locking
|
||||||
success_counter = Queue.new
|
success_counter = Queue.new
|
||||||
failure_counter = Queue.new
|
failure_counter = Queue.new
|
||||||
|
|
||||||
|
@client = client(acquisition_timeout: 0.9, resources: 50)
|
||||||
|
|
||||||
100.times.map do |i|
|
100.times.map do |i|
|
||||||
Thread.new do
|
Thread.new do
|
||||||
success = @client.lock(TEST_KEY, 50, retry_timeout: 0.9) do
|
success = @client.lock do
|
||||||
sleep(1)
|
sleep(3)
|
||||||
success_counter << i
|
success_counter << i
|
||||||
end
|
end
|
||||||
|
|
||||||
failure_counter << i unless success
|
failure_counter << i unless success
|
||||||
end
|
end
|
||||||
end.map(&:join)
|
end.each(&:join)
|
||||||
|
|
||||||
assert_equal 50, success_counter.size
|
assert_equal 50, success_counter.size
|
||||||
assert_equal 50, failure_counter.size
|
assert_equal 50, failure_counter.size
|
||||||
|
assert_equal false, @client.locked?
|
||||||
end
|
end
|
||||||
|
|
||||||
def test_instance_multiple_resource_locking_longer_timeout
|
def test_block_multiple_resource_locking_longer_timeout
|
||||||
success_counter = Queue.new
|
success_counter = Queue.new
|
||||||
failure_counter = Queue.new
|
failure_counter = Queue.new
|
||||||
|
|
||||||
|
@client = client(acquisition_timeout: 3, resources: 50)
|
||||||
|
|
||||||
100.times.map do |i|
|
100.times.map do |i|
|
||||||
Thread.new do
|
Thread.new do
|
||||||
success = @client.lock(TEST_KEY, 50, retry_timeout: 2) do
|
success = @client.lock do
|
||||||
sleep(1)
|
sleep(0.5)
|
||||||
success_counter << i
|
success_counter << i
|
||||||
end
|
end
|
||||||
|
|
||||||
failure_counter << i unless success
|
failure_counter << i unless success
|
||||||
end
|
end
|
||||||
end.map(&:join)
|
end.each(&:join)
|
||||||
|
|
||||||
assert_equal 100, success_counter.size
|
assert_equal 100, success_counter.size
|
||||||
assert_equal 0, failure_counter.size
|
assert_equal 0, failure_counter.size
|
||||||
|
assert_equal false, @client.locked?
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_unstale_lock_acquisition
|
||||||
|
success_counter = Queue.new
|
||||||
|
failure_counter = Queue.new
|
||||||
|
|
||||||
|
@client = client(stale_lock_expiration: 0.5)
|
||||||
|
|
||||||
|
t1 = Thread.new { @client.lock { sleep 0.6; success_counter << 1 } }
|
||||||
|
sleep 0.3
|
||||||
|
t2 = Thread.new do
|
||||||
|
locked = @client.lock { success_counter << 1 }
|
||||||
|
failure_counter << 1 unless locked
|
||||||
|
end
|
||||||
|
|
||||||
|
[t1, t2].each(&:join)
|
||||||
|
|
||||||
|
assert_equal 1, success_counter.size
|
||||||
|
assert_equal 1, failure_counter.size
|
||||||
|
assert_equal false, @client.locked?
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_stale_lock_acquisition
|
||||||
|
success_counter = Queue.new
|
||||||
|
failure_counter = Queue.new
|
||||||
|
|
||||||
|
@client = client(stale_lock_expiration: 0.5)
|
||||||
|
|
||||||
|
t1 = Thread.new { @client.lock { sleep 0.6; success_counter << 1 } }
|
||||||
|
sleep 0.55
|
||||||
|
t2 = Thread.new do
|
||||||
|
locked = @client.lock { success_counter << 1 }
|
||||||
|
failure_counter << 1 unless locked
|
||||||
|
end
|
||||||
|
|
||||||
|
[t1, t2].each(&:join)
|
||||||
|
|
||||||
|
assert_equal 2, success_counter.size
|
||||||
|
assert_equal 0, failure_counter.size
|
||||||
|
assert_equal false, @client.locked?
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_refresh
|
||||||
|
@client = client(stale_lock_expiration: 0.5)
|
||||||
|
|
||||||
|
lock1 = @client.lock
|
||||||
|
|
||||||
|
assert_equal true, @client.locked?
|
||||||
|
|
||||||
|
@client.refresh(lock1)
|
||||||
|
|
||||||
|
assert_equal true, @client.locked?
|
||||||
|
|
||||||
|
sleep 0.55
|
||||||
|
|
||||||
|
assert_equal false, @client.locked?
|
||||||
|
|
||||||
|
lock2 = @client.lock
|
||||||
|
|
||||||
|
@client.refresh(lock1)
|
||||||
|
|
||||||
|
assert_equal true, @client.locked?
|
||||||
|
|
||||||
|
@client.unlock(lock1)
|
||||||
|
|
||||||
|
# edge case with refresh lock in the middle
|
||||||
|
assert_equal true, @client.locked?
|
||||||
|
|
||||||
|
@client.clear
|
||||||
|
|
||||||
|
assert_equal false, @client.locked?
|
||||||
|
|
||||||
|
@client.refresh(lock2)
|
||||||
|
|
||||||
|
assert_equal true, @client.locked?
|
||||||
|
|
||||||
|
@client.unlock(lock2)
|
||||||
|
|
||||||
|
# now finally unlocked
|
||||||
|
assert_equal false, @client.locked?
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_block_refresh
|
||||||
|
success_counter = Queue.new
|
||||||
|
failure_counter = Queue.new
|
||||||
|
|
||||||
|
@client = client(stale_lock_expiration: 0.5)
|
||||||
|
|
||||||
|
t1 = Thread.new do
|
||||||
|
@client.lock do |token|
|
||||||
|
sleep 0.6
|
||||||
|
@client.refresh(token)
|
||||||
|
sleep 1
|
||||||
|
success_counter << 1
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
t2 = Thread.new do
|
||||||
|
sleep 0.8
|
||||||
|
locked = @client.lock { success_counter << 1 }
|
||||||
|
failure_counter << 1 unless locked
|
||||||
|
end
|
||||||
|
|
||||||
|
[t1, t2].each(&:join)
|
||||||
|
|
||||||
|
assert_equal 1, success_counter.size
|
||||||
|
assert_equal 1, failure_counter.size
|
||||||
|
assert_equal false, @client.locked?
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_refresh_multi
|
||||||
|
success_counter = Queue.new
|
||||||
|
failure_counter = Queue.new
|
||||||
|
|
||||||
|
@client = client(stale_lock_expiration: 0.5, resources: 2)
|
||||||
|
|
||||||
|
t1 = Thread.new do
|
||||||
|
@client.lock do |token|
|
||||||
|
sleep 0.4
|
||||||
|
@client.refresh(token)
|
||||||
|
success_counter << 1
|
||||||
|
sleep 0.5
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
t2 = Thread.new do
|
||||||
|
sleep 0.55
|
||||||
|
locked = @client.lock do
|
||||||
|
success_counter << 1
|
||||||
|
sleep 0.5
|
||||||
|
end
|
||||||
|
|
||||||
|
failure_counter << 1 unless locked
|
||||||
|
end
|
||||||
|
|
||||||
|
t3 = Thread.new do
|
||||||
|
sleep 0.75
|
||||||
|
locked = @client.lock { success_counter << 1 }
|
||||||
|
failure_counter << 1 unless locked
|
||||||
|
end
|
||||||
|
|
||||||
|
[t1, t2, t3].each(&:join)
|
||||||
|
|
||||||
|
assert_equal 2, success_counter.size
|
||||||
|
assert_equal 1, failure_counter.size
|
||||||
|
assert_equal false, @client.locked?
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_increment_reused_client
|
||||||
|
i = 0
|
||||||
|
|
||||||
|
threads = 2.times.map do
|
||||||
|
Thread.new do
|
||||||
|
@client.lock { i += 1 }
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
threads.each(&:join)
|
||||||
|
|
||||||
|
assert_equal 2, i
|
||||||
|
assert_equal false, @client.locked?
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_increment_new_client
|
||||||
|
i = 0
|
||||||
|
|
||||||
|
threads = 2.times.map do
|
||||||
|
Thread.new do
|
||||||
|
# note this is the method that generates a *new* client
|
||||||
|
client.lock { i += 1 }
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
threads.each(&:join)
|
||||||
|
|
||||||
|
assert_equal 2, i
|
||||||
|
assert_equal false, @client.locked?
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
class TestBaseClient < Minitest::Test
|
class TestBaseClient < Minitest::Test
|
||||||
def setup
|
def setup
|
||||||
@klass = Suo::Client::Base
|
@client = Suo::Client::Base.new(TEST_KEY, client: {})
|
||||||
end
|
end
|
||||||
|
|
||||||
def test_not_implemented
|
def test_not_implemented
|
||||||
assert_raises(NotImplementedError) do
|
assert_raises(NotImplementedError) do
|
||||||
@klass.lock(TEST_KEY, 1)
|
@client.send(:get)
|
||||||
|
end
|
||||||
|
|
||||||
|
assert_raises(NotImplementedError) do
|
||||||
|
@client.send(:set, "", "")
|
||||||
|
end
|
||||||
|
|
||||||
|
assert_raises(NotImplementedError) do
|
||||||
|
@client.send(:initial_set)
|
||||||
|
end
|
||||||
|
|
||||||
|
assert_raises(NotImplementedError) do
|
||||||
|
@client.send(:clear)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
@@ -123,13 +366,13 @@ class TestMemcachedClient < Minitest::Test
|
|||||||
include ClientTests
|
include ClientTests
|
||||||
|
|
||||||
def setup
|
def setup
|
||||||
@klass = Suo::Client::Memcached
|
@dalli = Dalli::Client.new("127.0.0.1:11211")
|
||||||
@client = @klass.new
|
@client = Suo::Client::Memcached.new(TEST_KEY)
|
||||||
@klass_client = Dalli::Client.new("127.0.0.1:11211")
|
teardown
|
||||||
end
|
end
|
||||||
|
|
||||||
def teardown
|
def teardown
|
||||||
@klass_client.delete(TEST_KEY)
|
@dalli.delete(TEST_KEY)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@@ -137,13 +380,13 @@ class TestRedisClient < Minitest::Test
|
|||||||
include ClientTests
|
include ClientTests
|
||||||
|
|
||||||
def setup
|
def setup
|
||||||
@klass = Suo::Client::Redis
|
@redis = Redis.new
|
||||||
@client = @klass.new
|
@client = Suo::Client::Redis.new(TEST_KEY)
|
||||||
@klass_client = Redis.new
|
teardown
|
||||||
end
|
end
|
||||||
|
|
||||||
def teardown
|
def teardown
|
||||||
@klass_client.del(TEST_KEY)
|
@redis.del(TEST_KEY)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|||||||
@@ -1,9 +1,13 @@
|
|||||||
$LOAD_PATH.unshift File.expand_path("../../lib", __FILE__)
|
$LOAD_PATH.unshift File.expand_path("../../lib", __FILE__)
|
||||||
|
|
||||||
|
if ENV["CODECLIMATE_REPO_TOKEN"]
|
||||||
|
require "codeclimate-test-reporter"
|
||||||
|
CodeClimate::TestReporter.start
|
||||||
|
end
|
||||||
|
|
||||||
require "suo"
|
require "suo"
|
||||||
require "thread"
|
require "thread"
|
||||||
require "minitest/autorun"
|
require "minitest/autorun"
|
||||||
require "minitest/benchmark"
|
require "minitest/benchmark"
|
||||||
|
|
||||||
ENV["SUO_TEST"] = "true"
|
ENV["SUO_TEST"] = "true"
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user