commit 06d296c8d9f50006d9f9ef39811c61fac0713892 Author: Nick Elser Date: Sun Apr 12 13:40:53 2015 -0700 first commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c1ecbde --- /dev/null +++ b/.gitignore @@ -0,0 +1,18 @@ +.DS_Store +*.gem +*.rbc +.bundle +.config +.yardoc +Gemfile.lock +InstalledFiles +_yardoc +coverage +doc/ +lib/bundler/man +pkg +rdoc +spec/reports +test/tmp +test/version_tmp +tmp diff --git a/.rubocop.yml b/.rubocop.yml new file mode 100644 index 0000000..967bcd7 --- /dev/null +++ b/.rubocop.yml @@ -0,0 +1,216 @@ +AllCops: + Exclude: + - .git/**/* + - tmp/**/* + - suo.gemspec + +Lint/DuplicateMethods: + Enabled: true + +Lint/DeprecatedClassMethods: + Enabled: true + +Style/TrailingWhitespace: + Enabled: true + +Style/Tab: + Enabled: true + +Style/TrailingBlankLines: + Enabled: true + +Style/NilComparison: + Enabled: true + +Style/NonNilCheck: + Enabled: true + +Style/Not: + Enabled: true + +Style/RedundantReturn: + Enabled: true + +Style/ClassCheck: + Enabled: true + +Style/EmptyLines: + Enabled: true + +Style/EmptyLiteral: + Enabled: true + +Style/Alias: + Enabled: true + +Style/MethodCallParentheses: + Enabled: true + +Style/MethodDefParentheses: + Enabled: true + +Style/SpaceBeforeBlockBraces: + Enabled: true + +Style/SpaceInsideBlockBraces: + Enabled: true + +Style/SpaceInsideParens: + Enabled: true + +Style/DeprecatedHashMethods: + Enabled: true + +Style/HashSyntax: + Enabled: true + +Style/SpaceInsideHashLiteralBraces: + Enabled: true + EnforcedStyle: no_space + +Style/SpaceInsideBrackets: + Enabled: true + +Style/AndOr: + Enabled: false + +Style/TrailingComma: + Enabled: true + +Style/SpaceBeforeComma: + Enabled: true + +Style/SpaceBeforeComment: + Enabled: true + +Style/SpaceBeforeSemicolon: + Enabled: true + +Style/SpaceAroundBlockParameters: + Enabled: true + +Style/SpaceAroundOperators: + Enabled: true + +Style/SpaceAfterColon: + Enabled: true + +Style/SpaceAfterComma: + Enabled: true + +Style/SpaceAfterControlKeyword: + Enabled: true + +Style/SpaceAfterNot: + Enabled: true + +Style/SpaceAfterSemicolon: + Enabled: true + +Lint/UselessComparison: + Enabled: true + +Lint/InvalidCharacterLiteral: + Enabled: true + +Lint/LiteralInInterpolation: + Enabled: true + +Lint/LiteralInCondition: + Enabled: true + +Lint/UnusedBlockArgument: + Enabled: true + +Style/VariableInterpolation: + Enabled: true + +Style/RedundantSelf: + Enabled: true + +Style/ParenthesesAroundCondition: + Enabled: true + +Style/WhileUntilDo: + Enabled: true + +Style/EmptyLineBetweenDefs: + Enabled: true + +Style/EmptyLinesAroundAccessModifier: + Enabled: true + +Style/EmptyLinesAroundMethodBody: + Enabled: true + +Style/ColonMethodCall: + Enabled: true + +Lint/SpaceBeforeFirstArg: + Enabled: true + +Lint/UnreachableCode: + Enabled: true + +Style/UnlessElse: + Enabled: true + +Style/ClassVars: + Enabled: true + +Style/StringLiterals: + Enabled: true + EnforcedStyle: double_quotes + +Metrics/CyclomaticComplexity: + Max: 8 + +Metrics/LineLength: + Max: 128 + +Metrics/MethodLength: + Max: 32 + +Metrics/PerceivedComplexity: + Max: 8 + +# Disabled + +Style/EvenOdd: + Enabled: false + +Style/AsciiComments: + Enabled: false + +Style/NumericLiterals: + Enabled: false + +Style/UnneededPercentQ: + Enabled: false + +Style/SpecialGlobalVars: + Enabled: false + +Style/TrivialAccessors: + Enabled: false + +Style/PerlBackrefs: + Enabled: false + +Metrics/AbcSize: + Enabled: false + +Metrics/BlockNesting: + Enabled: false + +Metrics/ClassLength: + Enabled: false + +Metrics/MethodLength: + Enabled: false + +Metrics/ParameterLists: + Enabled: false + +Metrics/PerceivedComplexity: + Enabled: false diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..457cdc9 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,3 @@ +language: ruby +rvm: + - 2.2.0 diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..bcf08a5 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,3 @@ +## 0.1.0 + +- First release diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000..b4e2a20 --- /dev/null +++ b/Gemfile @@ -0,0 +1,3 @@ +source "https://rubygems.org" + +gemspec diff --git a/README.md b/README.md new file mode 100644 index 0000000..5ed666a --- /dev/null +++ b/README.md @@ -0,0 +1,58 @@ +# Suo + +:lock: Distributed semaphores using Memcached or Redis in Ruby. + +Suo provides a very performant distributed lock solution using Compare-And-Set (`CAS`) commands in Memcached, and `WATCH/MULTI` in Redis. + +## Installation + +Add this line to your application’s Gemfile: + +```ruby +gem 'suo' +``` + +## Usage + +### Basic + +```ruby +# Memcached +suo = Suo::Client::Memcached.new(connection: "127.0.0.1:11211") + +# Redis +suo = Suo::Client::Redis.new(connection: {host: "10.0.1.1"}) + +# Pre-existing client +suo = Suo::Client::Memcached.new(client: some_dalli_client) + +suo.lock("some_key") do + # critical code here + @puppies.pet! +end + +2.times do + Thread.new do + # second argument is the number of resources - so this will run twice + suo.lock("other_key", 2, timeout: 0.5) { puts "Will run twice!" } + end +end +``` + +## TODO + - better stale key handling (refresh blocks) + - more race condition tests + - refactor clients to re-use more code + +## History + +View the [changelog](https://github.com/nickelser/suo/blob/master/CHANGELOG.md) + +## Contributing + +Everyone is encouraged to help improve this project. Here are a few ways you can help: + +- [Report bugs](https://github.com/nickelser/suo/issues) +- Fix bugs and [submit pull requests](https://github.com/nickelser/suo/pulls) +- Write, clarify, or fix documentation +- Suggest or add new features diff --git a/Rakefile b/Rakefile new file mode 100644 index 0000000..c52127f --- /dev/null +++ b/Rakefile @@ -0,0 +1,7 @@ +require "bundler/gem_tasks" +require "rake/testtask" + +Rake::TestTask.new do |t| + t.libs << "test" + t.pattern = "test/*_test.rb" +end diff --git a/bin/console b/bin/console new file mode 100755 index 0000000..eba769f --- /dev/null +++ b/bin/console @@ -0,0 +1,7 @@ +#!/usr/bin/env ruby + +require "bundler/setup" +require "suo" +require "irb" + +IRB.start diff --git a/bin/setup b/bin/setup new file mode 100755 index 0000000..7913e05 --- /dev/null +++ b/bin/setup @@ -0,0 +1,5 @@ +#!/bin/bash +set -euo pipefail +IFS=$'\n\t' + +bundle install diff --git a/lib/suo.rb b/lib/suo.rb new file mode 100644 index 0000000..993924a --- /dev/null +++ b/lib/suo.rb @@ -0,0 +1,2 @@ +require "suo/version" +require "suo/clients" diff --git a/lib/suo/client/base.rb b/lib/suo/client/base.rb new file mode 100644 index 0000000..99db651 --- /dev/null +++ b/lib/suo/client/base.rb @@ -0,0 +1,116 @@ +module Suo + module Client + class Base + DEFAULT_OPTIONS = { + retry_count: 3, + retry_delay: 0.01, + stale_lock_expiration: 3600 + }.freeze + + def initialize(options = {}) + @options = self.class.merge_defaults(options).merge(_initialized: true) + end + + def lock(key, resources = 1, options = {}) + options = self.class.merge_defaults(@options.merge(options)) + token = self.class.lock(key, resources, options) + + if token + begin + yield if block_given? + ensure + self.class.unlock(key, token, options) + end + + true + else + false + end + end + + def locked?(key, resources = 1) + self.class.locked?(key, resources, @options) + end + + class << self + def lock(key, resources = 1, options = {}) # rubocop:disable Lint/UnusedMethodArgument + fail NotImplementedError + end + + def locked?(key, resources = 1, options = {}) + options = merge_defaults(options) + client = options[:client] + locks = deserialize_locks(client.get(key)) + + locks.size >= resources + end + + def locks(key, options) + options = merge_defaults(options) + client = options[:client] + locks = deserialize_locks(client.get(key)) + + locks.size + end + + def refresh(key, acquisition_token, options = {}) # rubocop:disable Lint/UnusedMethodArgument + fail NotImplementedError + end + + def unlock(key, acquisition_token, options = {}) # rubocop:disable Lint/UnusedMethodArgument + fail NotImplementedError + end + + def clear(key, options = {}) # rubocop:disable Lint/UnusedMethodArgument + fail NotImplementedError + end + + def merge_defaults(options = {}) + unless options[:_initialized] + options = self::DEFAULT_OPTIONS.merge(options) + + fail "Client required" unless options[:client] + end + + if options[:retry_timeout] + options[:retry_count] = (options[:retry_timeout] / options[:retry_delay].to_f).floor + end + + options + end + + private + + def serialize_locks(locks) + locks.map { |time, token| [time.to_f, token].join(":") }.join(",") + end + + def deserialize_locks(str) + str.split(",").map do |s| + time, token = s.split(":", 2) + [Time.at(time.to_f), token] + end + end + + def clear_expired_locks(locks, options) + expired = Time.now - options[:stale_lock_expiration] + locks.reject { |time, _| time < expired } + end + + def add_lock(locks, token) + locks << [Time.now.to_f, token] + end + + def remove_lock(locks, acquisition_token) + lock = locks.find { |_, token| token == acquisition_token } + locks.delete(lock) + end + + def refresh_lock(locks, acquisition_token) + remove_lock(locks, acquisition_token) + add_lock(locks, token) + end + end + end + end +end diff --git a/lib/suo/client/errors.rb b/lib/suo/client/errors.rb new file mode 100644 index 0000000..15fee88 --- /dev/null +++ b/lib/suo/client/errors.rb @@ -0,0 +1,7 @@ +module Suo + module Client + module Errors + class FailedToAcquireLock < StandardError; end + end + end +end diff --git a/lib/suo/client/memcached.rb b/lib/suo/client/memcached.rb new file mode 100644 index 0000000..08a1d6d --- /dev/null +++ b/lib/suo/client/memcached.rb @@ -0,0 +1,137 @@ +module Suo + module Client + class Memcached < Base + def initialize(options = {}) + options[:client] ||= Dalli::Client.new(options[:connection] || ENV["MEMCACHE_SERVERS"] || "127.0.0.1:11211") + super + end + + class << self + def lock(key, resources = 1, options = {}) + options = merge_defaults(options) + acquisition_token = nil + token = SecureRandom.base64(16) + client = options[:client] + + begin + start = Time.now.to_f + + options[:retry_count].times do |i| + val, cas = client.get_cas(key) + + # no key has been set yet; we could simply set it, but would lead to race conditions on the initial setting + if val.nil? + client.set(key, "") + next + end + + locks = clear_expired_locks(deserialize_locks(val.to_s), options) + + if locks.size < resources + add_lock(locks, token) + + newval = serialize_locks(locks) + + if client.set_cas(key, newval, cas) + acquisition_token = token + break + end + end + + if options[:retry_timeout] + now = Time.now.to_f + break if now - start > options[:retry_timeout] + end + + sleep(rand(options[:retry_delay] * 1000).to_f / 1000) + end + rescue => _ + raise FailedToAcquireLock + end + + acquisition_token + end + + def refresh(key, acquisition_token, options = {}) + options = merge_defaults(options) + client = options[:client] + + begin + start = Time.now.to_f + + options[:retry_count].times do + val, cas = client.get_cas(key) + + # much like with initial set - ensure the key is here + if val.nil? + client.set(key, "") + next + end + + locks = clear_expired_locks(deserialize_locks(val), options) + + refresh_lock(locks, acquisition_token) + + newval = serialize_locks(locks) + + break if client.set_cas(key, newval, cas) + + if options[:retry_timeout] + now = Time.now.to_f + break if now - start > options[:retry_timeout] + end + + sleep(rand(options[:retry_delay] * 1000).to_f / 1000) + end + rescue => _ + raise FailedToAcquireLock + end + end + + def unlock(key, acquisition_token, options = {}) + options = merge_defaults(options) + client = options[:client] + + return unless acquisition_token + + begin + start = Time.now.to_f + + options[:retry_count].times do + val, cas = client.get_cas(key) + + break if val.nil? # lock has expired totally + + locks = clear_expired_locks(deserialize_locks(val), options) + + acquisition_lock = remove_lock(locks, acquisition_token) + + break unless acquisition_lock + + newval = serialize_locks(locks) + + break if client.set_cas(key, newval, cas) + + # another client cleared a token in the interim - try again! + + if options[:retry_timeout] + now = Time.now.to_f + break if now - start > options[:retry_timeout] + end + + sleep(rand(options[:retry_delay] * 1000).to_f / 1000) + end + rescue => boom # rubocop:disable Lint/HandleExceptions + # since it's optimistic locking - fine if we are unable to release + raise boom if ENV["SUO_TEST"] + end + end + + def clear(key, options = {}) + options = merge_defaults(options) + options[:client].delete(key) + end + end + end + end +end diff --git a/lib/suo/client/redis.rb b/lib/suo/client/redis.rb new file mode 100644 index 0000000..113072f --- /dev/null +++ b/lib/suo/client/redis.rb @@ -0,0 +1,167 @@ +module Suo + module Client + class Redis < Base + def initialize(options = {}) + options[:client] ||= ::Redis.new(options[:connection] || {}) + super + end + + class << self + def lock(key, resources = 1, options = {}) + options = merge_defaults(options) + acquisition_token = nil + token = SecureRandom.base64(16) + client = options[:client] + + begin + start = Time.now.to_f + + options[:retry_count].times do + client.watch(key) do + begin + val = client.get(key) + + locks = clear_expired_locks(deserialize_locks(val.to_s), options) + + if locks.size < resources + add_lock(locks, token) + + newval = serialize_locks(locks) + + ret = client.multi do |multi| + multi.set(key, newval) + end + + acquisition_token = token if ret[0] == "OK" + end + ensure + client.unwatch + end + end + + break if acquisition_token + + if options[:retry_timeout] + now = Time.now.to_f + break if now - start > options[:retry_timeout] + end + + sleep(rand(options[:retry_delay] * 1000).to_f / 1000) + end + rescue => boom + raise boom + raise Suo::Client::FailedToAcquireLock + end + + acquisition_token + end + + def refresh(key, acquisition_token, options = {}) + options = merge_defaults(options) + client = options[:client] + refreshed = false + + begin + start = Time.now.to_f + + options[:retry_count].times do + client.watch(key) do + begin + val = client.get(key) + + locks = clear_expired_locks(deserialize_locks(val), options) + + refresh_lock(locks, acquisition_token) + + newval = serialize_locks(locks) + + ret = client.multi do |multi| + multi.set(key, newval) + end + + refreshed = ret[0] == "OK" + ensure + client.unwatch + end + end + + break if refreshed + + if options[:retry_timeout] + now = Time.now.to_f + break if now - start > options[:retry_timeout] + end + + sleep(rand(options[:retry_delay] * 1000).to_f / 1000) + end + rescue => _ + raise Suo::Client::FailedToAcquireLock + end + end + + def unlock(key, acquisition_token, options = {}) + options = merge_defaults(options) + client = options[:client] + + return unless acquisition_token + + begin + start = Time.now.to_f + + options[:retry_count].times do + cleared = false + + client.watch(key) do + begin + val = client.get(key) + + if val.nil? + cleared = true + break + end + + locks = clear_expired_locks(deserialize_locks(val), options) + + acquisition_lock = remove_lock(locks, acquisition_token) + + unless acquisition_lock + # token was already cleared + cleared = true + break + end + + newval = serialize_locks(locks) + + ret = client.multi do |multi| + multi.set(key, newval) + end + + cleared = ret[0] == "OK" + ensure + client.unwatch + end + end + + break if cleared + + if options[:retry_timeout] + now = Time.now.to_f + break if now - start > options[:retry_timeout] + end + + sleep(rand(options[:retry_delay] * 1000).to_f / 1000) + end + rescue => boom # rubocop:disable Lint/HandleExceptions + # since it's optimistic locking - fine if we are unable to release + raise boom if ENV["SUO_TEST"] + end + end + + def clear(key, options = {}) + options = merge_defaults(options) + options[:client].del(key) + end + end + end + end +end diff --git a/lib/suo/clients.rb b/lib/suo/clients.rb new file mode 100644 index 0000000..5d8ba46 --- /dev/null +++ b/lib/suo/clients.rb @@ -0,0 +1,12 @@ +require "securerandom" +require "monitor" + +require "dalli" +require "dalli/cas/client" + +require "redis" + +require "suo/client/errors" +require "suo/client/base" +require "suo/client/memcached" +require "suo/client/redis" diff --git a/lib/suo/version.rb b/lib/suo/version.rb new file mode 100644 index 0000000..4e804ef --- /dev/null +++ b/lib/suo/version.rb @@ -0,0 +1,3 @@ +module Suo + VERSION = "0.1.0" +end diff --git a/suo.gemspec b/suo.gemspec new file mode 100644 index 0000000..6d84025 --- /dev/null +++ b/suo.gemspec @@ -0,0 +1,28 @@ +# coding: utf-8 +lib = File.expand_path('../lib', __FILE__) +$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib) +require 'suo/version' + +Gem::Specification.new do |spec| + spec.name = "suo" + spec.version = Suo::VERSION + spec.authors = ["Nick Elser"] + spec.email = ["nick.elser@gmail.com"] + + spec.summary = %q(TODO: Write a short summary, because Rubygems requires one.) + spec.description = %q{TODO: Write a longer description or delete this line.} + spec.homepage = "TODO: Put your gem's website or public repo URL here." + + spec.files = `git ls-files -z`.split("\x0").reject { |f| f.match(%r{^(test|spec|features)/}) } + spec.bindir = "exe" + spec.executables = spec.files.grep(%r{^exe/}) { |f| File.basename(f) } + spec.require_paths = ["lib"] + + spec.add_dependency "dalli" + spec.add_dependency "redis" + spec.add_dependency "msgpack" + + spec.add_development_dependency "bundler", "~> 1.5" + spec.add_development_dependency "rake", "~> 10.0" + spec.add_development_dependency "rubocop", "~> 0.30.0" +end diff --git a/test/client_test.rb b/test/client_test.rb new file mode 100644 index 0000000..8ad233b --- /dev/null +++ b/test/client_test.rb @@ -0,0 +1,154 @@ +require "test_helper" + +TEST_KEY = "suo_test_key".freeze + +module ClientTests + def test_requires_client + exception = assert_raises(RuntimeError) do + @klass.lock(TEST_KEY, 1) + end + + assert_equal "Client required", exception.message + end + + def test_class_single_resource_locking + lock1 = @klass.lock(TEST_KEY, 1, client: @klass_client) + refute_nil lock1 + + locked = @klass.locked?(TEST_KEY, 1, client: @klass_client) + assert_equal true, locked + + lock2 = @klass.lock(TEST_KEY, 1, client: @klass_client) + assert_nil lock2 + + @klass.unlock(TEST_KEY, lock1, client: @klass_client) + + locked = @klass.locked?(TEST_KEY, 1, client: @klass_client) + assert_equal false, locked + end + + def test_class_multiple_resource_locking + lock1 = @klass.lock(TEST_KEY, 2, client: @klass_client) + refute_nil lock1 + + locked = @klass.locked?(TEST_KEY, 2, client: @klass_client) + assert_equal false, locked + + lock2 = @klass.lock(TEST_KEY, 2, client: @klass_client) + refute_nil lock2 + + locked = @klass.locked?(TEST_KEY, 2, client: @klass_client) + assert_equal true, locked + + @klass.unlock(TEST_KEY, lock1, client: @klass_client) + + locked = @klass.locked?(TEST_KEY, 1, client: @klass_client) + assert_equal true, locked + + @klass.unlock(TEST_KEY, lock2, client: @klass_client) + + locked = @klass.locked?(TEST_KEY, 1, client: @klass_client) + assert_equal false, locked + end + + def test_instance_single_resource_locking + locked = false + + @client.lock(TEST_KEY, 1) { locked = true } + + assert_equal true, locked + end + + def test_instance_unlocks_on_exception + assert_raises(RuntimeError) do + @client.lock(TEST_KEY, 1) { fail "Test" } + end + + locked = @klass.locked?(TEST_KEY, 1, client: @klass_client) + assert_equal false, locked + end + + def test_instance_multiple_resource_locking + success_counter = Queue.new + failure_counter = Queue.new + + 100.times.map do |i| + Thread.new do + success = @client.lock(TEST_KEY, 50, retry_timeout: 0.9) do + sleep(1) + success_counter << i + end + + failure_counter << i unless success + end + end.map(&:join) + + assert_equal 50, success_counter.size + assert_equal 50, failure_counter.size + end + + def test_instance_multiple_resource_locking_longer_timeout + success_counter = Queue.new + failure_counter = Queue.new + + 100.times.map do |i| + Thread.new do + success = @client.lock(TEST_KEY, 50, retry_timeout: 2) do + sleep(1) + success_counter << i + end + + failure_counter << i unless success + end + end.map(&:join) + + assert_equal 100, success_counter.size + assert_equal 0, failure_counter.size + end +end + +class TestBaseClient < Minitest::Test + def setup + @klass = Suo::Client::Base + end + + def test_not_implemented + assert_raises(NotImplementedError) do + @klass.lock(TEST_KEY, 1) + end + end +end + +class TestMemcachedClient < Minitest::Test + include ClientTests + + def setup + @klass = Suo::Client::Memcached + @client = @klass.new + @klass_client = Dalli::Client.new("127.0.0.1:11211") + end + + def teardown + @klass_client.delete(TEST_KEY) + end +end + +class TestRedisClient < Minitest::Test + include ClientTests + + def setup + @klass = Suo::Client::Redis + @client = @klass.new + @klass_client = Redis.new + end + + def teardown + @klass_client.del(TEST_KEY) + end +end + +class TestLibrary < Minitest::Test + def test_that_it_has_a_version_number + refute_nil ::Suo::VERSION + end +end diff --git a/test/test_helper.rb b/test/test_helper.rb new file mode 100644 index 0000000..5a3867f --- /dev/null +++ b/test/test_helper.rb @@ -0,0 +1,9 @@ +$LOAD_PATH.unshift File.expand_path("../../lib", __FILE__) + +require "suo" +require "thread" +require "minitest/autorun" +require "minitest/benchmark" + +ENV["SUO_TEST"] = "true" +