-
Notifications
You must be signed in to change notification settings - Fork 97
[REEF-362] Implement a Wake Transport using HTTP #1341
base: master
Are you sure you want to change the base?
Changes from 17 commits
48ec577
56d0650
4994107
d3f7a46
1ecda33
9510d61
31e5efa
bc42c3d
5b3f1b5
1943609
dc36180
abd316d
7ce5b33
14121e7
476ebe1
2befc1a
df5fea5
985ad1a
8fd67f6
37a9ada
40f335d
817fa0e
3674ffa
cdaadcc
4990972
9e9260a
29344b6
017154e
f221dbb
455464a
bd3b205
2225679
b20df37
94fb7bc
046739d
645b898
bd2030a
6541d3c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ | |
import org.apache.reef.wake.remote.impl.DefaultTransportEStage; | ||
import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; | ||
import org.apache.reef.wake.remote.impl.TransportEvent; | ||
import org.apache.reef.wake.remote.transport.TransportFactory.ProtocolTypes; | ||
|
||
/** | ||
* Configuration options and helper methods for Wake remoting. | ||
|
@@ -47,6 +48,14 @@ public final class RemoteConfiguration { | |
public static final long REMOTE_CONNECTION_RETRY_TIMEOUT = | ||
WakeParameters.REMOTE_EXECUTOR_SHUTDOWN_TIMEOUT / (REMOTE_CONNECTION_NUMBER_OF_RETRIES + 1); | ||
|
||
|
||
/** | ||
* Unique protocol String for choosing protocols. | ||
*/ | ||
public static final String PROTOCOL_TCP = ProtocolTypes.TCP.name(); | ||
public static final String PROTOCOL_HTTP = ProtocolTypes.HTTP.name(); | ||
public static final String PROTOCOL_HTTPS = ProtocolTypes.HTTPS.name(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we handle supporting https in this PR? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I Think features about HTTPS should be handled in another PR. Thank you! |
||
|
||
private RemoteConfiguration() { | ||
// empty | ||
} | ||
|
@@ -132,4 +141,13 @@ public static final class RemoteClientStage implements Name<EStage<TransportEven | |
public static final class RemoteServerStage implements Name<EStage<TransportEvent>> { | ||
// Intentionally empty | ||
} | ||
|
||
/** | ||
* Option for use http. | ||
* Default value must be ProtocolTypes.TCP.name(). | ||
*/ | ||
@NamedParameter(doc = "Option for use http.", default_value = "TCP") | ||
public static final class Protocol implements Name<String> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Simply @NamedParameter(doc = "Option for use http.", default_value = "TCP")
public static final class Protocol implements Name<ProtocolTypes> { } |
||
// Intentionally empty | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,13 @@ | |
@DefaultImplementation(MessagingTransportFactory.class) | ||
public interface TransportFactory { | ||
|
||
/** | ||
* Types of protocol used in Transport. | ||
*/ | ||
enum ProtocolTypes { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't use the plural for enum name. Simply |
||
TCP, HTTP, HTTPS | ||
} | ||
|
||
/** | ||
* Creates a transport. | ||
* | ||
|
@@ -62,6 +69,26 @@ Transport newInstance(final String hostAddress, int port, | |
final int numberOfTries, | ||
final int retryTimeout); | ||
|
||
/** | ||
* Creates a transport. | ||
* | ||
* @param hostAddress a host address | ||
* @param port a listening port | ||
* @param clientStage a transport client-side stage | ||
* @param serverStage a transport server-side stage | ||
* @param numberOfTries the number of retries for connection | ||
* @param retryTimeout retry timeout | ||
* @param protocol protocol to use | ||
* @return transport | ||
*/ | ||
Transport newInstance(final String hostAddress, | ||
final int port, | ||
final EStage<TransportEvent> clientStage, | ||
final EStage<TransportEvent> serverStage, | ||
final int numberOfTries, | ||
final int retryTimeout, | ||
final ProtocolTypes protocol); | ||
|
||
/** | ||
* Creates a transport. | ||
* | ||
|
@@ -82,5 +109,27 @@ Transport newInstance(final String hostAddress, | |
final int retryTimeout, | ||
final TcpPortProvider tcpPortProvider); | ||
|
||
/** | ||
* Creates a transport. | ||
* | ||
* @param hostAddress a host address | ||
* @param port a listening port | ||
* @param clientStage a transport client-side stage | ||
* @param serverStage a transport server-side stage | ||
* @param numberOfTries the number of retries for connection | ||
* @param retryTimeout retry timeout | ||
* @param tcpPortProvider tcpPortProvider | ||
* @param protocol protocol to use | ||
* @return transport | ||
*/ | ||
Transport newInstance(final String hostAddress, | ||
final int port, | ||
final EStage<TransportEvent> clientStage, | ||
final EStage<TransportEvent> serverStage, | ||
final int numberOfTries, | ||
final int retryTimeout, | ||
final TcpPortProvider tcpPortProvider, | ||
final ProtocolTypes protocol); | ||
|
||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,6 +39,7 @@ | |
public final class MessagingTransportFactory implements TransportFactory { | ||
|
||
private final String localAddress; | ||
private static final Tang TANG = Tang.Factory.getTang(); | ||
|
||
@Inject | ||
private MessagingTransportFactory(final LocalAddressProvider localAddressProvider) { | ||
|
@@ -59,7 +60,7 @@ public Transport newInstance(final int port, | |
final EventHandler<TransportEvent> serverHandler, | ||
final EventHandler<Exception> exHandler) { | ||
|
||
final Injector injector = Tang.Factory.getTang().newInjector(); | ||
final Injector injector = TANG.newInjector(); | ||
injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, this.localAddress); | ||
injector.bindVolatileParameter(RemoteConfiguration.Port.class, port); | ||
injector.bindVolatileParameter(RemoteConfiguration.RemoteClientStage.class, new SyncStage<>(clientHandler)); | ||
|
@@ -93,14 +94,41 @@ public Transport newInstance(final String hostAddress, | |
final int numberOfTries, | ||
final int retryTimeout) { | ||
try { | ||
TcpPortProvider tcpPortProvider = Tang.Factory.getTang().newInjector().getInstance(TcpPortProvider.class); | ||
final TcpPortProvider tcpPortProvider = TANG.newInjector().getInstance(TcpPortProvider.class); | ||
return newInstance(hostAddress, port, clientStage, | ||
serverStage, numberOfTries, retryTimeout, tcpPortProvider); | ||
} catch (final InjectionException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
/** | ||
* Creates a transport. | ||
* | ||
* @param hostAddress a host address | ||
* @param port a listening port | ||
* @param clientStage a client stage | ||
* @param serverStage a server stage | ||
* @param numberOfTries a number of tries | ||
* @param retryTimeout a timeout for retry | ||
*/ | ||
@Override | ||
public Transport newInstance(final String hostAddress, | ||
final int port, | ||
final EStage<TransportEvent> clientStage, | ||
final EStage<TransportEvent> serverStage, | ||
final int numberOfTries, | ||
final int retryTimeout, | ||
final ProtocolTypes protocol) { | ||
try { | ||
final TcpPortProvider tcpPortProvider = TANG.newInjector().getInstance(TcpPortProvider.class); | ||
return newInstance(hostAddress, port, clientStage, | ||
serverStage, numberOfTries, retryTimeout, tcpPortProvider, protocol); | ||
} catch (final InjectionException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
/** | ||
* Creates a transport. | ||
* | ||
|
@@ -121,14 +149,41 @@ public Transport newInstance(final String hostAddress, | |
final int retryTimeout, | ||
final TcpPortProvider tcpPortProvider) { | ||
|
||
final Injector injector = Tang.Factory.getTang().newInjector(); | ||
return newInstance(hostAddress, port, clientStage, | ||
serverStage, numberOfTries, retryTimeout, tcpPortProvider, ProtocolTypes.TCP); | ||
} | ||
|
||
/** | ||
* Creates a transport. | ||
* | ||
* @param hostAddress a host address | ||
* @param port a listening port | ||
* @param clientStage a client stage | ||
* @param serverStage a server stage | ||
* @param numberOfTries a number of tries | ||
* @param retryTimeout a timeout for retry | ||
* @param tcpPortProvider a provider for TCP port | ||
* @param protocol a protocol to use | ||
*/ | ||
@Override | ||
public Transport newInstance(final String hostAddress, | ||
final int port, | ||
final EStage<TransportEvent> clientStage, | ||
final EStage<TransportEvent> serverStage, | ||
final int numberOfTries, | ||
final int retryTimeout, | ||
final TcpPortProvider tcpPortProvider, | ||
final ProtocolTypes protocol) { | ||
|
||
final Injector injector = TANG.newInjector(); | ||
injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, hostAddress); | ||
injector.bindVolatileParameter(RemoteConfiguration.Port.class, port); | ||
injector.bindVolatileParameter(RemoteConfiguration.RemoteClientStage.class, clientStage); | ||
injector.bindVolatileParameter(RemoteConfiguration.RemoteServerStage.class, serverStage); | ||
injector.bindVolatileParameter(RemoteConfiguration.NumberOfTries.class, numberOfTries); | ||
injector.bindVolatileParameter(RemoteConfiguration.RetryTimeout.class, retryTimeout); | ||
injector.bindVolatileInstance(TcpPortProvider.class, tcpPortProvider); | ||
injector.bindVolatileParameter(RemoteConfiguration.Protocol.class, protocol.name()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if |
||
try { | ||
return injector.getInstance(NettyMessagingTransport.class); | ||
} catch (final InjectionException e) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.reef.wake.remote.transport.netty; | ||
|
||
import io.netty.channel.ChannelFuture; | ||
import io.netty.channel.ChannelFutureListener; | ||
import org.apache.reef.wake.remote.transport.LinkListener; | ||
|
||
/** | ||
* Future Listener used in NettyLink. | ||
*/ | ||
public final class NettyChannelFutureListener<T> implements ChannelFutureListener { | ||
|
||
private final T message; | ||
private final LinkListener<T> listener; | ||
|
||
NettyChannelFutureListener(final T message, final LinkListener<T> listener) { | ||
this.message = message; | ||
this.listener = listener; | ||
} | ||
|
||
@Override | ||
public void operationComplete(final ChannelFuture channelFuture) throws Exception { | ||
if (channelFuture.isSuccess()) { | ||
listener.onSuccess(message); | ||
} else { | ||
listener.onException(channelFuture.cause(), channelFuture.channel().remoteAddress(), message); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,29 +24,77 @@ | |
import io.netty.handler.codec.LengthFieldPrepender; | ||
import io.netty.handler.codec.bytes.ByteArrayDecoder; | ||
import io.netty.handler.codec.bytes.ByteArrayEncoder; | ||
import io.netty.handler.codec.http.*; | ||
import io.netty.handler.ssl.SslContext; | ||
|
||
/** | ||
* Netty channel initializer for Transport. | ||
*/ | ||
class NettyChannelInitializer extends ChannelInitializer<SocketChannel> { | ||
|
||
/* Types for initiating channel */ | ||
public enum ChannelType { | ||
TCP, HTTP_SERVER, HTTP_CLIENT | ||
} | ||
|
||
/** | ||
* the buffer size of the frame decoder. | ||
*/ | ||
public static final int MAXFRAMELENGTH = 10 * 1024 * 1024; | ||
private final NettyChannelHandlerFactory handlerFactory; | ||
|
||
NettyChannelInitializer(final NettyChannelHandlerFactory handlerFactory) { | ||
/** | ||
* sslContext contains ssl context of the machine. used only for HTTP. | ||
*/ | ||
private final SslContext sslContext; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think explanations on these variables are necessary. |
||
|
||
/** | ||
* Type of channel whether it is netty or http client or http server. | ||
*/ | ||
private final ChannelType type; | ||
|
||
NettyChannelInitializer( | ||
final NettyChannelHandlerFactory handlerFactory, | ||
final SslContext sslContext, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this can be null, how about creating another constructor not receiving this variable? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately, current implementation of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is for HTTPS? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll remove the whole SSL parts. thank you! |
||
final ChannelType type) { | ||
this.handlerFactory = handlerFactory; | ||
this.sslContext = sslContext; | ||
this.type = type; | ||
} | ||
|
||
@Override | ||
protected void initChannel(final SocketChannel ch) throws Exception { | ||
ch.pipeline() | ||
.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MAXFRAMELENGTH, 0, 4, 0, 4)) | ||
.addLast("bytesDecoder", new ByteArrayDecoder()) | ||
.addLast("frameEncoder", new LengthFieldPrepender(4)) | ||
.addLast("bytesEncoder", new ByteArrayEncoder()) | ||
.addLast("chunker", new ChunkedReadWriteHandler()) | ||
.addLast("handler", handlerFactory.createChannelInboundHandler()); | ||
switch (this.type) { | ||
case TCP: | ||
ch.pipeline() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pipeline.addLast ... |
||
.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MAXFRAMELENGTH, 0, 4, 0, 4)) | ||
.addLast("bytesDecoder", new ByteArrayDecoder()) | ||
.addLast("frameEncoder", new LengthFieldPrepender(4)) | ||
.addLast("bytesEncoder", new ByteArrayEncoder()) | ||
.addLast("chunker", new ChunkedReadWriteHandler()) | ||
.addLast("handler", handlerFactory.createChannelInboundHandler()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why don't you remove this line and add it to line 92? |
||
break; | ||
case HTTP_SERVER: | ||
if (sslContext != null) { | ||
ch.pipeline().addLast(sslContext.newHandler(ch.alloc())); | ||
} | ||
ch.pipeline() | ||
.addLast("codec", new HttpServerCodec()) | ||
.addLast("requestDecoder", new HttpRequestDecoder()) | ||
.addLast("responseEncoder", new HttpResponseEncoder()) | ||
.addLast("handler", handlerFactory.createChannelInboundHandler()); | ||
break; | ||
case HTTP_CLIENT: | ||
if (sslContext != null) { | ||
ch.pipeline().addLast(sslContext.newHandler(ch.alloc())); | ||
} | ||
ch.pipeline() | ||
.addLast("codec", new HttpClientCodec()) | ||
.addLast("decompressor", new HttpContentDecompressor()) | ||
.addLast("handler", handlerFactory.createChannelInboundHandler()); | ||
break; | ||
default: | ||
throw new IllegalArgumentException("Invalid type of channel"); | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it is good idea! I'll implement it and test it. |
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove these constants. We can use
ProtocolTypes
enum directly where needed.