20 Commits

Author SHA1 Message Date
Nick Elser
857fc63378 release 0.2.1 2015-04-12 23:45:09 -07:00
Nick Elser
bb6762bbc6 ret is not always an array 2015-04-12 23:45:00 -07:00
Nick Elser
f0977c89f2 remove redundant require file 2015-04-12 23:03:54 -07:00
Nick Elser
4d5c96309f some style fixes 2015-04-12 22:54:53 -07:00
Nick Elser
8d6061b137 rename some tests, fix rare edge condition 2015-04-12 22:49:02 -07:00
Nick Elser
ce0a4d8d86 release 0.2.0 2015-04-12 22:33:19 -07:00
Nick Elser
1fd769eec2 refactor class methods into instance methods 2015-04-12 22:32:51 -07:00
Nick Elser
37be5ae27b bump version, update changelog 2015-04-12 20:58:18 -07:00
Nick Elser
a1a226fb59 account for variabilities in test memcached 2015-04-12 20:57:52 -07:00
Nick Elser
8d7ddaf35a move logic around loop counts to a more reasonable location 2015-04-12 20:57:29 -07:00
Nick Elser
1aacc0c1a1 properly throw lock LockClientError 2015-04-12 20:47:35 -07:00
Nick Elser
8166c6b51d specify ruby version 2015-04-12 19:53:47 -07:00
Nick Elser
7662743123 update changelog 2015-04-12 19:47:35 -07:00
Nick Elser
a2fa281b86 fix test cases when memcached is delayed 2015-04-12 19:47:23 -07:00
Nick Elser
887219b63d release 0.1.2 2015-04-12 19:43:59 -07:00
Nick Elser
c0905fef91 faster unpack without exceptions 2015-04-12 19:43:07 -07:00
Nick Elser
a54b795e20 simplify defaults logic and fix retry timeouts 2015-04-12 19:42:59 -07:00
Nick Elser
0828cd546b add gem info 2015-04-12 17:54:38 -07:00
Nick Elser
c2b9de4cf3 refactor shared logic into base class 2015-04-12 17:54:30 -07:00
Nick Elser
d4860423fa whitespace 2015-04-12 16:03:53 -07:00
13 changed files with 298 additions and 414 deletions

View File

@@ -1,8 +1,25 @@
## 0.2.1
- Fix bug when dealing with real-world Redis error conditions.
## 0.2.0
- Refactor class methods into instance methods to simplify implementation.
- Increase thread safety with Memcached implementation.
## 0.1.3
- Properly throw Suo::LockClientError when the connection itself fails (Memcache server not reachable, etc.)
## 0.1.2
- Fix retry_timeout to properly use the full time (was being calculated incorrectly).
- Refactor client implementations to re-use more code.
## 0.1.1 ## 0.1.1
- Use [MessagePack](https://github.com/msgpack/msgpack-ruby) for semaphore serialization. - Use [MessagePack](https://github.com/msgpack/msgpack-ruby) for semaphore serialization.
## 0.1.0 ## 0.1.0
- First release. - First release.

View File

@@ -1,4 +1,4 @@
# Suo [![Build Status](https://travis-ci.org/nickelser/suo.png?branch=master)](https://travis-ci.org/nickelser/suo) # Suo [![Build Status](https://travis-ci.org/nickelser/suo.svg?branch=master)](https://travis-ci.org/nickelser/suo) [![Gem Version](https://badge.fury.io/rb/suo.svg)](http://badge.fury.io/rb/suo)
:lock: Distributed semaphores using Memcached or Redis in Ruby. :lock: Distributed semaphores using Memcached or Redis in Ruby.
@@ -31,18 +31,27 @@ suo.lock("some_key") do
@puppies.pet! @puppies.pet!
end end
2.times do Thread.new { suo.lock("other_key", 2) { puts "One"; sleep 2 } }
Thread.new do Thread.new { suo.lock("other_key", 2) { puts "Two"; sleep 2 } }
# second argument is the number of resources - so this will run twice Thread.new { suo.lock("other_key", 2) { puts "Three" } }
suo.lock("other_key", 2, timeout: 0.5) { puts "Will run twice!" }
end # will print "One" "Two", but not "Three", as there are only 2 resources
end
# custom acquisition timeouts (time to acquire)
suo = Suo::Client::Memcached.new(client: some_dalli_client, acquisition_timeout: 1) # in seconds
# manually locking/unlocking
suo.lock("a_key")
foo.baz!
suo.unlock("a_key")
# custom stale lock cleanup (cleaning of dead clients)
suo = Suo::Client::Redis.new(client: some_redis_client, stale_lock_expiration: 60*5)
``` ```
## TODO ## TODO
- better stale key handling (refresh blocks) - better stale key handling (refresh blocks)
- more race condition tests - more race condition tests
- refactor clients to re-use more code
## History ## History

View File

@@ -1,2 +1,16 @@
require "securerandom"
require "monitor"
require "dalli"
require "dalli/cas/client"
require "redis"
require "msgpack"
require "suo/version" require "suo/version"
require "suo/clients"
require "suo/errors"
require "suo/client/base"
require "suo/client/memcached"
require "suo/client/redis"

View File

@@ -2,99 +2,172 @@ module Suo
module Client module Client
class Base class Base
DEFAULT_OPTIONS = { DEFAULT_OPTIONS = {
retry_count: 3, acquisition_timeout: 0.1,
retry_delay: 0.01, acquisition_delay: 0.01,
stale_lock_expiration: 3600 stale_lock_expiration: 3600
}.freeze }.freeze
attr_accessor :client
include MonitorMixin
def initialize(options = {}) def initialize(options = {})
@options = self.class.merge_defaults(options).merge(_initialized: true) fail "Client required" unless options[:client]
@options = DEFAULT_OPTIONS.merge(options)
@retry_count = (@options[:acquisition_timeout] / @options[:acquisition_delay].to_f).ceil
@client = @options[:client]
super()
end end
def lock(key, resources = 1, options = {}) def lock(key, resources = 1)
options = self.class.merge_defaults(@options.merge(options)) token = acquire_lock(key, resources)
token = self.class.lock(key, resources, options)
if token if block_given? && token
begin begin
yield if block_given? yield
ensure ensure
self.class.unlock(key, token, options) unlock(key, token)
end end
true
else else
false token
end end
end end
def locked?(key, resources = 1) def locked?(key, resources = 1)
self.class.locked?(key, resources, @options) locks(key).size >= resources
end end
class << self def locks(key)
def lock(key, resources = 1, options = {}) # rubocop:disable Lint/UnusedMethodArgument val, _ = get(key)
locks = deserialize_locks(val)
locks
end
def refresh(key, acquisition_token)
retry_with_timeout(key) do
val, cas = get(key)
if val.nil?
initial_set(key)
next
end
locks = deserialize_and_clear_locks(val)
refresh_lock(locks, acquisition_token)
break if set(key, serialize_locks(locks), cas)
end
end
def unlock(key, acquisition_token)
return unless acquisition_token
retry_with_timeout(key) do
val, cas = get(key)
break if val.nil?
locks = deserialize_and_clear_locks(val)
acquisition_lock = remove_lock(locks, acquisition_token)
break unless acquisition_lock
break if set(key, serialize_locks(locks), cas)
end
rescue LockClientError => _ # rubocop:disable Lint/HandleExceptions
# ignore - assume success due to optimistic locking
end
def clear(key) # rubocop:disable Lint/UnusedMethodArgument
fail NotImplementedError fail NotImplementedError
end end
def locked?(key, resources = 1, options = {})
options = merge_defaults(options)
client = options[:client]
locks = deserialize_locks(client.get(key))
locks.size >= resources
end
def locks(key, options)
options = merge_defaults(options)
client = options[:client]
locks = deserialize_locks(client.get(key))
locks.size
end
def refresh(key, acquisition_token, options = {}) # rubocop:disable Lint/UnusedMethodArgument
fail NotImplementedError
end
def unlock(key, acquisition_token, options = {}) # rubocop:disable Lint/UnusedMethodArgument
fail NotImplementedError
end
def clear(key, options = {}) # rubocop:disable Lint/UnusedMethodArgument
fail NotImplementedError
end
def merge_defaults(options = {})
unless options[:_initialized]
options = self::DEFAULT_OPTIONS.merge(options)
fail "Client required" unless options[:client]
end
if options[:retry_timeout]
options[:retry_count] = (options[:retry_timeout] / options[:retry_delay].to_f).floor
end
options
end
private private
def acquire_lock(key, resources = 1)
acquisition_token = nil
token = SecureRandom.base64(16)
retry_with_timeout(key) do
val, cas = get(key)
if val.nil?
initial_set(key)
next
end
locks = deserialize_and_clear_locks(val)
if locks.size < resources
add_lock(locks, token)
newval = serialize_locks(locks)
if set(key, newval, cas)
acquisition_token = token
break
end
end
end
acquisition_token
end
def get(key) # rubocop:disable Lint/UnusedMethodArgument
fail NotImplementedError
end
def set(key, newval, oldval) # rubocop:disable Lint/UnusedMethodArgument
fail NotImplementedError
end
def initial_set(key) # rubocop:disable Lint/UnusedMethodArgument
fail NotImplementedError
end
def synchronize(key) # rubocop:disable Lint/UnusedMethodArgument
mon_synchronize { yield }
end
def retry_with_timeout(key)
start = Time.now.to_f
@retry_count.times do
now = Time.now.to_f
break if now - start > @options[:acquisition_timeout]
synchronize(key) do
yield
end
sleep(rand(@options[:acquisition_delay] * 1000).to_f / 1000)
end
rescue => _
raise LockClientError
end
def serialize_locks(locks) def serialize_locks(locks)
MessagePack.pack(locks.map { |time, token| [time.to_f, token] }) MessagePack.pack(locks.map { |time, token| [time.to_f, token] })
end end
def deserialize_and_clear_locks(val)
clear_expired_locks(deserialize_locks(val))
end
def deserialize_locks(val) def deserialize_locks(val)
MessagePack.unpack(val).map do |time, token| unpacked = (val.nil? || val == "") ? [] : MessagePack.unpack(val)
unpacked.map do |time, token|
[Time.at(time), token] [Time.at(time), token]
end end
rescue EOFError => _ rescue EOFError => _
[] []
end end
def clear_expired_locks(locks, options) def clear_expired_locks(locks)
expired = Time.now - options[:stale_lock_expiration] expired = Time.now - @options[:stale_lock_expiration]
locks.reject { |time, _| time < expired } locks.reject { |time, _| time < expired }
end end
@@ -114,4 +187,3 @@ module Suo
end end
end end
end end
end

View File

@@ -1,7 +0,0 @@
module Suo
module Client
module Errors
class FailedToAcquireLock < StandardError; end
end
end
end

View File

@@ -6,131 +6,22 @@ module Suo
super super
end end
class << self def clear(key)
def lock(key, resources = 1, options = {}) @client.delete(key)
options = merge_defaults(options)
acquisition_token = nil
token = SecureRandom.base64(16)
client = options[:client]
begin
start = Time.now.to_f
options[:retry_count].times do
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end end
val, cas = client.get_cas(key) private
# no key has been set yet; we could simply set it, but would lead to race conditions on the initial setting def get(key)
if val.nil? @client.get_cas(key)
client.set(key, "")
next
end end
locks = clear_expired_locks(deserialize_locks(val.to_s), options) def set(key, newval, cas)
@client.set_cas(key, newval, cas)
if locks.size < resources
add_lock(locks, token)
newval = serialize_locks(locks)
if client.set_cas(key, newval, cas)
acquisition_token = token
break
end
end end
sleep(rand(options[:retry_delay] * 1000).to_f / 1000) def initial_set(key)
end @client.set(key, "")
rescue => _
raise FailedToAcquireLock
end
acquisition_token
end
def refresh(key, acquisition_token, options = {})
options = merge_defaults(options)
client = options[:client]
begin
start = Time.now.to_f
options[:retry_count].times do
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
val, cas = client.get_cas(key)
# much like with initial set - ensure the key is here
if val.nil?
client.set(key, "")
next
end
locks = clear_expired_locks(deserialize_locks(val), options)
refresh_lock(locks, acquisition_token)
newval = serialize_locks(locks)
break if client.set_cas(key, newval, cas)
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => _
raise FailedToAcquireLock
end
end
def unlock(key, acquisition_token, options = {})
options = merge_defaults(options)
client = options[:client]
return unless acquisition_token
begin
start = Time.now.to_f
options[:retry_count].times do
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
val, cas = client.get_cas(key)
break if val.nil? # lock has expired totally
locks = clear_expired_locks(deserialize_locks(val), options)
acquisition_lock = remove_lock(locks, acquisition_token)
break unless acquisition_lock
newval = serialize_locks(locks)
break if client.set_cas(key, newval, cas)
# another client cleared a token in the interim - try again!
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => boom # rubocop:disable Lint/HandleExceptions
# since it's optimistic locking - fine if we are unable to release
raise boom if ENV["SUO_TEST"]
end
end
def clear(key, options = {})
options = merge_defaults(options)
options[:client].delete(key)
end
end end
end end
end end

View File

@@ -6,160 +6,34 @@ module Suo
super super
end end
class << self def clear(key)
def lock(key, resources = 1, options = {}) @client.del(key)
options = merge_defaults(options)
acquisition_token = nil
token = SecureRandom.base64(16)
client = options[:client]
begin
start = Time.now.to_f
options[:retry_count].times do
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end end
client.watch(key) do private
begin
val = client.get(key)
locks = clear_expired_locks(deserialize_locks(val.to_s), options) def get(key)
[@client.get(key), nil]
end
if locks.size < resources def set(key, newval, _)
add_lock(locks, token) ret = @client.multi do |multi|
newval = serialize_locks(locks)
ret = client.multi do |multi|
multi.set(key, newval) multi.set(key, newval)
end end
acquisition_token = token if ret[0] == "OK" ret && ret[0] == "OK"
end
def synchronize(key)
@client.watch(key) do
yield
end end
ensure ensure
client.unwatch @client.unwatch
end
end end
break if acquisition_token def initial_set(key)
@client.set(key, "")
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => _
raise Suo::Client::FailedToAcquireLock
end
acquisition_token
end
def refresh(key, acquisition_token, options = {})
options = merge_defaults(options)
client = options[:client]
refreshed = false
begin
start = Time.now.to_f
options[:retry_count].times do
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
client.watch(key) do
begin
val = client.get(key)
locks = clear_expired_locks(deserialize_locks(val), options)
refresh_lock(locks, acquisition_token)
newval = serialize_locks(locks)
ret = client.multi do |multi|
multi.set(key, newval)
end
refreshed = ret[0] == "OK"
ensure
client.unwatch
end
end
break if refreshed
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => _
raise Suo::Client::FailedToAcquireLock
end
end
def unlock(key, acquisition_token, options = {})
options = merge_defaults(options)
client = options[:client]
return unless acquisition_token
begin
start = Time.now.to_f
options[:retry_count].times do
cleared = false
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
client.watch(key) do
begin
val = client.get(key)
if val.nil?
cleared = true
break
end
locks = clear_expired_locks(deserialize_locks(val), options)
acquisition_lock = remove_lock(locks, acquisition_token)
unless acquisition_lock
# token was already cleared
cleared = true
break
end
newval = serialize_locks(locks)
ret = client.multi do |multi|
multi.set(key, newval)
end
cleared = ret[0] == "OK"
ensure
client.unwatch
end
end
break if cleared
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => boom # rubocop:disable Lint/HandleExceptions
# since it's optimistic locking - fine if we are unable to release
raise boom if ENV["SUO_TEST"]
end
end
def clear(key, options = {})
options = merge_defaults(options)
options[:client].del(key)
end
end end
end end
end end

View File

@@ -1,14 +0,0 @@
require "securerandom"
require "monitor"
require "dalli"
require "dalli/cas/client"
require "redis"
require "msgpack"
require "suo/client/errors"
require "suo/client/base"
require "suo/client/memcached"
require "suo/client/redis"

3
lib/suo/errors.rb Normal file
View File

@@ -0,0 +1,3 @@
module Suo
class LockClientError < StandardError; end
end

View File

@@ -1,3 +1,3 @@
module Suo module Suo
VERSION = "0.1.1" VERSION = "0.2.1"
end end

View File

@@ -20,6 +20,8 @@ Gem::Specification.new do |spec|
spec.test_files = spec.files.grep(%r{^(test|spec|features)/}) spec.test_files = spec.files.grep(%r{^(test|spec|features)/})
spec.require_paths = ["lib"] spec.require_paths = ["lib"]
spec.required_ruby_version = "~> 2.0"
spec.add_dependency "dalli" spec.add_dependency "dalli"
spec.add_dependency "redis" spec.add_dependency "redis"
spec.add_dependency "msgpack" spec.add_dependency "msgpack"

View File

@@ -3,55 +3,55 @@ require "test_helper"
TEST_KEY = "suo_test_key".freeze TEST_KEY = "suo_test_key".freeze
module ClientTests module ClientTests
def test_requires_client def test_throws_failed_error_on_bad_client
exception = assert_raises(RuntimeError) do assert_raises(Suo::LockClientError) do
@klass.lock(TEST_KEY, 1) client = @client.class.new(client: {})
client.lock(TEST_KEY, 1)
end
end end
assert_equal "Client required", exception.message def test_single_resource_locking
end lock1 = @client.lock(TEST_KEY, 1)
def test_class_single_resource_locking
lock1 = @klass.lock(TEST_KEY, 1, client: @klass_client)
refute_nil lock1 refute_nil lock1
locked = @klass.locked?(TEST_KEY, 1, client: @klass_client) locked = @client.locked?(TEST_KEY, 1)
assert_equal true, locked assert_equal true, locked
lock2 = @klass.lock(TEST_KEY, 1, client: @klass_client) lock2 = @client.lock(TEST_KEY, 1)
assert_nil lock2 assert_nil lock2
@klass.unlock(TEST_KEY, lock1, client: @klass_client) @client.unlock(TEST_KEY, lock1)
locked = @client.locked?(TEST_KEY, 1)
locked = @klass.locked?(TEST_KEY, 1, client: @klass_client)
assert_equal false, locked assert_equal false, locked
end end
def test_class_multiple_resource_locking def test_multiple_resource_locking
lock1 = @klass.lock(TEST_KEY, 2, client: @klass_client) lock1 = @client.lock(TEST_KEY, 2)
refute_nil lock1 refute_nil lock1
locked = @klass.locked?(TEST_KEY, 2, client: @klass_client) locked = @client.locked?(TEST_KEY, 2)
assert_equal false, locked assert_equal false, locked
lock2 = @klass.lock(TEST_KEY, 2, client: @klass_client) lock2 = @client.lock(TEST_KEY, 2)
refute_nil lock2 refute_nil lock2
locked = @klass.locked?(TEST_KEY, 2, client: @klass_client) locked = @client.locked?(TEST_KEY, 2)
assert_equal true, locked assert_equal true, locked
@klass.unlock(TEST_KEY, lock1, client: @klass_client) @client.unlock(TEST_KEY, lock1)
locked = @klass.locked?(TEST_KEY, 1, client: @klass_client) locked = @client.locked?(TEST_KEY, 1)
assert_equal true, locked assert_equal true, locked
@klass.unlock(TEST_KEY, lock2, client: @klass_client) @client.unlock(TEST_KEY, lock2)
locked = @klass.locked?(TEST_KEY, 1, client: @klass_client) locked = @client.locked?(TEST_KEY, 1)
assert_equal false, locked assert_equal false, locked
end end
def test_instance_single_resource_locking def test_block_single_resource_locking
locked = false locked = false
@client.lock(TEST_KEY, 1) { locked = true } @client.lock(TEST_KEY, 1) { locked = true }
@@ -59,23 +59,47 @@ module ClientTests
assert_equal true, locked assert_equal true, locked
end end
def test_instance_unlocks_on_exception def test_block_unlocks_on_exception
assert_raises(RuntimeError) do assert_raises(RuntimeError) do
@client.lock(TEST_KEY, 1) { fail "Test" } @client.lock(TEST_KEY, 1) { fail "Test" }
end end
locked = @klass.locked?(TEST_KEY, 1, client: @klass_client) locked = @client.locked?(TEST_KEY, 1)
assert_equal false, locked assert_equal false, locked
end end
def test_instance_multiple_resource_locking def test_readme_example
output = Queue.new
threads = []
threads << Thread.new { @client.lock(TEST_KEY, 2) { output << "One"; sleep 0.5 } }
threads << Thread.new { @client.lock(TEST_KEY, 2) { output << "Two"; sleep 0.5 } }
sleep 0.1
threads << Thread.new { @client.lock(TEST_KEY, 2) { output << "Three" } }
threads.map(&:join)
ret = []
ret << output.pop
ret << output.pop
ret.sort!
assert_equal 0, output.size
assert_equal %w(One Two), ret
end
def test_block_multiple_resource_locking
success_counter = Queue.new success_counter = Queue.new
failure_counter = Queue.new failure_counter = Queue.new
client = @client.class.new(acquisition_timeout: 0.9, client: @client.client)
100.times.map do |i| 100.times.map do |i|
Thread.new do Thread.new do
success = @client.lock(TEST_KEY, 50, retry_timeout: 0.9) do success = client.lock(TEST_KEY, 50) do
sleep(1) sleep(3)
success_counter << i success_counter << i
end end
@@ -87,14 +111,16 @@ module ClientTests
assert_equal 50, failure_counter.size assert_equal 50, failure_counter.size
end end
def test_instance_multiple_resource_locking_longer_timeout def test_block_multiple_resource_locking_longer_timeout
success_counter = Queue.new success_counter = Queue.new
failure_counter = Queue.new failure_counter = Queue.new
client = @client.class.new(acquisition_timeout: 3, client: @client.client)
100.times.map do |i| 100.times.map do |i|
Thread.new do Thread.new do
success = @client.lock(TEST_KEY, 50, retry_timeout: 2) do success = client.lock(TEST_KEY, 50) do
sleep(1) sleep(0.5)
success_counter << i success_counter << i
end end
@@ -109,12 +135,12 @@ end
class TestBaseClient < Minitest::Test class TestBaseClient < Minitest::Test
def setup def setup
@klass = Suo::Client::Base @client = Suo::Client::Base.new(client: {})
end end
def test_not_implemented def test_not_implemented
assert_raises(NotImplementedError) do assert_raises(NotImplementedError) do
@klass.lock(TEST_KEY, 1) @client.send(:get, TEST_KEY)
end end
end end
end end
@@ -123,13 +149,12 @@ class TestMemcachedClient < Minitest::Test
include ClientTests include ClientTests
def setup def setup
@klass = Suo::Client::Memcached @dalli = Dalli::Client.new("127.0.0.1:11211")
@client = @klass.new @client = Suo::Client::Memcached.new
@klass_client = Dalli::Client.new("127.0.0.1:11211")
end end
def teardown def teardown
@klass_client.delete(TEST_KEY) @dalli.delete(TEST_KEY)
end end
end end
@@ -137,13 +162,12 @@ class TestRedisClient < Minitest::Test
include ClientTests include ClientTests
def setup def setup
@klass = Suo::Client::Redis @redis = Redis.new
@client = @klass.new @client = Suo::Client::Redis.new
@klass_client = Redis.new
end end
def teardown def teardown
@klass_client.del(TEST_KEY) @redis.del(TEST_KEY)
end end
end end

View File

@@ -6,4 +6,3 @@ require "minitest/autorun"
require "minitest/benchmark" require "minitest/benchmark"
ENV["SUO_TEST"] = "true" ENV["SUO_TEST"] = "true"