commit d5895d2f6825bef56f7c8a23c191a27f12f3867e Author: Loic Nageleisen Date: Tue Oct 30 15:57:34 2012 +0100 working tee diff --git a/tee.rb b/tee.rb new file mode 100644 index 0000000..0180645 --- /dev/null +++ b/tee.rb @@ -0,0 +1,89 @@ +require 'digest/sha1' +require 'digest/sha2' + +module Enumerable + def tee(*procs) + each { |i| procs.map { |p| p.call i } } + end +end + +class IO + def chunks(chunk_size=1024) + Enumerator.new { |y| y << read(chunk_size) until eof? } + end + + def each_chunk(chunk_size=nil) + chunks.each { |*args| yield *args } + end + + def digest_with(digest, chunk_size=nil) + chunks(chunk_size).each { |chunk| digest << chunk } + digest + end + + def sha256(chunk_size=nil) + digest_with Digest::SHA2.new(256), chunk_size + end + + def fiber_chunks(chunk_size=1024) + Enumerator.new do |y| + until eof? + Fiber.yield + y << read(chunk_size) + end + end + end + + def tee(*procs) + results = procs.map { nil } + ios = procs.map do |proc| + #IO.new + self + end + fibers = procs.map.with_index do |proc, i| + # set up communication + # start each proc, which will yield just before reading + Fiber.new do + results[i] = proc.call ios[i] + end + end.each { |fiber| fiber.resume } + prev_pos = tell + chunks.each do |chunk| + new_pos = tell + # for each proc + # - copy chunk into its dedicated IO stream + # - resume + procs.each.with_index do |proc, i| + ios[i].seek prev_pos + #ios[i].write chunk + fibers[i].resume + ios[i].seek new_pos + end + prev_pos = new_pos + end + results + end +end + + +File.open("tee.rb") do |f| + + sha1_proc = lambda do |f| + digest = Digest::SHA1.new + f.fiber_chunks.each { |chunk| digest << chunk } + digest + end + + sha2_proc = lambda do |f| + digest = Digest::SHA2.new(256) + f.fiber_chunks.each { |chunk| digest << chunk } + digest + end + + puts_proc = lambda do |f| + f.fiber_chunks.each { |chunk| puts chunk } + end + + p f.tee(sha1_proc, sha2_proc, puts_proc) + +end