Skip to content

Commit 74069a1

Browse files
authored
CXF-8911:Allow wrapping AsyncHTTPConduit response processing (using new AsyncHttpResponseWrapperFactory bus extension) (#1510)
1 parent 3048785 commit 74069a1

File tree

6 files changed

+227
-12
lines changed

6 files changed

+227
-12
lines changed

rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.apache.cxf.transport.http.Headers;
6767
import org.apache.cxf.transport.http.HttpClientHTTPConduit;
6868
import org.apache.cxf.transport.http.asyncclient.AsyncHTTPConduitFactory.UseAsyncPolicy;
69+
import org.apache.cxf.transport.http.asyncclient.AsyncHttpResponseWrapperFactory.AsyncHttpResponseWrapper;
6970
import org.apache.cxf.transport.https.HttpsURLConnectionInfo;
7071
import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
7172
import org.apache.cxf.version.Version;
@@ -99,13 +100,15 @@
99100
public class AsyncHTTPConduit extends HttpClientHTTPConduit {
100101
public static final String USE_ASYNC = "use.async.http.conduit";
101102

102-
final AsyncHTTPConduitFactory factory;
103-
volatile int lastTlsHash = -1;
104-
volatile Object sslState;
105-
volatile URI sslURL;
106-
volatile SSLContext sslContext;
107-
volatile SSLSession session;
108-
volatile CloseableHttpAsyncClient client;
103+
private final AsyncHTTPConduitFactory factory;
104+
private final AsyncHttpResponseWrapperFactory asyncHttpResponseWrapperFactory;
105+
106+
private volatile int lastTlsHash = -1;
107+
private volatile Object sslState;
108+
private volatile URI sslURL;
109+
private volatile SSLContext sslContext;
110+
private volatile SSLSession session;
111+
private volatile CloseableHttpAsyncClient client;
109112

110113

111114
public AsyncHTTPConduit(Bus b,
@@ -114,6 +117,7 @@ public AsyncHTTPConduit(Bus b,
114117
AsyncHTTPConduitFactory factory) throws IOException {
115118
super(b, ei, t);
116119
this.factory = factory;
120+
this.asyncHttpResponseWrapperFactory = bus.getExtension(AsyncHttpResponseWrapperFactory.class);
117121
}
118122

119123
public synchronized CloseableHttpAsyncClient getHttpAsyncClient() throws IOException {
@@ -478,14 +482,26 @@ protected void connect(boolean output) throws IOException {
478482
return;
479483
}
480484

481-
CXFResponseCallback responseCallback = new CXFResponseCallback() {
485+
CXFResponseCallback delegate = new CXFResponseCallback() {
482486
@Override
483487
public void responseReceived(HttpResponse response) {
484488
setHttpResponse(response);
485489
}
486490

487491
};
488492

493+
CXFResponseCallback responseCallback = delegate;
494+
if (asyncHttpResponseWrapperFactory != null) {
495+
final AsyncHttpResponseWrapper wrapper = asyncHttpResponseWrapperFactory.create();
496+
if (wrapper != null) {
497+
responseCallback = new CXFResponseCallback() {
498+
@Override
499+
public void responseReceived(HttpResponse response) {
500+
wrapper.responseReceived(response, delegate::responseReceived);
501+
}
502+
};
503+
}
504+
}
489505
FutureCallback<Boolean> callback = new FutureCallback<Boolean>() {
490506

491507
public void completed(Boolean result) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.cxf.transport.http.asyncclient;
21+
22+
import java.util.function.Consumer;
23+
24+
import org.apache.cxf.Bus;
25+
import org.apache.http.HttpResponse;
26+
27+
/**
28+
* The {@link Bus} extension to allow wrapping up the response processing of
29+
* the {@link AsyncHTTPConduit} instance.
30+
*/
31+
@FunctionalInterface
32+
public interface AsyncHttpResponseWrapperFactory {
33+
/**
34+
* Creates new instance of the {@link AsyncHttpResponseWrapper}
35+
* @return new instance of the {@link AsyncHttpResponseWrapper} (or null)
36+
*/
37+
AsyncHttpResponseWrapper create();
38+
39+
/**
40+
* The wrapper around the response that will be called by the {@link AsyncHTTPConduit}
41+
* instance once the response is received.
42+
*/
43+
interface AsyncHttpResponseWrapper {
44+
/**
45+
* The callback which is called by the {@link AsyncHTTPConduit} instance once
46+
* the response is received. The delegating response handler is passed as the
47+
* an argument and has to be called.
48+
* @param response the response received
49+
* @param delegate delegating response handler
50+
*/
51+
default void responseReceived(HttpResponse response, Consumer<HttpResponse> delegate) {
52+
delegate.accept(response);
53+
}
54+
}
55+
}

rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.ExecutionException;
2727
import java.util.concurrent.TimeUnit;
2828
import java.util.concurrent.atomic.AtomicInteger;
29+
import java.util.function.Consumer;
2930

3031
import jakarta.xml.ws.AsyncHandler;
3132
import jakarta.xml.ws.Endpoint;
@@ -43,19 +44,23 @@
4344
import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
4445
import org.apache.cxf.transport.http.HTTPConduit;
4546
import org.apache.cxf.transport.http.HTTPConduitFactory;
47+
import org.apache.cxf.transport.http.asyncclient.AsyncHttpResponseWrapperFactory.AsyncHttpResponseWrapper;
4648
import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
4749
import org.apache.cxf.workqueue.AutomaticWorkQueueImpl;
4850
import org.apache.cxf.workqueue.WorkQueueManager;
4951
import org.apache.hello_world_soap_http.Greeter;
5052
import org.apache.hello_world_soap_http.SOAPService;
5153
import org.apache.hello_world_soap_http.types.GreetMeLaterResponse;
5254
import org.apache.hello_world_soap_http.types.GreetMeResponse;
55+
import org.apache.http.HttpResponse;
5356

5457
import org.junit.AfterClass;
5558
import org.junit.BeforeClass;
5659
import org.junit.Ignore;
5760
import org.junit.Test;
5861

62+
import static org.hamcrest.CoreMatchers.is;
63+
import static org.hamcrest.MatcherAssert.assertThat;
5964
import static org.junit.Assert.assertEquals;
6065
import static org.junit.Assert.assertNotNull;
6166
import static org.junit.Assert.fail;
@@ -242,7 +247,7 @@ public void testConnectIssue() throws Exception {
242247
}
243248

244249
@Test
245-
public void testInovationWithHCAddress() throws Exception {
250+
public void testInvocationWithHCAddress() throws Exception {
246251
String address = "hc://http://localhost:" + PORT + "/SoapContext/SoapPort";
247252
JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
248253
factory.setServiceClass(Greeter.class);
@@ -263,6 +268,7 @@ public void testInvocationWithTransportId() throws Exception {
263268
String response = greeter.greetMe("test");
264269
assertEquals("Get a wrong response", "Hello test", response);
265270
}
271+
266272
@Test
267273
public void testCall() throws Exception {
268274
updateAddressPort(g, PORT);
@@ -273,6 +279,7 @@ public void testCall() throws Exception {
273279
c.setClient(cp);
274280
assertEquals("Hello " + request, g.greetMe(request));
275281
}
282+
276283
@Test
277284
public void testCallAsync() throws Exception {
278285
updateAddressPort(g, PORT);
@@ -293,6 +300,35 @@ public void handleResponse(Response<GreetMeLaterResponse> res) {
293300
}).get();
294301
}
295302

303+
@Test
304+
public void testCallAsyncWithResponseWrapper() throws Exception {
305+
try {
306+
final CountDownLatch latch = new CountDownLatch(1);
307+
final AsyncHttpResponseWrapper wrapper = new AsyncHttpResponseWrapper() {
308+
@Override
309+
public void responseReceived(HttpResponse response, Consumer<HttpResponse> delegate) {
310+
delegate.accept(response);
311+
latch.countDown();
312+
}
313+
};
314+
315+
getStaticBus().setExtension(() -> wrapper, AsyncHttpResponseWrapperFactory.class);
316+
317+
final String address = "hc://http://localhost:" + PORT + "/SoapContext/SoapPort";
318+
final Greeter greeter = new SOAPService().getSoapPort();
319+
setAddress(greeter, address);
320+
321+
greeter.greetMeLaterAsync(1000, new AsyncHandler<GreetMeLaterResponse>() {
322+
public void handleResponse(Response<GreetMeLaterResponse> res) {
323+
}
324+
}).get();
325+
326+
assertThat(latch.await(5, TimeUnit.SECONDS), is(true));
327+
} finally {
328+
getStaticBus().setExtension(null, AsyncHttpResponseWrapperFactory.class);
329+
}
330+
}
331+
296332
@Test
297333
public void testCallAsyncCallbackInvokedOnlyOnce() throws Exception {
298334
// This test is especially targeted for RHEL 6.8

rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduit.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.apache.cxf.transport.http.Headers;
6767
import org.apache.cxf.transport.http.HttpClientHTTPConduit;
6868
import org.apache.cxf.transport.http.asyncclient.hc5.AsyncHTTPConduitFactory.UseAsyncPolicy;
69+
import org.apache.cxf.transport.http.asyncclient.hc5.AsyncHttpResponseWrapperFactory.AsyncHttpResponseWrapper;
6970
import org.apache.cxf.transport.https.HttpsURLConnectionInfo;
7071
import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
7172
import org.apache.cxf.version.Version;
@@ -105,6 +106,7 @@ public class AsyncHTTPConduit extends HttpClientHTTPConduit {
105106
public static final String USE_ASYNC = "use.async.http.conduit";
106107

107108
private final AsyncHTTPConduitFactory factory;
109+
private final AsyncHttpResponseWrapperFactory asyncHttpResponseWrapperFactory;
108110
private volatile int lastTlsHash = -1;
109111
private volatile Object sslState;
110112
private volatile URI sslURL;
@@ -116,6 +118,7 @@ public AsyncHTTPConduit(Bus b, EndpointInfo ei, EndpointReferenceType t, AsyncHT
116118
throws IOException {
117119
super(b, ei, t);
118120
this.factory = factory;
121+
this.asyncHttpResponseWrapperFactory = bus.getExtension(AsyncHttpResponseWrapperFactory.class);
119122
}
120123

121124
public synchronized CloseableHttpAsyncClient getHttpAsyncClient(final TlsStrategy tlsStrategy)
@@ -491,15 +494,29 @@ protected void connect(boolean output) throws IOException {
491494
if (connectionFuture != null) {
492495
return;
493496
}
494-
495-
CXFResponseCallback responseCallback = new CXFResponseCallback() {
497+
498+
final CXFResponseCallback delegate = new CXFResponseCallback() {
496499
@Override
497500
public void responseReceived(HttpResponse response) {
498501
setHttpResponse(response);
499502
}
500503

501504
};
502505

506+
CXFResponseCallback responseCallback = delegate;
507+
if (asyncHttpResponseWrapperFactory != null) {
508+
final AsyncHttpResponseWrapper wrapper = asyncHttpResponseWrapperFactory.create();
509+
if (wrapper != null) {
510+
responseCallback = new CXFResponseCallback() {
511+
@Override
512+
public void responseReceived(HttpResponse response) {
513+
wrapper.responseReceived(response, delegate::responseReceived);
514+
}
515+
};
516+
}
517+
}
518+
519+
503520
FutureCallback<Boolean> callback = new FutureCallback<Boolean>() {
504521

505522
public void completed(Boolean result) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.cxf.transport.http.asyncclient.hc5;
21+
22+
import java.util.function.Consumer;
23+
24+
import org.apache.cxf.Bus;
25+
import org.apache.hc.core5.http.HttpResponse;
26+
27+
/**
28+
* The {@link Bus} extension to allow wrapping up the response processing of
29+
* the {@link AsyncHTTPConduit} instance.
30+
*/
31+
@FunctionalInterface
32+
public interface AsyncHttpResponseWrapperFactory {
33+
/**
34+
* Creates new instance of the {@link AsyncHttpResponseWrapper}
35+
* @return new instance of the {@link AsyncHttpResponseWrapper} (or null)
36+
*/
37+
AsyncHttpResponseWrapper create();
38+
39+
/**
40+
* The wrapper around the response that will be called by the {@link AsyncHTTPConduit}
41+
* instance once the response is received.
42+
*/
43+
interface AsyncHttpResponseWrapper {
44+
/**
45+
* The callback which is called by the {@link AsyncHTTPConduit} instance once
46+
* the response is received. The delegating response handler is passed as the
47+
* an argument and has to be called.
48+
* @param response the response received
49+
* @param delegate delegating response handler
50+
*/
51+
default void responseReceived(HttpResponse response, Consumer<HttpResponse> delegate) {
52+
delegate.accept(response);
53+
}
54+
}
55+
}

0 commit comments

Comments
 (0)