Skip to content

Commit

Permalink
Implement more rate limiting
Browse files Browse the repository at this point in the history
  • Loading branch information
Alemiz112 committed Mar 30, 2024
1 parent 9a9ab52 commit 2f77498
Show file tree
Hide file tree
Showing 9 changed files with 264 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,18 @@ public class RakConstants {
* Time after {@link RakSessionCodec} is refreshed due to no activity.
*/
public static final int SESSION_STALE_MS = 5000;

/**
* A number of datagram packets each address can send within one RakNet tick (10ms)
*/
public static final int DEFAULT_PACKET_LIMIT = 120;
/**
* A number of "unconnected" datagram packets each address can send within one second.
*/
public static final int DEFAULT_OFFLINE_PACKET_LIMIT = 10;
/**
* A number of all datagrams that will be handled within one RakNet tick before server starts dropping any incoming data.
*/
public static final int DEFAULT_GLOBAL_PACKET_LIMIT = 100000;
/*
* Flags
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@
import org.cloudburstmc.netty.channel.raknet.config.RakServerChannelConfig;
import org.cloudburstmc.netty.handler.codec.raknet.common.UnconnectedPongEncoder;
import org.cloudburstmc.netty.handler.codec.raknet.server.RakServerOfflineHandler;
import org.cloudburstmc.netty.handler.codec.raknet.server.RakServerRateLimiter;
import org.cloudburstmc.netty.handler.codec.raknet.server.RakServerRouteHandler;
import org.cloudburstmc.netty.handler.codec.raknet.server.RakServerTailHandler;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

public class RakServerChannel extends ProxyChannel<DatagramChannel> implements ServerChannel {

Expand All @@ -44,8 +47,9 @@ public RakServerChannel(DatagramChannel channel) {
super(channel);
this.config = new DefaultRakServerConfig(this);
// Default common handler of offline phase. Handles only raknet packets, forwards rest.
this.pipeline.addLast(UnconnectedPongEncoder.NAME, UnconnectedPongEncoder.INSTANCE);
this.pipeline.addLast(RakServerOfflineHandler.NAME, new RakServerOfflineHandler());
this.pipeline().addLast(UnconnectedPongEncoder.NAME, UnconnectedPongEncoder.INSTANCE);
this.pipeline().addLast(RakServerRateLimiter.NAME, new RakServerRateLimiter(this));
this.pipeline().addLast(RakServerOfflineHandler.NAME, new RakServerOfflineHandler(this));
this.pipeline().addLast(RakServerRouteHandler.NAME, new RakServerRouteHandler(this));
this.pipeline().addLast(RakServerTailHandler.NAME, RakServerTailHandler.INSTANCE);
}
Expand Down Expand Up @@ -92,6 +96,15 @@ public void onCloseTriggered(ChannelPromise promise) {
combiner.finish(combinedPromise);
}

public boolean tryBlockAddress(InetAddress address, long time, TimeUnit unit) {
RakServerRateLimiter rateLimiter = this.pipeline().get(RakServerRateLimiter.class);
if (rateLimiter != null) {
rateLimiter.blockAddress(address, time, unit);
return true;
}
return false;
}

@Override
public RakServerChannelConfig config() {
return this.config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public class DefaultRakServerConfig extends DefaultChannelConfig implements RakS
private volatile boolean handlePing;
private volatile int maxMtu = RakConstants.MAXIMUM_MTU_SIZE;
private volatile int minMtu = RakConstants.MINIMUM_MTU_SIZE;
private volatile int packetLimit = RakConstants.DEFAULT_PACKET_LIMIT;
private volatile int globalPacketLimit = RakConstants.DEFAULT_GLOBAL_PACKET_LIMIT;
private volatile int unconnectedPacketLimit = RakConstants.DEFAULT_OFFLINE_PACKET_LIMIT;

public DefaultRakServerConfig(RakServerChannel channel) {
super(channel);
Expand Down Expand Up @@ -86,6 +89,15 @@ public <T> T getOption(ChannelOption<T> option) {
if (option == RakChannelOption.RAK_HANDLE_PING) {
return (T) Boolean.valueOf(this.getHandlePing());
}
if (option == RakChannelOption.RAK_PACKET_LIMIT) {
return (T) Integer.valueOf(this.getPacketLimit());
}
if (option == RakChannelOption.RAK_GLOBAL_PACKET_LIMIT) {
return (T) Integer.valueOf(this.getGlobalPacketLimit());
}
if (option == RakChannelOption.RAK_OFFLINE_PACKET_LIMIT) {
return (T) Integer.valueOf(this.getUnconnectedPacketLimit());
}
return this.channel.parent().config().getOption(option);
}

Expand All @@ -111,6 +123,12 @@ public <T> boolean setOption(ChannelOption<T> option, T value) {
this.setMaxMtu((Integer) value);
} else if (option == RakChannelOption.RAK_MIN_MTU) {
this.setMinMtu((Integer) value);
} else if (option == RakChannelOption.RAK_PACKET_LIMIT) {
this.setPacketLimit((Integer) value);
} else if (option == RakChannelOption.RAK_OFFLINE_PACKET_LIMIT) {
this.setUnconnectedPacketLimit((Integer) value);
} else if (option == RakChannelOption.RAK_GLOBAL_PACKET_LIMIT) {
this.setGlobalPacketLimit((Integer) value);
} else {
return this.channel.parent().config().setOption(option, value);
}
Expand Down Expand Up @@ -226,4 +244,34 @@ public RakServerChannelConfig setMinMtu(int minMtu) {
public int getMinMtu() {
return this.minMtu;
}

@Override
public void setPacketLimit(int limit) {
this.packetLimit = limit;
}

@Override
public int getPacketLimit() {
return this.packetLimit;
}

@Override
public int getUnconnectedPacketLimit() {
return unconnectedPacketLimit;
}

@Override
public void setUnconnectedPacketLimit(int unconnectedPacketLimit) {
this.unconnectedPacketLimit = unconnectedPacketLimit;
}

@Override
public int getGlobalPacketLimit() {
return globalPacketLimit;
}

@Override
public void setGlobalPacketLimit(int globalPacketLimit) {
this.globalPacketLimit = globalPacketLimit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,28 @@ public class RakChannelOption<T> extends ChannelOption<T> {
public static final ChannelOption<Integer> RAK_FLUSH_INTERVAL =
valueOf(RakChannelOption.class, "RAK_FLUSH_INTERVAL");

/**
* A number of datagram packets each address can send within one RakNet tick (10ms).
* Default is 120 packets.
*/
public static final ChannelOption<Integer> RAK_PACKET_LIMIT =
valueOf(RakChannelOption.class, "RAK_PACKET_LIMIT");

/**
* A number of "unconnected" datagram packets each address can send within one second.
* This includes packets such as UNCONNECTED_PING and OPEN_CONNECTION_REQUEST packets.
* Default is 10 packets.
*/
public static final ChannelOption<Integer> RAK_OFFLINE_PACKET_LIMIT =
valueOf(RakChannelOption.class, "RAK_OFFLINE_PACKET_LIMIT");

/**
* A number of all datagrams that will be handled within one RakNet tick before server starts dropping any incoming data.
* Default is 100_000 (RAK_PACKET_LIMIT * 0.56 * 1500 different connections).
*/
public static final ChannelOption<Integer> RAK_GLOBAL_PACKET_LIMIT =
valueOf(RakChannelOption.class, "RAK_GLOBAL_PACKET_LIMIT");

@SuppressWarnings("deprecation")
protected RakChannelOption() {
super(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,16 @@ public interface RakServerChannelConfig extends ChannelConfig {
int getMinMtu();

RakServerChannelConfig setMinMtu(int mtu);

int getPacketLimit();

void setPacketLimit(int limit);

int getGlobalPacketLimit();

void setGlobalPacketLimit(int limit);

int getUnconnectedPacketLimit();

void setUnconnectedPacketLimit(int limit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -397,11 +397,15 @@ private void tryTick() {
}

private void onTick() {
long curTime = System.currentTimeMillis();

if (this.state == RakState.UNCONNECTED) {
if (this.isTimedOut(curTime)) {
this.close(RakDisconnectReason.TIMED_OUT);
}
return;
}

long curTime = System.currentTimeMillis();
if (this.isTimedOut(curTime)) {
this.disconnect(RakDisconnectReason.TIMED_OUT);
return;
Expand Down Expand Up @@ -760,6 +764,15 @@ private ChannelPromise disconnect0(RakDisconnectReason reason) {
}

public void close(RakDisconnectReason reason) {
if (this.state == RakState.DISCONNECTING) {
return;
}
this.state = RakState.DISCONNECTING;

if (log.isDebugEnabled()) {
log.debug("Closing RakNet Session ({} => {}) due to {}", this.channel.localAddress(), this.getRemoteAddress(), reason);
}

this.channel.pipeline().fireUserEventTriggered(reason).close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@

public class RakServerOfflineHandler extends AdvancedChannelInboundHandler<DatagramPacket> {
public static final String NAME = "rak-offline-handler";
private static final int MAX_PACKETS_PER_SECOND = 10;

private static final InternalLogger log = InternalLoggerFactory.getInstance(RakServerOfflineHandler.class);

Expand All @@ -59,6 +58,12 @@ public class RakServerOfflineHandler extends AdvancedChannelInboundHandler<Datag
.expirationPolicy(ExpirationPolicy.CREATED)
.build();

private final RakServerChannel channel;

public RakServerOfflineHandler(RakServerChannel channel) {
this.channel = channel;
}

@Override
protected boolean acceptInboundMessage(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!super.acceptInboundMessage(ctx, msg)) {
Expand Down Expand Up @@ -100,8 +105,9 @@ protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) th
long guid = ctx.channel().config().getOption(RakChannelOption.RAK_GUID);

AtomicInteger counter = this.packetsCounter.computeIfAbsent(packet.sender().getAddress(), s -> new AtomicInteger());
if (counter.incrementAndGet() > MAX_PACKETS_PER_SECOND) {
if (counter.incrementAndGet() > this.channel.config().getUnconnectedPacketLimit()) {
log.warn("[{}] Sent too many packets per second", packet.sender());
this.channel.tryBlockAddress(packet.sender().getAddress(), 10, TimeUnit.SECONDS);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import org.cloudburstmc.netty.channel.raknet.RakChildChannel;
import org.cloudburstmc.netty.channel.raknet.RakDisconnectReason;
import org.cloudburstmc.netty.channel.raknet.RakPriority;
import org.cloudburstmc.netty.channel.raknet.RakReliability;
import org.cloudburstmc.netty.channel.raknet.config.RakChannelConfig;
import org.cloudburstmc.netty.channel.raknet.config.RakServerChannelConfig;
import org.cloudburstmc.netty.channel.raknet.packet.EncapsulatedPacket;
import org.cloudburstmc.netty.channel.raknet.packet.RakMessage;
Expand All @@ -33,15 +34,17 @@

import java.net.Inet6Address;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicInteger;

import static org.cloudburstmc.netty.channel.raknet.RakConstants.*;

@Sharable
public class RakServerOnlineInitialHandler extends SimpleChannelInboundHandler<EncapsulatedPacket> {

public static final String NAME = "rak-server-online-initial-handler";
private static final InternalLogger log = InternalLoggerFactory.getInstance(RakServerOnlineInitialHandler.class);

private final RakChildChannel channel;
private final AtomicInteger retriesCounter = new AtomicInteger(5);

public RakServerOnlineInitialHandler(RakChildChannel channel) {
this.channel = channel;
Expand Down Expand Up @@ -73,20 +76,23 @@ protected void channelRead0(ChannelHandlerContext ctx, EncapsulatedPacket messag

private void onConnectionRequest(ChannelHandlerContext ctx, ByteBuf buffer) {
buffer.skipBytes(1);
long guid = ((RakChannelConfig) this.channel.config()).getGuid();
long guid = this.channel.config().getGuid();
long serverGuid = buffer.readLong();
long timestamp = buffer.readLong();
boolean security = buffer.readBoolean();

if (serverGuid != guid || security) {
if (this.retriesCounter.decrementAndGet() < 0) {
this.sendConnectionRequestFailed(ctx, guid);
log.warn("[{}] Connection request failed due to too many retries", this.channel.remoteAddress());
} else if (serverGuid != guid || security) {
this.sendConnectionRequestFailed(ctx, guid);
} else {
this.sendConnectionRequestAccepted(ctx, timestamp);
}
}

private void sendConnectionRequestAccepted(ChannelHandlerContext ctx, long time) {
InetSocketAddress address = ((InetSocketAddress) this.channel.remoteAddress());
InetSocketAddress address = this.channel.remoteAddress();
boolean ipv6 = address.getAddress() instanceof Inet6Address;
ByteBuf outBuf = ctx.alloc().ioBuffer(ipv6 ? 628 : 166);

Expand All @@ -99,7 +105,7 @@ private void sendConnectionRequestAccepted(ChannelHandlerContext ctx, long time)
outBuf.writeLong(time);
outBuf.writeLong(System.currentTimeMillis());

ctx.writeAndFlush(new RakMessage(outBuf, RakReliability.RELIABLE, RakPriority.IMMEDIATE));
ctx.writeAndFlush(new RakMessage(outBuf, RakReliability.UNRELIABLE, RakPriority.IMMEDIATE));
}

private void sendConnectionRequestFailed(ChannelHandlerContext ctx, long guid) {
Expand Down
Loading

0 comments on commit 2f77498

Please sign in to comment.