normandy/lib/channel.rb
2015-12-15 13:50:47 +01:00

189 lines
2.9 KiB
Ruby

class Channel
class Closed < StandardError; end
def initialize(size = nil)
@q = size ? SizedQueue.new(size) : Queue.new
@closed = false
@mutex = Mutex.new
@waiting = []
end
private def lock!(&block)
@mutex.synchronize(&block)
end
private def wait!
@waiting << Thread.current
@mutex.sleep
end
private def next!
loop do
thr = @waiting.shift
break if thr.nil?
next unless thr.alive?
break thr.wakeup
end
end
private def all!
@waiting.dup.each { next! }
end
def recv
lock! do
loop do
closed! if closed? && @q.empty?
wait! && next if @q.empty?
break @q.pop
end
end
end
alias_method :pop, :recv
def send(val)
lock! do
fail Closed if closed?
@q << val
next!
end
end
alias_method :push, :send
alias_method :<<, :push
def close
lock! do
return if closed?
@closed = true
all!
end
end
def closed?
@closed
end
private def closed!
fail Closed
end
def each
return enum_for(:each) unless block_given?
loop do
begin
e = recv
rescue Channel::Closed
return
else
yield e
end
end
end
def receive_only!
ReceiveOnly.new(self)
end
alias_method :r!, :receive_only!
def send_only!
SendOnly.new(self)
end
alias_method :s!, :send_only!
class << self
def select(*channels)
selector = new
threads = channels.map do |c|
Thread.new { selector << [c.recv, c] }
end
yield selector.recv
ensure
selector.close
threads.each(&:kill).each(&:join)
end
end
class Direction < StandardError; end
class Conversion < StandardError; end
class ReceiveOnly
def initialize(channel)
@channel = channel
end
def recv
@channel.recv
end
alias_method :pop, :recv
def send(_)
fail Direction, 'receive only'
end
alias_method :push, :send
alias_method :<<, :push
def close
@channel.close
end
def closed?
@channel.closed?
end
def receive_only!
self
end
alias_method :r!, :receive_only!
def send_only!
fail Conversion, 'receive only'
end
alias_method :s!, :send_only!
def hash
@channel.hash
end
end
class SendOnly
def initialize(channel)
@channel = channel
end
def recv
fail Direction, 'send only'
end
alias_method :pop, :recv
def send(val)
@channel.send(val)
end
alias_method :push, :send
alias_method :<<, :push
def close
@channel.close
end
def closed?
@channel.closed?
end
def receive_only!
fail Conversion, 'send only'
end
alias_method :r!, :receive_only!
def send_only!
self
end
alias_method :s!, :send_only!
def hash
@channel.hash
end
end
end
require 'channel/runtime'