Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion dev.hxml
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
tests.hxml
-lib travix
-lib tink_io
--each
-lib hxnodejs
-js bin/node/tests.js
-js bin/node/tests.js
--next
-lib hxjs-http2
-js bin/js/tests.js
3 changes: 3 additions & 0 deletions haxe_libraries/hxjs-http2.hxml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# @install: lix --silent download "gh://github.com/brave-pi/hxjs-http2#b86cc5c4f80e04b1f895da733a01b53179adc37b" into hxjs-http2/0.0.0/github/b86cc5c4f80e04b1f895da733a01b53179adc37b
-cp ${HAXE_LIBCACHE}/hxjs-http2/0.0.0/github/b86cc5c4f80e04b1f895da733a01b53179adc37b/.
-D hxjs-http2=0.0.0
11 changes: 11 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

241 changes: 145 additions & 96 deletions src/tink/io/Sink.hx
Original file line number Diff line number Diff line change
Expand Up @@ -42,107 +42,156 @@ abstract SinkYielding<FailingWith, Result>(SinkObject<FailingWith, Result>)
static public inline function ofNodeStream(name, r:js.node.stream.Writable.IWritable):RealSink
return tink.io.nodejs.NodejsSink.wrap(name, r);
#end

#if (js && !nodejs && !macro && hxjs_http2)
static public inline function ofJsStream(name, r:js.Stream.WritableStream):RealSink
return tink.io.js.JsSink.wrap(name, r);

public inline function toJsStream():js.Stream.WritableStream {
var incoming = Signal.trigger();
var stream = new SignalStream(incoming);
var consumption:Promise<Noise> = null;
return new js.Stream.WritableStream({
start: function(controller) {
consumption = this.consume(stream, {end: true}).next(function(e) {
return switch e {
case SinkFailed(e, _):
controller.error('$e');
Failure(e);
case SourceFailed(e):
controller.error('$e');
Failure(e);
default: Success(Noise);
}
});
return null;
},
write: function(_chunk:js.lib.ArrayBufferView, ?_):js.lib.Promise<Noise> {
var ret = incoming.asSignal().nextTime().next(function(_) return Success(Noise));
var chunk = tink.Chunk.ofBytes(haxe.io.Bytes.ofData(_chunk.buffer));
incoming.trigger(Data(chunk));
return ret;
},
abort: function(reason):js.lib.Promise<Noise> {
incoming.trigger(Fail(new Error(reason, null)));
return consumption.next(function(_) {
return Noise;
}).tryRecover(function(e) {
return e;
});
},
close: function(controller):js.lib.Promise<Noise> {
incoming.trigger(End);
return consumption.next(function(r) {
return Noise;
}).tryRecover(function(e) {
return e;
});
}
});
}
#end

#if cs
static public inline function ofCsStream(name, r:cs.system.io.Stream):RealSink
return tink.io.cs.CsSink.wrap(name, r);
#end

#if java
static public inline function ofJavaFileChannel(name, channel:java.nio.channels.AsynchronousFileChannel):RealSink
return tink.io.java.JavaFileSink.wrap(name, channel);
static public inline function ofJavaSocketChannel(name, channel:java.nio.channels.AsynchronousSocketChannel):RealSink
return tink.io.java.JavaSocketSink.wrap(name, channel);
#end

#if cs
static public inline function ofCsStream(name, r:cs.system.io.Stream):RealSink
return tink.io.cs.CsSink.wrap(name, r);
#end
static public function ofOutput(name:String, target:haxe.io.Output, ?options:{ ?worker:Worker }):RealSink
return new tink.io.std.OutputSink(name, target, switch options {
case null | { worker: null }: Worker.get();
case { worker: w }: w;
});

#if java
static public inline function ofJavaFileChannel(name, channel:java.nio.channels.AsynchronousFileChannel):RealSink
return tink.io.java.JavaFileSink.wrap(name, channel);
static public inline function ofJavaSocketChannel(name, channel:java.nio.channels.AsynchronousSocketChannel):RealSink
return tink.io.java.JavaSocketSink.wrap(name, channel);
#end

static public function ofOutput(name:String, target:haxe.io.Output, ?options:{ ?worker:Worker }):RealSink
return new tink.io.std.OutputSink(name, target, switch options {
case null | { worker: null }: Worker.get();
case { worker: w }: w;
});


}

private class Blackhole extends SinkBase<Noise, Noise> {
public static var inst(default, null):Blackhole = new Blackhole();

function new() {}

override public function consume<EIn>(source:Stream<Chunk, EIn>, options:PipeOptions):Future<PipeResult<EIn, Noise, Noise>>
return source.forEach(function(_) return Resume).map(function(o):PipeResult<EIn, Noise, Noise> return switch o {
case Depleted: AllWritten;
case Halted(_): throw 'unreachable';
case Failed(e): SourceFailed(e);
});
}

private class FutureSink<FailingWith, Result> extends SinkBase<FailingWith, Result> {
var f:Future<SinkYielding<FailingWith, Result>>;
public function new(f)
this.f = f;

override public function consume<EIn>(source:Stream<Chunk, EIn>, options:PipeOptions):Future<PipeResult<EIn, FailingWith, Result>>
return f.flatMap(function (sink) return sink.consume(source, options));
}

private class ErrorSink<Result> extends SinkBase<Error, Result> {
}

var error:Error;

public function new(error)
this.error = error;

override function get_sealed()
return false;

override public function consume<EIn>(source:Stream<Chunk, EIn>, options:PipeOptions):Future<PipeResult<EIn, Error, Result>>
return Future.sync(cast PipeResult.SinkFailed(error, source));//TODO: there's something rotten here - the cast should be unnecessary
}

interface SinkObject<FailingWith, Result> {
var sealed(get, never):Bool;
function consume<EIn>(source:Stream<Chunk, EIn>, options:PipeOptions):Future<PipeResult<EIn, FailingWith, Result>>;
private class Blackhole extends SinkBase<Noise, Noise> {
public static var inst(default, null):Blackhole = new Blackhole();

function new() {}

//function idealize(recover:Error->SinkObject<FailingWith>):IdealSink;
}

class SinkBase<FailingWith, Result> implements SinkObject<FailingWith, Result> {
override public function consume<EIn>(source:Stream<Chunk, EIn>, options:PipeOptions):Future<PipeResult<EIn, Noise, Noise>>
return source.forEach(function(_) return Resume).map(function(o):PipeResult<EIn, Noise, Noise> return switch o {
case Depleted: AllWritten;
case Halted(_): throw 'unreachable';
case Failed(e): SourceFailed(e);
});
}

public var sealed(get, never):Bool;
function get_sealed() return true;
private class FutureSink<FailingWith, Result> extends SinkBase<FailingWith, Result> {
var f:Future<SinkYielding<FailingWith, Result>>;
public function new(f)
this.f = f;

override public function consume<EIn>(source:Stream<Chunk, EIn>, options:PipeOptions):Future<PipeResult<EIn, FailingWith, Result>>
return f.flatMap(function (sink) return sink.consume(source, options));
}

private class ErrorSink<Result> extends SinkBase<Error, Result> {

var error:Error;

public function new(error)
this.error = error;

override function get_sealed()
return false;

override public function consume<EIn>(source:Stream<Chunk, EIn>, options:PipeOptions):Future<PipeResult<EIn, Error, Result>>
return Future.sync(cast PipeResult.SinkFailed(error, source));//TODO: there's something rotten here - the cast should be unnecessary
}

interface SinkObject<FailingWith, Result> {
var sealed(get, never):Bool;
function consume<EIn>(source:Stream<Chunk, EIn>, options:PipeOptions):Future<PipeResult<EIn, FailingWith, Result>>;

//function idealize(recover:Error->SinkObject<FailingWith>):IdealSink;
}

class SinkBase<FailingWith, Result> implements SinkObject<FailingWith, Result> {

public var sealed(get, never):Bool;
function get_sealed() return true;

public function consume<EIn>(source:Stream<Chunk, EIn>, options:PipeOptions):Future<PipeResult<EIn, FailingWith, Result>>
return throw 'not implemented';

//public function idealize(onError:Callback<Error>):IdealSink;

//public function idealize(onError:Callback<Error>):IdealSink
//return new IdealizedSink(this, onError);
}

public function consume<EIn>(source:Stream<Chunk, EIn>, options:PipeOptions):Future<PipeResult<EIn, FailingWith, Result>>
return throw 'not implemented';

//public function idealize(onError:Callback<Error>):IdealSink;

//public function idealize(onError:Callback<Error>):IdealSink
//return new IdealizedSink(this, onError);
}

//
//class IdealizedSink extends IdealSinkBase {
//var target:Sink;
//var onError:Callback<Error>;
//
//public function new(target, onError) {
//this.target = target;
//this.onError = onError;
//}
//
//override public function consumeSafely(source:IdealSource, options:PipeOptions):Future<IdealSource>
//return Future.async(function (cb)
//target.consume(source, options).handle(function (c) {
//switch c.error {
//case Some(e): onError.invoke(e);
//default:
//}
//cb(c.rest);
//})
//);
//
//override public function endSafely():Future<Bool> {
//return target.end().recover(function (_) return Future.sync(false));
//}
//}
//class IdealizedSink extends IdealSinkBase {
//var target:Sink;
//var onError:Callback<Error>;
//
//public function new(target, onError) {
//this.target = target;
//this.onError = onError;
//}
//
//override public function consumeSafely(source:IdealSource, options:PipeOptions):Future<IdealSource>
//return Future.async(function (cb)
//target.consume(source, options).handle(function (c) {
//switch c.error {
//case Some(e): onError.invoke(e);
//default:
//}
//cb(c.rest);
//})
//);
//
//override public function endSafely():Future<Bool> {
//return target.end().recover(function (_) return Future.sync(false));
//}
//}
69 changes: 56 additions & 13 deletions src/tink/io/Source.hx
Original file line number Diff line number Diff line change
@@ -1,31 +1,74 @@
package tink.io;

#if hxjs_http2
import js.lib.Uint8Array;
import js.Stream;
#end
import haxe.io.Bytes;
import tink.io.Sink;
import tink.io.StreamParser;
import tink.streams.IdealStream;
import tink.streams.RealStream;
import tink.streams.Stream;

import tink.io.Transformer;
#if (nodejs && !macro)
import #if haxe4 js.lib.Error #else js.Error #end as JsError;
#end

using tink.io.Source;
using tink.CoreApi;

@:forward(reduce)
abstract Source<E>(SourceObject<E>) from SourceObject<E> to SourceObject<E> to Stream<Chunk, E> from Stream<Chunk, E> {

public static var EMPTY(default, null):IdealSource = Empty.make();

@:to inline function dirty():Source<Error>
return cast this;

public var depleted(get, never):Bool;
inline function get_depleted() return this.depleted;
abstract Source<E>(SourceObject<E>) from SourceObject<E> to SourceObject<E> to Stream<Chunk, E> from Stream<Chunk, E> {
public static var EMPTY(default, null):IdealSource = Empty.make();

@:to inline function dirty():Source<Error>
return cast this;

public var depleted(get, never):Bool;

inline function get_depleted()
return this.depleted;

#if (js && !nodejs && !macro && hxjs_http2)
@:noUsing static public inline function ofJsStream(name:String, r:js.Stream.ReadableStream, ?options:{?chunkSize:Int, ?onEnd:Void->Void}):RealSource {
if (options == null)
options = {};
return tink.io.js.JsSource.wrap(name, r, options.chunkSize, options.onEnd);
}

public function toJsStream():js.Stream.ReadableStream {
var source = chunked();
function write(controller:ReadableByteStreamController):js.lib.Promise<Dynamic> {
return source.forEach(function(chunk:Chunk) {
controller.enqueue(new Uint8Array(chunk.toBytes().getData()));
return Resume;
}).next(function(o) return switch o {
case Depleted:
controller.close();
Success(Noise);
case Halted(rest):
source = rest;
Success(Noise);
case Failed(e):
controller.error(e.message);
Failure(e);
});
}
var native = new js.Stream.ReadableStream(cast {
start: write,
pull: function(controller) {
return null;
},
cancel: function(reason) {
return null;
},
type: "bytes"
});
return native;
}
#end

#if (nodejs && !macro)
#if (nodejs && !macro)
@:noUsing static public inline function ofNodeStream(name:String, r:js.node.stream.Readable.IReadable, ?options:{ ?chunkSize: Int, ?onEnd:Void->Void }):RealSource {
if (options == null)
options = {};
Expand Down
Loading