Skip to content
This repository has been archived by the owner on Aug 4, 2022. It is now read-only.

[REEF-362] Implement a Wake Transport using HTTP #1341

Open
wants to merge 38 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
48ec577
[REEF-362] Implement a Wake Transport using HTTP
nhne Jul 4, 2017
56d0650
Added a test for RemoteManager with Http Transport
nhne Jul 25, 2017
4994107
separated NettyChannelFutureListener from NettyLink
nhne Aug 14, 2017
d3f7a46
Refactored Netty Codes for HTTP
nhne Aug 14, 2017
1ecda33
Removed HttpMessagingTransportFactory and Refactored Code
nhne Aug 17, 2017
9510d61
Removed package reef.wake.remote.transport.netty.http
nhne Aug 17, 2017
31e5efa
fixed errors regarding to review
nhne Aug 17, 2017
bc42c3d
fixed errors regarding to review
nhne Aug 17, 2017
5b3f1b5
Merge branch 'REEF-362' of https://github.com/nhne/reef into REEF-362
nhne Aug 17, 2017
1943609
Fixed errors and typos
nhne Aug 21, 2017
dc36180
added a java doc and fixed format error
nhne Aug 22, 2017
abd316d
reverted some interface back
nhne Aug 22, 2017
7ce5b33
Fixed implementation to accept PROTOCOL_HTTPS
nhne Aug 22, 2017
14121e7
fixed TransportHttpTest
nhne Aug 23, 2017
476ebe1
Fixed String into ProtocolTypes
nhne Aug 23, 2017
2befc1a
Fixed reflexing reviews
nhne Aug 23, 2017
df5fea5
implemented NettyLinkFactory and its default class
nhne Aug 23, 2017
985ad1a
added NettyHttpLinkFactory
nhne Aug 24, 2017
8fd67f6
excluded HTTPS related codes
nhne Aug 24, 2017
37a9ada
removed ssl part
nhne Sep 1, 2017
40f335d
Merge branch 'master' of https://github.com/apache/reef into REEF-362
nhne Sep 1, 2017
817fa0e
changed constant String into enum and altered NettyChannelInitializer
nhne Sep 1, 2017
3674ffa
Merge branch 'master' of https://github.com/apache/reef into REEF-362
nhne Sep 4, 2017
cdaadcc
Merge branch 'REEF-362' of https://github.com/nhne/reef into REEF-362
nhne Sep 4, 2017
4990972
changed TransportFactory to use protocol as constructor parameter
nhne Sep 6, 2017
9e9260a
refactored TransportTest and merged TransportHttpTest into TransportTest
nhne Sep 6, 2017
29344b6
merged RemoteMangerTestHttp into RemoteManagerTest
nhne Sep 14, 2017
017154e
changed implementation of ChannelInitializer, Http Event Listener
nhne Nov 23, 2017
f221dbb
fixed NettyChannelInitializer and Event Listener, NettyHttpLink
nhne Nov 24, 2017
455464a
changed implementation in Netty[HttpServer,Client]EventListener
nhne Dec 5, 2017
bd3b205
removed check on lenth of content
nhne Dec 6, 2017
2225679
copiedBuffer to wrappedBuffer
nhne Dec 7, 2017
b20df37
removed sync() in NettyHttpLink
nhne Jan 4, 2018
94fb7bc
removed `sync()` in NettyHttpLink
nhne Jan 4, 2018
046739d
check isLoggable
nhne Jan 5, 2018
645b898
Fixed bug on uri creation
nhne Feb 11, 2018
bd2030a
Added Exception e into throwing exception
nhne Feb 22, 2018
6541d3c
Add `final` on Argument statement
nhne Apr 1, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.reef.wake.remote.impl.DefaultTransportEStage;
import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
import org.apache.reef.wake.remote.impl.TransportEvent;
import org.apache.reef.wake.remote.transport.TransportFactory.ProtocolType;

/**
* Configuration options and helper methods for Wake remoting.
Expand Down Expand Up @@ -132,4 +133,13 @@ public static final class RemoteClientStage implements Name<EStage<TransportEven
public static final class RemoteServerStage implements Name<EStage<TransportEvent>> {
// Intentionally empty
}

/**
* Option for use http.
* Default value must be ProtocolType.TCP.name().
*/
@NamedParameter(doc = "Option for use http.", default_value = "TCP")
public static final class Protocol implements Name<ProtocolType> {
// Intentionally empty
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@
@DefaultImplementation(MessagingTransportFactory.class)
public interface TransportFactory {

/**
* Types of protocol used in Transport.
*/
enum ProtocolType {
TCP, HTTP
}

/**
* Creates a transport.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.reef.tang.Injector;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.wake.EStage;
import org.apache.reef.wake.EventHandler;
Expand All @@ -39,10 +40,14 @@
public final class MessagingTransportFactory implements TransportFactory {

private final String localAddress;
private final ProtocolType protocol;
private static final Tang TANG = Tang.Factory.getTang();

@Inject
private MessagingTransportFactory(final LocalAddressProvider localAddressProvider) {
private MessagingTransportFactory(final LocalAddressProvider localAddressProvider,
@Parameter(RemoteConfiguration.Protocol.class) final ProtocolType protocolType) {
this.localAddress = localAddressProvider.getLocalAddress();
this.protocol = protocolType;
}

/**
Expand All @@ -59,11 +64,12 @@ public Transport newInstance(final int port,
final EventHandler<TransportEvent> serverHandler,
final EventHandler<Exception> exHandler) {

final Injector injector = Tang.Factory.getTang().newInjector();
final Injector injector = TANG.newInjector();
injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, this.localAddress);
injector.bindVolatileParameter(RemoteConfiguration.Port.class, port);
injector.bindVolatileParameter(RemoteConfiguration.RemoteClientStage.class, new SyncStage<>(clientHandler));
injector.bindVolatileParameter(RemoteConfiguration.RemoteServerStage.class, new SyncStage<>(serverHandler));
injector.bindVolatileParameter(RemoteConfiguration.Protocol.class, this.protocol);

final Transport transport;
try {
Expand Down Expand Up @@ -93,7 +99,7 @@ public Transport newInstance(final String hostAddress,
final int numberOfTries,
final int retryTimeout) {
try {
TcpPortProvider tcpPortProvider = Tang.Factory.getTang().newInjector().getInstance(TcpPortProvider.class);
final TcpPortProvider tcpPortProvider = TANG.newInjector().getInstance(TcpPortProvider.class);
return newInstance(hostAddress, port, clientStage,
serverStage, numberOfTries, retryTimeout, tcpPortProvider);
} catch (final InjectionException e) {
Expand Down Expand Up @@ -121,14 +127,15 @@ public Transport newInstance(final String hostAddress,
final int retryTimeout,
final TcpPortProvider tcpPortProvider) {

final Injector injector = Tang.Factory.getTang().newInjector();
final Injector injector = TANG.newInjector();
injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, hostAddress);
injector.bindVolatileParameter(RemoteConfiguration.Port.class, port);
injector.bindVolatileParameter(RemoteConfiguration.RemoteClientStage.class, clientStage);
injector.bindVolatileParameter(RemoteConfiguration.RemoteServerStage.class, serverStage);
injector.bindVolatileParameter(RemoteConfiguration.NumberOfTries.class, numberOfTries);
injector.bindVolatileParameter(RemoteConfiguration.RetryTimeout.class, retryTimeout);
injector.bindVolatileInstance(TcpPortProvider.class, tcpPortProvider);
injector.bindVolatileParameter(RemoteConfiguration.Protocol.class, this.protocol);
try {
return injector.getInstance(NettyMessagingTransport.class);
} catch (final InjectionException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.wake.remote.transport.netty;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.reef.wake.remote.transport.LinkListener;

/**
* Future Listener used in NettyLink.
*/
public final class NettyChannelFutureListener<T> implements ChannelFutureListener {

private final T message;
private final LinkListener<T> listener;

NettyChannelFutureListener(final T message, final LinkListener<T> listener) {
this.message = message;
this.listener = listener;
}

@Override
public void operationComplete(final ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
listener.onSuccess(message);
} else {
listener.onException(channelFuture.cause(), channelFuture.channel().remoteAddress(), message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,70 @@
package org.apache.reef.wake.remote.transport.netty;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.handler.codec.http.*;
import org.apache.reef.wake.remote.transport.TransportFactory.ProtocolType;


/**
* Netty channel initializer for Transport.
*/
class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {

/**
* the buffer size of the frame decoder.
*/
public static final int MAXFRAMELENGTH = 10 * 1024 * 1024;
private static final int MAX_HTTP_MESSAGE_LENGTH = 10 * 1024 * 1024;
private final NettyChannelHandlerFactory handlerFactory;

NettyChannelInitializer(final NettyChannelHandlerFactory handlerFactory) {
/**
* Type of protocol channel use.
*/
private final ProtocolType protocolType;
private final boolean isServer;

NettyChannelInitializer(
final NettyChannelHandlerFactory handlerFactory,
final ProtocolType protocol) {
this(handlerFactory, protocol, false);
}

NettyChannelInitializer(
final NettyChannelHandlerFactory handlerFactory,
final ProtocolType protocol,
final boolean isServer) {
this.handlerFactory = handlerFactory;
this.protocolType = protocol;
this.isServer = isServer;
}

@Override
protected void initChannel(final SocketChannel ch) throws Exception {
ch.pipeline()
.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MAXFRAMELENGTH, 0, 4, 0, 4))
.addLast("bytesDecoder", new ByteArrayDecoder())
.addLast("frameEncoder", new LengthFieldPrepender(4))
.addLast("bytesEncoder", new ByteArrayEncoder())
.addLast("chunker", new ChunkedReadWriteHandler())
.addLast("handler", handlerFactory.createChannelInboundHandler());
final ChannelPipeline pipeline = ch.pipeline();
switch (this.protocolType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about defining a pipeline variable? final ChannelPipeline pipeline = ch.pipeline();

case TCP:
pipeline
.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MAXFRAMELENGTH, 0, 4, 0, 4))
.addLast("bytesDecoder", new ByteArrayDecoder())
.addLast("frameEncoder", new LengthFieldPrepender(4))
.addLast("bytesEncoder", new ByteArrayEncoder())
.addLast("chunker", new ChunkedReadWriteHandler());
break;
case HTTP:
pipeline
.addLast("codec", isServer ? new HttpServerCodec() : new HttpClientCodec())
.addLast("aggregator", new HttpObjectAggregator(MAX_HTTP_MESSAGE_LENGTH));
break;
default:
throw new IllegalArgumentException("Invalid type of channel");
}
// every channel pipeline has the same inbound handler.
pipeline.addLast("handler", handlerFactory.createChannelInboundHandler());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using HttpObjectAggregator?
ex) adding pipeline.addLast("aggregator", new HttpObjectAggregator(MAX_MESSAGE_SIZE).
The HttpObjectAggregator aggregates the http content and returns a FullHttpRequest or FullHttpResponse.
Then we don't have to buffer http contents in NettyHttpClientEventListener and NettHttpServerEventListener.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is good idea! I'll implement it and test it.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.wake.remote.transport.netty;

import io.netty.channel.Channel;
import org.apache.reef.wake.remote.Encoder;
import org.apache.reef.wake.remote.transport.Link;
import org.apache.reef.wake.remote.transport.LinkListener;

/**
* Factory that creates a NettyLink.
*/
public final class NettyDefaultLinkFactory<T> implements NettyLinkFactory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you create HttpLinkFactory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've created NettyHttpLinkFactory. Thank you!


NettyDefaultLinkFactory() {}

@Override
public Link newInstance(final Channel channel, final Encoder encoder) {
return new NettyLink<T>(channel, encoder);
}

@Override
public Link newInstance(final Channel channel,
final Encoder encoder,
final LinkListener listener) {
return new NettyLink<T>(channel, encoder, listener);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.wake.remote.transport.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import io.netty.util.CharsetUtil;
import org.apache.reef.wake.EStage;
import org.apache.reef.wake.remote.exception.RemoteRuntimeException;
import org.apache.reef.wake.remote.impl.TransportEvent;

import java.net.SocketAddress;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* A Netty event listener for client.
*/
final class NettyHttpClientEventListener extends AbstractNettyEventListener {

private static final Logger LOG = Logger.getLogger(NettyHttpClientEventListener.class.getName());

NettyHttpClientEventListener(
final ConcurrentMap<SocketAddress, LinkReference> addrToLinkRefMap,
final EStage<TransportEvent> stage) {
super(addrToLinkRefMap, stage);
}

@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about the following code?

public void channelRead(final ChannelHandlerContext ctx, final FullHttpResponse response) {

	byte[] content = null;
	ByteBuf byteBuf = response.content();
	if (byteBuf.hasArray()) {
		content = byteBuf.array();
	} else {
		content = new byte[byteBuf.readableBytes()];
		byteBuf.readBytes(content);
	}
        .... 
}

Copy link
Contributor Author

@nhne nhne Dec 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry that as method channelRead is overriding method, I cannot change type of parameter.
But i'll adapt the name change and other things. Thank you!

if (msg instanceof FullHttpResponse) {
if(LOG.isLoggable(Level.FINEST)) {
LOG.log(Level.FINEST, "HttpResponse Received: {0}", msg);
}
final HttpContent httpContent = (HttpContent) msg;
final ByteBuf byteBuf = httpContent.content();
final Channel channel = ctx.channel();
final byte[] content;

if (byteBuf.hasArray()) {
content = byteBuf.array();
} else {
content = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(content);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we have to create a new StringBuilder here? because it is the end of the message and we need a new StringBuilder for another response?

if (LOG.isLoggable(Level.FINEST)) {
final StringBuilder buf = new StringBuilder();
buf.append("CONTENT: ").append(byteBuf.toString(CharsetUtil.UTF_8)).append("\r\n");
LOG.log(Level.FINEST, "MessageEvent: local: {0} remote: {1} :: {2}", new Object[]{
channel.localAddress(), channel.remoteAddress(), buf});
}

// send to the dispatch stage
this.stage.onNext(this.getTransportEvent(content, channel));
} else {
LOG.log(Level.SEVERE, "Unknown type of message received: {0}", msg);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw exception?

throw new RemoteRuntimeException("Unknown type of message received " + msg);
}
}

@Override
public void channelActive(final ChannelHandlerContext ctx) {
// noop
}

@Override
protected TransportEvent getTransportEvent(final byte[] message, final Channel channel) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why protected is necessary here. It seems that you don't have any extending classes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this method should not be used outside of netty implementation since the stage is declared in protected

return new TransportEvent(message, channel.localAddress(), channel.remoteAddress());
}

@Override
protected void exceptionCleanup(final ChannelHandlerContext ctx, final Throwable cause) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

Copy link
Contributor Author

@nhne nhne Aug 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replied above. I don't know why It says outdated...

this.closeChannel(ctx.channel());
}
}
Loading