Skip to content
This repository has been archived by the owner on Aug 4, 2022. It is now read-only.

[REEF-362] Implement a Wake Transport using HTTP #1341

Open
wants to merge 38 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
48ec577
[REEF-362] Implement a Wake Transport using HTTP
nhne Jul 4, 2017
56d0650
Added a test for RemoteManager with Http Transport
nhne Jul 25, 2017
4994107
separated NettyChannelFutureListener from NettyLink
nhne Aug 14, 2017
d3f7a46
Refactored Netty Codes for HTTP
nhne Aug 14, 2017
1ecda33
Removed HttpMessagingTransportFactory and Refactored Code
nhne Aug 17, 2017
9510d61
Removed package reef.wake.remote.transport.netty.http
nhne Aug 17, 2017
31e5efa
fixed errors regarding to review
nhne Aug 17, 2017
bc42c3d
fixed errors regarding to review
nhne Aug 17, 2017
5b3f1b5
Merge branch 'REEF-362' of https://github.com/nhne/reef into REEF-362
nhne Aug 17, 2017
1943609
Fixed errors and typos
nhne Aug 21, 2017
dc36180
added a java doc and fixed format error
nhne Aug 22, 2017
abd316d
reverted some interface back
nhne Aug 22, 2017
7ce5b33
Fixed implementation to accept PROTOCOL_HTTPS
nhne Aug 22, 2017
14121e7
fixed TransportHttpTest
nhne Aug 23, 2017
476ebe1
Fixed String into ProtocolTypes
nhne Aug 23, 2017
2befc1a
Fixed reflexing reviews
nhne Aug 23, 2017
df5fea5
implemented NettyLinkFactory and its default class
nhne Aug 23, 2017
985ad1a
added NettyHttpLinkFactory
nhne Aug 24, 2017
8fd67f6
excluded HTTPS related codes
nhne Aug 24, 2017
37a9ada
removed ssl part
nhne Sep 1, 2017
40f335d
Merge branch 'master' of https://github.com/apache/reef into REEF-362
nhne Sep 1, 2017
817fa0e
changed constant String into enum and altered NettyChannelInitializer
nhne Sep 1, 2017
3674ffa
Merge branch 'master' of https://github.com/apache/reef into REEF-362
nhne Sep 4, 2017
cdaadcc
Merge branch 'REEF-362' of https://github.com/nhne/reef into REEF-362
nhne Sep 4, 2017
4990972
changed TransportFactory to use protocol as constructor parameter
nhne Sep 6, 2017
9e9260a
refactored TransportTest and merged TransportHttpTest into TransportTest
nhne Sep 6, 2017
29344b6
merged RemoteMangerTestHttp into RemoteManagerTest
nhne Sep 14, 2017
017154e
changed implementation of ChannelInitializer, Http Event Listener
nhne Nov 23, 2017
f221dbb
fixed NettyChannelInitializer and Event Listener, NettyHttpLink
nhne Nov 24, 2017
455464a
changed implementation in Netty[HttpServer,Client]EventListener
nhne Dec 5, 2017
bd3b205
removed check on lenth of content
nhne Dec 6, 2017
2225679
copiedBuffer to wrappedBuffer
nhne Dec 7, 2017
b20df37
removed sync() in NettyHttpLink
nhne Jan 4, 2018
94fb7bc
removed `sync()` in NettyHttpLink
nhne Jan 4, 2018
046739d
check isLoggable
nhne Jan 5, 2018
645b898
Fixed bug on uri creation
nhne Feb 11, 2018
bd2030a
Added Exception e into throwing exception
nhne Feb 22, 2018
6541d3c
Add `final` on Argument statement
nhne Apr 1, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ public final class RemoteConfiguration {
public static final long REMOTE_CONNECTION_RETRY_TIMEOUT =
WakeParameters.REMOTE_EXECUTOR_SHUTDOWN_TIMEOUT / (REMOTE_CONNECTION_NUMBER_OF_RETRIES + 1);


/**
* Unique protocol numbers for choosing protocols.
*/
public static final String PROTOCOL_NETTY = "__PROTOCOL_NETTY__";
public static final String PROTOCOL_HTTP = "__PROTOCOL_HTTP__";

private RemoteConfiguration() {
// empty
}
Expand Down Expand Up @@ -132,4 +139,12 @@ public static final class RemoteClientStage implements Name<EStage<TransportEven
public static final class RemoteServerStage implements Name<EStage<TransportEvent>> {
// Intentionally empty
}

/**
* Option for use http.
*/
@NamedParameter(doc = "Option for use http.", default_value = PROTOCOL_NETTY)
public static final class Protocol implements Name<String> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simply

  @NamedParameter(doc = "Option for use http.", default_value = "TCP")
  public static final class Protocol implements Name<ProtocolTypes> { }

// Intentionally empty
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,25 @@ Transport newInstance(final String hostAddress, int port,
final int numberOfTries,
final int retryTimeout);

/**
* Creates a transport.
*
* @param hostAddress a host address
* @param port a listening port
* @param clientStage a transport client-side stage
* @param serverStage a transport server-side stage
* @param numberOfTries the number of retries for connection
* @param retryTimeout retry timeout
* @param protocol protocol to use
* @return transport
*/
Transport newInstance(final String hostAddress, int port,
final EStage<TransportEvent> clientStage,
final EStage<TransportEvent> serverStage,
final int numberOfTries,
final int retryTimeout,
final String protocol);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using enum?


/**
* Creates a transport.
*
Expand All @@ -82,5 +101,27 @@ Transport newInstance(final String hostAddress,
final int retryTimeout,
final TcpPortProvider tcpPortProvider);

/**
* Creates a transport.
*
* @param hostAddress a host address
* @param port a listening port
* @param clientStage a transport client-side stage
* @param serverStage a transport server-side stage
* @param numberOfTries the number of retries for connection
* @param retryTimeout retry timeout
* @param tcpPortProvider tcpPortProvider
* @param protocol protocol to use
* @return transport
*/
Transport newInstance(final String hostAddress,
int port,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why no final here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be refactored during another PR. Thank you!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't have to refactor it in another PR because it is a new code. Please just add final here.

final EStage<TransportEvent> clientStage,
final EStage<TransportEvent> serverStage,
final int numberOfTries,
final int retryTimeout,
final TcpPortProvider tcpPortProvider,
final String protocol);


}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
public final class MessagingTransportFactory implements TransportFactory {

private final String localAddress;
private static final Tang TANG = Tang.Factory.getTang();

@Inject
private MessagingTransportFactory(final LocalAddressProvider localAddressProvider) {
Expand All @@ -59,7 +60,7 @@ public Transport newInstance(final int port,
final EventHandler<TransportEvent> serverHandler,
final EventHandler<Exception> exHandler) {

final Injector injector = Tang.Factory.getTang().newInjector();
final Injector injector = TANG.newInjector();
injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, this.localAddress);
injector.bindVolatileParameter(RemoteConfiguration.Port.class, port);
injector.bindVolatileParameter(RemoteConfiguration.RemoteClientStage.class, new SyncStage<>(clientHandler));
Expand Down Expand Up @@ -93,14 +94,41 @@ public Transport newInstance(final String hostAddress,
final int numberOfTries,
final int retryTimeout) {
try {
TcpPortProvider tcpPortProvider = Tang.Factory.getTang().newInjector().getInstance(TcpPortProvider.class);
final TcpPortProvider tcpPortProvider = TANG.newInjector().getInstance(TcpPortProvider.class);
return newInstance(hostAddress, port, clientStage,
serverStage, numberOfTries, retryTimeout, tcpPortProvider);
} catch (final InjectionException e) {
throw new RuntimeException(e);
}
}

/**
* Creates a transport.
*
* @param hostAddress a host address
* @param port a listening port
* @param clientStage a client stage
* @param serverStage a server stage
* @param numberOfTries a number of tries
* @param retryTimeout a timeout for retry
*/
@Override
public Transport newInstance(final String hostAddress,
final int port,
final EStage<TransportEvent> clientStage,
final EStage<TransportEvent> serverStage,
final int numberOfTries,
final int retryTimeout,
final String protocol) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this variable is not used

try {
final TcpPortProvider tcpPortProvider = TANG.newInjector().getInstance(TcpPortProvider.class);
return newInstance(hostAddress, port, clientStage,
serverStage, numberOfTries, retryTimeout, tcpPortProvider);
} catch (final InjectionException e) {
throw new RuntimeException(e);
}
}

/**
* Creates a transport.
*
Expand All @@ -121,14 +149,41 @@ public Transport newInstance(final String hostAddress,
final int retryTimeout,
final TcpPortProvider tcpPortProvider) {

final Injector injector = Tang.Factory.getTang().newInjector();
return newInstance(hostAddress, port, clientStage,
serverStage, numberOfTries, retryTimeout, tcpPortProvider, RemoteConfiguration.PROTOCOL_NETTY);
}

/**
* Creates a transport.
*
* @param hostAddress a host address
* @param port a listening port
* @param clientStage a client stage
* @param serverStage a server stage
* @param numberOfTries a number of tries
* @param retryTimeout a timeout for retry
* @param tcpPortProvider a provider for TCP port
* @param protocol a protocol to use
*/
@Override
public Transport newInstance(final String hostAddress,
final int port,
final EStage<TransportEvent> clientStage,
final EStage<TransportEvent> serverStage,
final int numberOfTries,
final int retryTimeout,
final TcpPortProvider tcpPortProvider,
final String protocol) {

final Injector injector = TANG.newInjector();
injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, hostAddress);
injector.bindVolatileParameter(RemoteConfiguration.Port.class, port);
injector.bindVolatileParameter(RemoteConfiguration.RemoteClientStage.class, clientStage);
injector.bindVolatileParameter(RemoteConfiguration.RemoteServerStage.class, serverStage);
injector.bindVolatileParameter(RemoteConfiguration.NumberOfTries.class, numberOfTries);
injector.bindVolatileParameter(RemoteConfiguration.RetryTimeout.class, retryTimeout);
injector.bindVolatileInstance(TcpPortProvider.class, tcpPortProvider);
injector.bindVolatileParameter(RemoteConfiguration.Protocol.class, protocol);
try {
return injector.getInstance(NettyMessagingTransport.class);
} catch (final InjectionException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.wake.remote.transport.netty;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.reef.wake.remote.transport.LinkListener;

/**
* Future Listener used in NettyLink.
*/
public final class NettyChannelFutureListener<T> implements ChannelFutureListener {

private final T message;
private LinkListener<T> listener;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final


NettyChannelFutureListener(final T message, final LinkListener<T> listener) {
this.message = message;
this.listener = listener;
}

@Override
public void operationComplete(final ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
listener.onSuccess(message);
} else {
listener.onException(channelFuture.cause(), channelFuture.channel().remoteAddress(), message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,77 @@
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.handler.codec.http.*;
import io.netty.handler.ssl.SslContext;

/**
* Netty channel initializer for Transport.
*/
class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {

/* Types for initiating channel */
public enum ChannelType {
NETTY, HTTP_SERVER, HTTP_CLIENT
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is NETTY? Is it a tcp or udp socket channel? The naming is unclear to me because HTTP server and client also uses Netty.

}

/**
* the buffer size of the frame decoder.
*/
public static final int MAXFRAMELENGTH = 10 * 1024 * 1024;
private final NettyChannelHandlerFactory handlerFactory;

NettyChannelInitializer(final NettyChannelHandlerFactory handlerFactory) {
/**
* sslContext contains ssl context of the machine. used only for HTTP.
*/
private final SslContext sslContext;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think explanations on these variables are necessary.


/**
* Type of channel whether it is netty or http client or http server.
*/
private final ChannelType type;

NettyChannelInitializer(
final NettyChannelHandlerFactory handlerFactory,
final SslContext sslContext,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this can be null, how about creating another constructor not receiving this variable?

Copy link
Contributor Author

@nhne nhne Aug 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, current implementation of NettyMessagingTransport must be changed into if statement to take advantage of new constructor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for HTTPS?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remove the whole SSL parts. thank you!

final ChannelType type) {
this.handlerFactory = handlerFactory;
this.sslContext = sslContext;
this.type = type;
}

@Override
protected void initChannel(final SocketChannel ch) throws Exception {
ch.pipeline()
.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MAXFRAMELENGTH, 0, 4, 0, 4))
.addLast("bytesDecoder", new ByteArrayDecoder())
.addLast("frameEncoder", new LengthFieldPrepender(4))
.addLast("bytesEncoder", new ByteArrayEncoder())
.addLast("chunker", new ChunkedReadWriteHandler())
.addLast("handler", handlerFactory.createChannelInboundHandler());
switch (this.type) {
case NETTY:
ch.pipeline()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pipeline.addLast ...

.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MAXFRAMELENGTH, 0, 4, 0, 4))
.addLast("bytesDecoder", new ByteArrayDecoder())
.addLast("frameEncoder", new LengthFieldPrepender(4))
.addLast("bytesEncoder", new ByteArrayEncoder())
.addLast("chunker", new ChunkedReadWriteHandler())
.addLast("handler", handlerFactory.createChannelInboundHandler());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you remove this line and add it to line 92?

break;
case HTTP_SERVER:
if (sslContext != null) {
ch.pipeline().addLast(sslContext.newHandler(ch.alloc()));
}
ch.pipeline()
.addLast("codec", new HttpServerCodec())
.addLast("requestDecoder", new HttpRequestDecoder())
.addLast("responseEncoder", new HttpResponseEncoder())
.addLast("handler", handlerFactory.createChannelInboundHandler());
break;
case HTTP_CLIENT:
if (sslContext != null) {
ch.pipeline().addLast(sslContext.newHandler(ch.alloc()));
}
ch.pipeline()
.addLast("codec", new HttpClientCodec())
.addLast("decompressor", new HttpContentDecompressor())
.addLast("handler", handlerFactory.createChannelInboundHandler());
break;
default:
throw new IllegalArgumentException("Invalid type of channel");
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using HttpObjectAggregator?
ex) adding pipeline.addLast("aggregator", new HttpObjectAggregator(MAX_MESSAGE_SIZE).
The HttpObjectAggregator aggregates the http content and returns a FullHttpRequest or FullHttpResponse.
Then we don't have to buffer http contents in NettyHttpClientEventListener and NettHttpServerEventListener.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is good idea! I'll implement it and test it.

}
Loading