Skip to content

Commit

Permalink
Refactor FluxWriter to use more straight through processing
Browse files Browse the repository at this point in the history
  • Loading branch information
dsyer committed May 14, 2019
1 parent 8b89d1d commit 35a609d
Showing 1 changed file with 30 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.io.IOException;
import java.io.Writer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;

import org.reactivestreams.Publisher;
Expand All @@ -39,7 +41,9 @@ class FluxWriter extends Writer {

private final Charset charset;

private Flux<String> buffers;
private List<String> current = new ArrayList<>();

private List<Object> accumulated = new ArrayList<>();

public FluxWriter(Supplier<DataBuffer> factory) {
this(factory, Charset.defaultCharset());
Expand All @@ -48,17 +52,32 @@ public FluxWriter(Supplier<DataBuffer> factory) {
public FluxWriter(Supplier<DataBuffer> factory, Charset charset) {
this.factory = factory;
this.charset = charset;
this.buffers = Flux.empty();
}

public Publisher<? extends Publisher<? extends DataBuffer>> getBuffers() {
return this.buffers
.map(string -> Mono.just(buffer().write(string, this.charset)));
Flux<String> buffers = Flux.empty();
if (!this.current.isEmpty()) {
this.accumulated.add(new ArrayList<>(this.current));
this.current.clear();
}
for (Object thing : this.accumulated) {
if (thing instanceof Publisher) {
@SuppressWarnings("unchecked")
Publisher<String> publisher = (Publisher<String>) thing;
buffers = buffers.concatWith(publisher);
}
else {
@SuppressWarnings("unchecked")
List<String> list = (List<String>) thing;
buffers = buffers.concatWithValues(list.toArray(new String[0]));
}
}
return buffers.map(string -> Mono.just(buffer().write(string, this.charset)));
}

@Override
public void write(char[] cbuf, int off, int len) throws IOException {
this.buffers = this.buffers.concatWith(Mono.just(new String(cbuf, off, len)));
this.current.add(new String(cbuf, off, len));
}

@Override
Expand All @@ -79,13 +98,15 @@ private DataBuffer buffer() {

public void write(Object thing) {
if (thing instanceof Publisher) {
@SuppressWarnings("unchecked")
Publisher<String> publisher = (Publisher<String>) thing;
this.buffers = this.buffers.concatWith(Flux.from(publisher));
if (!this.current.isEmpty()) {
this.accumulated.add(new ArrayList<>(this.current));
this.current.clear();
}
this.accumulated.add(thing);
}
else {
if (thing instanceof String) {
this.buffers = this.buffers.concatWith(Mono.just((String) thing));
this.current.add((String) thing);
}
}
}
Expand Down

0 comments on commit 35a609d

Please sign in to comment.