From 87915c85683375e0a5b0f4e7e74dcd815443a005 Mon Sep 17 00:00:00 2001 From: Loic Nageleisen Date: Mon, 3 Aug 2015 13:49:35 +0200 Subject: [PATCH] first commit --- .gitignore | 1 + .rubocop.yml | 11 ++++ .travis.yml | 8 +++ Gemfile | 3 + Gemfile.lock | 46 +++++++++++++++ Rakefile | 10 ++++ channel.gemspec | 16 ++++++ lib/channel.rb | 90 ++++++++++++++++++++++++++++++ test/test_channel.rb | 130 +++++++++++++++++++++++++++++++++++++++++++ 9 files changed, 315 insertions(+) create mode 100644 .gitignore create mode 100644 .rubocop.yml create mode 100644 .travis.yml create mode 100644 Gemfile create mode 100644 Gemfile.lock create mode 100644 Rakefile create mode 100644 channel.gemspec create mode 100644 lib/channel.rb create mode 100644 test/test_channel.rb diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..eaa6e54 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.ruby-version diff --git a/.rubocop.yml b/.rubocop.yml new file mode 100644 index 0000000..0eaf834 --- /dev/null +++ b/.rubocop.yml @@ -0,0 +1,11 @@ +Style/Documentation: + Enabled: false + +Style/FileName: + Exclude: [lib/zipcode-fr.rb] + +Style/Semicolon: + AllowAsExpressionSeparator: true + +Style/TrailingComma: + EnforcedStyleForMultiline: comma diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..e223d7c --- /dev/null +++ b/.travis.yml @@ -0,0 +1,8 @@ +language: ruby +rvm: + - '2.1' + - '2.2' + - 'rbx-2.5' +script: + - bundle exec rubocop + - bundle exec rake test diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000..fa75df1 --- /dev/null +++ b/Gemfile @@ -0,0 +1,3 @@ +source 'https://rubygems.org' + +gemspec diff --git a/Gemfile.lock b/Gemfile.lock new file mode 100644 index 0000000..a352acc --- /dev/null +++ b/Gemfile.lock @@ -0,0 +1,46 @@ +PATH + remote: . + specs: + channel (0.1.0) + +GEM + remote: https://rubygems.org/ + specs: + ast (2.0.0) + astrolabe (1.3.1) + parser (~> 2.2) + coderay (1.1.0) + method_source (0.8.2) + parser (2.2.2.6) + ast (>= 1.1, < 3.0) + power_assert (0.2.4) + powerpack (0.1.1) + pry (0.10.1) + coderay (~> 1.1.0) + method_source (~> 0.8.1) + slop (~> 3.4) + rainbow (2.0.0) + rake (10.4.2) + rubocop (0.32.1) + astrolabe (~> 1.3) + parser (>= 2.2.2.5, < 3.0) + powerpack (~> 0.1) + rainbow (>= 1.99.1, < 3.0) + ruby-progressbar (~> 1.4) + ruby-progressbar (1.7.5) + slop (3.6.0) + test-unit (3.1.3) + power_assert + +PLATFORMS + ruby + +DEPENDENCIES + channel! + pry + rake + rubocop + test-unit + +BUNDLED WITH + 1.10.6 diff --git a/Rakefile b/Rakefile new file mode 100644 index 0000000..06329ca --- /dev/null +++ b/Rakefile @@ -0,0 +1,10 @@ +require 'rake/testtask' +require 'bundler' + +Bundler::GemHelper.install_tasks + +Rake::TestTask.new do |t| + t.libs << 'test' + t.test_files = FileList['test/test_*.rb'] + t.verbose = true +end diff --git a/channel.gemspec b/channel.gemspec new file mode 100644 index 0000000..598a6b3 --- /dev/null +++ b/channel.gemspec @@ -0,0 +1,16 @@ +Gem::Specification.new do |s| + s.name = 'channel' + s.version = '0.1.0' + s.licenses = ['MIT'] + s.summary = 'Channels' + s.description = 'Share memory by communicating' + s.authors = ['Loic Nageleisen'] + s.email = 'loic.nageleisen@gmail.com' + s.files = ['lib/channel.rb'] + s.homepage = 'https://github.com/lloeki/channel' + + s.add_development_dependency 'pry' + s.add_development_dependency 'rubocop' + s.add_development_dependency 'rake' + s.add_development_dependency 'test-unit' +end diff --git a/lib/channel.rb b/lib/channel.rb new file mode 100644 index 0000000..ed8e68e --- /dev/null +++ b/lib/channel.rb @@ -0,0 +1,90 @@ +require 'thread' + +module Kernel + def go(prc, *args) + Thread.new { prc.call(*args) } + end +end + +class Channel + class Closed < StandardError; end + + def initialize(size = nil) + @q = size ? SizedQueue.new(size) : Queue.new + @closed = false + @mutex = Mutex.new + @waiting = [] + end + + private def lock!(&block) + @mutex.synchronize(&block) + end + + private def wait! + @waiting << Thread.current + @mutex.sleep + end + + private def next! + loop do + thr = @waiting.shift + break if thr.nil? + next unless thr.alive? + break thr.wakeup + end + end + + private def all! + @waiting.dup.each { next! } + end + + def recv + lock! do + loop do + closed! if closed? + wait! && next if @q.empty? + break @q.pop + end + end + end + alias_method :pop, :recv + + def send(val) + lock! do + fail Closed if closed? + @q << val + next! + end + end + alias_method :push, :send + alias_method :<<, :push + + def close + lock! do + return if closed? + @closed = true + all! + end + end + + def closed? + @closed + end + + private def closed! + fail Closed + end + + class << self + def select(*channels) + selector = new + threads = channels.map do |c| + Thread.new { selector << [c.recv, c] } + end + yield selector.recv + ensure + selector.close + threads.each(&:kill).each(&:join) + end + end +end diff --git a/test/test_channel.rb b/test/test_channel.rb new file mode 100644 index 0000000..6d7ef25 --- /dev/null +++ b/test/test_channel.rb @@ -0,0 +1,130 @@ +require 'test/unit' +require 'thread' +require 'channel' + +# rubocop:disable Metrics/AbcSize +# rubocop:disable Metrics/MethodLength + +class TestChannel < Test::Unit::TestCase + module Util + def meanwhile(*procs, &blk) + threads = procs.map { |p| Thread.new(&p) } + blk.call + threads.each(&:join) + end + end + + module Assert + def assert_raise_with_message(exc, msg, &block) + e = assert_raise(exc, &block) + assert_match(msg, e.message) + end + end + + include Util + include Assert + + def test_channel + c = Channel.new + result = nil + meanwhile(-> { result = c.recv }) do + c << 'foo' + end + assert_equal('foo', result) + end + + def test_closed_channel + c = Channel.new + c.close + assert_equal(true, c.closed?) + assert_raise(Channel::Closed) { c.recv } + end + + # def test_fail_send_to_unbuffered_channel + # c = Channel.new + # assert_raise_with_message(ThreadError, /No live threads left/) do + # c.send 'foo' + # end + # end + + def test_send_to_unbuffered_channel + c = Channel.new + go -> { assert_equal('foo', c.recv) } + c.send 'foo' + end + + # def test_fill_buffered_channel + # c = Channel.new(1) + # c.send 'foo' + # assert_raise_with_message('ThreadError', /No live threads left/) do + # c.send 'foo' + # end + # end + + def test_single_thread_send_to_buffered_channel + c = Channel.new(1) + c.send 'foo' + assert_equal('foo', c.recv) + end + + def test_send_on_closed_channel + c = Channel.new + c.close + assert_raise(Channel::Closed) { c << 'foo' } + end + + def test_close_blocking_channel + c = Channel.new + meanwhile(-> { assert_raise(Channel::Closed) { c.recv } }) do + sleep(0.1) + c.close + end + assert_equal(true, c.closed?) + end + + def test_close_blocking_channels + c = Channel.new + meanwhile( + -> { assert_raise(Channel::Closed) { c.recv } }, + -> { assert_raise(Channel::Closed) { c.recv } }, + -> { assert_raise(Channel::Closed) { c.recv } }, + ) do + sleep(0.1) + c.close + end + assert_equal(true, c.closed?) + end + + def test_select + c1 = Channel.new + c2 = Channel.new + c3 = Channel.new + c4 = Channel.new + + go -> { sleep(0.1); c1 << '1' } + go -> { sleep(0.2); c2 << '2' } + go -> { sleep(0.3); c3 << '3' } + go -> { sleep(0.4); c4 << '4' } + + 4.times do + Channel.select(c1, c2, c3, c4) do |msg, c| + case c + when c1 then assert_equal('1', msg) + when c2 then assert_equal('2', msg) + when c3 then assert_equal('3', msg) + when c4 then assert_equal('4', msg) + end + end + end + end + + def test_buffered_channel + messages = Channel.new(2) + + messages << 'buffered' + messages << 'channel' + + assert_equal('buffered', messages.recv) + assert_equal('channel', messages.recv) + end +end