first commit

This commit is contained in:
Nick Elser
2015-04-12 13:40:53 -07:00
commit 06d296c8d9
19 changed files with 955 additions and 0 deletions

2
lib/suo.rb Normal file
View File

@@ -0,0 +1,2 @@
require "suo/version"
require "suo/clients"

116
lib/suo/client/base.rb Normal file
View File

@@ -0,0 +1,116 @@
module Suo
module Client
class Base
DEFAULT_OPTIONS = {
retry_count: 3,
retry_delay: 0.01,
stale_lock_expiration: 3600
}.freeze
def initialize(options = {})
@options = self.class.merge_defaults(options).merge(_initialized: true)
end
def lock(key, resources = 1, options = {})
options = self.class.merge_defaults(@options.merge(options))
token = self.class.lock(key, resources, options)
if token
begin
yield if block_given?
ensure
self.class.unlock(key, token, options)
end
true
else
false
end
end
def locked?(key, resources = 1)
self.class.locked?(key, resources, @options)
end
class << self
def lock(key, resources = 1, options = {}) # rubocop:disable Lint/UnusedMethodArgument
fail NotImplementedError
end
def locked?(key, resources = 1, options = {})
options = merge_defaults(options)
client = options[:client]
locks = deserialize_locks(client.get(key))
locks.size >= resources
end
def locks(key, options)
options = merge_defaults(options)
client = options[:client]
locks = deserialize_locks(client.get(key))
locks.size
end
def refresh(key, acquisition_token, options = {}) # rubocop:disable Lint/UnusedMethodArgument
fail NotImplementedError
end
def unlock(key, acquisition_token, options = {}) # rubocop:disable Lint/UnusedMethodArgument
fail NotImplementedError
end
def clear(key, options = {}) # rubocop:disable Lint/UnusedMethodArgument
fail NotImplementedError
end
def merge_defaults(options = {})
unless options[:_initialized]
options = self::DEFAULT_OPTIONS.merge(options)
fail "Client required" unless options[:client]
end
if options[:retry_timeout]
options[:retry_count] = (options[:retry_timeout] / options[:retry_delay].to_f).floor
end
options
end
private
def serialize_locks(locks)
locks.map { |time, token| [time.to_f, token].join(":") }.join(",")
end
def deserialize_locks(str)
str.split(",").map do |s|
time, token = s.split(":", 2)
[Time.at(time.to_f), token]
end
end
def clear_expired_locks(locks, options)
expired = Time.now - options[:stale_lock_expiration]
locks.reject { |time, _| time < expired }
end
def add_lock(locks, token)
locks << [Time.now.to_f, token]
end
def remove_lock(locks, acquisition_token)
lock = locks.find { |_, token| token == acquisition_token }
locks.delete(lock)
end
def refresh_lock(locks, acquisition_token)
remove_lock(locks, acquisition_token)
add_lock(locks, token)
end
end
end
end
end

7
lib/suo/client/errors.rb Normal file
View File

@@ -0,0 +1,7 @@
module Suo
module Client
module Errors
class FailedToAcquireLock < StandardError; end
end
end
end

137
lib/suo/client/memcached.rb Normal file
View File

@@ -0,0 +1,137 @@
module Suo
module Client
class Memcached < Base
def initialize(options = {})
options[:client] ||= Dalli::Client.new(options[:connection] || ENV["MEMCACHE_SERVERS"] || "127.0.0.1:11211")
super
end
class << self
def lock(key, resources = 1, options = {})
options = merge_defaults(options)
acquisition_token = nil
token = SecureRandom.base64(16)
client = options[:client]
begin
start = Time.now.to_f
options[:retry_count].times do |i|
val, cas = client.get_cas(key)
# no key has been set yet; we could simply set it, but would lead to race conditions on the initial setting
if val.nil?
client.set(key, "")
next
end
locks = clear_expired_locks(deserialize_locks(val.to_s), options)
if locks.size < resources
add_lock(locks, token)
newval = serialize_locks(locks)
if client.set_cas(key, newval, cas)
acquisition_token = token
break
end
end
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => _
raise FailedToAcquireLock
end
acquisition_token
end
def refresh(key, acquisition_token, options = {})
options = merge_defaults(options)
client = options[:client]
begin
start = Time.now.to_f
options[:retry_count].times do
val, cas = client.get_cas(key)
# much like with initial set - ensure the key is here
if val.nil?
client.set(key, "")
next
end
locks = clear_expired_locks(deserialize_locks(val), options)
refresh_lock(locks, acquisition_token)
newval = serialize_locks(locks)
break if client.set_cas(key, newval, cas)
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => _
raise FailedToAcquireLock
end
end
def unlock(key, acquisition_token, options = {})
options = merge_defaults(options)
client = options[:client]
return unless acquisition_token
begin
start = Time.now.to_f
options[:retry_count].times do
val, cas = client.get_cas(key)
break if val.nil? # lock has expired totally
locks = clear_expired_locks(deserialize_locks(val), options)
acquisition_lock = remove_lock(locks, acquisition_token)
break unless acquisition_lock
newval = serialize_locks(locks)
break if client.set_cas(key, newval, cas)
# another client cleared a token in the interim - try again!
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => boom # rubocop:disable Lint/HandleExceptions
# since it's optimistic locking - fine if we are unable to release
raise boom if ENV["SUO_TEST"]
end
end
def clear(key, options = {})
options = merge_defaults(options)
options[:client].delete(key)
end
end
end
end
end

167
lib/suo/client/redis.rb Normal file
View File

@@ -0,0 +1,167 @@
module Suo
module Client
class Redis < Base
def initialize(options = {})
options[:client] ||= ::Redis.new(options[:connection] || {})
super
end
class << self
def lock(key, resources = 1, options = {})
options = merge_defaults(options)
acquisition_token = nil
token = SecureRandom.base64(16)
client = options[:client]
begin
start = Time.now.to_f
options[:retry_count].times do
client.watch(key) do
begin
val = client.get(key)
locks = clear_expired_locks(deserialize_locks(val.to_s), options)
if locks.size < resources
add_lock(locks, token)
newval = serialize_locks(locks)
ret = client.multi do |multi|
multi.set(key, newval)
end
acquisition_token = token if ret[0] == "OK"
end
ensure
client.unwatch
end
end
break if acquisition_token
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => boom
raise boom
raise Suo::Client::FailedToAcquireLock
end
acquisition_token
end
def refresh(key, acquisition_token, options = {})
options = merge_defaults(options)
client = options[:client]
refreshed = false
begin
start = Time.now.to_f
options[:retry_count].times do
client.watch(key) do
begin
val = client.get(key)
locks = clear_expired_locks(deserialize_locks(val), options)
refresh_lock(locks, acquisition_token)
newval = serialize_locks(locks)
ret = client.multi do |multi|
multi.set(key, newval)
end
refreshed = ret[0] == "OK"
ensure
client.unwatch
end
end
break if refreshed
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => _
raise Suo::Client::FailedToAcquireLock
end
end
def unlock(key, acquisition_token, options = {})
options = merge_defaults(options)
client = options[:client]
return unless acquisition_token
begin
start = Time.now.to_f
options[:retry_count].times do
cleared = false
client.watch(key) do
begin
val = client.get(key)
if val.nil?
cleared = true
break
end
locks = clear_expired_locks(deserialize_locks(val), options)
acquisition_lock = remove_lock(locks, acquisition_token)
unless acquisition_lock
# token was already cleared
cleared = true
break
end
newval = serialize_locks(locks)
ret = client.multi do |multi|
multi.set(key, newval)
end
cleared = ret[0] == "OK"
ensure
client.unwatch
end
end
break if cleared
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => boom # rubocop:disable Lint/HandleExceptions
# since it's optimistic locking - fine if we are unable to release
raise boom if ENV["SUO_TEST"]
end
end
def clear(key, options = {})
options = merge_defaults(options)
options[:client].del(key)
end
end
end
end
end

12
lib/suo/clients.rb Normal file
View File

@@ -0,0 +1,12 @@
require "securerandom"
require "monitor"
require "dalli"
require "dalli/cas/client"
require "redis"
require "suo/client/errors"
require "suo/client/base"
require "suo/client/memcached"
require "suo/client/redis"

3
lib/suo/version.rb Normal file
View File

@@ -0,0 +1,3 @@
module Suo
VERSION = "0.1.0"
end