-
Notifications
You must be signed in to change notification settings - Fork 97
[REEF-362] Implement a Wake Transport using HTTP #1341
base: master
Are you sure you want to change the base?
Changes from 5 commits
48ec577
56d0650
4994107
d3f7a46
1ecda33
9510d61
31e5efa
bc42c3d
5b3f1b5
1943609
dc36180
abd316d
7ce5b33
14121e7
476ebe1
2befc1a
df5fea5
985ad1a
8fd67f6
37a9ada
40f335d
817fa0e
3674ffa
cdaadcc
4990972
9e9260a
29344b6
017154e
f221dbb
455464a
bd3b205
2225679
b20df37
94fb7bc
046739d
645b898
bd2030a
6541d3c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,6 +47,13 @@ public final class RemoteConfiguration { | |
public static final long REMOTE_CONNECTION_RETRY_TIMEOUT = | ||
WakeParameters.REMOTE_EXECUTOR_SHUTDOWN_TIMEOUT / (REMOTE_CONNECTION_NUMBER_OF_RETRIES + 1); | ||
|
||
|
||
/** | ||
* Unique protocol numbers for choosing protocols. | ||
*/ | ||
public static final int PROTOCOL_NETTY = 100; | ||
public static final int PROTOCOL_HTTP = 101; | ||
|
||
private RemoteConfiguration() { | ||
// empty | ||
} | ||
|
@@ -132,4 +139,12 @@ public static final class RemoteClientStage implements Name<EStage<TransportEven | |
public static final class RemoteServerStage implements Name<EStage<TransportEvent>> { | ||
// Intentionally empty | ||
} | ||
|
||
/** | ||
* Option for use http. | ||
*/ | ||
@NamedParameter(doc = "Option for use http.", default_value = "" + PROTOCOL_NETTY) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Converting There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It was used across the original source. Is it better to update to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be better to make another issue for refactoring the whole project. |
||
public static final class Protocol implements Name<Integer> { | ||
// Intentionally empty | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -62,6 +62,25 @@ Transport newInstance(final String hostAddress, int port, | |
final int numberOfTries, | ||
final int retryTimeout); | ||
|
||
/** | ||
* Creates a transport. | ||
* | ||
* @param hostAddress a host address | ||
* @param port a listening port | ||
* @param clientStage a transport client-side stage | ||
* @param serverStage a transport server-side stage | ||
* @param numberOfTries the number of retries for connection | ||
* @param retryTimeout retry timeout | ||
* @param protocol protocol to use | ||
* @return transport | ||
*/ | ||
Transport newInstance(final String hostAddress, int port, | ||
final EStage<TransportEvent> clientStage, | ||
final EStage<TransportEvent> serverStage, | ||
final int numberOfTries, | ||
final int retryTimeout, | ||
final int protocol); | ||
|
||
/** | ||
* Creates a transport. | ||
* | ||
|
@@ -82,5 +101,27 @@ Transport newInstance(final String hostAddress, | |
final int retryTimeout, | ||
final TcpPortProvider tcpPortProvider); | ||
|
||
/** | ||
* Creates a transport. | ||
* | ||
* @param hostAddress a host address | ||
* @param port a listening port | ||
* @param clientStage a transport client-side stage | ||
* @param serverStage a transport server-side stage | ||
* @param numberOfTries the number of retries for connection | ||
* @param retryTimeout retry timeout | ||
* @param tcpPortProvider tcpPortProvider | ||
* @param protocol protocol to use | ||
* @return transport | ||
*/ | ||
Transport newInstance(final String hostAddress, | ||
int port, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why no final here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be refactored during another PR. Thank you! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You don't have to refactor it in another PR because it is a new code. Please just add |
||
final EStage<TransportEvent> clientStage, | ||
final EStage<TransportEvent> serverStage, | ||
final int numberOfTries, | ||
final int retryTimeout, | ||
final TcpPortProvider tcpPortProvider, | ||
final int protocol); | ||
|
||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -101,6 +101,33 @@ public Transport newInstance(final String hostAddress, | |
} | ||
} | ||
|
||
/** | ||
* Creates a transport. | ||
* | ||
* @param hostAddress a host address | ||
* @param port a listening port | ||
* @param clientStage a client stage | ||
* @param serverStage a server stage | ||
* @param numberOfTries a number of tries | ||
* @param retryTimeout a timeout for retry | ||
*/ | ||
@Override | ||
public Transport newInstance(final String hostAddress, | ||
final int port, | ||
final EStage<TransportEvent> clientStage, | ||
final EStage<TransportEvent> serverStage, | ||
final int numberOfTries, | ||
final int retryTimeout, | ||
final int protocol) { | ||
try { | ||
TcpPortProvider tcpPortProvider = Tang.Factory.getTang().newInjector().getInstance(TcpPortProvider.class); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. final |
||
return newInstance(hostAddress, port, clientStage, | ||
serverStage, numberOfTries, retryTimeout, tcpPortProvider); | ||
} catch (final InjectionException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
/** | ||
* Creates a transport. | ||
* | ||
|
@@ -121,6 +148,32 @@ public Transport newInstance(final String hostAddress, | |
final int retryTimeout, | ||
final TcpPortProvider tcpPortProvider) { | ||
|
||
return newInstance(hostAddress, port, clientStage, | ||
serverStage, numberOfTries, retryTimeout, tcpPortProvider, RemoteConfiguration.PROTOCOL_NETTY); | ||
} | ||
|
||
/** | ||
* Creates a transport. | ||
* | ||
* @param hostAddress a host address | ||
* @param port a listening port | ||
* @param clientStage a client stage | ||
* @param serverStage a server stage | ||
* @param numberOfTries a number of tries | ||
* @param retryTimeout a timeout for retry | ||
* @param tcpPortProvider a provider for TCP port | ||
* @param protocol a protocol to use | ||
*/ | ||
@Override | ||
public Transport newInstance(final String hostAddress, | ||
final int port, | ||
final EStage<TransportEvent> clientStage, | ||
final EStage<TransportEvent> serverStage, | ||
final int numberOfTries, | ||
final int retryTimeout, | ||
final TcpPortProvider tcpPortProvider, | ||
final int protocol) { | ||
|
||
final Injector injector = Tang.Factory.getTang().newInjector(); | ||
injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, hostAddress); | ||
injector.bindVolatileParameter(RemoteConfiguration.Port.class, port); | ||
|
@@ -129,6 +182,7 @@ public Transport newInstance(final String hostAddress, | |
injector.bindVolatileParameter(RemoteConfiguration.NumberOfTries.class, numberOfTries); | ||
injector.bindVolatileParameter(RemoteConfiguration.RetryTimeout.class, retryTimeout); | ||
injector.bindVolatileInstance(TcpPortProvider.class, tcpPortProvider); | ||
injector.bindVolatileParameter(RemoteConfiguration.Protocol.class, protocol); | ||
try { | ||
return injector.getInstance(NettyMessagingTransport.class); | ||
} catch (final InjectionException e) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.reef.wake.remote.transport.netty; | ||
|
||
import io.netty.channel.ChannelFuture; | ||
import io.netty.channel.ChannelFutureListener; | ||
import org.apache.reef.wake.remote.transport.LinkListener; | ||
|
||
class NettyChannelFutureListener<T> implements ChannelFutureListener { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This class should be public & final if there is no class extending it. |
||
|
||
private final T message; | ||
private LinkListener<T> listener; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. final |
||
|
||
NettyChannelFutureListener(final T message, final LinkListener<T> listener) { | ||
this.message = message; | ||
this.listener = listener; | ||
} | ||
|
||
@Override | ||
public void operationComplete(final ChannelFuture channelFuture) throws Exception { | ||
if (channelFuture.isSuccess()) { | ||
listener.onSuccess(message); | ||
} else { | ||
listener.onException(channelFuture.cause(), channelFuture.channel().remoteAddress(), message); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,29 +24,68 @@ | |
import io.netty.handler.codec.LengthFieldPrepender; | ||
import io.netty.handler.codec.bytes.ByteArrayDecoder; | ||
import io.netty.handler.codec.bytes.ByteArrayEncoder; | ||
import io.netty.handler.codec.http.*; | ||
import io.netty.handler.ssl.SslContext; | ||
|
||
/** | ||
* Netty channel initializer for Transport. | ||
*/ | ||
class NettyChannelInitializer extends ChannelInitializer<SocketChannel> { | ||
|
||
private static final int HANDLER_NETTY = 100; | ||
private static final int HANDLER_HTTP_SERVER = 101; | ||
private static final int HANDLER_HTTP_CLIENT = 102; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
/** | ||
* the buffer size of the frame decoder. | ||
*/ | ||
public static final int MAXFRAMELENGTH = 10 * 1024 * 1024; | ||
private final NettyChannelHandlerFactory handlerFactory; | ||
private final SslContext sslContext; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think explanations on these variables are necessary. |
||
private final int type; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add comments. |
||
|
||
NettyChannelInitializer(final NettyChannelHandlerFactory handlerFactory) { | ||
NettyChannelInitializer( | ||
final NettyChannelHandlerFactory handlerFactory, | ||
final SslContext sslContext, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this can be null, how about creating another constructor not receiving this variable? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately, current implementation of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is for HTTPS? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll remove the whole SSL parts. thank you! |
||
final int type) { | ||
this.handlerFactory = handlerFactory; | ||
this.sslContext = sslContext; | ||
this.type = type; | ||
} | ||
|
||
@Override | ||
protected void initChannel(final SocketChannel ch) throws Exception { | ||
ch.pipeline() | ||
.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MAXFRAMELENGTH, 0, 4, 0, 4)) | ||
.addLast("bytesDecoder", new ByteArrayDecoder()) | ||
.addLast("frameEncoder", new LengthFieldPrepender(4)) | ||
.addLast("bytesEncoder", new ByteArrayEncoder()) | ||
.addLast("chunker", new ChunkedReadWriteHandler()) | ||
.addLast("handler", handlerFactory.createChannelInboundHandler()); | ||
switch (this.type) { | ||
case HANDLER_NETTY: | ||
ch.pipeline() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pipeline.addLast ... |
||
.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MAXFRAMELENGTH, 0, 4, 0, 4)) | ||
.addLast("bytesDecoder", new ByteArrayDecoder()) | ||
.addLast("frameEncoder", new LengthFieldPrepender(4)) | ||
.addLast("bytesEncoder", new ByteArrayEncoder()) | ||
.addLast("chunker", new ChunkedReadWriteHandler()) | ||
.addLast("handler", handlerFactory.createChannelInboundHandler()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why don't you remove this line and add it to line 92? |
||
break; | ||
case HANDLER_HTTP_SERVER: | ||
if (sslContext != null) { | ||
ch.pipeline().addLast(sslContext.newHandler(ch.alloc())); | ||
} | ||
ch.pipeline() | ||
.addLast("codec", new HttpServerCodec()) | ||
.addLast("requestDecoder", new HttpRequestDecoder()) | ||
.addLast("responseEncoder", new HttpResponseEncoder()) | ||
.addLast("handler", handlerFactory.createChannelInboundHandler()); | ||
break; | ||
case HANDLER_HTTP_CLIENT: | ||
if (sslContext != null) { | ||
ch.pipeline().addLast(sslContext.newHandler(ch.alloc())); | ||
} | ||
ch.pipeline() | ||
.addLast("codec", new HttpClientCodec()) | ||
.addLast("decompressor", new HttpContentDecompressor()) | ||
.addLast("handler", handlerFactory.createChannelInboundHandler()); | ||
break; | ||
default: | ||
throw new IllegalArgumentException("Invalid type of channel"); | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it is good idea! I'll implement it and test it. |
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably use
enum
here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, @DifferentSC said that using
enum
which is related to Tang Configuration might be not supported for now. Instead, I suggest it could be changed tostatic final String
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nhne You can convert enum to string. Please see https://stackoverflow.com/questions/6667243/using-enum-values-as-string-literals