Skip to content
This repository has been archived by the owner on Aug 4, 2022. It is now read-only.

[REEF-362] Implement a Wake Transport using HTTP #1341

Open
wants to merge 38 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
48ec577
[REEF-362] Implement a Wake Transport using HTTP
nhne Jul 4, 2017
56d0650
Added a test for RemoteManager with Http Transport
nhne Jul 25, 2017
4994107
separated NettyChannelFutureListener from NettyLink
nhne Aug 14, 2017
d3f7a46
Refactored Netty Codes for HTTP
nhne Aug 14, 2017
1ecda33
Removed HttpMessagingTransportFactory and Refactored Code
nhne Aug 17, 2017
9510d61
Removed package reef.wake.remote.transport.netty.http
nhne Aug 17, 2017
31e5efa
fixed errors regarding to review
nhne Aug 17, 2017
bc42c3d
fixed errors regarding to review
nhne Aug 17, 2017
5b3f1b5
Merge branch 'REEF-362' of https://github.com/nhne/reef into REEF-362
nhne Aug 17, 2017
1943609
Fixed errors and typos
nhne Aug 21, 2017
dc36180
added a java doc and fixed format error
nhne Aug 22, 2017
abd316d
reverted some interface back
nhne Aug 22, 2017
7ce5b33
Fixed implementation to accept PROTOCOL_HTTPS
nhne Aug 22, 2017
14121e7
fixed TransportHttpTest
nhne Aug 23, 2017
476ebe1
Fixed String into ProtocolTypes
nhne Aug 23, 2017
2befc1a
Fixed reflexing reviews
nhne Aug 23, 2017
df5fea5
implemented NettyLinkFactory and its default class
nhne Aug 23, 2017
985ad1a
added NettyHttpLinkFactory
nhne Aug 24, 2017
8fd67f6
excluded HTTPS related codes
nhne Aug 24, 2017
37a9ada
removed ssl part
nhne Sep 1, 2017
40f335d
Merge branch 'master' of https://github.com/apache/reef into REEF-362
nhne Sep 1, 2017
817fa0e
changed constant String into enum and altered NettyChannelInitializer
nhne Sep 1, 2017
3674ffa
Merge branch 'master' of https://github.com/apache/reef into REEF-362
nhne Sep 4, 2017
cdaadcc
Merge branch 'REEF-362' of https://github.com/nhne/reef into REEF-362
nhne Sep 4, 2017
4990972
changed TransportFactory to use protocol as constructor parameter
nhne Sep 6, 2017
9e9260a
refactored TransportTest and merged TransportHttpTest into TransportTest
nhne Sep 6, 2017
29344b6
merged RemoteMangerTestHttp into RemoteManagerTest
nhne Sep 14, 2017
017154e
changed implementation of ChannelInitializer, Http Event Listener
nhne Nov 23, 2017
f221dbb
fixed NettyChannelInitializer and Event Listener, NettyHttpLink
nhne Nov 24, 2017
455464a
changed implementation in Netty[HttpServer,Client]EventListener
nhne Dec 5, 2017
bd3b205
removed check on lenth of content
nhne Dec 6, 2017
2225679
copiedBuffer to wrappedBuffer
nhne Dec 7, 2017
b20df37
removed sync() in NettyHttpLink
nhne Jan 4, 2018
94fb7bc
removed `sync()` in NettyHttpLink
nhne Jan 4, 2018
046739d
check isLoggable
nhne Jan 5, 2018
645b898
Fixed bug on uri creation
nhne Feb 11, 2018
bd2030a
Added Exception e into throwing exception
nhne Feb 22, 2018
6541d3c
Add `final` on Argument statement
nhne Apr 1, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ public final class RemoteConfiguration {
public static final long REMOTE_CONNECTION_RETRY_TIMEOUT =
WakeParameters.REMOTE_EXECUTOR_SHUTDOWN_TIMEOUT / (REMOTE_CONNECTION_NUMBER_OF_RETRIES + 1);


/**
* Unique protocol numbers for choosing protocols.
*/
public static final int PROTOCOL_NETTY = 100;
public static final int PROTOCOL_HTTP = 101;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably use enum here.

Copy link
Contributor Author

@nhne nhne Aug 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, @DifferentSC said that using enum which is related to Tang Configuration might be not supported for now. Instead, I suggest it could be changed to static final String.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


private RemoteConfiguration() {
// empty
}
Expand Down Expand Up @@ -132,4 +139,12 @@ public static final class RemoteClientStage implements Name<EStage<TransportEven
public static final class RemoteServerStage implements Name<EStage<TransportEvent>> {
// Intentionally empty
}

/**
* Option for use http.
*/
@NamedParameter(doc = "Option for use http.", default_value = "" + PROTOCOL_NETTY)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Converting String using blank string and + is a bad practice. Use String.valueOf instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was used across the original source. Is it better to update to String.valueOf every use?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to make another issue for refactoring the whole project.

public static final class Protocol implements Name<Integer> {
// Intentionally empty
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ public interface TransportFactory {
* @param exHandler an exception handler
* @return transport
*/
Transport newInstance(int port,
EventHandler<TransportEvent> clientHandler,
EventHandler<TransportEvent> serverHandler,
EventHandler<Exception> exHandler);
Transport newInstance(final int port,
final EventHandler<TransportEvent> clientHandler,
final EventHandler<TransportEvent> serverHandler,
final EventHandler<Exception> exHandler);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love my variables final, but we probably should not mix functional and non-functional updates in one PR. Next time please submit your stylistic fixes in a separate pull request (and yes, we welcome such updates!) :)


/**
* Creates a transport.
Expand All @@ -56,12 +56,33 @@ Transport newInstance(int port,
* @param retryTimeout retry timeout
* @return transport
*/
Transport newInstance(final String hostAddress, int port,
Transport newInstance(final String hostAddress,
final int port,
final EStage<TransportEvent> clientStage,
final EStage<TransportEvent> serverStage,
final int numberOfTries,
final int retryTimeout);

/**
* Creates a transport.
*
* @param hostAddress a host address
* @param port a listening port
* @param clientStage a transport client-side stage
* @param serverStage a transport server-side stage
* @param numberOfTries the number of retries for connection
* @param retryTimeout retry timeout
* @param protocol protocol to use
* @return transport
*/
Transport newInstance(final String hostAddress,
final int port,
final EStage<TransportEvent> clientStage,
final EStage<TransportEvent> serverStage,
final int numberOfTries,
final int retryTimeout,
final int protocol);

/**
* Creates a transport.
*
Expand All @@ -75,12 +96,34 @@ Transport newInstance(final String hostAddress, int port,
* @return transport
*/
Transport newInstance(final String hostAddress,
int port,
final int port,
final EStage<TransportEvent> clientStage,
final EStage<TransportEvent> serverStage,
final int numberOfTries,
final int retryTimeout,
final TcpPortProvider tcpPortProvider);

/**
* Creates a transport.
*
* @param hostAddress a host address
* @param port a listening port
* @param clientStage a transport client-side stage
* @param serverStage a transport server-side stage
* @param numberOfTries the number of retries for connection
* @param retryTimeout retry timeout
* @param tcpPortProvider tcpPortProvider
* @param protocol protocol to use
* @return transport
*/
Transport newInstance(final String hostAddress,
final int port,
final EStage<TransportEvent> clientStage,
final EStage<TransportEvent> serverStage,
final int numberOfTries,
final int retryTimeout,
final TcpPortProvider tcpPortProvider,
final int protocol);


}
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,17 @@
* Generic functionality for the Netty event listener.
* This is a base class for client and server versions.
*/
abstract class AbstractNettyEventListener implements NettyEventListener {
public abstract class AbstractNettyEventListener implements NettyEventListener {

protected static final Logger LOG = Logger.getLogger(AbstractNettyEventListener.class.getName());

protected final ConcurrentMap<SocketAddress, LinkReference> addrToLinkRefMap;
protected final EStage<TransportEvent> stage;
protected EventHandler<Exception> exceptionHandler;

AbstractNettyEventListener(
final ConcurrentMap<SocketAddress, LinkReference> addrToLinkRefMap,
final EStage<TransportEvent> stage) {
public AbstractNettyEventListener(
final ConcurrentMap<SocketAddress, LinkReference> addrToLinkRefMap,
final EStage<TransportEvent> stage) {
this.addrToLinkRefMap = addrToLinkRefMap;
this.stage = stage;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,27 @@
* A reference for a link.
* When channel became active, LinkReference is created and mapped with remote address.
*/
final class LinkReference {
public final class LinkReference {

private final AtomicInteger connectInProgress = new AtomicInteger(0);
private Link<?> link;

LinkReference() {
public LinkReference() {
}

LinkReference(final Link<?> link) {
public LinkReference(final Link<?> link) {
this.link = link;
}

synchronized Link<?> getLink() {
public synchronized Link<?> getLink() {
return this.link;
}

synchronized void setLink(final Link<?> link) {
public synchronized void setLink(final Link<?> link) {
this.link = link;
}

AtomicInteger getConnectInProgress() {
public AtomicInteger getConnectInProgress() {
return this.connectInProgress;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,41 @@ public Transport newInstance(final String hostAddress,
final int numberOfTries,
final int retryTimeout) {
try {
TcpPortProvider tcpPortProvider = Tang.Factory.getTang().newInjector().getInstance(TcpPortProvider.class);
final TcpPortProvider tcpPortProvider = Tang.Factory.getTang().newInjector().getInstance(TcpPortProvider.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tang.Factory.getTang() called many times in this class. Can declare

private static final Tang TANG = Tang.Factory.getTang();

return newInstance(hostAddress, port, clientStage,
serverStage, numberOfTries, retryTimeout, tcpPortProvider);
} catch (final InjectionException e) {
throw new RuntimeException(e);
}
}

/**
* Creates a transport.
*
* @param hostAddress a host address
* @param port a listening port
* @param clientStage a client stage
* @param serverStage a server stage
* @param numberOfTries a number of tries
* @param retryTimeout a timeout for retry
*/
@Override
public Transport newInstance(final String hostAddress,
final int port,
final EStage<TransportEvent> clientStage,
final EStage<TransportEvent> serverStage,
final int numberOfTries,
final int retryTimeout,
final int protocol) {
try {
final TcpPortProvider tcpPortProvider = Tang.Factory.getTang().newInjector().getInstance(TcpPortProvider.class);
return newInstance(hostAddress, port, clientStage,
serverStage, numberOfTries, retryTimeout, tcpPortProvider);
} catch (final InjectionException e) {
throw new RuntimeException(e);
}
}

/**
* Creates a transport.
*
Expand All @@ -121,6 +148,32 @@ public Transport newInstance(final String hostAddress,
final int retryTimeout,
final TcpPortProvider tcpPortProvider) {

return newInstance(hostAddress, port, clientStage,
serverStage, numberOfTries, retryTimeout, tcpPortProvider, RemoteConfiguration.PROTOCOL_NETTY);
}

/**
* Creates a transport.
*
* @param hostAddress a host address
* @param port a listening port
* @param clientStage a client stage
* @param serverStage a server stage
* @param numberOfTries a number of tries
* @param retryTimeout a timeout for retry
* @param tcpPortProvider a provider for TCP port
* @param protocol a protocol to use
*/
@Override
public Transport newInstance(final String hostAddress,
final int port,
final EStage<TransportEvent> clientStage,
final EStage<TransportEvent> serverStage,
final int numberOfTries,
final int retryTimeout,
final TcpPortProvider tcpPortProvider,
final int protocol) {

final Injector injector = Tang.Factory.getTang().newInjector();
injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, hostAddress);
injector.bindVolatileParameter(RemoteConfiguration.Port.class, port);
Expand All @@ -129,6 +182,7 @@ public Transport newInstance(final String hostAddress,
injector.bindVolatileParameter(RemoteConfiguration.NumberOfTries.class, numberOfTries);
injector.bindVolatileParameter(RemoteConfiguration.RetryTimeout.class, retryTimeout);
injector.bindVolatileInstance(TcpPortProvider.class, tcpPortProvider);
injector.bindVolatileParameter(RemoteConfiguration.Protocol.class, protocol);
try {
return injector.getInstance(NettyMessagingTransport.class);
} catch (final InjectionException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.wake.remote.transport.netty;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.reef.wake.remote.transport.LinkListener;

public final class NettyChannelFutureListener<T> implements ChannelFutureListener {

private final T message;
private LinkListener<T> listener;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final


NettyChannelFutureListener(final T message, final LinkListener<T> listener) {
this.message = message;
this.listener = listener;
}

@Override
public void operationComplete(final ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
listener.onSuccess(message);
} else {
listener.onException(channelFuture.cause(), channelFuture.channel().remoteAddress(), message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
/**
* Factory that creates a Netty channel handler.
*/
interface NettyChannelHandlerFactory {
public interface NettyChannelHandlerFactory {

/**
* Creates a channel inbound handler.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,76 @@
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.handler.codec.http.*;
import io.netty.handler.ssl.SslContext;

/**
* Netty channel initializer for Transport.
*/
class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {

private static final int HANDLER_NETTY = 100;
private static final int HANDLER_HTTP_SERVER = 101;
private static final int HANDLER_HTTP_CLIENT = 102;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enum?


/**
* the buffer size of the frame decoder.
*/
public static final int MAXFRAMELENGTH = 10 * 1024 * 1024;
private final NettyChannelHandlerFactory handlerFactory;

NettyChannelInitializer(final NettyChannelHandlerFactory handlerFactory) {
/**
* sslContext contains ssl context of the machine. used only for HTTP.
*/
private final SslContext sslContext;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think explanations on these variables are necessary.


/**
* Type of channel whether it is netty or http client or http server.
*/
private final int type;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comments.


NettyChannelInitializer(
final NettyChannelHandlerFactory handlerFactory,
final SslContext sslContext,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this can be null, how about creating another constructor not receiving this variable?

Copy link
Contributor Author

@nhne nhne Aug 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, current implementation of NettyMessagingTransport must be changed into if statement to take advantage of new constructor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for HTTPS?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remove the whole SSL parts. thank you!

final int type) {
this.handlerFactory = handlerFactory;
this.sslContext = sslContext;
this.type = type;
}

@Override
protected void initChannel(final SocketChannel ch) throws Exception {
ch.pipeline()
.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MAXFRAMELENGTH, 0, 4, 0, 4))
.addLast("bytesDecoder", new ByteArrayDecoder())
.addLast("frameEncoder", new LengthFieldPrepender(4))
.addLast("bytesEncoder", new ByteArrayEncoder())
.addLast("chunker", new ChunkedReadWriteHandler())
.addLast("handler", handlerFactory.createChannelInboundHandler());
switch (this.type) {
case HANDLER_NETTY:
ch.pipeline()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pipeline.addLast ...

.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MAXFRAMELENGTH, 0, 4, 0, 4))
.addLast("bytesDecoder", new ByteArrayDecoder())
.addLast("frameEncoder", new LengthFieldPrepender(4))
.addLast("bytesEncoder", new ByteArrayEncoder())
.addLast("chunker", new ChunkedReadWriteHandler())
.addLast("handler", handlerFactory.createChannelInboundHandler());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you remove this line and add it to line 92?

break;
case HANDLER_HTTP_SERVER:
if (sslContext != null) {
ch.pipeline().addLast(sslContext.newHandler(ch.alloc()));
}
ch.pipeline()
.addLast("codec", new HttpServerCodec())
.addLast("requestDecoder", new HttpRequestDecoder())
.addLast("responseEncoder", new HttpResponseEncoder())
.addLast("handler", handlerFactory.createChannelInboundHandler());
break;
case HANDLER_HTTP_CLIENT:
if (sslContext != null) {
ch.pipeline().addLast(sslContext.newHandler(ch.alloc()));
}
ch.pipeline()
.addLast("codec", new HttpClientCodec())
.addLast("decompressor", new HttpContentDecompressor())
.addLast("handler", handlerFactory.createChannelInboundHandler());
break;
default:
throw new IllegalArgumentException("Invalid type of channel");
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using HttpObjectAggregator?
ex) adding pipeline.addLast("aggregator", new HttpObjectAggregator(MAX_MESSAGE_SIZE).
The HttpObjectAggregator aggregates the http content and returns a FullHttpRequest or FullHttpResponse.
Then we don't have to buffer http contents in NettyHttpClientEventListener and NettHttpServerEventListener.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is good idea! I'll implement it and test it.

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
/**
* Netty event listener.
*/
interface NettyEventListener {
public interface NettyEventListener {

/**
* Handles the message.
Expand Down
Loading