diff --git a/spring-boot-project/spring-boot/src/main/java/org/springframework/boot/web/reactive/result/view/FluxWriter.java b/spring-boot-project/spring-boot/src/main/java/org/springframework/boot/web/reactive/result/view/FluxWriter.java index c7c2ed12be95..dac477b1a558 100644 --- a/spring-boot-project/spring-boot/src/main/java/org/springframework/boot/web/reactive/result/view/FluxWriter.java +++ b/spring-boot-project/spring-boot/src/main/java/org/springframework/boot/web/reactive/result/view/FluxWriter.java @@ -21,13 +21,13 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; -import java.util.function.Supplier; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; /** * A {@link Writer} that can write a {@link Flux} (or {@link Publisher}) to a data buffer. @@ -37,47 +37,46 @@ */ class FluxWriter extends Writer { - private final Supplier factory; + private final DataBufferFactory factory; private final Charset charset; - private List current = new ArrayList<>(); - private List accumulated = new ArrayList<>(); - FluxWriter(Supplier factory) { - this(factory, Charset.defaultCharset()); - } - - FluxWriter(Supplier factory, Charset charset) { + FluxWriter(DataBufferFactory factory, Charset charset) { this.factory = factory; this.charset = charset; } - public Publisher> getBuffers() { + @SuppressWarnings("unchecked") + public Flux> getBuffers() { Flux buffers = Flux.empty(); - if (!this.current.isEmpty()) { - this.accumulated.add(new ArrayList<>(this.current)); - this.current.clear(); - } + List chunks = new ArrayList<>(); for (Object thing : this.accumulated) { if (thing instanceof Publisher) { - @SuppressWarnings("unchecked") - Publisher publisher = (Publisher) thing; - buffers = buffers.concatWith(publisher); + buffers = concatValues(chunks, buffers); + buffers = buffers.concatWith((Publisher) thing); } else { - @SuppressWarnings("unchecked") - List list = (List) thing; - buffers = buffers.concatWithValues(list.toArray(new String[0])); + chunks.add((String) thing); } } - return buffers.map((string) -> Mono.just(buffer().write(string, this.charset))); + buffers = concatValues(chunks, buffers); + return buffers.map((string) -> Mono.fromCallable( + () -> this.factory.allocateBuffer().write(string, this.charset))); + } + + private Flux concatValues(List chunks, Flux buffers) { + if (!chunks.isEmpty()) { + buffers = buffers.concatWithValues(chunks.toArray(new String[0])); + chunks.clear(); + } + return buffers; } @Override public void write(char[] cbuf, int off, int len) throws IOException { - this.current.add(new String(cbuf, off, len)); + this.accumulated.add(new String(cbuf, off, len)); } @Override @@ -92,23 +91,8 @@ public void release() { // TODO: maybe implement this and call it on error } - private DataBuffer buffer() { - return this.factory.get(); - } - public void write(Object thing) { - if (thing instanceof Publisher) { - if (!this.current.isEmpty()) { - this.accumulated.add(new ArrayList<>(this.current)); - this.current.clear(); - } - this.accumulated.add(thing); - } - else { - if (thing instanceof String) { - this.current.add((String) thing); - } - } + this.accumulated.add(thing); } } diff --git a/spring-boot-project/spring-boot/src/main/java/org/springframework/boot/web/reactive/result/view/MustacheView.java b/spring-boot-project/spring-boot/src/main/java/org/springframework/boot/web/reactive/result/view/MustacheView.java index e6f2df67f618..9f1f93fdad29 100644 --- a/spring-boot-project/spring-boot/src/main/java/org/springframework/boot/web/reactive/result/view/MustacheView.java +++ b/spring-boot-project/spring-boot/src/main/java/org/springframework/boot/web/reactive/result/view/MustacheView.java @@ -40,6 +40,7 @@ import org.springframework.core.io.Resource; import org.springframework.http.MediaType; +import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.web.reactive.result.view.AbstractUrlBasedView; import org.springframework.web.reactive.result.view.View; import org.springframework.web.server.ServerWebExchange; @@ -101,8 +102,8 @@ protected Mono renderInternal(Map model, MediaType content } boolean sse = MediaType.TEXT_EVENT_STREAM.isCompatibleWith(contentType); Charset charset = getCharset(contentType).orElse(getDefaultCharset()); - FluxWriter writer = new FluxWriter( - () -> exchange.getResponse().bufferFactory().allocateBuffer(), charset); + ServerHttpResponse response = exchange.getResponse(); + FluxWriter writer = new FluxWriter(response.bufferFactory(), charset); Mono