commit 87915c85683375e0a5b0f4e7e74dcd815443a005 Author: Loic Nageleisen Date: Mon Aug 3 13:49:35 2015 +0200 first commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..eaa6e54 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.ruby-version diff --git a/.rubocop.yml b/.rubocop.yml new file mode 100644 index 0000000..0eaf834 --- /dev/null +++ b/.rubocop.yml @@ -0,0 +1,11 @@ +Style/Documentation: + Enabled: false + +Style/FileName: + Exclude: [lib/zipcode-fr.rb] + +Style/Semicolon: + AllowAsExpressionSeparator: true + +Style/TrailingComma: + EnforcedStyleForMultiline: comma diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..e223d7c --- /dev/null +++ b/.travis.yml @@ -0,0 +1,8 @@ +language: ruby +rvm: + - '2.1' + - '2.2' + - 'rbx-2.5' +script: + - bundle exec rubocop + - bundle exec rake test diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000..fa75df1 --- /dev/null +++ b/Gemfile @@ -0,0 +1,3 @@ +source 'https://rubygems.org' + +gemspec diff --git a/Gemfile.lock b/Gemfile.lock new file mode 100644 index 0000000..a352acc --- /dev/null +++ b/Gemfile.lock @@ -0,0 +1,46 @@ +PATH + remote: . + specs: + channel (0.1.0) + +GEM + remote: https://rubygems.org/ + specs: + ast (2.0.0) + astrolabe (1.3.1) + parser (~> 2.2) + coderay (1.1.0) + method_source (0.8.2) + parser (2.2.2.6) + ast (>= 1.1, < 3.0) + power_assert (0.2.4) + powerpack (0.1.1) + pry (0.10.1) + coderay (~> 1.1.0) + method_source (~> 0.8.1) + slop (~> 3.4) + rainbow (2.0.0) + rake (10.4.2) + rubocop (0.32.1) + astrolabe (~> 1.3) + parser (>= 2.2.2.5, < 3.0) + powerpack (~> 0.1) + rainbow (>= 1.99.1, < 3.0) + ruby-progressbar (~> 1.4) + ruby-progressbar (1.7.5) + slop (3.6.0) + test-unit (3.1.3) + power_assert + +PLATFORMS + ruby + +DEPENDENCIES + channel! + pry + rake + rubocop + test-unit + +BUNDLED WITH + 1.10.6 diff --git a/Rakefile b/Rakefile new file mode 100644 index 0000000..06329ca --- /dev/null +++ b/Rakefile @@ -0,0 +1,10 @@ +require 'rake/testtask' +require 'bundler' + +Bundler::GemHelper.install_tasks + +Rake::TestTask.new do |t| + t.libs << 'test' + t.test_files = FileList['test/test_*.rb'] + t.verbose = true +end diff --git a/channel.gemspec b/channel.gemspec new file mode 100644 index 0000000..598a6b3 --- /dev/null +++ b/channel.gemspec @@ -0,0 +1,16 @@ +Gem::Specification.new do |s| + s.name = 'channel' + s.version = '0.1.0' + s.licenses = ['MIT'] + s.summary = 'Channels' + s.description = 'Share memory by communicating' + s.authors = ['Loic Nageleisen'] + s.email = 'loic.nageleisen@gmail.com' + s.files = ['lib/channel.rb'] + s.homepage = 'https://github.com/lloeki/channel' + + s.add_development_dependency 'pry' + s.add_development_dependency 'rubocop' + s.add_development_dependency 'rake' + s.add_development_dependency 'test-unit' +end diff --git a/lib/channel.rb b/lib/channel.rb new file mode 100644 index 0000000..ed8e68e --- /dev/null +++ b/lib/channel.rb @@ -0,0 +1,90 @@ +require 'thread' + +module Kernel + def go(prc, *args) + Thread.new { prc.call(*args) } + end +end + +class Channel + class Closed < StandardError; end + + def initialize(size = nil) + @q = size ? SizedQueue.new(size) : Queue.new + @closed = false + @mutex = Mutex.new + @waiting = [] + end + + private def lock!(&block) + @mutex.synchronize(&block) + end + + private def wait! + @waiting << Thread.current + @mutex.sleep + end + + private def next! + loop do + thr = @waiting.shift + break if thr.nil? + next unless thr.alive? + break thr.wakeup + end + end + + private def all! + @waiting.dup.each { next! } + end + + def recv + lock! do + loop do + closed! if closed? + wait! && next if @q.empty? + break @q.pop + end + end + end + alias_method :pop, :recv + + def send(val) + lock! do + fail Closed if closed? + @q << val + next! + end + end + alias_method :push, :send + alias_method :<<, :push + + def close + lock! do + return if closed? + @closed = true + all! + end + end + + def closed? + @closed + end + + private def closed! + fail Closed + end + + class << self + def select(*channels) + selector = new + threads = channels.map do |c| + Thread.new { selector << [c.recv, c] } + end + yield selector.recv + ensure + selector.close + threads.each(&:kill).each(&:join) + end + end +end diff --git a/test/test_channel.rb b/test/test_channel.rb new file mode 100644 index 0000000..6d7ef25 --- /dev/null +++ b/test/test_channel.rb @@ -0,0 +1,130 @@ +require 'test/unit' +require 'thread' +require 'channel' + +# rubocop:disable Metrics/AbcSize +# rubocop:disable Metrics/MethodLength + +class TestChannel < Test::Unit::TestCase + module Util + def meanwhile(*procs, &blk) + threads = procs.map { |p| Thread.new(&p) } + blk.call + threads.each(&:join) + end + end + + module Assert + def assert_raise_with_message(exc, msg, &block) + e = assert_raise(exc, &block) + assert_match(msg, e.message) + end + end + + include Util + include Assert + + def test_channel + c = Channel.new + result = nil + meanwhile(-> { result = c.recv }) do + c << 'foo' + end + assert_equal('foo', result) + end + + def test_closed_channel + c = Channel.new + c.close + assert_equal(true, c.closed?) + assert_raise(Channel::Closed) { c.recv } + end + + # def test_fail_send_to_unbuffered_channel + # c = Channel.new + # assert_raise_with_message(ThreadError, /No live threads left/) do + # c.send 'foo' + # end + # end + + def test_send_to_unbuffered_channel + c = Channel.new + go -> { assert_equal('foo', c.recv) } + c.send 'foo' + end + + # def test_fill_buffered_channel + # c = Channel.new(1) + # c.send 'foo' + # assert_raise_with_message('ThreadError', /No live threads left/) do + # c.send 'foo' + # end + # end + + def test_single_thread_send_to_buffered_channel + c = Channel.new(1) + c.send 'foo' + assert_equal('foo', c.recv) + end + + def test_send_on_closed_channel + c = Channel.new + c.close + assert_raise(Channel::Closed) { c << 'foo' } + end + + def test_close_blocking_channel + c = Channel.new + meanwhile(-> { assert_raise(Channel::Closed) { c.recv } }) do + sleep(0.1) + c.close + end + assert_equal(true, c.closed?) + end + + def test_close_blocking_channels + c = Channel.new + meanwhile( + -> { assert_raise(Channel::Closed) { c.recv } }, + -> { assert_raise(Channel::Closed) { c.recv } }, + -> { assert_raise(Channel::Closed) { c.recv } }, + ) do + sleep(0.1) + c.close + end + assert_equal(true, c.closed?) + end + + def test_select + c1 = Channel.new + c2 = Channel.new + c3 = Channel.new + c4 = Channel.new + + go -> { sleep(0.1); c1 << '1' } + go -> { sleep(0.2); c2 << '2' } + go -> { sleep(0.3); c3 << '3' } + go -> { sleep(0.4); c4 << '4' } + + 4.times do + Channel.select(c1, c2, c3, c4) do |msg, c| + case c + when c1 then assert_equal('1', msg) + when c2 then assert_equal('2', msg) + when c3 then assert_equal('3', msg) + when c4 then assert_equal('4', msg) + end + end + end + end + + def test_buffered_channel + messages = Channel.new(2) + + messages << 'buffered' + messages << 'channel' + + assert_equal('buffered', messages.recv) + assert_equal('channel', messages.recv) + end +end