This commit is contained in:
Loic Nageleisen 2015-12-15 13:50:47 +01:00
parent 0cdf0a490a
commit 06273c3525
11 changed files with 208 additions and 29 deletions

1
.gitignore vendored
View file

@ -1 +1,2 @@
.ruby-version
.DS_Store

View file

@ -25,6 +25,16 @@ To prove this point and to serve as a nice tutorial anyway, some examples
ported over from [Go by example](http://gobyexample.com) are included in
[examples](examples).
- [X] channels
- [X] channel buffering
- [X] channel synchronization
- [X] channel direction
- [X] select
- [X] timeouts
- [ ] non-block channel operations
- [X] closing channels
- [X] range over channels
## License
MIT

View file

@ -3,10 +3,13 @@
require 'channel'
def ping(pings, msg)
pings = pings.send_only!
pings << msg
end
def pong(pings, pongs)
pings = pings.receive_only!
pongs = pongs.send_only!
msg = pings.recv
pongs << msg
end

View file

@ -3,7 +3,7 @@
require 'channel'
def worker(done)
puts 'working...'
$stdout.write 'working...'
sleep 1
puts 'done'
done << true

View file

@ -10,8 +10,6 @@ go lambda {
begin
j = jobs.recv
rescue Channel::Closed
# TODO: wrong! ends before all items recv'd
# j, more := <-jobs; more == True
puts 'received all jobs'
done << true
return
@ -21,7 +19,7 @@ go lambda {
end
}
3.times do |j|
1.upto 3 do |j|
jobs << j
puts "sent job #{j}"
end

View file

@ -5,6 +5,6 @@ require 'channel'
queue = Channel.new(2)
queue << 'one'
queue << 'two'
close(queue)
queue.close
queue.each { |e| puts e }

View file

@ -1,7 +1,7 @@
# https://gobyexample.com/timeouts
require 'channel'
require 'channel/timeout'
require 'channel/timer'
c1 = Channel.new(1)
go lambda {
@ -9,7 +9,7 @@ go lambda {
c1 << 'result 1'
}
Channel.select(c1, t1 = Channel::Timeout.after(1)) do |res, c|
Channel.select(c1, t1 = Channel::Timer.after(1)) do |res, c|
case c
when c1 then puts res
when t1 then puts 'timeout 1'
@ -22,7 +22,7 @@ go lambda {
c2 << 'result 2'
}
Channel.select(c2, t2 = Channel::Timeout.after(3)) do |res, c|
Channel.select(c2, t2 = Channel::Timer.after(3)) do |res, c|
case c
when c2 then puts res
when t2 then puts 'timeout 1'

View file

@ -1,11 +1,3 @@
require 'thread'
module Kernel
def go(prc, *args)
Thread.new { prc.call(*args) }
end
end
class Channel
class Closed < StandardError; end
@ -41,7 +33,7 @@ class Channel
def recv
lock! do
loop do
closed! if closed?
closed! if closed? && @q.empty?
wait! && next if @q.empty?
break @q.pop
end
@ -75,6 +67,30 @@ class Channel
fail Closed
end
def each
return enum_for(:each) unless block_given?
loop do
begin
e = recv
rescue Channel::Closed
return
else
yield e
end
end
end
def receive_only!
ReceiveOnly.new(self)
end
alias_method :r!, :receive_only!
def send_only!
SendOnly.new(self)
end
alias_method :s!, :send_only!
class << self
def select(*channels)
selector = new
@ -87,4 +103,87 @@ class Channel
threads.each(&:kill).each(&:join)
end
end
class Direction < StandardError; end
class Conversion < StandardError; end
class ReceiveOnly
def initialize(channel)
@channel = channel
end
def recv
@channel.recv
end
alias_method :pop, :recv
def send(_)
fail Direction, 'receive only'
end
alias_method :push, :send
alias_method :<<, :push
def close
@channel.close
end
def closed?
@channel.closed?
end
def receive_only!
self
end
alias_method :r!, :receive_only!
def send_only!
fail Conversion, 'receive only'
end
alias_method :s!, :send_only!
def hash
@channel.hash
end
end
class SendOnly
def initialize(channel)
@channel = channel
end
def recv
fail Direction, 'send only'
end
alias_method :pop, :recv
def send(val)
@channel.send(val)
end
alias_method :push, :send
alias_method :<<, :push
def close
@channel.close
end
def closed?
@channel.closed?
end
def receive_only!
fail Conversion, 'send only'
end
alias_method :r!, :receive_only!
def send_only!
self
end
alias_method :s!, :send_only!
def hash
@channel.hash
end
end
end
require 'channel/runtime'

15
lib/channel/runtime.rb Normal file
View file

@ -0,0 +1,15 @@
require 'thread'
module Channel::Runtime
module_function
def go(prc, *args)
Thread.new { prc.call(*args) }
end
end
module Kernel
def go(prc, *args)
Channel::Runtime.go(prc, *args)
end
end

17
lib/channel/timer.rb Normal file
View file

@ -0,0 +1,17 @@
class Channel::Timer
attr_reader :channel
def initialize(delay)
@channel = Channel.new(1)
@prc = -> { sleep delay; channel << Time.now }
start
end
def start
Channel::Runtime.go @prc
end
def self.after(delay)
new(delay).channel
end
end

View file

@ -4,6 +4,7 @@ require 'channel'
# rubocop:disable Metrics/AbcSize
# rubocop:disable Metrics/MethodLength
# rubocop:disable Metrics/ClassLength
class TestChannel < Test::Unit::TestCase
module Util
@ -40,6 +41,16 @@ class TestChannel < Test::Unit::TestCase
assert_raise(Channel::Closed) { c.recv }
end
def test_buffered_channel
messages = Channel.new(2)
messages << 'buffered'
messages << 'channel'
assert_equal('buffered', messages.recv)
assert_equal('channel', messages.recv)
end
# def test_fail_send_to_unbuffered_channel
# c = Channel.new
# assert_raise_with_message(ThreadError, /No live threads left/) do
@ -73,7 +84,7 @@ class TestChannel < Test::Unit::TestCase
assert_raise(Channel::Closed) { c << 'foo' }
end
def test_close_blocking_channel
def test_receive_on_closed_blocking_channel
c = Channel.new
meanwhile(-> { assert_raise(Channel::Closed) { c.recv } }) do
sleep(0.1)
@ -82,7 +93,7 @@ class TestChannel < Test::Unit::TestCase
assert_equal(true, c.closed?)
end
def test_close_blocking_channels
def test_many_receive_on_closed_blocking_channel
c = Channel.new
meanwhile(
-> { assert_raise(Channel::Closed) { c.recv } },
@ -95,6 +106,41 @@ class TestChannel < Test::Unit::TestCase
assert_equal(true, c.closed?)
end
def test_receive_and_close_buffered_channel
c = Channel.new(5)
meanwhile(
-> { sleep 0.1; assert_equal(1, c.recv) },
-> { sleep 0.2; assert_equal(2, c.recv) },
-> { sleep 0.3; assert_equal(3, c.recv) },
-> { sleep 0.4; assert_raise(Channel::Closed) { c.recv } },
) do
c << 1
c << 2
c << 3
c.close
end
assert_equal(true, c.closed?)
end
def test_iterate_over_buffered_channel
c = Channel.new(2)
c << 1
c << 2
c.close
assert_equal([1, 2], c.each.to_a)
end
# def test_iterate_over_unclosed_buffered_channel
# c = Channel.new(2)
# c << 1
# c << 2
# assert_raise_with_message('ThreadError', /No live threads left/) do
# c.each.to_a
# end
# end
def test_select
c1 = Channel.new
c2 = Channel.new
@ -117,14 +163,4 @@ class TestChannel < Test::Unit::TestCase
end
end
end
def test_buffered_channel
messages = Channel.new(2)
messages << 'buffered'
messages << 'channel'
assert_equal('buffered', messages.recv)
assert_equal('channel', messages.recv)
end
end