dramatically simpify interface by forcing key at initialization

This commit is contained in:
Nick Elser
2015-04-15 23:10:21 -07:00
parent 2960c14a4d
commit 81e4a3e143
3 changed files with 65 additions and 60 deletions

View File

@@ -4,96 +4,101 @@ module Suo
DEFAULT_OPTIONS = { DEFAULT_OPTIONS = {
acquisition_timeout: 0.1, acquisition_timeout: 0.1,
acquisition_delay: 0.01, acquisition_delay: 0.01,
stale_lock_expiration: 3600 stale_lock_expiration: 3600,
resources: 1
}.freeze }.freeze
attr_accessor :client attr_accessor :client, :key, :resources, :options
include MonitorMixin include MonitorMixin
def initialize(options = {}) def initialize(key, options = {})
fail "Client required" unless options[:client] fail "Client required" unless options[:client]
@options = DEFAULT_OPTIONS.merge(options) @options = DEFAULT_OPTIONS.merge(options)
@retry_count = (@options[:acquisition_timeout] / @options[:acquisition_delay].to_f).ceil @retry_count = (@options[:acquisition_timeout] / @options[:acquisition_delay].to_f).ceil
@client = @options[:client] @client = @options[:client]
super() @resources = @options[:resources].to_i
@key = key
super() # initialize Monitor mixin for thread safety
end end
def lock(key, resources = 1) def lock
token = acquire_lock(key, resources) token = acquire_lock
if block_given? && token if block_given? && token
begin begin
yield(token) yield
ensure ensure
unlock(key, token) unlock(token)
end end
else else
token token
end end
end end
def locked?(key, resources = 1) def locked?
locks(key).size >= resources locks.size >= resources
end end
def locks(key) def locks
val, _ = get(key) val, _ = get
cleared_locks = deserialize_and_clear_locks(val) cleared_locks = deserialize_and_clear_locks(val)
cleared_locks cleared_locks
end end
def refresh(key, acquisition_token) def refresh(token)
retry_with_timeout(key) do retry_with_timeout do
val, cas = get(key) val, cas = get
if val.nil? if val.nil?
initial_set(key) initial_set
next next
end end
cleared_locks = deserialize_and_clear_locks(val) cleared_locks = deserialize_and_clear_locks(val)
refresh_lock(cleared_locks, acquisition_token) refresh_lock(cleared_locks, token)
break if set(key, serialize_locks(cleared_locks), cas) break if set(serialize_locks(cleared_locks), cas)
end end
end end
def unlock(key, acquisition_token) def unlock(token)
return unless acquisition_token return unless token
retry_with_timeout(key) do retry_with_timeout do
val, cas = get(key) val, cas = get
break if val.nil? break if val.nil?
cleared_locks = deserialize_and_clear_locks(val) cleared_locks = deserialize_and_clear_locks(val)
acquisition_lock = remove_lock(cleared_locks, acquisition_token) acquisition_lock = remove_lock(cleared_locks, token)
break unless acquisition_lock break unless acquisition_lock
break if set(key, serialize_locks(cleared_locks), cas) break if set(serialize_locks(cleared_locks), cas)
end end
rescue LockClientError => _ # rubocop:disable Lint/HandleExceptions rescue LockClientError => _ # rubocop:disable Lint/HandleExceptions
# ignore - assume success due to optimistic locking # ignore - assume success due to optimistic locking
end end
def clear(key) # rubocop:disable Lint/UnusedMethodArgument def clear
fail NotImplementedError fail NotImplementedError
end end
private private
def acquire_lock(key, resources = 1) attr_accessor :retry_count
def acquire_lock
token = SecureRandom.base64(16) token = SecureRandom.base64(16)
retry_with_timeout(key) do retry_with_timeout do
val, cas = get(key) val, cas = get
if val.nil? if val.nil?
initial_set(key) initial_set
next next
end end
@@ -104,41 +109,41 @@ module Suo
newval = serialize_locks(cleared_locks) newval = serialize_locks(cleared_locks)
return token if set(key, newval, cas) return token if set(newval, cas)
end end
end end
nil nil
end end
def get(key) # rubocop:disable Lint/UnusedMethodArgument def get
fail NotImplementedError fail NotImplementedError
end end
def set(key, newval, cas) # rubocop:disable Lint/UnusedMethodArgument def set(newval, cas) # rubocop:disable Lint/UnusedMethodArgument
fail NotImplementedError fail NotImplementedError
end end
def initial_set(key, val = "") # rubocop:disable Lint/UnusedMethodArgument def initial_set(val = "") # rubocop:disable Lint/UnusedMethodArgument
fail NotImplementedError fail NotImplementedError
end end
def synchronize(key) # rubocop:disable Lint/UnusedMethodArgument def synchronize
mon_synchronize { yield } mon_synchronize { yield }
end end
def retry_with_timeout(key) def retry_with_timeout
start = Time.now.to_f start = Time.now.to_f
@retry_count.times do retry_count.times do
elapsed = Time.now.to_f - start elapsed = Time.now.to_f - start
break if elapsed >= @options[:acquisition_timeout] break if elapsed >= options[:acquisition_timeout]
synchronize(key) do synchronize do
yield yield
end end
sleep(rand(@options[:acquisition_delay] * 1000).to_f / 1000) sleep(rand(options[:acquisition_delay] * 1000).to_f / 1000)
end end
rescue => _ rescue => _
raise LockClientError raise LockClientError
@@ -163,7 +168,7 @@ module Suo
end end
def clear_expired_locks(locks) def clear_expired_locks(locks)
expired = Time.now - @options[:stale_lock_expiration] expired = Time.now - options[:stale_lock_expiration]
locks.reject { |time, _| time < expired } locks.reject { |time, _| time < expired }
end end

View File

@@ -1,27 +1,27 @@
module Suo module Suo
module Client module Client
class Memcached < Base class Memcached < Base
def initialize(options = {}) def initialize(key, options = {})
options[:client] ||= Dalli::Client.new(options[:connection] || ENV["MEMCACHE_SERVERS"] || "127.0.0.1:11211") options[:client] ||= Dalli::Client.new(options[:connection] || ENV["MEMCACHE_SERVERS"] || "127.0.0.1:11211")
super super
end end
def clear(key) def clear
@client.delete(key) @client.delete(@key)
end end
private private
def get(key) def get
@client.get_cas(key) @client.get_cas(@key)
end end
def set(key, newval, cas) def set(newval, cas)
@client.set_cas(key, newval, cas) @client.set_cas(@key, newval, cas)
end end
def initial_set(key, val = "") def initial_set(val = "")
@client.set(key, val) @client.set(@key, val)
end end
end end
end end

View File

@@ -1,39 +1,39 @@
module Suo module Suo
module Client module Client
class Redis < Base class Redis < Base
def initialize(options = {}) def initialize(key, options = {})
options[:client] ||= ::Redis.new(options[:connection] || {}) options[:client] ||= ::Redis.new(options[:connection] || {})
super super
end end
def clear(key) def clear
@client.del(key) @client.del(@key)
end end
private private
def get(key) def get
[@client.get(key), nil] [@client.get(@key), nil]
end end
def set(key, newval, _) def set(newval, _)
ret = @client.multi do |multi| ret = @client.multi do |multi|
multi.set(key, newval) multi.set(@key, newval)
end end
ret && ret[0] == "OK" ret && ret[0] == "OK"
end end
def synchronize(key) def synchronize
@client.watch(key) do @client.watch(@key) do
yield yield
end end
ensure ensure
@client.unwatch @client.unwatch
end end
def initial_set(key, val = "") def initial_set(val = "")
@client.set(key, val) @client.set(@key, val)
end end
end end
end end