Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions src/tink/io/PassThrough.hx
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package tink.io;

import tink.streams.Stream;
import tink.io.Sink;
import tink.io.Source;

using tink.io.PipeResult;
using tink.CoreApi;

class PassThrough<Quality> extends SignalStream<Chunk, Quality> implements SinkObject<Quality, Noise> implements SourceObject<Quality> {
public var sealed(get, never):Bool;
function get_sealed() return trigger == null;

var trigger:SignalTrigger<Yield<Chunk, Quality>>;

public function consume<EIn>(source:Stream<Chunk, EIn>, options:PipeOptions):Future<PipeResult<EIn, Quality, Noise>> {
var ret = source.forEach(function(c:Chunk) {
trigger.trigger(Data(c));
return Resume;
});

if (options.end)
ret.handle(function (end) {
trigger.trigger(End);
trigger = null;
});

return ret.map(function (c) {
switch c {
case Failed(e):
trigger.trigger(cast Fail(e));
trigger = null;
case _:
}
return cast c.toResult(Noise);
});
}

public function new() {
super(trigger = Signal.trigger());
}
}
59 changes: 59 additions & 0 deletions tests/PassThroughTest.hx
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package;

import tink.Chunk;
import tink.io.PassThrough;
import tink.streams.Stream;

using tink.io.Source;
using tink.CoreApi;

@:asserts
class PassThroughTest {
public function new() {}

public function ideal() {
var pass = new PassThrough<Noise>();

var data = [for(i in 0...4) (Std.string(i):Chunk)];
var src:IdealSource = Stream.ofIterator(data.iterator());

var total = 0;
(pass:IdealSource).chunked()
.forEach(function(chunk) {
asserts.assert(chunk.length == 1);
total += chunk.length;
return Resume;
})
.map(function(_) {
asserts.assert(total == 4);
return Success(Noise);
})
.handle(asserts.handle);

src.pipeTo(pass, {end: true}).eager();
return asserts;
}

public function real() {
var pass = new PassThrough<Error>();

var data = [for(i in 0...4) (Std.string(i):Chunk)];
var src:RealSource = Stream.ofIterator(data.iterator());

var total = 0;
(pass:RealSource).chunked()
.forEach(function(chunk) {
asserts.assert(chunk.length == 1);
total += chunk.length;
return Resume;
})
.map(function(_) {
asserts.assert(total == 4);
return Success(Noise);
})
.handle(asserts.handle);

src.pipeTo(pass, {end: true}).eager();
return asserts;
}
}
3 changes: 3 additions & 0 deletions tests/RunTests.hx
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package;
import tink.unit.*;
import tink.testrunner.*;

using tink.CoreApi;

class RunTests {

static function main() {
Runner.run(TestBatch.make([
#if sys new PipeTest(),#end
new SourceTest(),
new ParserTest(),
new PassThroughTest(),
#if (js && !nodejs) new JsTest(), #end
])).handle(Runner.exit);
}
Expand Down