mirror of
https://github.com/lloeki/normandy.git
synced 2025-12-06 01:54:40 +01:00
189 lines
2.9 KiB
Ruby
189 lines
2.9 KiB
Ruby
class Channel
|
|
class Closed < StandardError; end
|
|
|
|
def initialize(size = nil)
|
|
@q = size ? SizedQueue.new(size) : Queue.new
|
|
@closed = false
|
|
@mutex = Mutex.new
|
|
@waiting = []
|
|
end
|
|
|
|
private def lock!(&block)
|
|
@mutex.synchronize(&block)
|
|
end
|
|
|
|
private def wait!
|
|
@waiting << Thread.current
|
|
@mutex.sleep
|
|
end
|
|
|
|
private def next!
|
|
loop do
|
|
thr = @waiting.shift
|
|
break if thr.nil?
|
|
next unless thr.alive?
|
|
break thr.wakeup
|
|
end
|
|
end
|
|
|
|
private def all!
|
|
@waiting.dup.each { next! }
|
|
end
|
|
|
|
def recv
|
|
lock! do
|
|
loop do
|
|
closed! if closed? && @q.empty?
|
|
wait! && next if @q.empty?
|
|
break @q.pop
|
|
end
|
|
end
|
|
end
|
|
alias_method :pop, :recv
|
|
|
|
def send(val)
|
|
lock! do
|
|
fail Closed if closed?
|
|
@q << val
|
|
next!
|
|
end
|
|
end
|
|
alias_method :push, :send
|
|
alias_method :<<, :push
|
|
|
|
def close
|
|
lock! do
|
|
return if closed?
|
|
@closed = true
|
|
all!
|
|
end
|
|
end
|
|
|
|
def closed?
|
|
@closed
|
|
end
|
|
|
|
private def closed!
|
|
fail Closed
|
|
end
|
|
|
|
def each
|
|
return enum_for(:each) unless block_given?
|
|
|
|
loop do
|
|
begin
|
|
e = recv
|
|
rescue Channel::Closed
|
|
return
|
|
else
|
|
yield e
|
|
end
|
|
end
|
|
end
|
|
|
|
def receive_only!
|
|
ReceiveOnly.new(self)
|
|
end
|
|
alias_method :r!, :receive_only!
|
|
|
|
def send_only!
|
|
SendOnly.new(self)
|
|
end
|
|
alias_method :s!, :send_only!
|
|
|
|
class << self
|
|
def select(*channels)
|
|
selector = new
|
|
threads = channels.map do |c|
|
|
Thread.new { selector << [c.recv, c] }
|
|
end
|
|
yield selector.recv
|
|
ensure
|
|
selector.close
|
|
threads.each(&:kill).each(&:join)
|
|
end
|
|
end
|
|
|
|
class Direction < StandardError; end
|
|
class Conversion < StandardError; end
|
|
|
|
class ReceiveOnly
|
|
def initialize(channel)
|
|
@channel = channel
|
|
end
|
|
|
|
def recv
|
|
@channel.recv
|
|
end
|
|
alias_method :pop, :recv
|
|
|
|
def send(_)
|
|
fail Direction, 'receive only'
|
|
end
|
|
alias_method :push, :send
|
|
alias_method :<<, :push
|
|
|
|
def close
|
|
@channel.close
|
|
end
|
|
|
|
def closed?
|
|
@channel.closed?
|
|
end
|
|
|
|
def receive_only!
|
|
self
|
|
end
|
|
alias_method :r!, :receive_only!
|
|
|
|
def send_only!
|
|
fail Conversion, 'receive only'
|
|
end
|
|
alias_method :s!, :send_only!
|
|
|
|
def hash
|
|
@channel.hash
|
|
end
|
|
end
|
|
|
|
class SendOnly
|
|
def initialize(channel)
|
|
@channel = channel
|
|
end
|
|
|
|
def recv
|
|
fail Direction, 'send only'
|
|
end
|
|
alias_method :pop, :recv
|
|
|
|
def send(val)
|
|
@channel.send(val)
|
|
end
|
|
alias_method :push, :send
|
|
alias_method :<<, :push
|
|
|
|
def close
|
|
@channel.close
|
|
end
|
|
|
|
def closed?
|
|
@channel.closed?
|
|
end
|
|
|
|
def receive_only!
|
|
fail Conversion, 'send only'
|
|
end
|
|
alias_method :r!, :receive_only!
|
|
|
|
def send_only!
|
|
self
|
|
end
|
|
alias_method :s!, :send_only!
|
|
|
|
def hash
|
|
@channel.hash
|
|
end
|
|
end
|
|
end
|
|
|
|
require 'channel/runtime'
|