Skip to content
6 changes: 6 additions & 0 deletions vertx-web-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@
<artifactId>vertx-auth-htdigest</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<version>2.2</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.RequestOptions;
import io.vertx.core.internal.http.HttpClientInternal;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.PromiseInternal;
import io.vertx.core.internal.http.HttpClientInternal;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import io.vertx.core.streams.Pipe;
import io.vertx.core.streams.ReadStream;
import io.vertx.ext.web.client.HttpRequest;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClientOptions;
Expand Down Expand Up @@ -59,7 +58,6 @@ public class HttpContext<T> {
private RequestOptions requestOptions;
private HttpClientRequest clientRequest;
private HttpClientResponse clientResponse;
private Promise<HttpClientRequest> requestPromise;
private HttpResponse<T> response;
private Throwable failure;
private int redirects;
Expand Down Expand Up @@ -120,7 +118,7 @@ public RequestOptions requestOptions() {
return requestOptions;
}

public void setRequestOptions(RequestOptions requestOptions) {
public void requestOptions(RequestOptions requestOptions) {
this.requestOptions = requestOptions;
}

Expand Down Expand Up @@ -178,7 +176,7 @@ public Throwable failure() {
/**
* @return all traced redirects
*/
public List<String> getRedirectedLocations() {
public List<String> redirectedLocations() {
return redirectedLocations;
}

Expand Down Expand Up @@ -402,6 +400,12 @@ private void handleFailure() {
clientRequest = null;
req.reset();
}
if (body != null) {
if (body instanceof Pipe) {
((Pipe<?>)body).close();
}
body = null;
}
promise.tryFail(failure);
}

Expand All @@ -426,145 +430,113 @@ private void handlePrepareRequest() {
contentType = prev;
}
}
if (body instanceof Pipe) {
//
} else if (body instanceof MultipartForm) {
MultipartFormUpload multipartForm;
try {
boolean multipart = "multipart/form-data".equals(contentType);
HttpPostRequestEncoder.EncoderMode encoderMode = this.request.multipartMixed() ? HttpPostRequestEncoder.EncoderMode.RFC1738 : HttpPostRequestEncoder.EncoderMode.HTML5;
multipartForm = new MultipartFormUpload(context, (MultipartForm) this.body, multipart, encoderMode);
this.body = multipartForm.pipe();
} catch (Exception e) {
fail(e);
return;
}
for (Map.Entry<String, String> header : multipartForm.headers()) {
requestOptions.putHeader(header.getKey(), header.getValue());
}
} else if (body == null && "application/json".equals(contentType)) {
body = Buffer.buffer("null");
} else if (body instanceof JsonObject) {
body = ((JsonObject) body).toBuffer();
} else if (body != null && !(body instanceof Buffer)) {
body = Json.encodeToBuffer(body);
}

if (body instanceof Buffer) {
Buffer buffer = (Buffer) body;
requestOptions.putHeader(HttpHeaders.CONTENT_LENGTH, "" + buffer.length());
}

createRequest(requestOptions);
}

private void handleCreateRequest() {
requestPromise = context.promise();
if (body != null || "application/json".equals(contentType)) {
if (body instanceof MultipartForm) {
MultipartFormUpload multipartForm;
try {
boolean multipart = "multipart/form-data".equals(contentType);
HttpPostRequestEncoder.EncoderMode encoderMode = this.request.multipartMixed() ? HttpPostRequestEncoder.EncoderMode.RFC1738 : HttpPostRequestEncoder.EncoderMode.HTML5;
multipartForm = new MultipartFormUpload(context, (MultipartForm) this.body, multipart, encoderMode);
this.body = multipartForm;
} catch (Exception e) {
fail(e);
return;
}
for (Map.Entry<String, String> header : multipartForm.headers()) {
requestOptions.putHeader(header.getKey(), header.getValue());
}
}
if (body instanceof ReadStream<?>) {
ReadStream<Buffer> stream = (ReadStream<Buffer>) body;
Pipe<Buffer> pipe = stream.pipe(); // Shouldn't this be called in an earlier phase ?
requestPromise.future().onComplete(ar -> {
if (ar.succeeded()) {
HttpClientRequest req = ar.result();
if (this.request.headers == null || !this.request.headers.contains(HttpHeaders.CONTENT_LENGTH)) {
req.setChunked(true);
}
pipe.endOnFailure(false);
pipe.to(req).onComplete(ar2 -> {
clientRequest = null;
if (ar2.failed()) {
req.reset(0L, ar2.cause());
}
});
if (body instanceof MultipartFormUpload) {
((MultipartFormUpload) body).pump();
}
} else {
// Test this
clientRequest = null;
pipe.close();
}
});
} else {
Buffer buffer;
if (body instanceof Buffer) {
buffer = (Buffer) body;
} else if (body instanceof JsonObject) {
buffer = ((JsonObject) body).toBuffer();
} else {
buffer = Json.encodeToBuffer(body);
}
requestOptions.putHeader(HttpHeaders.CONTENT_LENGTH, "" + buffer.length());
requestPromise.future().onSuccess(request -> {
clientRequest = null;
request.end(buffer);
});
}
} else {
requestPromise.future().onSuccess(request -> {
clientRequest = null;
request.end();
});
}
client.request(requestOptions)
.onComplete(ar1 -> {
if (ar1.succeeded()) {
sendRequest(ar1.result());
} else {
fail(ar1.cause());
requestPromise.fail(ar1.cause());
}
});
}

private void handleReceiveResponse() {
BodyStream<T> stream;
try {
stream = request.bodyCodec().stream();
} catch (Exception e) {
fail(e);
return;
}
HttpClientResponse resp = clientResponse;
Context context = Vertx.currentContext();
Promise<HttpResponse<T>> promise = Promise.promise();
promise.future().onComplete(r -> {
// We are running on a context (the HTTP client mandates it)
context.runOnContext(v -> {
if (r.succeeded()) {
dispatchResponse(r.result());
resp
.pipeTo(stream)
.compose(v -> stream.result())
.map(result -> new HttpResponseImpl<>(
resp.version(),
resp.statusCode(),
resp.statusMessage(),
resp.headers(),
resp.trailers(),
resp.cookies(),
result,
redirectedLocations
)).onComplete(ar -> {
if (ar.succeeded()) {
dispatchResponse(ar.result());
} else {
fail(r.cause());
fail(ar.cause());
}
});
});
resp.exceptionHandler(err -> {
if (!promise.future().isComplete()) {
promise.fail(err);
}
});
Pipe<Buffer> pipe = resp.pipe();
request.bodyCodec().create(ar1 -> {
if (ar1.succeeded()) {
BodyStream<T> stream = ar1.result();
pipe.to(stream).onComplete(ar2 -> {
if (ar2.succeeded()) {
stream.result().onComplete(ar3 -> {
if (ar3.succeeded()) {
promise.complete(new HttpResponseImpl<>(
resp.version(),
resp.statusCode(),
resp.statusMessage(),
resp.headers(),
resp.trailers(),
resp.cookies(),
stream.result().result(),
redirectedLocations
));
} else {
promise.fail(ar3.cause());
}
});
} else {
promise.fail(ar2.cause());
}
});
} else {
pipe.close();
fail(ar1.cause());
}
});
}

private void handleSendRequest() {
clientRequest.response().onComplete(ar -> {
clientRequest = null;
if (ar.succeeded()) {
receiveResponse(ar.result().pause());
} else {
fail(ar.cause());
}
});
requestPromise.complete(clientRequest);
doSendRequest(clientRequest);
}

private void doSendRequest(HttpClientRequest request) {
Object bodyToSend = body;
if (bodyToSend != null) {
body = null;
if (bodyToSend instanceof Pipe) {
Pipe<Buffer> pipe = (Pipe<Buffer>) bodyToSend;
if (this.request.headers == null || !this.request.headers.contains(HttpHeaders.CONTENT_LENGTH)) {
request.setChunked(true);
}
pipe.endOnFailure(false);
pipe.to(request).onComplete(ar2 -> {
if (ar2.failed()) {
request.reset(0L, ar2.cause());
}
});
} else {
Buffer buffer = (Buffer) bodyToSend;
request.send(buffer);
}
} else {
request.send();
}
}

public <T> T get(String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ public boolean multipartMixed() {

@Override
public Future<HttpResponse<T>> sendStream(ReadStream<Buffer> body) {
return send(null, body);
return send(null, body.pipe());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.multipart.*;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.buffer.BufferInternal;
import io.vertx.core.internal.concurrent.InboundMessageQueue;
import io.vertx.core.internal.http.HttpHeadersInternal;
import io.vertx.core.streams.Pipe;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.WriteStream;
import io.vertx.ext.web.multipart.FormDataPart;
import io.vertx.ext.web.multipart.MultipartForm;

Expand Down Expand Up @@ -230,4 +233,36 @@ public synchronized MultipartFormUpload endHandler(Handler<Void> handler) {
endHandler = handler;
return this;
}

@Override
public Pipe<Buffer> pipe() {
Pipe<Buffer> pipe = ReadStream.super.pipe();
return new Pipe<>() {
@Override
public Pipe<Buffer> endOnFailure(boolean end) {
pipe.endOnFailure(end);
return this;
}
@Override
public Pipe<Buffer> endOnSuccess(boolean end) {
pipe.endOnSuccess(end);
return this;
}
@Override
public Pipe<Buffer> endOnComplete(boolean end) {
pipe.endOnComplete(end);
return this;
}
@Override
public Future<Void> to(WriteStream<Buffer> dst) {
Future<Void> f = pipe.to(dst);
pump();
return f;
}
@Override
public void close() {
pipe.close();
}
};
}
}
Loading
Loading