From 37a0ba2c4a83b4c368073c9121b507f075092138 Mon Sep 17 00:00:00 2001 From: Kevin Leung Date: Mon, 8 Oct 2018 16:41:14 +0800 Subject: [PATCH] Add passthrough --- src/tink/io/PassThrough.hx | 42 +++++++++++++++++++++++++++ tests/PassThroughTest.hx | 59 ++++++++++++++++++++++++++++++++++++++ tests/RunTests.hx | 3 ++ 3 files changed, 104 insertions(+) create mode 100644 src/tink/io/PassThrough.hx create mode 100644 tests/PassThroughTest.hx diff --git a/src/tink/io/PassThrough.hx b/src/tink/io/PassThrough.hx new file mode 100644 index 0000000..2aed643 --- /dev/null +++ b/src/tink/io/PassThrough.hx @@ -0,0 +1,42 @@ +package tink.io; + +import tink.streams.Stream; +import tink.io.Sink; +import tink.io.Source; + +using tink.io.PipeResult; +using tink.CoreApi; + +class PassThrough extends SignalStream implements SinkObject implements SourceObject { + public var sealed(get, never):Bool; + function get_sealed() return trigger == null; + + var trigger:SignalTrigger>; + + public function consume(source:Stream, options:PipeOptions):Future> { + var ret = source.forEach(function(c:Chunk) { + trigger.trigger(Data(c)); + return Resume; + }); + + if (options.end) + ret.handle(function (end) { + trigger.trigger(End); + trigger = null; + }); + + return ret.map(function (c) { + switch c { + case Failed(e): + trigger.trigger(cast Fail(e)); + trigger = null; + case _: + } + return cast c.toResult(Noise); + }); + } + + public function new() { + super(trigger = Signal.trigger()); + } +} \ No newline at end of file diff --git a/tests/PassThroughTest.hx b/tests/PassThroughTest.hx new file mode 100644 index 0000000..e9ed3c0 --- /dev/null +++ b/tests/PassThroughTest.hx @@ -0,0 +1,59 @@ +package; + +import tink.Chunk; +import tink.io.PassThrough; +import tink.streams.Stream; + +using tink.io.Source; +using tink.CoreApi; + +@:asserts +class PassThroughTest { + public function new() {} + + public function ideal() { + var pass = new PassThrough(); + + var data = [for(i in 0...4) (Std.string(i):Chunk)]; + var src:IdealSource = Stream.ofIterator(data.iterator()); + + var total = 0; + (pass:IdealSource).chunked() + .forEach(function(chunk) { + asserts.assert(chunk.length == 1); + total += chunk.length; + return Resume; + }) + .map(function(_) { + asserts.assert(total == 4); + return Success(Noise); + }) + .handle(asserts.handle); + + src.pipeTo(pass, {end: true}).eager(); + return asserts; + } + + public function real() { + var pass = new PassThrough(); + + var data = [for(i in 0...4) (Std.string(i):Chunk)]; + var src:RealSource = Stream.ofIterator(data.iterator()); + + var total = 0; + (pass:RealSource).chunked() + .forEach(function(chunk) { + asserts.assert(chunk.length == 1); + total += chunk.length; + return Resume; + }) + .map(function(_) { + asserts.assert(total == 4); + return Success(Noise); + }) + .handle(asserts.handle); + + src.pipeTo(pass, {end: true}).eager(); + return asserts; + } +} \ No newline at end of file diff --git a/tests/RunTests.hx b/tests/RunTests.hx index 3206354..a6c3cad 100644 --- a/tests/RunTests.hx +++ b/tests/RunTests.hx @@ -3,6 +3,8 @@ package; import tink.unit.*; import tink.testrunner.*; +using tink.CoreApi; + class RunTests { static function main() { @@ -10,6 +12,7 @@ class RunTests { #if sys new PipeTest(),#end new SourceTest(), new ParserTest(), + new PassThroughTest(), #if (js && !nodejs) new JsTest(), #end ])).handle(Runner.exit); }