Skip to content
Open
39 changes: 14 additions & 25 deletions bt-core/src/main/java/bt/net/ConnectionSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import bt.metainfo.TorrentId;
import bt.runtime.Config;
import bt.service.IRuntimeLifecycleBinder;
import bt.torrent.PeerTimeoutRegistry;
import com.google.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -42,9 +43,10 @@ public class ConnectionSource implements IConnectionSource {
private final Object lock = new Object();

private final ConcurrentMap<ConnectionKey, CompletableFuture<ConnectionResult>> pendingConnections;
// TODO: weak map
private final ConcurrentMap<Peer, Long> unreachablePeers;

private final PeerTimeoutRegistry peerTimeoutRegistry;

@Inject
public ConnectionSource(Set<PeerConnectionAcceptor> connectionAcceptors,
IPeerConnectionFactory connectionFactory,
Expand All @@ -64,6 +66,7 @@ public ConnectionSource(Set<PeerConnectionAcceptor> connectionAcceptors,

this.pendingConnections = new ConcurrentHashMap<>();
this.unreachablePeers = new ConcurrentHashMap<>();
this.peerTimeoutRegistry = new PeerTimeoutRegistry(1, java.util.concurrent.TimeUnit.MINUTES);

String incomingThreadName = String.format("%d.bt.net.pool.incoming-connection-worker", config.getAcceptorPort());
ExecutorService incomingConnectionExecutor = Executors.newFixedThreadPool(
Expand Down Expand Up @@ -91,6 +94,13 @@ public ConnectionResult getConnection(Peer peer, TorrentId torrentId) {
@Override
public CompletableFuture<ConnectionResult> getConnectionAsync(Peer peer, TorrentId torrentId) {
ConnectionKey key = new ConnectionKey(peer, peer.getPort(), torrentId);
String peerId = peer.toString();
if (!peerTimeoutRegistry.isAllowed(peerId)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Skipping banned peer: {}", peerId);
}
return CompletableFuture.completedFuture(ConnectionResult.failure("Peer is temporarily banned"));
}

CompletableFuture<ConnectionResult> result = validateNewConnPossible(peer, torrentId, key);
if (result != null) {
Expand All @@ -114,7 +124,6 @@ public CompletableFuture<ConnectionResult> getConnectionAsync(Peer peer, Torrent
}

synchronized (lock) {
// synchronized double checking.
result = validateNewConnPossible(peer, torrentId, key);
if (result != null) {
return result;
Expand All @@ -125,7 +134,6 @@ public CompletableFuture<ConnectionResult> getConnectionAsync(Peer peer, Torrent
pendingConnections.put(key, result);
return result;
} finally {
// If an exception happens, make sure that a thread isn't deadlocked waiting for this to complete
addedToPendingConnections.complete(null);
}
}
Expand All @@ -150,23 +158,17 @@ private CompletableFuture<ConnectionResult> createPendingConnFuture(Peer peer, T
return connectionResult;
}
} finally {
// ensure that we don't remove this key from the map before it is added.
addedToPendingConnections.join();

// The synchronize ensures a memory barrier that ensures the effects of connectionPool.addConnectionIfAbsent(established)
// are visible to any other thread that sees the removal.
// Unfortunately ConcurrentMap.remove() does not guarantee a happens before relationship. See:
// https://stackoverflow.com/questions/39341742/does-concurrentmap-remove-provide-a-happens-before-edge-before-get-returns-n
// When Java 11 features are enabled, this synchronize can be replaced with VarHandle.storeStoreFence().
synchronized (pendingConnections) {
pendingConnections.remove(key);
}
}
}, outgoingConnectionExecutor).whenComplete((acquiredConnection, throwable) -> {
if (acquiredConnection == null || throwable != null) {
peerTimeoutRegistry.markTimedOut(peer.toString());

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Peer is unreachable: {}. Will prevent further attempts to establish connection.",
peer);
LOGGER.debug("Peer is unreachable: {}. Will prevent further attempts to establish connection.", peer);
}
unreachablePeers.putIfAbsent(peer, System.currentTimeMillis());
}
Expand All @@ -178,16 +180,6 @@ private CompletableFuture<ConnectionResult> createPendingConnFuture(Peer peer, T
});
}

/**
* Checks whether a connection to this peer on the specified torrent id is possible. Returns a result if a new
* connection is not possible. This can happen if there is an existing pending connection, or we have reached
* {@link Config#getMaxPeerConnections()}
*
* @param peer the peer to connect to
* @param torrentId the torrent for the connection
* @param key the connection key
* @return a result if a new connection is not possible, null otherwise
*/
private CompletableFuture<ConnectionResult> validateNewConnPossible(Peer peer, TorrentId torrentId,
ConnectionKey key) {
CompletableFuture<ConnectionResult> connection = getExistingOrPendingConnection(key);
Expand Down Expand Up @@ -216,9 +208,6 @@ private boolean checkPeerConnectionsLimit(Peer peer, TorrentId torrentId) {
}

private CompletableFuture<ConnectionResult> getExistingOrPendingConnection(ConnectionKey key) {
// When Java 11 features are enabled, this synchronize can be replaced with VarHandle.loadLoadFence() below the
// end of the synchronized block.
// See comment in createPendingConnFuture()
synchronized (pendingConnections) {
CompletableFuture<ConnectionResult> pendingConnection = pendingConnections.get(key);
if (pendingConnection != null) {
Expand Down
31 changes: 18 additions & 13 deletions bt-core/src/main/java/bt/net/PeerConnectionPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import bt.metainfo.TorrentId;
import bt.runtime.Config;
import bt.service.IRuntimeLifecycleBinder;
import bt.torrent.PeerTimeoutRegistry;

import com.google.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -39,19 +41,20 @@
import java.util.stream.Collectors;

/**
*<p><b>Note that this class implements a service.
* <p><b>Note that this class implements a service.
* Hence, is not a part of the public API and is a subject to change.</b></p>
*/
public class PeerConnectionPool implements IPeerConnectionPool {

private static final Logger LOGGER = LoggerFactory.getLogger(PeerConnectionPool.class);

private Config config;
private EventSink eventSink;
private ScheduledExecutorService cleaner;
private Connections connections;
private ReentrantLock connectionLock;
private Duration peerConnectionInactivityThreshold;
private final Config config;
private final EventSink eventSink;
private final ScheduledExecutorService cleaner;
private final Connections connections;
private final ReentrantLock connectionLock;
private final Duration peerConnectionInactivityThreshold;
private final PeerTimeoutRegistry peerTimeoutRegistry;

@Inject
public PeerConnectionPool(
Expand All @@ -64,6 +67,7 @@ public PeerConnectionPool(
this.peerConnectionInactivityThreshold = config.getPeerConnectionInactivityThreshold();
this.connections = new Connections();
this.connectionLock = new ReentrantLock();
this.peerTimeoutRegistry = new PeerTimeoutRegistry(5, TimeUnit.MINUTES);

String cleanerThreadName = String.format("%d.bt.net.pool.cleaner", config.getAcceptorPort());
this.cleaner = Executors.newScheduledThreadPool(1, r -> new Thread(r, cleanerThreadName));
Expand Down Expand Up @@ -113,8 +117,8 @@ public PeerConnection addConnectionIfAbsent(PeerConnection newConnection) {
getConnectionsForAddress(newConnection.getTorrentId(), newConnection.getRemotePeer());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Checking duplicate connections for newly established" +
" connection with peer: {} (established remote port: {})." +
" All connections:\n{}", newConnection.getRemotePeer(), newConnection.getRemotePort(),
" connection with peer: {} (established remote port: {})." +
" All connections:\n{}", newConnection.getRemotePeer(), newConnection.getRemotePort(),
connectionsWithSameAddress.stream()
.map(Object::toString)
.collect(Collectors.joining("\n")));
Expand Down Expand Up @@ -171,7 +175,6 @@ public void checkDuplicateConnections(TorrentId torrentId, Peer peer) {
}
}
if (outgoingConnection != null && incomingConnection != null) {
// always prefer to keep outgoing connections
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Closing duplicate incoming connection with {}:{}" +
" (established remote port {})",
Expand Down Expand Up @@ -220,7 +223,6 @@ public void run() {
}
purgeConnection(connection);
}
// can send keep-alives here based on lastActiveTime
});

} finally {
Expand All @@ -232,6 +234,9 @@ public void run() {
private void purgeConnection(PeerConnection connection) {
ConnectionKey connectionKey = new ConnectionKey(connection.getRemotePeer(),
connection.getRemotePort(), connection.getTorrentId());

peerTimeoutRegistry.markTimedOut(connectionKey.toString());

connections.remove(connectionKey, connection);
connection.closeQuietly();
eventSink.firePeerDisconnected(connectionKey);
Expand All @@ -256,8 +261,8 @@ private void shutdownCleaner() {
}

class Connections {
private ConcurrentMap<ConnectionKey, PeerConnection> connections;
private ConcurrentMap<TorrentId, Collection<PeerConnection>> connectionsByTorrent;
private final ConcurrentMap<ConnectionKey, PeerConnection> connections;
private final ConcurrentMap<TorrentId, Collection<PeerConnection>> connectionsByTorrent;

Connections() {
this.connections = new ConcurrentHashMap<>();
Expand Down
34 changes: 17 additions & 17 deletions bt-core/src/main/java/bt/peer/PeerRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
* Copyright (c) 2016—2021 Andrei Tomashpolskiy and individual contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
Expand All @@ -27,20 +25,14 @@
import bt.service.IdentityService;
import bt.torrent.TorrentDescriptor;
import bt.torrent.TorrentRegistry;
import bt.torrent.PeerTimeoutRegistry;
import bt.tracker.AnnounceKey;
import bt.tracker.ITrackerService;
import com.google.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
Expand All @@ -67,6 +59,7 @@ public class PeerRegistry implements IPeerRegistry {

private final ConcurrentMap<TorrentId, Set<AnnounceKey>> extraAnnounceKeys;
private final ReentrantLock extraAnnounceKeysLock;
private final PeerTimeoutRegistry peerTimeoutRegistry;

@Inject
public PeerRegistry(IRuntimeLifecycleBinder lifecycleBinder,
Expand All @@ -91,9 +84,13 @@ public PeerRegistry(IRuntimeLifecycleBinder lifecycleBinder,

this.extraAnnounceKeys = new ConcurrentHashMap<>();
this.extraAnnounceKeysLock = new ReentrantLock();

this.peerTimeoutRegistry = new PeerTimeoutRegistry(5, TimeUnit.MINUTES);
this.scheduledExecutorService = createExecutor(lifecycleBinder, config);
eventSource.onTorrentStopped(null, e -> this.extraAnnounceKeys.remove(e.getTorrentId()));

lifecycleBinder.onStartup("Schedule peer timeout cleanup", () ->
scheduledExecutorService.scheduleAtFixedRate(
peerTimeoutRegistry::cleanup, 1, 1, TimeUnit.MINUTES));
}

private ScheduledExecutorService createExecutor(IRuntimeLifecycleBinder lifecycleBinder, Config config) {
Expand Down Expand Up @@ -131,9 +128,6 @@ private void collectPeersForTorrent(TorrentId torrentId) {
LOGGER.warn("Will not query extra trackers for a private torrent, id: {}", torrentId);
}
} else {
// more announce keys might be added at the same time;
// querying all trackers can be time-consuming, so we make a copy of the collection
// to prevent blocking callers of addPeerSource(TorrentId, AnnounceKey) for too long
Collection<AnnounceKey> extraTorrentAnnounceKeysCopy;
extraAnnounceKeysLock.lock();
try {
Expand All @@ -144,7 +138,6 @@ private void collectPeersForTorrent(TorrentId torrentId) {
queryTrackers(torrentId, torrentAnnounceKey, extraTorrentAnnounceKeysCopy);
}

// disallow querying peer sources other than the tracker for private torrents
if ((!torrentOptional.isPresent() || !torrentOptional.get().isPrivate()) && !extraPeerSourceFactories.isEmpty()) {
extraPeerSourceFactories.forEach(factory ->
queryPeerSource(torrentId, factory.getPeerSource(torrentId)));
Expand Down Expand Up @@ -180,7 +173,6 @@ private void queryTracker(TorrentId torrentId, AnnounceKey announceKey) {

private boolean mightCreateTracker(AnnounceKey announceKey) {
if (announceKey.isMultiKey()) {
// TODO: need some more sophisticated solution because some of the trackers might be supported
for (List<String> tier : announceKey.getTrackerUrls()) {
for (String trackerUrl : tier) {
if (!trackerService.isSupportedProtocol(trackerUrl)) {
Expand Down Expand Up @@ -215,6 +207,15 @@ private void queryPeerSource(TorrentId torrentId, PeerSource peerSource) {

@Override
public void addPeer(TorrentId torrentId, Peer peer) {
// --------------------- Skip banned peers ---------------------
String peerId = peer.toString();
if (!peerTimeoutRegistry.isAllowed(peerId)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Skipping banned peer: {}", peerId);
}
return;
}

if (peer.isPortUnknown()) {
throw new IllegalArgumentException("Peer's port is unknown: " + peer);
} else if (peer.getPort() < 0 || peer.getPort() > 65535) {
Expand Down Expand Up @@ -256,5 +257,4 @@ private boolean isLocal(Peer peer) {
public Peer getLocalPeer() {
return localPeer;
}

}
Loading