Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion src/tink/io/Sink.hx
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import tink.Chunk;
import tink.io.PipeOptions;
import tink.streams.Stream;

using tink.io.PipeResult;
using tink.io.Source;
using tink.CoreApi;

Expand Down Expand Up @@ -138,4 +139,49 @@ class SinkBase<FailingWith, Result> implements SinkObject<FailingWith, Result> {
//override public function endSafely():Future<Bool> {
//return target.end().recover(function (_) return Future.sync(false));
//}
//}
//}

class CollectSink<Result> extends SinkBase<Error, Result> {
var ended = false;
var result:PromiseTrigger<Result> = Promise.trigger();
var collected:Chunk = Chunk.EMPTY;
var process:Chunk->Promise<Result>;

public function new(process) {
this.process = process;
}

override function get_sealed() return ended;

override function consume<EIn>(source:Stream<Chunk, EIn>, options:PipeOptions):Future<PipeResult<EIn, Error, Result>> {
return
if(ended)
result.asPromise().map(function(o) return switch o {
case Success(result): SinkEnded(result, (cast source:Source<EIn>));
case Failure(e): SinkFailed(e, (cast source:Source<EIn>));
});
else
source.forEach(function(chunk) {
collected = collected & chunk;
return Resume;
}).flatMap(function(o):Future<PipeResult<EIn, Error, Result>> return switch o {
case Depleted:
if(options.end) {
ended = true;
process(collected).map(function(o) {
result.trigger(o);
return switch o {
case Success(result): SinkEnded(result, (cast Source.EMPTY:Source<EIn>));
case Failure(e): SinkFailed(e, (cast Source.EMPTY:Source<EIn>));
}
});
} else {
Future.sync(AllWritten);
}
case Failed(e):
Future.sync(SourceFailed(e));
case Halted(rest):
throw 'unreachable';
});
}
}