8 Commits

Author SHA1 Message Date
Nick Elser
7662743123 update changelog 2015-04-12 19:47:35 -07:00
Nick Elser
a2fa281b86 fix test cases when memcached is delayed 2015-04-12 19:47:23 -07:00
Nick Elser
887219b63d release 0.1.2 2015-04-12 19:43:59 -07:00
Nick Elser
c0905fef91 faster unpack without exceptions 2015-04-12 19:43:07 -07:00
Nick Elser
a54b795e20 simplify defaults logic and fix retry timeouts 2015-04-12 19:42:59 -07:00
Nick Elser
0828cd546b add gem info 2015-04-12 17:54:38 -07:00
Nick Elser
c2b9de4cf3 refactor shared logic into base class 2015-04-12 17:54:30 -07:00
Nick Elser
d4860423fa whitespace 2015-04-12 16:03:53 -07:00
7 changed files with 167 additions and 301 deletions

View File

@@ -1,8 +1,12 @@
## 0.1.2
- Fix retry_timeout to properly use the full time (was being calculated incorrectly).
- Refactor client implementations to re-use more code.
## 0.1.1 ## 0.1.1
- Use [MessagePack](https://github.com/msgpack/msgpack-ruby) for semaphore serialization. - Use [MessagePack](https://github.com/msgpack/msgpack-ruby) for semaphore serialization.
## 0.1.0 ## 0.1.0
- First release. - First release.

View File

@@ -1,4 +1,4 @@
# Suo [![Build Status](https://travis-ci.org/nickelser/suo.png?branch=master)](https://travis-ci.org/nickelser/suo) # Suo [![Build Status](https://travis-ci.org/nickelser/suo.svg?branch=master)](https://travis-ci.org/nickelser/suo) [![Gem Version](https://badge.fury.io/rb/suo.svg)](http://badge.fury.io/rb/suo)
:lock: Distributed semaphores using Memcached or Redis in Ruby. :lock: Distributed semaphores using Memcached or Redis in Ruby.
@@ -42,7 +42,6 @@ end
## TODO ## TODO
- better stale key handling (refresh blocks) - better stale key handling (refresh blocks)
- more race condition tests - more race condition tests
- refactor clients to re-use more code
## History ## History

View File

@@ -2,13 +2,13 @@ module Suo
module Client module Client
class Base class Base
DEFAULT_OPTIONS = { DEFAULT_OPTIONS = {
retry_count: 3, retry_timeout: 0.1,
retry_delay: 0.01, retry_delay: 0.01,
stale_lock_expiration: 3600 stale_lock_expiration: 3600
}.freeze }.freeze
def initialize(options = {}) def initialize(options = {})
@options = self.class.merge_defaults(options).merge(_initialized: true) @options = self.class.merge_defaults(options)
end end
def lock(key, resources = 1, options = {}) def lock(key, resources = 1, options = {})
@@ -33,32 +33,86 @@ module Suo
end end
class << self class << self
def lock(key, resources = 1, options = {}) # rubocop:disable Lint/UnusedMethodArgument def lock(key, resources = 1, options = {})
fail NotImplementedError options = merge_defaults(options)
acquisition_token = nil
token = SecureRandom.base64(16)
retry_with_timeout(key, options) do
val, cas = get(key, options)
if val.nil?
set_initial(key, options)
next
end
locks = deserialize_and_clear_locks(val, options)
if locks.size < resources
add_lock(locks, token)
newval = serialize_locks(locks)
if set(key, newval, cas, options)
acquisition_token = token
break
end
end
end
acquisition_token
end end
def locked?(key, resources = 1, options = {}) def locked?(key, resources = 1, options = {})
options = merge_defaults(options) locks(key, options).size >= resources
client = options[:client]
locks = deserialize_locks(client.get(key))
locks.size >= resources
end end
def locks(key, options) def locks(key, options)
options = merge_defaults(options) options = merge_defaults(options)
client = options[:client] val, _ = get(key, options)
locks = deserialize_locks(client.get(key)) locks = deserialize_locks(val)
locks.size locks
end end
def refresh(key, acquisition_token, options = {}) # rubocop:disable Lint/UnusedMethodArgument def refresh(key, acquisition_token, options = {})
fail NotImplementedError options = merge_defaults(options)
retry_with_timeout(key, options) do
val, cas = get(key, options)
if val.nil?
set_initial(key, options)
next
end
locks = deserialize_and_clear_locks(val, options)
refresh_lock(locks, acquisition_token)
break if set(key, serialize_locks(locks), cas, options)
end
end end
def unlock(key, acquisition_token, options = {}) # rubocop:disable Lint/UnusedMethodArgument def unlock(key, acquisition_token, options = {})
fail NotImplementedError options = merge_defaults(options)
return unless acquisition_token
retry_with_timeout(key, options) do
val, cas = get(key, options)
break if val.nil?
locks = deserialize_and_clear_locks(val, options)
acquisition_lock = remove_lock(locks, acquisition_token)
break unless acquisition_lock
break if set(key, serialize_locks(locks), cas, options)
end
rescue FailedToAcquireLock => _ # rubocop:disable Lint/HandleExceptions
# ignore - assume success due to optimistic locking
end end
def clear(key, options = {}) # rubocop:disable Lint/UnusedMethodArgument def clear(key, options = {}) # rubocop:disable Lint/UnusedMethodArgument
@@ -66,27 +120,64 @@ module Suo
end end
def merge_defaults(options = {}) def merge_defaults(options = {})
unless options[:_initialized] options = self::DEFAULT_OPTIONS.merge(options)
options = self::DEFAULT_OPTIONS.merge(options)
fail "Client required" unless options[:client] fail "Client required" unless options[:client]
end
if options[:retry_timeout] options[:retry_count] = (options[:retry_timeout] / options[:retry_delay].to_f).ceil
options[:retry_count] = (options[:retry_timeout] / options[:retry_delay].to_f).floor
end
options options
end end
private private
def get(key, options) # rubocop:disable Lint/UnusedMethodArgument
fail NotImplementedError
end
def set(key, newval, oldval, options) # rubocop:disable Lint/UnusedMethodArgument
fail NotImplementedError
end
def set_initial(key, options) # rubocop:disable Lint/UnusedMethodArgument
fail NotImplementedError
end
def synchronize(key, options)
yield(key, options)
end
def retry_with_timeout(key, options)
start = Time.now.to_f
options[:retry_count].times do
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
synchronize(key, options) do
yield
end
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => _
raise FailedToAcquireLock
end
def serialize_locks(locks) def serialize_locks(locks)
MessagePack.pack(locks.map { |time, token| [time.to_f, token] }) MessagePack.pack(locks.map { |time, token| [time.to_f, token] })
end end
def deserialize_and_clear_locks(val, options)
clear_expired_locks(deserialize_locks(val), options)
end
def deserialize_locks(val) def deserialize_locks(val)
MessagePack.unpack(val).map do |time, token| unpacked = (val.nil? || val == "") ? [] : MessagePack.unpack(val)
unpacked.map do |time, token|
[Time.at(time), token] [Time.at(time), token]
end end
rescue EOFError => _ rescue EOFError => _

View File

@@ -7,130 +7,24 @@ module Suo
end end
class << self class << self
def lock(key, resources = 1, options = {})
options = merge_defaults(options)
acquisition_token = nil
token = SecureRandom.base64(16)
client = options[:client]
begin
start = Time.now.to_f
options[:retry_count].times do
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
val, cas = client.get_cas(key)
# no key has been set yet; we could simply set it, but would lead to race conditions on the initial setting
if val.nil?
client.set(key, "")
next
end
locks = clear_expired_locks(deserialize_locks(val.to_s), options)
if locks.size < resources
add_lock(locks, token)
newval = serialize_locks(locks)
if client.set_cas(key, newval, cas)
acquisition_token = token
break
end
end
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => _
raise FailedToAcquireLock
end
acquisition_token
end
def refresh(key, acquisition_token, options = {})
options = merge_defaults(options)
client = options[:client]
begin
start = Time.now.to_f
options[:retry_count].times do
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
val, cas = client.get_cas(key)
# much like with initial set - ensure the key is here
if val.nil?
client.set(key, "")
next
end
locks = clear_expired_locks(deserialize_locks(val), options)
refresh_lock(locks, acquisition_token)
newval = serialize_locks(locks)
break if client.set_cas(key, newval, cas)
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => _
raise FailedToAcquireLock
end
end
def unlock(key, acquisition_token, options = {})
options = merge_defaults(options)
client = options[:client]
return unless acquisition_token
begin
start = Time.now.to_f
options[:retry_count].times do
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
val, cas = client.get_cas(key)
break if val.nil? # lock has expired totally
locks = clear_expired_locks(deserialize_locks(val), options)
acquisition_lock = remove_lock(locks, acquisition_token)
break unless acquisition_lock
newval = serialize_locks(locks)
break if client.set_cas(key, newval, cas)
# another client cleared a token in the interim - try again!
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => boom # rubocop:disable Lint/HandleExceptions
# since it's optimistic locking - fine if we are unable to release
raise boom if ENV["SUO_TEST"]
end
end
def clear(key, options = {}) def clear(key, options = {})
options = merge_defaults(options) options = merge_defaults(options)
options[:client].delete(key) options[:client].delete(key)
end end
private
def get(key, options)
options[:client].get_cas(key)
end
def set(key, newval, cas, options)
options[:client].set_cas(key, newval, cas)
end
def set_initial(key, options)
options[:client].set(key, "")
end
end end
end end
end end

View File

@@ -7,159 +7,36 @@ module Suo
end end
class << self class << self
def lock(key, resources = 1, options = {})
options = merge_defaults(options)
acquisition_token = nil
token = SecureRandom.base64(16)
client = options[:client]
begin
start = Time.now.to_f
options[:retry_count].times do
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
client.watch(key) do
begin
val = client.get(key)
locks = clear_expired_locks(deserialize_locks(val.to_s), options)
if locks.size < resources
add_lock(locks, token)
newval = serialize_locks(locks)
ret = client.multi do |multi|
multi.set(key, newval)
end
acquisition_token = token if ret[0] == "OK"
end
ensure
client.unwatch
end
end
break if acquisition_token
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => _
raise Suo::Client::FailedToAcquireLock
end
acquisition_token
end
def refresh(key, acquisition_token, options = {})
options = merge_defaults(options)
client = options[:client]
refreshed = false
begin
start = Time.now.to_f
options[:retry_count].times do
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
client.watch(key) do
begin
val = client.get(key)
locks = clear_expired_locks(deserialize_locks(val), options)
refresh_lock(locks, acquisition_token)
newval = serialize_locks(locks)
ret = client.multi do |multi|
multi.set(key, newval)
end
refreshed = ret[0] == "OK"
ensure
client.unwatch
end
end
break if refreshed
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => _
raise Suo::Client::FailedToAcquireLock
end
end
def unlock(key, acquisition_token, options = {})
options = merge_defaults(options)
client = options[:client]
return unless acquisition_token
begin
start = Time.now.to_f
options[:retry_count].times do
cleared = false
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
client.watch(key) do
begin
val = client.get(key)
if val.nil?
cleared = true
break
end
locks = clear_expired_locks(deserialize_locks(val), options)
acquisition_lock = remove_lock(locks, acquisition_token)
unless acquisition_lock
# token was already cleared
cleared = true
break
end
newval = serialize_locks(locks)
ret = client.multi do |multi|
multi.set(key, newval)
end
cleared = ret[0] == "OK"
ensure
client.unwatch
end
end
break if cleared
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => boom # rubocop:disable Lint/HandleExceptions
# since it's optimistic locking - fine if we are unable to release
raise boom if ENV["SUO_TEST"]
end
end
def clear(key, options = {}) def clear(key, options = {})
options = merge_defaults(options) options = merge_defaults(options)
options[:client].del(key) options[:client].del(key)
end end
private
def get(key, options)
[options[:client].get(key), nil]
end
def set(key, newval, _, options)
ret = options[:client].multi do |multi|
multi.set(key, newval)
end
ret[0] == "OK"
end
def synchronize(key, options)
options[:client].watch(key) do
yield
end
ensure
options[:client].unwatch
end
def set_initial(key, options)
options[:client].set(key, "")
end
end end
end end
end end

View File

@@ -1,3 +1,3 @@
module Suo module Suo
VERSION = "0.1.1" VERSION = "0.1.2"
end end

View File

@@ -24,6 +24,7 @@ module ClientTests
@klass.unlock(TEST_KEY, lock1, client: @klass_client) @klass.unlock(TEST_KEY, lock1, client: @klass_client)
locked = @klass.locked?(TEST_KEY, 1, client: @klass_client) locked = @klass.locked?(TEST_KEY, 1, client: @klass_client)
assert_equal false, locked assert_equal false, locked
end end
@@ -74,8 +75,8 @@ module ClientTests
100.times.map do |i| 100.times.map do |i|
Thread.new do Thread.new do
success = @client.lock(TEST_KEY, 50, retry_timeout: 0.9) do success = @client.lock(TEST_KEY, 50, retry_timeout: 0.5) do
sleep(1) sleep(2)
success_counter << i success_counter << i
end end
@@ -94,7 +95,7 @@ module ClientTests
100.times.map do |i| 100.times.map do |i|
Thread.new do Thread.new do
success = @client.lock(TEST_KEY, 50, retry_timeout: 2) do success = @client.lock(TEST_KEY, 50, retry_timeout: 2) do
sleep(1) sleep(0.5)
success_counter << i success_counter << i
end end
@@ -114,7 +115,7 @@ class TestBaseClient < Minitest::Test
def test_not_implemented def test_not_implemented
assert_raises(NotImplementedError) do assert_raises(NotImplementedError) do
@klass.lock(TEST_KEY, 1) @klass.send(:get, TEST_KEY, {})
end end
end end
end end