65 Commits

Author SHA1 Message Date
Nick Elser
b9d3f1b7a1 Bump version and update changelog. 2019-09-04 14:36:40 -07:00
Nick Elser
270c05b80e Merge pull request #10 from levkk/levkk/support-pooled-clients
Add support for pooled memcached clients by using #with
2019-09-04 14:33:26 -07:00
Lev Kokotov
60e167e146 relax bundler dependency version 2019-08-22 16:16:57 -07:00
Lev Kokotov
ad08c8b5ea bump Bundler to 2 2019-08-22 16:14:01 -07:00
Lev Kokotov
9b8ef6c244 Add support for pooled memcached clients by using #with 2019-08-22 16:04:14 -07:00
Nick Elser
b8a1d7d9ac Merge pull request #8 from nickelser/update_changelog
Update changelog
2018-10-05 14:06:42 -07:00
Nick Elser
c58a247156 Run on more modern rubies, as well. 2018-10-05 16:40:25 -04:00
Nick Elser
8c37c24ee6 Merge branch 'master' into update_changelog 2018-10-05 16:31:49 -04:00
Nick Elser
29da8cf090 Add changelog entry, remove spurious gemspec entry. 2018-10-05 16:31:33 -04:00
Nick Elser
8ed488f071 Merge pull request #5 from keylimetoolbox/double-initial-set
Fix #initial_set which is causing a double attempt and delay on lock acquisition and incorrect drop on short acquisition_timeout
2018-10-05 13:28:16 -07:00
Nick Elser
152b6acf9c Merge pull request #7 from nickelser/update_rubocop
Update Rubocop, and bump the version.
2018-10-05 13:25:44 -07:00
Nick Elser
5e10afe534 Update Rubocop, and bump the version. 2018-10-05 16:14:49 -04:00
Nick Elser
0423eb9e12 Merge pull request #6 from GandalftheGUI/ian_remillard/remove_keys_after_last_lock_released
Add 'Time To Live' to mitigate potential memory leak
2018-10-05 13:06:29 -07:00
Ian Remillard
ca46f5f369 add default for expire on set 2018-10-01 10:48:50 -07:00
Ian Remillard
1022a6f9d3 move to expire only when all locks are released 2018-10-01 10:35:30 -07:00
Ian Remillard
6be3a5bdda edits to docs 2018-09-28 16:43:44 -07:00
Ian Remillard
aa4da5d739 update docs for new options 2018-09-28 13:19:01 -07:00
Ian Remillard
fdb0b7f9d5 adds lock ttl and lock_release_removes_key 2018-09-28 12:43:52 -07:00
Jeremy Wadsack
a13edcf7d1 Fix #initial_set which is causing a double attempt and delay on lock acquisition
The call to `#initial_set` in `#retry` and `#acquire_lock` is followed by `next` which leads to a second pass through the `#retry_with_timeout` loop and a sleep call for up to `:acquisition_delay`. This delay isn't necessary if the value can be set without a race condition.

Removing the `next` call causes the client to continue to retry because the transaction has been changed outside the transaction boundary:

In Redis, calling `SET` within a `WATCH`/`UNWATCH` block but not inside a `MULTI`/`EXEC` block will [cause the EXEC to fail the transaction](https://github.com/antirez/redis-doc/issues/734), so the first `#set` call fails and it requires a second pass. To resolve this I changed `#initial_set` to call `#set` within a `MULTI` block so that it would be inside the transaction.

In Memcache the call to `SET` without the `CAS` during `#initial_set` is going to cause the `SET` with `CAS` to fail (return `EXISTS`), and resulting in a second pass. To resolve this I changed `#initial_set` to use `SET` with `CAS` and return the CAS value to be used in the subsequent `#set` call that stores the lock token.
2018-08-30 13:06:16 -07:00
Nick Elser
af1c476f08 Bump version. 2016-10-06 10:22:36 -07:00
Nick Elser
58fae54022 Minor style fixes. 2016-10-06 10:22:29 -07:00
Nick Elser
2088fd90b3 Merge pull request #1 from Shuttlerock/master
Allow to use custom token for lock
2016-10-06 10:20:35 -07:00
Vokhmin Alexey V
05661e143c Allow to use custom token for lock 2016-10-05 13:47:10 +03:00
Nick Elser
a23282dcc6 don't go around allocating empty strings willy-nilly 2015-05-07 00:16:28 -07:00
Nick Elser
323caaee9b update readme 2015-04-15 23:14:06 -07:00
Nick Elser
745d49466f release v0.3.0 2015-04-15 23:11:06 -07:00
Nick Elser
161d50deb9 update tests for new interface 2015-04-15 23:10:34 -07:00
Nick Elser
81e4a3e143 dramatically simpify interface by forcing key at initialization 2015-04-15 23:10:21 -07:00
Nick Elser
2960c14a4d use same language for summary + description 2015-04-13 22:28:42 -07:00
Nick Elser
308e918e60 on second thought, remove confusing language 2015-04-13 22:20:44 -07:00
Nick Elser
aaee69a2df release v0.2.3 2015-04-13 22:15:14 -07:00
Nick Elser
c6d1c29ada add additional deadlock tests 2015-04-13 22:13:15 -07:00
Nick Elser
14e442e99d remove semaphore language and clarify language 2015-04-13 22:13:07 -07:00
Nick Elser
498073b92e tiny code style improvements 2015-04-13 21:55:34 -07:00
Nick Elser
155a3ac40c release v0.2.2 2015-04-13 21:45:47 -07:00
Nick Elser
10d3ab09cf handle invalid lock data 2015-04-13 21:40:54 -07:00
Nick Elser
2724ec6d9d add another test case for the nil refresh case 2015-04-13 21:32:01 -07:00
Nick Elser
185327f59c fix documentation and add another test for refresh 2015-04-13 21:21:45 -07:00
Nick Elser
c8a972da31 more tests for (still not great) refresh method 2015-04-13 20:29:23 -07:00
Nick Elser
7591c08a28 avoid name collision for locks method 2015-04-13 20:29:03 -07:00
Nick Elser
1dee338e16 slightly more coverage 2015-04-13 19:42:50 -07:00
Nick Elser
900c723043 only report to codeclimate when credentials passed in 2015-04-13 19:37:58 -07:00
Nick Elser
6e2afdf80a add tests for refresh and slight refactor 2015-04-13 19:37:49 -07:00
Nick Elser
49a9757d44 fix refresh token 2015-04-13 19:37:34 -07:00
Nick Elser
754d7d8faf report test coverage 2015-04-13 10:12:26 -07:00
Nick Elser
857fc63378 release 0.2.1 2015-04-12 23:45:09 -07:00
Nick Elser
bb6762bbc6 ret is not always an array 2015-04-12 23:45:00 -07:00
Nick Elser
f0977c89f2 remove redundant require file 2015-04-12 23:03:54 -07:00
Nick Elser
4d5c96309f some style fixes 2015-04-12 22:54:53 -07:00
Nick Elser
8d6061b137 rename some tests, fix rare edge condition 2015-04-12 22:49:02 -07:00
Nick Elser
ce0a4d8d86 release 0.2.0 2015-04-12 22:33:19 -07:00
Nick Elser
1fd769eec2 refactor class methods into instance methods 2015-04-12 22:32:51 -07:00
Nick Elser
37be5ae27b bump version, update changelog 2015-04-12 20:58:18 -07:00
Nick Elser
a1a226fb59 account for variabilities in test memcached 2015-04-12 20:57:52 -07:00
Nick Elser
8d7ddaf35a move logic around loop counts to a more reasonable location 2015-04-12 20:57:29 -07:00
Nick Elser
1aacc0c1a1 properly throw lock LockClientError 2015-04-12 20:47:35 -07:00
Nick Elser
8166c6b51d specify ruby version 2015-04-12 19:53:47 -07:00
Nick Elser
7662743123 update changelog 2015-04-12 19:47:35 -07:00
Nick Elser
a2fa281b86 fix test cases when memcached is delayed 2015-04-12 19:47:23 -07:00
Nick Elser
887219b63d release 0.1.2 2015-04-12 19:43:59 -07:00
Nick Elser
c0905fef91 faster unpack without exceptions 2015-04-12 19:43:07 -07:00
Nick Elser
a54b795e20 simplify defaults logic and fix retry timeouts 2015-04-12 19:42:59 -07:00
Nick Elser
0828cd546b add gem info 2015-04-12 17:54:38 -07:00
Nick Elser
c2b9de4cf3 refactor shared logic into base class 2015-04-12 17:54:30 -07:00
Nick Elser
d4860423fa whitespace 2015-04-12 16:03:53 -07:00
15 changed files with 631 additions and 439 deletions

View File

@@ -74,7 +74,7 @@ Style/SpaceInsideBrackets:
Style/AndOr:
Enabled: false
Style/TrailingComma:
Style/TrailingCommaInLiteral:
Enabled: true
Style/SpaceBeforeComma:
@@ -98,7 +98,7 @@ Style/SpaceAfterColon:
Style/SpaceAfterComma:
Enabled: true
Style/SpaceAfterControlKeyword:
Style/SpaceAroundKeyword:
Enabled: true
Style/SpaceAfterNot:
@@ -163,7 +163,7 @@ Style/StringLiterals:
EnforcedStyle: double_quotes
Metrics/CyclomaticComplexity:
Max: 8
Max: 10
Metrics/LineLength:
Max: 128
@@ -214,3 +214,6 @@ Metrics/ParameterLists:
Metrics/PerceivedComplexity:
Enabled: false
Style/Documentation:
Enabled: false

View File

@@ -1,6 +1,9 @@
language: ruby
rvm:
- 2.2.0
- 2.2.6
- 2.3.7
- 2.4.4
- 2.5.1
services:
- memcached
- redis-server

View File

@@ -1,7 +1,56 @@
## 0.3.4
- Support for connection pooling when using memcached locks, via `with` blocks using Dalli (thanks to Lev).
## 0.3.3
- Default TTL for keys to allow for short-lived locking keys (thanks to Ian Remillard) without leaking memory.
- Vastly improve initial lock acquisition, especially on Redis (thanks to Jeremy Wadscak).
## 0.3.2
- Custom lock tokens (thanks to avokhmin).
## 0.3.1
- Slight memory leak fix.
## 0.3.0
- Dramatically simplify the interface by forcing clients to specify the key & resources at lock initialization instead of every method call.
## 0.2.3
- Clarify documentation further with respect to semaphores.
## 0.2.2
- Fix bug with refresh - typo would've prevented real use.
- Clean up code.
- Improve documentation a bit.
- 100% test coverage.
## 0.2.1
- Fix bug when dealing with real-world Redis error conditions.
## 0.2.0
- Refactor class methods into instance methods to simplify implementation.
- Increase thread safety with Memcached implementation.
## 0.1.3
- Properly throw Suo::LockClientError when the connection itself fails (Memcache server not reachable, etc.)
## 0.1.2
- Fix retry_timeout to properly use the full time (was being calculated incorrectly).
- Refactor client implementations to re-use more code.
## 0.1.1
- Use [MessagePack](https://github.com/msgpack/msgpack-ruby) for semaphore serialization.
- Use [MessagePack](https://github.com/msgpack/msgpack-ruby) for lock serialization.
## 0.1.0

View File

@@ -1,8 +1,8 @@
# Suo [![Build Status](https://travis-ci.org/nickelser/suo.png?branch=master)](https://travis-ci.org/nickelser/suo)
# Suo [![Build Status](https://travis-ci.org/nickelser/suo.svg?branch=master)](https://travis-ci.org/nickelser/suo) [![Code Climate](https://codeclimate.com/github/nickelser/suo/badges/gpa.svg)](https://codeclimate.com/github/nickelser/suo) [![Test Coverage](https://codeclimate.com/github/nickelser/suo/badges/coverage.svg)](https://codeclimate.com/github/nickelser/suo) [![Gem Version](https://badge.fury.io/rb/suo.svg)](http://badge.fury.io/rb/suo)
:lock: Distributed semaphores using Memcached or Redis in Ruby.
Suo provides a very performant distributed lock solution using Compare-And-Set (`CAS`) commands in Memcached, and `WATCH/MULTI` in Redis.
Suo provides a very performant distributed lock solution using Compare-And-Set (`CAS`) commands in Memcached, and `WATCH/MULTI` in Redis. It allows locking both single exclusion (like a mutex - sharing one resource), as well as multiple resources.
## Installation
@@ -18,35 +18,75 @@ gem 'suo'
```ruby
# Memcached
suo = Suo::Client::Memcached.new(connection: "127.0.0.1:11211")
suo = Suo::Client::Memcached.new("foo_resource", connection: "127.0.0.1:11211")
# Redis
suo = Suo::Client::Redis.new(connection: {host: "10.0.1.1"})
suo = Suo::Client::Redis.new("baz_resource", connection: {host: "10.0.1.1"})
# Pre-existing client
suo = Suo::Client::Memcached.new(client: some_dalli_client)
suo = Suo::Client::Memcached.new("bar_resource", client: some_dalli_client)
suo.lock("some_key") do
suo.lock do
# critical code here
@puppies.pet!
end
2.times do
Thread.new do
# second argument is the number of resources - so this will run twice
suo.lock("other_key", 2, timeout: 0.5) { puts "Will run twice!" }
# The resources argument is the number of resources the semaphore will allow to lock (defaulting to one - a mutex)
suo = Suo::Client::Memcached.new("bar_resource", client: some_dalli_client, resources: 2)
Thread.new { suo.lock { puts "One"; sleep 2 } }
Thread.new { suo.lock { puts "Two"; sleep 2 } }
Thread.new { suo.lock { puts "Three" } }
# will print "One" "Two", but not "Three", as there are only 2 resources
# custom acquisition timeouts (time to acquire)
suo = Suo::Client::Memcached.new("protected_key", client: some_dalli_client, acquisition_timeout: 1) # in seconds
# manually locking/unlocking
# the return value from lock without a block is a unique token valid only for the current lock
# which must be unlocked manually
token = suo.lock
foo.baz!
suo.unlock(token)
# custom stale lock expiration (cleaning of dead locks)
suo = Suo::Client::Redis.new("other_key", client: some_redis_client, stale_lock_expiration: 60*5)
```
### Stale locks
"Stale locks" - those acquired more than `stale_lock_expiration` (defaulting to 3600 or one hour) ago - are automatically cleared during any operation on the key (`lock`, `unlock`, `refresh`). The `locked?` method will not return true if only stale locks exist, but will not modify the key itself.
To re-acquire a lock in the middle of a block, you can use the refresh method on client.
```ruby
suo = Suo::Client::Redis.new("foo")
# lock is the same token as seen in the manual example, above
suo.lock do |token|
5.times do
baz.bar!
suo.refresh(token)
end
end
```
### Time To Live
```ruby
Suo::Client::Redis.new("bar_resource", ttl: 60) #ttl in seconds
```
A key representing a set of lockable resources is removed once the last resource lock is released and the `ttl` time runs out. When another lock is acquired and the key has been removed the key has to be recreated.
## TODO
- better stale key handling (refresh blocks)
- more race condition tests
- refactor clients to re-use more code
## History
View the [changelog](https://github.com/nickelser/suo/blob/master/CHANGELOG.md)
View the [changelog](https://github.com/nickelser/suo/blob/master/CHANGELOG.md).
## Contributing

View File

@@ -1,2 +1,16 @@
require "securerandom"
require "monitor"
require "dalli"
require "dalli/cas/client"
require "redis"
require "msgpack"
require "suo/version"
require "suo/clients"
require "suo/errors"
require "suo/client/base"
require "suo/client/memcached"
require "suo/client/redis"

View File

@@ -2,104 +2,177 @@ module Suo
module Client
class Base
DEFAULT_OPTIONS = {
retry_count: 3,
retry_delay: 0.01,
stale_lock_expiration: 3600
acquisition_timeout: 0.1,
acquisition_delay: 0.01,
stale_lock_expiration: 3600,
resources: 1,
ttl: 60,
}.freeze
def initialize(options = {})
@options = self.class.merge_defaults(options).merge(_initialized: true)
BLANK_STR = "".freeze
attr_accessor :client, :key, :resources, :options
include MonitorMixin
def initialize(key, options = {})
fail "Client required" unless options[:client]
@options = DEFAULT_OPTIONS.merge(options)
@retry_count = (@options[:acquisition_timeout] / @options[:acquisition_delay].to_f).ceil
@client = @options[:client]
@resources = @options[:resources].to_i
@key = key
super() # initialize Monitor mixin for thread safety
end
def lock(key, resources = 1, options = {})
options = self.class.merge_defaults(@options.merge(options))
token = self.class.lock(key, resources, options)
def lock(custom_token = nil)
token = acquire_lock(custom_token)
if token
if block_given? && token
begin
yield if block_given?
yield
ensure
self.class.unlock(key, token, options)
unlock(token)
end
true
else
false
token
end
end
def locked?(key, resources = 1)
self.class.locked?(key, resources, @options)
end
class << self
def lock(key, resources = 1, options = {}) # rubocop:disable Lint/UnusedMethodArgument
fail NotImplementedError
end
def locked?(key, resources = 1, options = {})
options = merge_defaults(options)
client = options[:client]
locks = deserialize_locks(client.get(key))
def locked?
locks.size >= resources
end
def locks(key, options)
options = merge_defaults(options)
client = options[:client]
locks = deserialize_locks(client.get(key))
def locks
val, _ = get
cleared_locks = deserialize_and_clear_locks(val)
locks.size
cleared_locks
end
def refresh(key, acquisition_token, options = {}) # rubocop:disable Lint/UnusedMethodArgument
def refresh(token)
retry_with_timeout do
val, cas = get
cas = initial_set if val.nil?
cleared_locks = deserialize_and_clear_locks(val)
refresh_lock(cleared_locks, token)
break if set(serialize_locks(cleared_locks), cas, expire: cleared_locks.empty?)
end
end
def unlock(token)
return unless token
retry_with_timeout do
val, cas = get
break if val.nil?
cleared_locks = deserialize_and_clear_locks(val)
acquisition_lock = remove_lock(cleared_locks, token)
break unless acquisition_lock
break if set(serialize_locks(cleared_locks), cas, expire: cleared_locks.empty?)
end
rescue LockClientError => _ # rubocop:disable Lint/HandleExceptions
# ignore - assume success due to optimistic locking
end
def clear
fail NotImplementedError
end
def unlock(key, acquisition_token, options = {}) # rubocop:disable Lint/UnusedMethodArgument
fail NotImplementedError
end
def clear(key, options = {}) # rubocop:disable Lint/UnusedMethodArgument
fail NotImplementedError
end
def merge_defaults(options = {})
unless options[:_initialized]
options = self::DEFAULT_OPTIONS.merge(options)
fail "Client required" unless options[:client]
end
if options[:retry_timeout]
options[:retry_count] = (options[:retry_timeout] / options[:retry_delay].to_f).floor
end
options
end
private
attr_accessor :retry_count
def acquire_lock(token = nil)
token ||= SecureRandom.base64(16)
retry_with_timeout do
val, cas = get
cas = initial_set if val.nil?
cleared_locks = deserialize_and_clear_locks(val)
if cleared_locks.size < resources
add_lock(cleared_locks, token)
newval = serialize_locks(cleared_locks)
return token if set(newval, cas)
end
end
nil
end
def get
fail NotImplementedError
end
def set(newval, cas) # rubocop:disable Lint/UnusedMethodArgument
fail NotImplementedError
end
def initial_set(val = BLANK_STR) # rubocop:disable Lint/UnusedMethodArgument
fail NotImplementedError
end
def synchronize
mon_synchronize { yield }
end
def retry_with_timeout
start = Time.now.to_f
retry_count.times do
elapsed = Time.now.to_f - start
break if elapsed >= options[:acquisition_timeout]
synchronize do
yield
end
sleep(rand(options[:acquisition_delay] * 1000).to_f / 1000)
end
rescue => _
raise LockClientError
end
def serialize_locks(locks)
MessagePack.pack(locks.map { |time, token| [time.to_f, token] })
end
def deserialize_and_clear_locks(val)
clear_expired_locks(deserialize_locks(val))
end
def deserialize_locks(val)
MessagePack.unpack(val).map do |time, token|
unpacked = (val.nil? || val == BLANK_STR) ? [] : MessagePack.unpack(val)
unpacked.map do |time, token|
[Time.at(time), token]
end
rescue EOFError => _
rescue EOFError, MessagePack::MalformedFormatError => _
[]
end
def clear_expired_locks(locks, options)
def clear_expired_locks(locks)
expired = Time.now - options[:stale_lock_expiration]
locks.reject { |time, _| time < expired }
end
def add_lock(locks, token)
locks << [Time.now.to_f, token]
def add_lock(locks, token, time = Time.now.to_f)
locks << [time, token]
end
def remove_lock(locks, acquisition_token)
@@ -109,8 +182,7 @@ module Suo
def refresh_lock(locks, acquisition_token)
remove_lock(locks, acquisition_token)
add_lock(locks, token)
end
add_lock(locks, acquisition_token)
end
end
end

View File

@@ -1,7 +0,0 @@
module Suo
module Client
module Errors
class FailedToAcquireLock < StandardError; end
end
end
end

View File

@@ -1,135 +1,34 @@
module Suo
module Client
class Memcached < Base
def initialize(options = {})
def initialize(key, options = {})
options[:client] ||= Dalli::Client.new(options[:connection] || ENV["MEMCACHE_SERVERS"] || "127.0.0.1:11211")
super
end
class << self
def lock(key, resources = 1, options = {})
options = merge_defaults(options)
acquisition_token = nil
token = SecureRandom.base64(16)
client = options[:client]
begin
start = Time.now.to_f
options[:retry_count].times do
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
def clear
@client.with { |client| client.delete(@key) }
end
val, cas = client.get_cas(key)
private
# no key has been set yet; we could simply set it, but would lead to race conditions on the initial setting
if val.nil?
client.set(key, "")
next
def get
@client.with { |client| client.get_cas(@key) }
end
locks = clear_expired_locks(deserialize_locks(val.to_s), options)
if locks.size < resources
add_lock(locks, token)
newval = serialize_locks(locks)
if client.set_cas(key, newval, cas)
acquisition_token = token
break
def set(newval, cas, expire: false)
if expire
@client.with { |client| client.set_cas(@key, newval, cas, @options[:ttl]) }
else
@client.with { |client| client.set_cas(@key, newval, cas) }
end
end
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => _
raise FailedToAcquireLock
end
acquisition_token
end
def refresh(key, acquisition_token, options = {})
options = merge_defaults(options)
client = options[:client]
begin
start = Time.now.to_f
options[:retry_count].times do
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
val, cas = client.get_cas(key)
# much like with initial set - ensure the key is here
if val.nil?
client.set(key, "")
next
end
locks = clear_expired_locks(deserialize_locks(val), options)
refresh_lock(locks, acquisition_token)
newval = serialize_locks(locks)
break if client.set_cas(key, newval, cas)
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => _
raise FailedToAcquireLock
end
end
def unlock(key, acquisition_token, options = {})
options = merge_defaults(options)
client = options[:client]
return unless acquisition_token
begin
start = Time.now.to_f
options[:retry_count].times do
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
val, cas = client.get_cas(key)
break if val.nil? # lock has expired totally
locks = clear_expired_locks(deserialize_locks(val), options)
acquisition_lock = remove_lock(locks, acquisition_token)
break unless acquisition_lock
newval = serialize_locks(locks)
break if client.set_cas(key, newval, cas)
# another client cleared a token in the interim - try again!
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => boom # rubocop:disable Lint/HandleExceptions
# since it's optimistic locking - fine if we are unable to release
raise boom if ENV["SUO_TEST"]
end
end
def clear(key, options = {})
options = merge_defaults(options)
options[:client].delete(key)
def initial_set(val = BLANK_STR)
@client.with do |client|
client.set(@key, val)
_val, cas = client.get_cas(@key)
cas
end
end
end

View File

@@ -1,165 +1,46 @@
module Suo
module Client
class Redis < Base
def initialize(options = {})
OK_STR = "OK".freeze
def initialize(key, options = {})
options[:client] ||= ::Redis.new(options[:connection] || {})
super
end
class << self
def lock(key, resources = 1, options = {})
options = merge_defaults(options)
acquisition_token = nil
token = SecureRandom.base64(16)
client = options[:client]
begin
start = Time.now.to_f
options[:retry_count].times do
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
def clear
@client.del(@key)
end
client.watch(key) do
begin
val = client.get(key)
private
locks = clear_expired_locks(deserialize_locks(val.to_s), options)
if locks.size < resources
add_lock(locks, token)
newval = serialize_locks(locks)
ret = client.multi do |multi|
multi.set(key, newval)
def get
[@client.get(@key), nil]
end
acquisition_token = token if ret[0] == "OK"
def set(newval, _, expire: false)
ret = @client.multi do |multi|
if expire
multi.setex(@key, @options[:ttl], newval)
else
multi.set(@key, newval)
end
end
ret && ret[0] == OK_STR
end
def synchronize
@client.watch(@key) do
yield
end
ensure
client.unwatch
end
@client.unwatch
end
break if acquisition_token
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => _
raise Suo::Client::FailedToAcquireLock
end
acquisition_token
end
def refresh(key, acquisition_token, options = {})
options = merge_defaults(options)
client = options[:client]
refreshed = false
begin
start = Time.now.to_f
options[:retry_count].times do
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
client.watch(key) do
begin
val = client.get(key)
locks = clear_expired_locks(deserialize_locks(val), options)
refresh_lock(locks, acquisition_token)
newval = serialize_locks(locks)
ret = client.multi do |multi|
multi.set(key, newval)
end
refreshed = ret[0] == "OK"
ensure
client.unwatch
end
end
break if refreshed
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => _
raise Suo::Client::FailedToAcquireLock
end
end
def unlock(key, acquisition_token, options = {})
options = merge_defaults(options)
client = options[:client]
return unless acquisition_token
begin
start = Time.now.to_f
options[:retry_count].times do
cleared = false
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
client.watch(key) do
begin
val = client.get(key)
if val.nil?
cleared = true
break
end
locks = clear_expired_locks(deserialize_locks(val), options)
acquisition_lock = remove_lock(locks, acquisition_token)
unless acquisition_lock
# token was already cleared
cleared = true
break
end
newval = serialize_locks(locks)
ret = client.multi do |multi|
multi.set(key, newval)
end
cleared = ret[0] == "OK"
ensure
client.unwatch
end
end
break if cleared
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => boom # rubocop:disable Lint/HandleExceptions
# since it's optimistic locking - fine if we are unable to release
raise boom if ENV["SUO_TEST"]
end
end
def clear(key, options = {})
options = merge_defaults(options)
options[:client].del(key)
end
def initial_set(val = BLANK_STR)
set(val, nil)
nil
end
end
end

View File

@@ -1,14 +0,0 @@
require "securerandom"
require "monitor"
require "dalli"
require "dalli/cas/client"
require "redis"
require "msgpack"
require "suo/client/errors"
require "suo/client/base"
require "suo/client/memcached"
require "suo/client/redis"

3
lib/suo/errors.rb Normal file
View File

@@ -0,0 +1,3 @@
module Suo
class LockClientError < StandardError; end
end

View File

@@ -1,3 +1,3 @@
module Suo
VERSION = "0.1.1"
VERSION = "0.3.4".freeze
end

View File

@@ -9,23 +9,25 @@ Gem::Specification.new do |spec|
spec.authors = ["Nick Elser"]
spec.email = ["nick.elser@gmail.com"]
spec.summary = %q(Distributed semaphores using Memcached or Redis.)
spec.description = %q(Distributed semaphores using Memcached or Redis.)
spec.summary = %q(Distributed locks (mutexes & semaphores) using Memcached or Redis.)
spec.description = %q(Distributed locks (mutexes & semaphores) using Memcached or Redis.)
spec.homepage = "https://github.com/nickelser/suo"
spec.license = "MIT"
spec.files = `git ls-files -z`.split("\x0")
spec.bindir = "bin"
spec.executables = spec.files.grep(%r{^bin/}) { |f| File.basename(f) }
spec.test_files = spec.files.grep(%r{^(test|spec|features)/})
spec.require_paths = ["lib"]
spec.required_ruby_version = "~> 2.0"
spec.add_dependency "dalli"
spec.add_dependency "redis"
spec.add_dependency "msgpack"
spec.add_development_dependency "bundler", "~> 1.5"
spec.add_development_dependency "bundler"
spec.add_development_dependency "rake", "~> 10.0"
spec.add_development_dependency "rubocop", "~> 0.30.0"
spec.add_development_dependency "rubocop", "~> 0.49.0"
spec.add_development_dependency "minitest", "~> 5.5.0"
spec.add_development_dependency "codeclimate-test-reporter", "~> 0.4.7"
end

View File

@@ -3,118 +3,361 @@ require "test_helper"
TEST_KEY = "suo_test_key".freeze
module ClientTests
def test_requires_client
exception = assert_raises(RuntimeError) do
@klass.lock(TEST_KEY, 1)
def client(options = {})
@client.class.new(options[:key] || TEST_KEY, options.merge(client: @client.client))
end
assert_equal "Client required", exception.message
def test_throws_failed_error_on_bad_client
assert_raises(Suo::LockClientError) do
client = @client.class.new(TEST_KEY, client: {})
client.lock
end
end
def test_class_single_resource_locking
lock1 = @klass.lock(TEST_KEY, 1, client: @klass_client)
def test_single_resource_locking
lock1 = @client.lock
refute_nil lock1
locked = @klass.locked?(TEST_KEY, 1, client: @klass_client)
locked = @client.locked?
assert_equal true, locked
lock2 = @klass.lock(TEST_KEY, 1, client: @klass_client)
lock2 = @client.lock
assert_nil lock2
@klass.unlock(TEST_KEY, lock1, client: @klass_client)
@client.unlock(lock1)
locked = @client.locked?
locked = @klass.locked?(TEST_KEY, 1, client: @klass_client)
assert_equal false, locked
end
def test_class_multiple_resource_locking
lock1 = @klass.lock(TEST_KEY, 2, client: @klass_client)
def test_lock_with_custom_token
token = 'foo-bar'
lock = @client.lock token
assert_equal lock, token
end
def test_empty_lock_on_invalid_data
@client.send(:initial_set, "bad value")
assert_equal false, @client.locked?
end
def test_clear
lock1 = @client.lock
refute_nil lock1
locked = @klass.locked?(TEST_KEY, 2, client: @klass_client)
assert_equal false, locked
@client.clear
lock2 = @klass.lock(TEST_KEY, 2, client: @klass_client)
assert_equal false, @client.locked?
end
def test_multiple_resource_locking
@client = client(resources: 2)
lock1 = @client.lock
refute_nil lock1
assert_equal false, @client.locked?
lock2 = @client.lock
refute_nil lock2
locked = @klass.locked?(TEST_KEY, 2, client: @klass_client)
assert_equal true, locked
assert_equal true, @client.locked?
@klass.unlock(TEST_KEY, lock1, client: @klass_client)
@client.unlock(lock1)
locked = @klass.locked?(TEST_KEY, 1, client: @klass_client)
assert_equal true, locked
assert_equal false, @client.locked?
@klass.unlock(TEST_KEY, lock2, client: @klass_client)
assert_equal 1, @client.locks.size
locked = @klass.locked?(TEST_KEY, 1, client: @klass_client)
assert_equal false, locked
@client.unlock(lock2)
assert_equal false, @client.locked?
assert_equal 0, @client.locks.size
end
def test_instance_single_resource_locking
def test_block_single_resource_locking
locked = false
@client.lock(TEST_KEY, 1) { locked = true }
@client.lock { locked = true }
assert_equal true, locked
end
def test_instance_unlocks_on_exception
def test_block_unlocks_on_exception
assert_raises(RuntimeError) do
@client.lock(TEST_KEY, 1) { fail "Test" }
@client.lock{ fail "Test" }
end
locked = @klass.locked?(TEST_KEY, 1, client: @klass_client)
assert_equal false, locked
assert_equal false, @client.locked?
end
def test_instance_multiple_resource_locking
def test_readme_example
output = Queue.new
@client = client(resources: 2)
threads = []
threads << Thread.new { @client.lock { output << "One"; sleep 0.5 } }
threads << Thread.new { @client.lock { output << "Two"; sleep 0.5 } }
sleep 0.1
threads << Thread.new { @client.lock { output << "Three" } }
threads.each(&:join)
ret = []
ret << (output.size > 0 ? output.pop : nil)
ret << (output.size > 0 ? output.pop : nil)
ret.sort!
assert_equal 0, output.size
assert_equal %w(One Two), ret
assert_equal false, @client.locked?
end
def test_block_multiple_resource_locking
success_counter = Queue.new
failure_counter = Queue.new
@client = client(acquisition_timeout: 0.9, resources: 50)
100.times.map do |i|
Thread.new do
success = @client.lock(TEST_KEY, 50, retry_timeout: 0.9) do
sleep(1)
success = @client.lock do
sleep(3)
success_counter << i
end
failure_counter << i unless success
end
end.map(&:join)
end.each(&:join)
assert_equal 50, success_counter.size
assert_equal 50, failure_counter.size
assert_equal false, @client.locked?
end
def test_instance_multiple_resource_locking_longer_timeout
def test_block_multiple_resource_locking_longer_timeout
success_counter = Queue.new
failure_counter = Queue.new
@client = client(acquisition_timeout: 3, resources: 50)
100.times.map do |i|
Thread.new do
success = @client.lock(TEST_KEY, 50, retry_timeout: 2) do
sleep(1)
success = @client.lock do
sleep(0.5)
success_counter << i
end
failure_counter << i unless success
end
end.map(&:join)
end.each(&:join)
assert_equal 100, success_counter.size
assert_equal 0, failure_counter.size
assert_equal false, @client.locked?
end
def test_unstale_lock_acquisition
success_counter = Queue.new
failure_counter = Queue.new
@client = client(stale_lock_expiration: 0.5)
t1 = Thread.new { @client.lock { sleep 0.6; success_counter << 1 } }
sleep 0.3
t2 = Thread.new do
locked = @client.lock { success_counter << 1 }
failure_counter << 1 unless locked
end
[t1, t2].each(&:join)
assert_equal 1, success_counter.size
assert_equal 1, failure_counter.size
assert_equal false, @client.locked?
end
def test_stale_lock_acquisition
success_counter = Queue.new
failure_counter = Queue.new
@client = client(stale_lock_expiration: 0.5)
t1 = Thread.new { @client.lock { sleep 0.6; success_counter << 1 } }
sleep 0.55
t2 = Thread.new do
locked = @client.lock { success_counter << 1 }
failure_counter << 1 unless locked
end
[t1, t2].each(&:join)
assert_equal 2, success_counter.size
assert_equal 0, failure_counter.size
assert_equal false, @client.locked?
end
def test_refresh
@client = client(stale_lock_expiration: 0.5)
lock1 = @client.lock
assert_equal true, @client.locked?
@client.refresh(lock1)
assert_equal true, @client.locked?
sleep 0.55
assert_equal false, @client.locked?
lock2 = @client.lock
@client.refresh(lock1)
assert_equal true, @client.locked?
@client.unlock(lock1)
# edge case with refresh lock in the middle
assert_equal true, @client.locked?
@client.clear
assert_equal false, @client.locked?
@client.refresh(lock2)
assert_equal true, @client.locked?
@client.unlock(lock2)
# now finally unlocked
assert_equal false, @client.locked?
end
def test_block_refresh
success_counter = Queue.new
failure_counter = Queue.new
@client = client(stale_lock_expiration: 0.5)
t1 = Thread.new do
@client.lock do |token|
sleep 0.6
@client.refresh(token)
sleep 1
success_counter << 1
end
end
t2 = Thread.new do
sleep 0.8
locked = @client.lock { success_counter << 1 }
failure_counter << 1 unless locked
end
[t1, t2].each(&:join)
assert_equal 1, success_counter.size
assert_equal 1, failure_counter.size
assert_equal false, @client.locked?
end
def test_refresh_multi
success_counter = Queue.new
failure_counter = Queue.new
@client = client(stale_lock_expiration: 0.5, resources: 2)
t1 = Thread.new do
@client.lock do |token|
sleep 0.4
@client.refresh(token)
success_counter << 1
sleep 0.5
end
end
t2 = Thread.new do
sleep 0.55
locked = @client.lock do
success_counter << 1
sleep 0.5
end
failure_counter << 1 unless locked
end
t3 = Thread.new do
sleep 0.75
locked = @client.lock { success_counter << 1 }
failure_counter << 1 unless locked
end
[t1, t2, t3].each(&:join)
assert_equal 2, success_counter.size
assert_equal 1, failure_counter.size
assert_equal false, @client.locked?
end
def test_increment_reused_client
i = 0
threads = 2.times.map do
Thread.new do
@client.lock { i += 1 }
end
end
threads.each(&:join)
assert_equal 2, i
assert_equal false, @client.locked?
end
def test_increment_new_client
i = 0
threads = 2.times.map do
Thread.new do
# note this is the method that generates a *new* client
client.lock { i += 1 }
end
end
threads.each(&:join)
assert_equal 2, i
assert_equal false, @client.locked?
end
end
class TestBaseClient < Minitest::Test
def setup
@klass = Suo::Client::Base
@client = Suo::Client::Base.new(TEST_KEY, client: {})
end
def test_not_implemented
assert_raises(NotImplementedError) do
@klass.lock(TEST_KEY, 1)
@client.send(:get)
end
assert_raises(NotImplementedError) do
@client.send(:set, "", "")
end
assert_raises(NotImplementedError) do
@client.send(:initial_set)
end
assert_raises(NotImplementedError) do
@client.send(:clear)
end
end
end
@@ -123,13 +366,13 @@ class TestMemcachedClient < Minitest::Test
include ClientTests
def setup
@klass = Suo::Client::Memcached
@client = @klass.new
@klass_client = Dalli::Client.new("127.0.0.1:11211")
@dalli = Dalli::Client.new("127.0.0.1:11211")
@client = Suo::Client::Memcached.new(TEST_KEY)
teardown
end
def teardown
@klass_client.delete(TEST_KEY)
@dalli.delete(TEST_KEY)
end
end
@@ -137,13 +380,13 @@ class TestRedisClient < Minitest::Test
include ClientTests
def setup
@klass = Suo::Client::Redis
@client = @klass.new
@klass_client = Redis.new
@redis = Redis.new
@client = Suo::Client::Redis.new(TEST_KEY)
teardown
end
def teardown
@klass_client.del(TEST_KEY)
@redis.del(TEST_KEY)
end
end

View File

@@ -1,9 +1,13 @@
$LOAD_PATH.unshift File.expand_path("../../lib", __FILE__)
if ENV["CODECLIMATE_REPO_TOKEN"]
require "codeclimate-test-reporter"
CodeClimate::TestReporter.start
end
require "suo"
require "thread"
require "minitest/autorun"
require "minitest/benchmark"
ENV["SUO_TEST"] = "true"