From 81e4a3e143413c019209499a1697d2c5328911f6 Mon Sep 17 00:00:00 2001 From: Nick Elser Date: Wed, 15 Apr 2015 23:10:21 -0700 Subject: [PATCH] dramatically simpify interface by forcing key at initialization --- lib/suo/client/base.rb | 85 ++++++++++++++++++++----------------- lib/suo/client/memcached.rb | 18 ++++---- lib/suo/client/redis.rb | 22 +++++----- 3 files changed, 65 insertions(+), 60 deletions(-) diff --git a/lib/suo/client/base.rb b/lib/suo/client/base.rb index 17bf6f5..7d74810 100644 --- a/lib/suo/client/base.rb +++ b/lib/suo/client/base.rb @@ -4,96 +4,101 @@ module Suo DEFAULT_OPTIONS = { acquisition_timeout: 0.1, acquisition_delay: 0.01, - stale_lock_expiration: 3600 + stale_lock_expiration: 3600, + resources: 1 }.freeze - attr_accessor :client + attr_accessor :client, :key, :resources, :options include MonitorMixin - def initialize(options = {}) + def initialize(key, options = {}) fail "Client required" unless options[:client] @options = DEFAULT_OPTIONS.merge(options) @retry_count = (@options[:acquisition_timeout] / @options[:acquisition_delay].to_f).ceil @client = @options[:client] - super() + @resources = @options[:resources].to_i + @key = key + super() # initialize Monitor mixin for thread safety end - def lock(key, resources = 1) - token = acquire_lock(key, resources) + def lock + token = acquire_lock if block_given? && token begin - yield(token) + yield ensure - unlock(key, token) + unlock(token) end else token end end - def locked?(key, resources = 1) - locks(key).size >= resources + def locked? + locks.size >= resources end - def locks(key) - val, _ = get(key) + def locks + val, _ = get cleared_locks = deserialize_and_clear_locks(val) cleared_locks end - def refresh(key, acquisition_token) - retry_with_timeout(key) do - val, cas = get(key) + def refresh(token) + retry_with_timeout do + val, cas = get if val.nil? - initial_set(key) + initial_set next end cleared_locks = deserialize_and_clear_locks(val) - refresh_lock(cleared_locks, acquisition_token) + refresh_lock(cleared_locks, token) - break if set(key, serialize_locks(cleared_locks), cas) + break if set(serialize_locks(cleared_locks), cas) end end - def unlock(key, acquisition_token) - return unless acquisition_token + def unlock(token) + return unless token - retry_with_timeout(key) do - val, cas = get(key) + retry_with_timeout do + val, cas = get break if val.nil? cleared_locks = deserialize_and_clear_locks(val) - acquisition_lock = remove_lock(cleared_locks, acquisition_token) + acquisition_lock = remove_lock(cleared_locks, token) break unless acquisition_lock - break if set(key, serialize_locks(cleared_locks), cas) + break if set(serialize_locks(cleared_locks), cas) end rescue LockClientError => _ # rubocop:disable Lint/HandleExceptions # ignore - assume success due to optimistic locking end - def clear(key) # rubocop:disable Lint/UnusedMethodArgument + def clear fail NotImplementedError end private - def acquire_lock(key, resources = 1) + attr_accessor :retry_count + + def acquire_lock token = SecureRandom.base64(16) - retry_with_timeout(key) do - val, cas = get(key) + retry_with_timeout do + val, cas = get if val.nil? - initial_set(key) + initial_set next end @@ -104,41 +109,41 @@ module Suo newval = serialize_locks(cleared_locks) - return token if set(key, newval, cas) + return token if set(newval, cas) end end nil end - def get(key) # rubocop:disable Lint/UnusedMethodArgument + def get fail NotImplementedError end - def set(key, newval, cas) # rubocop:disable Lint/UnusedMethodArgument + def set(newval, cas) # rubocop:disable Lint/UnusedMethodArgument fail NotImplementedError end - def initial_set(key, val = "") # rubocop:disable Lint/UnusedMethodArgument + def initial_set(val = "") # rubocop:disable Lint/UnusedMethodArgument fail NotImplementedError end - def synchronize(key) # rubocop:disable Lint/UnusedMethodArgument + def synchronize mon_synchronize { yield } end - def retry_with_timeout(key) + def retry_with_timeout start = Time.now.to_f - @retry_count.times do + retry_count.times do elapsed = Time.now.to_f - start - break if elapsed >= @options[:acquisition_timeout] + break if elapsed >= options[:acquisition_timeout] - synchronize(key) do + synchronize do yield end - sleep(rand(@options[:acquisition_delay] * 1000).to_f / 1000) + sleep(rand(options[:acquisition_delay] * 1000).to_f / 1000) end rescue => _ raise LockClientError @@ -163,7 +168,7 @@ module Suo end def clear_expired_locks(locks) - expired = Time.now - @options[:stale_lock_expiration] + expired = Time.now - options[:stale_lock_expiration] locks.reject { |time, _| time < expired } end diff --git a/lib/suo/client/memcached.rb b/lib/suo/client/memcached.rb index dea8e4a..7f87020 100644 --- a/lib/suo/client/memcached.rb +++ b/lib/suo/client/memcached.rb @@ -1,27 +1,27 @@ module Suo module Client class Memcached < Base - def initialize(options = {}) + def initialize(key, options = {}) options[:client] ||= Dalli::Client.new(options[:connection] || ENV["MEMCACHE_SERVERS"] || "127.0.0.1:11211") super end - def clear(key) - @client.delete(key) + def clear + @client.delete(@key) end private - def get(key) - @client.get_cas(key) + def get + @client.get_cas(@key) end - def set(key, newval, cas) - @client.set_cas(key, newval, cas) + def set(newval, cas) + @client.set_cas(@key, newval, cas) end - def initial_set(key, val = "") - @client.set(key, val) + def initial_set(val = "") + @client.set(@key, val) end end end diff --git a/lib/suo/client/redis.rb b/lib/suo/client/redis.rb index 97d67c3..439e868 100644 --- a/lib/suo/client/redis.rb +++ b/lib/suo/client/redis.rb @@ -1,39 +1,39 @@ module Suo module Client class Redis < Base - def initialize(options = {}) + def initialize(key, options = {}) options[:client] ||= ::Redis.new(options[:connection] || {}) super end - def clear(key) - @client.del(key) + def clear + @client.del(@key) end private - def get(key) - [@client.get(key), nil] + def get + [@client.get(@key), nil] end - def set(key, newval, _) + def set(newval, _) ret = @client.multi do |multi| - multi.set(key, newval) + multi.set(@key, newval) end ret && ret[0] == "OK" end - def synchronize(key) - @client.watch(key) do + def synchronize + @client.watch(@key) do yield end ensure @client.unwatch end - def initial_set(key, val = "") - @client.set(key, val) + def initial_set(val = "") + @client.set(@key, val) end end end