From 1fd769eec2d6e9be201212a6711fab84d1c689df Mon Sep 17 00:00:00 2001 From: Nick Elser Date: Sun, 12 Apr 2015 22:32:51 -0700 Subject: [PATCH] refactor class methods into instance methods --- lib/suo/client/base.rb | 290 +++++++++++++++++------------------- lib/suo/client/memcached.rb | 27 ++-- lib/suo/client/redis.rb | 49 +++--- test/client_test.rb | 100 +++++++------ 4 files changed, 229 insertions(+), 237 deletions(-) diff --git a/lib/suo/client/base.rb b/lib/suo/client/base.rb index 459a7ba..289f3d9 100644 --- a/lib/suo/client/base.rb +++ b/lib/suo/client/base.rb @@ -1,206 +1,188 @@ module Suo module Client class Base - DEFAULT_OPTIONS = { - retry_timeout: 0.1, - retry_delay: 0.01, + acquisition_timeout: 0.1, + acquisition_delay: 0.01, stale_lock_expiration: 3600 }.freeze + attr_accessor :client + + include MonitorMixin + def initialize(options = {}) - @options = self.class.merge_defaults(options) + fail "Client required" unless options[:client] + @options = DEFAULT_OPTIONS.merge(options) + @retry_count = (@options[:acquisition_timeout] / @options[:acquisition_delay].to_f).ceil + @client = @options[:client] + super() end - def lock(key, resources = 1, options = {}) - options = self.class.merge_defaults(@options.merge(options)) - token = self.class.lock(key, resources, options) + def lock(key, resources = 1) + token = acquire_lock(key, resources) - if token + if block_given? && token begin - yield if block_given? + yield ensure - self.class.unlock(key, token, options) + unlock(key, token) end - - true else - false + token end end def locked?(key, resources = 1) - self.class.locked?(key, resources, @options) + locks(key).size >= resources end - class << self - def lock(key, resources = 1, options = {}) - options = merge_defaults(options) - acquisition_token = nil - token = SecureRandom.base64(16) + def locks(key) + val, _ = get(key) + locks = deserialize_locks(val) - retry_with_timeout(key, options) do - val, cas = get(key, options) + locks + end - if val.nil? - set_initial(key, options) - next - end + def refresh(key, acquisition_token) + retry_with_timeout(key) do + val, cas = get(key) - locks = deserialize_and_clear_locks(val, options) + if val.nil? + set_initial(key) + next + end - if locks.size < resources - add_lock(locks, token) + locks = deserialize_and_clear_locks(val) - newval = serialize_locks(locks) + refresh_lock(locks, acquisition_token) - if set(key, newval, cas, options) - acquisition_token = token - break - end + break if set(key, serialize_locks(locks), cas) + end + end + + def unlock(key, acquisition_token) + return unless acquisition_token + + retry_with_timeout(key) do + val, cas = get(key) + + break if val.nil? + + locks = deserialize_and_clear_locks(val) + + acquisition_lock = remove_lock(locks, acquisition_token) + + break unless acquisition_lock + break if set(key, serialize_locks(locks), cas) + end + rescue LockClientError => _ # rubocop:disable Lint/HandleExceptions + # ignore - assume success due to optimistic locking + end + + def clear(key) # rubocop:disable Lint/UnusedMethodArgument + fail NotImplementedError + end + + private + + def acquire_lock(key, resources = 1) + acquisition_token = nil + token = SecureRandom.base64(16) + + retry_with_timeout(key) do + val, cas = get(key) + + if val.nil? + set_initial(key) + next + end + + locks = deserialize_and_clear_locks(val) + + if locks.size < resources + add_lock(locks, token) + + newval = serialize_locks(locks) + + if set(key, newval, cas) + acquisition_token = token + break end end - - acquisition_token end - def locked?(key, resources = 1, options = {}) - locks(key, options).size >= resources - end + acquisition_token + end - def locks(key, options) - options = merge_defaults(options) - val, _ = get(key, options) - locks = deserialize_locks(val) + def get(key) # rubocop:disable Lint/UnusedMethodArgument + fail NotImplementedError + end - locks - end + def set(key, newval, oldval) # rubocop:disable Lint/UnusedMethodArgument + fail NotImplementedError + end - def refresh(key, acquisition_token, options = {}) - options = merge_defaults(options) + def set_initial(key) # rubocop:disable Lint/UnusedMethodArgument + fail NotImplementedError + end - retry_with_timeout(key, options) do - val, cas = get(key, options) + def synchronize(key) # rubocop:disable Lint/UnusedMethodArgument + mon_synchronize { yield } + end - if val.nil? - set_initial(key, options) - next - end + def retry_with_timeout(key) + start = Time.now.to_f - locks = deserialize_and_clear_locks(val, options) + @retry_count.times do + now = Time.now.to_f + break if now - start > @options[:acquisition_timeout] - refresh_lock(locks, acquisition_token) - - break if set(key, serialize_locks(locks), cas, options) + synchronize(key) do + yield end + + sleep(rand(@options[:acquisition_delay] * 1000).to_f / 1000) end + rescue => _ + raise LockClientError + end - def unlock(key, acquisition_token, options = {}) - options = merge_defaults(options) + def serialize_locks(locks) + MessagePack.pack(locks.map { |time, token| [time.to_f, token] }) + end - return unless acquisition_token + def deserialize_and_clear_locks(val) + clear_expired_locks(deserialize_locks(val)) + end - retry_with_timeout(key, options) do - val, cas = get(key, options) + def deserialize_locks(val) + unpacked = (val.nil? || val == "") ? [] : MessagePack.unpack(val) - break if val.nil? - - locks = deserialize_and_clear_locks(val, options) - - acquisition_lock = remove_lock(locks, acquisition_token) - - break unless acquisition_lock - break if set(key, serialize_locks(locks), cas, options) - end - rescue LockClientError => _ # rubocop:disable Lint/HandleExceptions - # ignore - assume success due to optimistic locking + unpacked.map do |time, token| + [Time.at(time), token] end + rescue EOFError => _ + [] + end - def clear(key, options = {}) # rubocop:disable Lint/UnusedMethodArgument - fail NotImplementedError - end + def clear_expired_locks(locks) + expired = Time.now - @options[:stale_lock_expiration] + locks.reject { |time, _| time < expired } + end - def merge_defaults(options = {}) - options = self::DEFAULT_OPTIONS.merge(options) + def add_lock(locks, token) + locks << [Time.now.to_f, token] + end - fail "Client required" unless options[:client] + def remove_lock(locks, acquisition_token) + lock = locks.find { |_, token| token == acquisition_token } + locks.delete(lock) + end - options - end - - private - - def get(key, options) # rubocop:disable Lint/UnusedMethodArgument - fail NotImplementedError - end - - def set(key, newval, oldval, options) # rubocop:disable Lint/UnusedMethodArgument - fail NotImplementedError - end - - def set_initial(key, options) # rubocop:disable Lint/UnusedMethodArgument - fail NotImplementedError - end - - def synchronize(key, options) - yield(key, options) - end - - def retry_with_timeout(key, options) - count = (options[:retry_timeout] / options[:retry_delay].to_f).ceil - - start = Time.now.to_f - - count.times do - now = Time.now.to_f - break if now - start > options[:retry_timeout] - - synchronize(key, options) do - yield - end - - sleep(rand(options[:retry_delay] * 1000).to_f / 1000) - end - rescue => _ - raise LockClientError - end - - def serialize_locks(locks) - MessagePack.pack(locks.map { |time, token| [time.to_f, token] }) - end - - def deserialize_and_clear_locks(val, options) - clear_expired_locks(deserialize_locks(val), options) - end - - def deserialize_locks(val) - unpacked = (val.nil? || val == "") ? [] : MessagePack.unpack(val) - - unpacked.map do |time, token| - [Time.at(time), token] - end - rescue EOFError => _ - [] - end - - def clear_expired_locks(locks, options) - expired = Time.now - options[:stale_lock_expiration] - locks.reject { |time, _| time < expired } - end - - def add_lock(locks, token) - locks << [Time.now.to_f, token] - end - - def remove_lock(locks, acquisition_token) - lock = locks.find { |_, token| token == acquisition_token } - locks.delete(lock) - end - - def refresh_lock(locks, acquisition_token) - remove_lock(locks, acquisition_token) - add_lock(locks, token) - end + def refresh_lock(locks, acquisition_token) + remove_lock(locks, acquisition_token) + add_lock(locks, token) end end end diff --git a/lib/suo/client/memcached.rb b/lib/suo/client/memcached.rb index 01767ed..7112200 100644 --- a/lib/suo/client/memcached.rb +++ b/lib/suo/client/memcached.rb @@ -6,25 +6,22 @@ module Suo super end - class << self - def clear(key, options = {}) - options = merge_defaults(options) - options[:client].delete(key) - end + def clear(key) + @client.delete(key) + end - private + private - def get(key, options) - options[:client].get_cas(key) - end + def get(key) + @client.get_cas(key) + end - def set(key, newval, cas, options) - options[:client].set_cas(key, newval, cas) - end + def set(key, newval, cas) + @client.set_cas(key, newval, cas) + end - def set_initial(key, options) - options[:client].set(key, "") - end + def set_initial(key) + @client.set(key, "") end end end diff --git a/lib/suo/client/redis.rb b/lib/suo/client/redis.rb index 0b57d06..8996bca 100644 --- a/lib/suo/client/redis.rb +++ b/lib/suo/client/redis.rb @@ -6,37 +6,34 @@ module Suo super end - class << self - def clear(key, options = {}) - options = merge_defaults(options) - options[:client].del(key) + def clear(key) + @client.del(key) + end + + private + + def get(key) + [@client.get(key), nil] + end + + def set(key, newval, _) + ret = @client.multi do |multi| + multi.set(key, newval) end - private + ret[0] == "OK" + end - def get(key, options) - [options[:client].get(key), nil] + def synchronize(key) + @client.watch(key) do + yield end + ensure + @client.unwatch + end - def set(key, newval, _, options) - ret = options[:client].multi do |multi| - multi.set(key, newval) - end - - ret[0] == "OK" - end - - def synchronize(key, options) - options[:client].watch(key) do - yield - end - ensure - options[:client].unwatch - end - - def set_initial(key, options) - options[:client].set(key, "") - end + def set_initial(key) + @client.set(key, "") end end end diff --git a/test/client_test.rb b/test/client_test.rb index 43931e1..ae65f71 100644 --- a/test/client_test.rb +++ b/test/client_test.rb @@ -3,62 +3,55 @@ require "test_helper" TEST_KEY = "suo_test_key".freeze module ClientTests - def test_requires_client - exception = assert_raises(RuntimeError) do - @klass.lock(TEST_KEY, 1) - end - - assert_equal "Client required", exception.message - end - def test_throws_failed_error_on_bad_client assert_raises(Suo::LockClientError) do - @klass.lock(TEST_KEY, 1, client: {}) + client = @client.class.new(client: {}) + client.lock(TEST_KEY, 1) end end def test_class_single_resource_locking - lock1 = @klass.lock(TEST_KEY, 1, client: @klass_client) + lock1 = @client.lock(TEST_KEY, 1) refute_nil lock1 - locked = @klass.locked?(TEST_KEY, 1, client: @klass_client) + locked = @client.locked?(TEST_KEY, 1) assert_equal true, locked - lock2 = @klass.lock(TEST_KEY, 1, client: @klass_client) + lock2 = @client.lock(TEST_KEY, 1) assert_nil lock2 - @klass.unlock(TEST_KEY, lock1, client: @klass_client) + @client.unlock(TEST_KEY, lock1) - locked = @klass.locked?(TEST_KEY, 1, client: @klass_client) + locked = @client.locked?(TEST_KEY, 1) assert_equal false, locked end def test_class_multiple_resource_locking - lock1 = @klass.lock(TEST_KEY, 2, client: @klass_client) + lock1 = @client.lock(TEST_KEY, 2) refute_nil lock1 - locked = @klass.locked?(TEST_KEY, 2, client: @klass_client) + locked = @client.locked?(TEST_KEY, 2) assert_equal false, locked - lock2 = @klass.lock(TEST_KEY, 2, client: @klass_client) + lock2 = @client.lock(TEST_KEY, 2) refute_nil lock2 - locked = @klass.locked?(TEST_KEY, 2, client: @klass_client) + locked = @client.locked?(TEST_KEY, 2) assert_equal true, locked - @klass.unlock(TEST_KEY, lock1, client: @klass_client) + @client.unlock(TEST_KEY, lock1) - locked = @klass.locked?(TEST_KEY, 1, client: @klass_client) + locked = @client.locked?(TEST_KEY, 1) assert_equal true, locked - @klass.unlock(TEST_KEY, lock2, client: @klass_client) + @client.unlock(TEST_KEY, lock2) - locked = @klass.locked?(TEST_KEY, 1, client: @klass_client) + locked = @client.locked?(TEST_KEY, 1) assert_equal false, locked end - def test_instance_single_resource_locking + def test_block_single_resource_locking locked = false @client.lock(TEST_KEY, 1) { locked = true } @@ -66,22 +59,45 @@ module ClientTests assert_equal true, locked end - def test_instance_unlocks_on_exception + def test_block_unlocks_on_exception assert_raises(RuntimeError) do @client.lock(TEST_KEY, 1) { fail "Test" } end - locked = @klass.locked?(TEST_KEY, 1, client: @klass_client) + locked = @client.locked?(TEST_KEY, 1) assert_equal false, locked end + def test_readme_example + output = Queue.new + threads = [] + + threads << Thread.new { @client.lock(TEST_KEY, 2) { output << "One"; sleep 2 } } + threads << Thread.new { @client.lock(TEST_KEY, 2) { output << "Two"; sleep 2 } } + threads << Thread.new { @client.lock(TEST_KEY, 2) { output << "Three" } } + + threads.map(&:join) + + ret = [] + + ret << output.pop + ret << output.pop + + ret.sort! + + assert_equal 0, output.size + assert_equal ["One", "Two"], ret + end + def test_instance_multiple_resource_locking success_counter = Queue.new failure_counter = Queue.new - 50.times.map do |i| + client = @client.class.new(acquisition_timeout: 0.9, client: @client.client) + + 100.times.map do |i| Thread.new do - success = @client.lock(TEST_KEY, 25, retry_timeout: 0.9) do + success = @client.lock(TEST_KEY, 50) do sleep(3) success_counter << i end @@ -90,17 +106,19 @@ module ClientTests end end.map(&:join) - assert_equal 25, success_counter.size - assert_equal 25, failure_counter.size + assert_equal 50, success_counter.size + assert_equal 50, failure_counter.size end def test_instance_multiple_resource_locking_longer_timeout success_counter = Queue.new failure_counter = Queue.new - 50.times.map do |i| + client = @client.class.new(acquisition_timeout: 3, client: @client.client) + + 100.times.map do |i| Thread.new do - success = @client.lock(TEST_KEY, 25, retry_timeout: 2) do + success = client.lock(TEST_KEY, 50) do sleep(0.5) success_counter << i end @@ -109,19 +127,19 @@ module ClientTests end end.map(&:join) - assert_equal 50, success_counter.size + assert_equal 100, success_counter.size assert_equal 0, failure_counter.size end end class TestBaseClient < Minitest::Test def setup - @klass = Suo::Client::Base + @client = Suo::Client::Base.new(client: {}) end def test_not_implemented assert_raises(NotImplementedError) do - @klass.send(:get, TEST_KEY, {}) + @client.send(:get, TEST_KEY) end end end @@ -130,13 +148,12 @@ class TestMemcachedClient < Minitest::Test include ClientTests def setup - @klass = Suo::Client::Memcached - @client = @klass.new - @klass_client = Dalli::Client.new("127.0.0.1:11211") + @dalli = Dalli::Client.new("127.0.0.1:11211") + @client = Suo::Client::Memcached.new end def teardown - @klass_client.delete(TEST_KEY) + @dalli.delete(TEST_KEY) end end @@ -144,13 +161,12 @@ class TestRedisClient < Minitest::Test include ClientTests def setup - @klass = Suo::Client::Redis - @client = @klass.new - @klass_client = Redis.new + @redis = Redis.new + @client = Suo::Client::Redis.new end def teardown - @klass_client.del(TEST_KEY) + @redis.del(TEST_KEY) end end