diff --git a/.gitignore b/.gitignore index eaa6e54..4367087 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ .ruby-version +.DS_Store diff --git a/README.md b/README.md index fd7027f..e0b9e55 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,16 @@ To prove this point and to serve as a nice tutorial anyway, some examples ported over from [Go by example](http://gobyexample.com) are included in [examples](examples). +- [X] channels +- [X] channel buffering +- [X] channel synchronization +- [X] channel direction +- [X] select +- [X] timeouts +- [ ] non-block channel operations +- [X] closing channels +- [X] range over channels + ## License MIT diff --git a/examples/channel_directions.rb b/examples/channel_directions.rb index 1df5034..e3eef40 100644 --- a/examples/channel_directions.rb +++ b/examples/channel_directions.rb @@ -3,10 +3,13 @@ require 'channel' def ping(pings, msg) + pings = pings.send_only! pings << msg end def pong(pings, pongs) + pings = pings.receive_only! + pongs = pongs.send_only! msg = pings.recv pongs << msg end diff --git a/examples/channel_synchronization.rb b/examples/channel_synchronization.rb index 61c29e8..6abdc94 100644 --- a/examples/channel_synchronization.rb +++ b/examples/channel_synchronization.rb @@ -3,7 +3,7 @@ require 'channel' def worker(done) - puts 'working...' + $stdout.write 'working...' sleep 1 puts 'done' done << true diff --git a/examples/closing_channels.rb b/examples/closing_channels.rb index ba2668f..ee9263c 100644 --- a/examples/closing_channels.rb +++ b/examples/closing_channels.rb @@ -10,8 +10,6 @@ go lambda { begin j = jobs.recv rescue Channel::Closed - # TODO: wrong! ends before all items recv'd - # j, more := <-jobs; more == True puts 'received all jobs' done << true return @@ -21,7 +19,7 @@ go lambda { end } -3.times do |j| +1.upto 3 do |j| jobs << j puts "sent job #{j}" end diff --git a/examples/range_over_channels.rb b/examples/range_over_channels.rb index 57addf2..2133ddf 100644 --- a/examples/range_over_channels.rb +++ b/examples/range_over_channels.rb @@ -5,6 +5,6 @@ require 'channel' queue = Channel.new(2) queue << 'one' queue << 'two' -close(queue) +queue.close queue.each { |e| puts e } diff --git a/examples/timeout.rb b/examples/timeouts.rb similarity index 68% rename from examples/timeout.rb rename to examples/timeouts.rb index 2a6be27..20ecf2d 100644 --- a/examples/timeout.rb +++ b/examples/timeouts.rb @@ -1,7 +1,7 @@ # https://gobyexample.com/timeouts require 'channel' -require 'channel/timeout' +require 'channel/timer' c1 = Channel.new(1) go lambda { @@ -9,7 +9,7 @@ go lambda { c1 << 'result 1' } -Channel.select(c1, t1 = Channel::Timeout.after(1)) do |res, c| +Channel.select(c1, t1 = Channel::Timer.after(1)) do |res, c| case c when c1 then puts res when t1 then puts 'timeout 1' @@ -22,7 +22,7 @@ go lambda { c2 << 'result 2' } -Channel.select(c2, t2 = Channel::Timeout.after(3)) do |res, c| +Channel.select(c2, t2 = Channel::Timer.after(3)) do |res, c| case c when c2 then puts res when t2 then puts 'timeout 1' diff --git a/lib/channel.rb b/lib/channel.rb index ed8e68e..13aa455 100644 --- a/lib/channel.rb +++ b/lib/channel.rb @@ -1,11 +1,3 @@ -require 'thread' - -module Kernel - def go(prc, *args) - Thread.new { prc.call(*args) } - end -end - class Channel class Closed < StandardError; end @@ -41,7 +33,7 @@ class Channel def recv lock! do loop do - closed! if closed? + closed! if closed? && @q.empty? wait! && next if @q.empty? break @q.pop end @@ -75,6 +67,30 @@ class Channel fail Closed end + def each + return enum_for(:each) unless block_given? + + loop do + begin + e = recv + rescue Channel::Closed + return + else + yield e + end + end + end + + def receive_only! + ReceiveOnly.new(self) + end + alias_method :r!, :receive_only! + + def send_only! + SendOnly.new(self) + end + alias_method :s!, :send_only! + class << self def select(*channels) selector = new @@ -87,4 +103,87 @@ class Channel threads.each(&:kill).each(&:join) end end + + class Direction < StandardError; end + class Conversion < StandardError; end + + class ReceiveOnly + def initialize(channel) + @channel = channel + end + + def recv + @channel.recv + end + alias_method :pop, :recv + + def send(_) + fail Direction, 'receive only' + end + alias_method :push, :send + alias_method :<<, :push + + def close + @channel.close + end + + def closed? + @channel.closed? + end + + def receive_only! + self + end + alias_method :r!, :receive_only! + + def send_only! + fail Conversion, 'receive only' + end + alias_method :s!, :send_only! + + def hash + @channel.hash + end + end + + class SendOnly + def initialize(channel) + @channel = channel + end + + def recv + fail Direction, 'send only' + end + alias_method :pop, :recv + + def send(val) + @channel.send(val) + end + alias_method :push, :send + alias_method :<<, :push + + def close + @channel.close + end + + def closed? + @channel.closed? + end + + def receive_only! + fail Conversion, 'send only' + end + alias_method :r!, :receive_only! + + def send_only! + self + end + alias_method :s!, :send_only! + + def hash + @channel.hash + end + end end + +require 'channel/runtime' diff --git a/lib/channel/runtime.rb b/lib/channel/runtime.rb new file mode 100644 index 0000000..7f4fedb --- /dev/null +++ b/lib/channel/runtime.rb @@ -0,0 +1,15 @@ +require 'thread' + +module Channel::Runtime + module_function + + def go(prc, *args) + Thread.new { prc.call(*args) } + end +end + +module Kernel + def go(prc, *args) + Channel::Runtime.go(prc, *args) + end +end diff --git a/lib/channel/timer.rb b/lib/channel/timer.rb new file mode 100644 index 0000000..b871878 --- /dev/null +++ b/lib/channel/timer.rb @@ -0,0 +1,17 @@ +class Channel::Timer + attr_reader :channel + + def initialize(delay) + @channel = Channel.new(1) + @prc = -> { sleep delay; channel << Time.now } + start + end + + def start + Channel::Runtime.go @prc + end + + def self.after(delay) + new(delay).channel + end +end diff --git a/test/test_channel.rb b/test/test_channel.rb index 6d7ef25..1254789 100644 --- a/test/test_channel.rb +++ b/test/test_channel.rb @@ -4,6 +4,7 @@ require 'channel' # rubocop:disable Metrics/AbcSize # rubocop:disable Metrics/MethodLength +# rubocop:disable Metrics/ClassLength class TestChannel < Test::Unit::TestCase module Util @@ -40,6 +41,16 @@ class TestChannel < Test::Unit::TestCase assert_raise(Channel::Closed) { c.recv } end + def test_buffered_channel + messages = Channel.new(2) + + messages << 'buffered' + messages << 'channel' + + assert_equal('buffered', messages.recv) + assert_equal('channel', messages.recv) + end + # def test_fail_send_to_unbuffered_channel # c = Channel.new # assert_raise_with_message(ThreadError, /No live threads left/) do @@ -73,7 +84,7 @@ class TestChannel < Test::Unit::TestCase assert_raise(Channel::Closed) { c << 'foo' } end - def test_close_blocking_channel + def test_receive_on_closed_blocking_channel c = Channel.new meanwhile(-> { assert_raise(Channel::Closed) { c.recv } }) do sleep(0.1) @@ -82,7 +93,7 @@ class TestChannel < Test::Unit::TestCase assert_equal(true, c.closed?) end - def test_close_blocking_channels + def test_many_receive_on_closed_blocking_channel c = Channel.new meanwhile( -> { assert_raise(Channel::Closed) { c.recv } }, @@ -95,6 +106,41 @@ class TestChannel < Test::Unit::TestCase assert_equal(true, c.closed?) end + def test_receive_and_close_buffered_channel + c = Channel.new(5) + meanwhile( + -> { sleep 0.1; assert_equal(1, c.recv) }, + -> { sleep 0.2; assert_equal(2, c.recv) }, + -> { sleep 0.3; assert_equal(3, c.recv) }, + -> { sleep 0.4; assert_raise(Channel::Closed) { c.recv } }, + ) do + c << 1 + c << 2 + c << 3 + c.close + end + assert_equal(true, c.closed?) + end + + def test_iterate_over_buffered_channel + c = Channel.new(2) + c << 1 + c << 2 + c.close + + assert_equal([1, 2], c.each.to_a) + end + + # def test_iterate_over_unclosed_buffered_channel + # c = Channel.new(2) + # c << 1 + # c << 2 + + # assert_raise_with_message('ThreadError', /No live threads left/) do + # c.each.to_a + # end + # end + def test_select c1 = Channel.new c2 = Channel.new @@ -117,14 +163,4 @@ class TestChannel < Test::Unit::TestCase end end end - - def test_buffered_channel - messages = Channel.new(2) - - messages << 'buffered' - messages << 'channel' - - assert_equal('buffered', messages.recv) - assert_equal('channel', messages.recv) - end end