diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ceeb05b --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/tmp diff --git a/.rspec b/.rspec new file mode 100644 index 0000000..5f16476 --- /dev/null +++ b/.rspec @@ -0,0 +1,2 @@ +--color +--format progress diff --git a/.rubocop.yml b/.rubocop.yml new file mode 100644 index 0000000..1e9db3b --- /dev/null +++ b/.rubocop.yml @@ -0,0 +1,15 @@ +AsciiComments: + Enabled: false +MethodLength: + Max: 30 +ParameterLists: + CountKeywordArgs: false +Documentation: + Enabled: false +TrivialAccessors: + ExactNameMatch: true + AllowPredicates: true +AndOr: + Enabled: false +PerlBackrefs: + Enabled: false diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000..fa75df1 --- /dev/null +++ b/Gemfile @@ -0,0 +1,3 @@ +source 'https://rubygems.org' + +gemspec diff --git a/Gemfile.lock b/Gemfile.lock new file mode 100644 index 0000000..9d14e13 --- /dev/null +++ b/Gemfile.lock @@ -0,0 +1,57 @@ +PATH + remote: . + specs: + tee (0.5) + +GEM + remote: https://rubygems.org/ + specs: + coderay (1.0.9) + diff-lcs (1.2.4) + ffi (1.9.0) + formatador (0.2.4) + guard (1.8.3) + formatador (>= 0.2.4) + listen (~> 1.3) + lumberjack (>= 1.0.2) + pry (>= 0.9.10) + thor (>= 0.14.6) + guard-rspec (3.1.0) + guard (>= 1.8) + rspec (~> 2.13) + listen (1.3.1) + rb-fsevent (>= 0.9.3) + rb-inotify (>= 0.9) + rb-kqueue (>= 0.2) + lumberjack (1.0.4) + method_source (0.8.2) + pry (0.9.12.2) + coderay (~> 1.0.5) + method_source (~> 0.8) + slop (~> 3.4) + rake (10.1.0) + rb-fsevent (0.9.3) + rb-inotify (0.9.2) + ffi (>= 0.5.0) + rb-kqueue (0.2.0) + ffi (>= 0.5.0) + rspec (2.14.1) + rspec-core (~> 2.14.0) + rspec-expectations (~> 2.14.0) + rspec-mocks (~> 2.14.0) + rspec-core (2.14.5) + rspec-expectations (2.14.3) + diff-lcs (>= 1.1.3, < 2.0) + rspec-mocks (2.14.3) + slop (3.4.6) + thor (0.18.1) + +PLATFORMS + ruby + +DEPENDENCIES + guard-rspec + pry + rake + rspec (~> 2.14) + tee! diff --git a/Guardfile b/Guardfile new file mode 100644 index 0000000..ddf888f --- /dev/null +++ b/Guardfile @@ -0,0 +1,6 @@ +guard :rspec do + watch(%r{^spec/.+_spec\.rb$}) + watch(%r{^lib/(.+)\.rb$}) { |m| "spec/lib/#{m[1]}_spec.rb" } + watch('spec/spec_helper.rb') { "spec" } +end + diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..c772868 --- /dev/null +++ b/LICENSE @@ -0,0 +1,20 @@ +Copyright (c) 2012 Loic Nageleisen + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..f3bbf05 --- /dev/null +++ b/README.md @@ -0,0 +1,31 @@ +# Tee + +Allows Enumerables and IO objects to be teed via fibers. + +# Examples + +Teeing enumerables makes each proc receive its own enumerator. `#tee` returns +an array with each proc's return values. + +```ruby +> require 'tee/core_ext' +> [1, 2, 3].tee -> (e) { e.reduce(&:+) }, + -> (e) { e.map { |i| i**2 } } +=> [6, [1, 4, 9]] +``` + +Teeing IOs makes each proc receive its own IO. Those IOs can read the incoming +data in chunks. + +```ruby +> require 'tee/core_ext' +> StringIO.new("foo").tee -> (io) { io.chunks.each { |c| puts c } } +foo +=> [nil] +``` + +Data can currently only be read in whole, uniformly sized chunks. Concurrent +execution is achieved via fibers (no threads needed). + +One can skip requiring `core_ext` and get `#tee` on a case by case basis by +including or extending `Enumerable::Tee` and `IO::Tee` modules. diff --git a/Rakefile b/Rakefile new file mode 100644 index 0000000..0bd96b6 --- /dev/null +++ b/Rakefile @@ -0,0 +1,5 @@ +require 'bundler/gem_tasks' +require 'rspec/core/rake_task' + +RSpec::Core::RakeTask.new +task :default => :spec diff --git a/lib/tee.rb b/lib/tee.rb new file mode 100644 index 0000000..1cde1ae --- /dev/null +++ b/lib/tee.rb @@ -0,0 +1,107 @@ +require 'digest/sha1' +require 'digest/sha2' + +module Enumerable + module Tee + def tee(*procs) + procs.map { |p| p.call(each) } + end + end +end + +class IO + module Chunkable + CHUNK_SIZE = 1024 + + def chunks(chunk_size = nil) + chunk_size ||= CHUNK_SIZE + + Enumerator.new { |y| y << read(chunk_size) until eof? } + end + + def each_chunk(chunk_size = nil) + return chunks.each unless block_given? + + chunks.each { |*args| yield(*args) } + end + end + + module Digestable + def digest_with(digest, chunk_size = nil) + chunks(chunk_size).each { |chunk| digest << chunk } + + digest + end + + def sha256(chunk_size = nil) + digest_with(Digest::SHA2.new(256), chunk_size) + end + end + + module Tee + def fiber_tee(*procs) + ios = procs.map { |proc| FiberChunkedIO.new(&proc) } + + chunks.each do |chunk| + ios.each do |io| + io.write chunk + end + end + ios.each { |io| io.close } + + ios.map { |io| io.result } + end + alias_method :tee, :fiber_tee + end +end + +class FiberChunkedIO + def initialize(chunk_size = 1024, &block) + @chunk_size = chunk_size + @chunks = [] + @eof = false + @fiber = Fiber.new do + @result = block.call self + end + @fiber.resume + end + + # Being a stream, it behaves like IO#eof? and blocks until the other end + # sends some data or closes it. + def eof? + Fiber.yield + + @eof + end + + def close + @eof = true + @fiber.resume + end + + attr_reader :result + + def write(chunk) + if chunk.size > @chunk_size + raise ArgumentError.new('chunk size mismatch: ' << + "expected #{@chunk_size}, got #{chunk.size}") + end + + @chunks << chunk + @eof = false + @fiber.resume + + chunk.size + end + + def read(chunk_size) + unless chunk_size == @chunk_size + raise ArgumentError.new('chunk size mismatch:' << + " expected #{@chunk_size}, got #{chunk_size}") + end + + @chunks.shift + end + + include IO::Chunkable +end diff --git a/lib/tee/core_ext.rb b/lib/tee/core_ext.rb new file mode 100644 index 0000000..000625f --- /dev/null +++ b/lib/tee/core_ext.rb @@ -0,0 +1,19 @@ +require 'tee' + +class IO + include Chunkable + include Digestable + include Tee +end + +class StringIO + include IO::Chunkable + include IO::Digestable + include IO::Tee +end + +module Enumerable + def tee(*procs) + Enumerable::Tee.instance_method(:tee).bind(self).call(*procs) + end +end diff --git a/lib/tee/version.rb b/lib/tee/version.rb new file mode 100644 index 0000000..e17b923 --- /dev/null +++ b/lib/tee/version.rb @@ -0,0 +1,3 @@ +module Tee + VERSION = '0.5' +end diff --git a/test b/spec/fixtures/lorem similarity index 100% rename from test rename to spec/fixtures/lorem diff --git a/sums b/spec/fixtures/sums similarity index 53% rename from sums rename to spec/fixtures/sums index 2b4665e..1c0c96d 100644 --- a/sums +++ b/spec/fixtures/sums @@ -1,2 +1,2 @@ -c88da884d035813d6f0c87b7345d26debf65d522 test -1b546c2b32aa7532b69fdd1502fae4b9839d22e3a436d33a16691cde2d9167ba test +c88da884d035813d6f0c87b7345d26debf65d522 lorem +1b546c2b32aa7532b69fdd1502fae4b9839d22e3a436d33a16691cde2d9167ba lorem diff --git a/spec/lib/tee_spec.rb b/spec/lib/tee_spec.rb new file mode 100644 index 0000000..8bf8bdf --- /dev/null +++ b/spec/lib/tee_spec.rb @@ -0,0 +1,121 @@ +require 'spec_helper' +require 'tee' +require 'tee/core_ext' + +describe IO::Chunkable do + let(:io) do + Class.new do + def initialize + @count = -1 + end + + def read(bytes) + @count += 1 + @count.to_s[0] * bytes + end + + def eof? + @count > 10 + end + end.new + .extend(IO::Chunkable) + end + + it 'should allow enumeration of chunks' do + io.chunks.should be_a Enumerator + end + + it 'should allow iteration on chunks' do + io.each_chunk.with_index do |chunk, i| + chunk.length.should eq 1024 + chunk[0].should eq i.to_s[0] + end + + io.each_chunk do |chunk| + chunk.length.should eq 1024 + end + end +end + +describe IO::Digestable do + let(:io) do + Class.new do + def initialize + @count = -1 + end + + def read(bytes) + @count += 1 + @count.to_s[0] * bytes + end + + def eof? + @count > 10 + end + end.new + .extend(IO::Chunkable) + .extend(IO::Digestable) + end + + let(:digest) do + Class.new do + def initialize + @digest = 0 + end + + attr_reader :digest + + def <<(value) + @digest += value.each_byte.reduce(0) { |a, e| a + e } + end + end.new + end + + it 'should digest the whole IO with the provided hash functions' do + io.digest_with(digest).digest.should eq 637_952 + end + + it 'should digest the whole IO with sha256' do + io.sha256.hexdigest.should eq '56e2d8a90ae93b2637ab8e005243580d'\ + 'a87b03d8dc32d0b9a5aaaeb39ae6bd48' + end + + it 'should digest the whole IO with typical hash functions' + + it 'should do a rolling digest with the provided digest' + it 'should do a rolling digest with typical hash functions' +end + +describe FiberChunkedIO do + it 'should tee a file in chunks' do + File.open(fixture 'lorem') do |lorem| + + sha1_proc = lambda do |f| + f.chunks.each.with_object(Digest::SHA1.new) do |chunk, digest| + digest << chunk + end + end + + sha2_proc = lambda do |f| + f.chunks.each.with_object(Digest::SHA2.new(256)) do |chunk, digest| + digest << chunk + end + end + + chunk_sizes = [] + chunk_sizes_proc = lambda do |f| + f.chunks.each { |chunk| chunk_sizes << chunk.length } + end + + results = lorem.tee(sha1_proc, sha2_proc, chunk_sizes_proc) + + results.size.should eq 3 + chunk_sizes.should eq [1024, 1024, 1024, 1024, 918] + + File.read(fixture 'sums').lines + .map.with_index do |l, i| + results[i].should eq l.split(' ')[0] + end + end + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb new file mode 100644 index 0000000..b9f57c6 --- /dev/null +++ b/spec/spec_helper.rb @@ -0,0 +1,15 @@ +RSpec.configure do |config| + config.treat_symbols_as_metadata_keys_with_true_values = true + config.run_all_when_everything_filtered = true + config.filter_run :focus + + # Run specs in random order to surface order dependencies. If you find an + # order dependency and want to debug it, you can fix the order by providing + # the seed, which is printed after each run. + # --seed 1234 + config.order = 'random' +end + +def fixture(name) + File.join(File.dirname(__FILE__), 'fixtures', name) +end diff --git a/tee.gemspec b/tee.gemspec new file mode 100644 index 0000000..f9008e0 --- /dev/null +++ b/tee.gemspec @@ -0,0 +1,24 @@ +# -*- encoding: utf-8 -*- +$LOAD_PATH.push File.expand_path('../lib', __FILE__) +require 'tee/version' + +Gem::Specification.new do |s| + s.name = 'tee' + s.version = Tee::VERSION + s.authors = ['Loic Nageleisen'] + s.email = ['loic.nageleisen@gmail.com'] + s.homepage = 'http://github.com/lloeki/ruby-tee' + s.summary = %q{Teeing enumerables} + s.description = %q{Allows enumerables to be teed via fibers} + s.license = 'MIT' + s.files = `git ls-files`.split("\n") + s.test_files = `git ls-files -- {test,spec,features}/*`.split("\n") + s.executables = `git ls-files -- bin/*`.split("\n") + .map { |f| File.basename(f) } + s.require_paths = ['lib'] + + s.add_development_dependency 'rspec', '~> 2.14' + s.add_development_dependency 'rake' + s.add_development_dependency 'guard-rspec' + s.add_development_dependency 'pry' +end diff --git a/tee.rb b/tee.rb deleted file mode 100644 index 75cb3ea..0000000 --- a/tee.rb +++ /dev/null @@ -1,122 +0,0 @@ -require 'digest/sha1' -require 'digest/sha2' - -module Enumerable - def tee(*procs) - each { |i| procs.map { |p| p.call i } } - end -end - -class IO - module Chunkable - def chunks(chunk_size=1024) - Enumerator.new { |y| y << read(chunk_size) until eof? } - end - - def each_chunk(chunk_size=nil) - chunks.each { |*args| yield *args } - end - end - - module Digestable - def digest_with(digest, chunk_size=nil) - chunks(chunk_size).each { |chunk| digest << chunk } - digest - end - - def sha256(chunk_size=nil) - digest_with Digest::SHA2.new(256), chunk_size - end - end - - module Utils - def fiber_tee(*procs) - ios = procs.map { |proc| FiberChunkedIO.new &proc } - - chunks.each do |chunk| - ios.each do |io| - io.write chunk - end - end - ios.each { |io| io.close } - ios.map { |io| io.result } - end - alias_method :tee, :fiber_tee - end - - include Chunkable - include Digestable - include Utils -end - -class FiberChunkedIO - def initialize(chunk_size=1024, &block) - @chunk_size = chunk_size - @chunks = [] - @eof = false - @fiber = Fiber.new do - @result = block.call self - end - @fiber.resume - end - - # Being a stream, it behaves like IO#eof? and blocks until the other end sends some data or closes it. - def eof? - Fiber.yield - @eof - end - - def close - @eof = true - @fiber.resume - end - - def result - @result - end - - def write(chunk) - if chunk.size > @chunk_size - raise ArgumentError.new("chunk size mismatch: expected #{@chunk_size}, got #{chunk.size}") - end - - @chunks << chunk - @eof = false - @fiber.resume - chunk.size - end - - def read(chunk_size) - unless chunk_size == @chunk_size - raise ArgumentError.new("chunk size mismatch: expected #{@chunk_size}, got #{chunk_size}") - end - - @chunks.shift - end - - include IO::Chunkable -end - -File.open("test") do |f| - - sha1_proc = lambda do |f| - digest = Digest::SHA1.new - f.chunks.each { |chunk| digest << chunk } - digest - end - - sha2_proc = lambda do |f| - digest = Digest::SHA2.new(256) - f.chunks.each { |chunk| digest << chunk } - digest - end - - puts_proc = lambda do |f| - f.chunks.each { |chunk| puts chunk.length } - end - - results = f.tee(sha1_proc, sha2_proc, puts_proc) - p results - p File.read('sums').lines.map.with_index { |l, i| results[i] == l.split(' ')[0] } - -end