Skip to content

Commit 80b7531

Browse files
author
Sven Göthel
committed
cool#9833: Implement MaxConnection Limit (2)
Reimplementation of commit 80246f7 (post revert). Adding TCP connection limit to outside facing TCP IPv4/IPv6 Sockets - validated and registered at ServerSocket::accept() (IPv4/IPv6 only) - excluded from LocalServerSocket::accept() for local UDS connections - unregistered at Socket dtor post socket closing. TODO: - revise net::Defaults.maxTCPConnections to match actual system settings Signed-off-by: Sven Göthel <[email protected]> Change-Id: Ib9f0ac17c05ffe65c2370490f68f581fa76730e7
1 parent abac9fa commit 80b7531

File tree

4 files changed

+160
-36
lines changed

4 files changed

+160
-36
lines changed

net/ServerSocket.hpp

+5-3
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,11 @@ class ServerSocket : public Socket
108108
const std::string msg = "Failed to accept. (errno: ";
109109
throw std::runtime_error(msg + std::strerror(errno) + ')');
110110
}
111-
112-
LOG_TRC("Accepted client #" << clientSocket->getFD());
113-
_clientPoller.insertNewSocket(std::move(clientSocket));
111+
if( clientSocket->isOpen() )
112+
{
113+
LOG_TRC("Accepted client #" << clientSocket->getFD());
114+
_clientPoller.insertNewSocket(std::move(clientSocket));
115+
} // else intentionally cancelled accepted connection (e.g. connection limiter)
114116
}
115117
}
116118

net/Socket.cpp

+75-30
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ std::atomic<bool> Socket::InhibitThreadChecks(false);
6565

6666
std::unique_ptr<Watchdog> SocketPoll::PollWatchdog;
6767

68+
std::mutex Socket::statsMutex;
69+
std::atomic<size_t> Socket::statsConnectionCount(0);
70+
6871
net::DefaultValues net::Defaults = { .inactivityTimeout = std::chrono::seconds(3600),
6972
.wsPingAvgTimeout = std::chrono::seconds(12),
7073
.wsPingInterval = std::chrono::seconds(18),
@@ -146,8 +149,10 @@ std::string Socket::getStatsString(const std::chrono::steady_clock::time_point &
146149
std::ostream& Socket::streamImpl(std::ostream& os) const
147150
{
148151
os << "Socket[#" << getFD()
149-
<< ", " << toString(type())
150-
<< " @ ";
152+
<< ", " << toString(type());
153+
if (isCounted())
154+
os << ", counted";
155+
os << " @ ";
151156
if (Type::IPv6 == type())
152157
{
153158
os << "[" << clientAddress() << "]:" << clientPort();
@@ -1159,47 +1164,85 @@ std::shared_ptr<Socket> ServerSocket::accept()
11591164
const int rc = fakeSocketAccept4(getFD());
11601165
#endif
11611166
LOG_TRC("Accepted socket #" << rc << ", creating socket object.");
1162-
try
1167+
if (rc != -1)
11631168
{
1164-
// Create a socket object using the factory.
1165-
if (rc != -1)
1166-
{
11671169
#if !MOBILEAPP
1168-
char addrstr[INET6_ADDRSTRLEN];
1170+
char addrstr[INET6_ADDRSTRLEN];
11691171

1170-
Socket::Type type;
1171-
const void *inAddr;
1172-
if (clientInfo.sin6_family == AF_INET)
1172+
Socket::Type type;
1173+
const void *inAddr;
1174+
if (clientInfo.sin6_family == AF_INET)
1175+
{
1176+
struct sockaddr_in *ipv4 = (struct sockaddr_in *)&clientInfo;
1177+
inAddr = &(ipv4->sin_addr);
1178+
type = Socket::Type::IPv4;
1179+
}
1180+
else
1181+
{
1182+
struct sockaddr_in6 *ipv6 = &clientInfo;
1183+
inAddr = &(ipv6->sin6_addr);
1184+
type = Socket::Type::IPv6;
1185+
}
1186+
::inet_ntop(clientInfo.sin6_family, inAddr, addrstr, sizeof(addrstr));
1187+
1188+
if( !Socket::checkAndIncrConnectionCount(rc) )
1189+
{
1190+
// cancellation branch!
1191+
if constexpr (!Util::isMobileApp())
11731192
{
1174-
struct sockaddr_in *ipv4 = (struct sockaddr_in *)&clientInfo;
1175-
inAddr = &(ipv4->sin_addr);
1176-
type = Socket::Type::IPv4;
1193+
if (::close(rc))
1194+
LOG_SYS("Ignored error closing socket #" << rc);
1195+
} else
1196+
fakeSocketClose(rc);
1197+
1198+
try {
1199+
// return closed dummy socket to avoid handlePoll throw
1200+
std::shared_ptr<Socket> _socket = StreamSocket::create<StreamSocket>(
1201+
std::string(), /*fd=*/-1 /* closed */, type, false, HostType::Other,
1202+
std::make_shared<NoOpSocketHandler>());
1203+
_socket->setClientAddress(addrstr, clientInfo.sin6_port);
1204+
LOG_WRN("TCP Limiter: Rejecting accepted socket #" << rc << ", " << *_socket);
1205+
return _socket;
11771206
}
1178-
else
1207+
catch (const std::exception& ex)
11791208
{
1180-
struct sockaddr_in6 *ipv6 = &clientInfo;
1181-
inAddr = &(ipv6->sin6_addr);
1182-
type = Socket::Type::IPv6;
1209+
LOG_WRN("TCP Limiter: Rejecting accepted socket #" << rc);
1210+
LOG_ERR("Failed to create limited rejected client socket #" << rc << ". Error: " << ex.what());
1211+
return nullptr;
11831212
}
1184-
1185-
std::shared_ptr<Socket> _socket = createSocketFromAccept(rc, type);
1186-
1187-
::inet_ntop(clientInfo.sin6_family, inAddr, addrstr, sizeof(addrstr));
1213+
}
1214+
#endif
1215+
// Create a socket object using the factory.
1216+
bool hasDtor = false;
1217+
try
1218+
{
1219+
#if !MOBILEAPP
1220+
std::shared_ptr<Socket> _socket = createSocketFromAccept(rc, type); // may throw
1221+
_socket->_isCounted = true;
1222+
hasDtor = true;
11881223
_socket->setClientAddress(addrstr, clientInfo.sin6_port);
11891224

11901225
LOG_TRC("Accepted socket #" << _socket->getFD() << " has family "
1191-
<< clientInfo.sin6_family << " address "
1192-
<< _socket->clientAddress());
1226+
<< clientInfo.sin6_family << ", " << *_socket);
11931227
#else
11941228
std::shared_ptr<Socket> _socket = createSocketFromAccept(rc, Socket::Type::Unix);
11951229
#endif
11961230
return _socket;
11971231
}
1198-
return std::shared_ptr<Socket>(nullptr);
1199-
}
1200-
catch (const std::exception& ex)
1201-
{
1202-
LOG_ERR("Failed to create client socket #" << rc << ". Error: " << ex.what());
1232+
catch (const std::exception& ex)
1233+
{
1234+
LOG_ERR("Failed to create client socket #" << rc << " (had socket << " << hasDtor << "). Error: " << ex.what());
1235+
if (!hasDtor)
1236+
{
1237+
if constexpr (!Util::isMobileApp())
1238+
{
1239+
if (::close(rc))
1240+
LOG_SYS("Ignored error closing socket #" << rc);
1241+
} else
1242+
fakeSocketClose(rc);
1243+
decrConnectionCount(rc);
1244+
}
1245+
}
12031246
}
12041247

12051248
return nullptr;
@@ -1392,8 +1435,10 @@ std::ostream& StreamSocket::stream(std::ostream& os) const
13921435
{
13931436
os << "StreamSocket[#" << getFD()
13941437
<< ", " << toStringShort(_wsState)
1395-
<< ", " << Socket::toString(type())
1396-
<< " @ ";
1438+
<< ", " << Socket::toString(type());
1439+
if (isCounted())
1440+
os << ", counted";
1441+
os << " @ ";
13971442
if (Type::IPv6 == type())
13981443
{
13991444
os << "[" << clientAddress() << "]:" << clientPort();

net/Socket.hpp

+74-3
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
#pragma once
1313

14+
#include <limits>
1415
#include <poll.h>
1516
#include <unistd.h>
1617
#include <sys/types.h>
@@ -33,6 +34,7 @@
3334

3435
#include <common/StateEnum.hpp>
3536
#include "Log.hpp"
37+
#include "NetUtil.hpp"
3638
#include "Util.hpp"
3739
#include "Buffer.hpp"
3840
#include "SigUtil.hpp"
@@ -129,6 +131,8 @@ class SocketDisposition final
129131
std::shared_ptr<Socket> _socket;
130132
};
131133

134+
class ServerSocket; // fwd
135+
132136
/// A non-blocking, streaming socket.
133137
class Socket
134138
{
@@ -151,6 +155,7 @@ class Socket
151155
, _lastSeenTime(_creationTime)
152156
, _bytesSent(0)
153157
, _bytesRcvd(0)
158+
, _isCounted(false)
154159
{
155160
init();
156161
}
@@ -162,19 +167,24 @@ class Socket
162167
// Doesn't block on sockets; no error handling needed.
163168
if constexpr (!Util::isMobileApp())
164169
{
165-
::close(_fd);
170+
if (::close(_fd))
171+
LOG_SYS("Ignored error closing socket #" << _fd);
166172
LOG_DBG("Closed socket " << toStringImpl());
167173
}
168174
else
169-
{
170175
fakeSocketClose(_fd);
176+
if (_isCounted)
177+
{
178+
_isCounted = false;
179+
decrConnectionCount(_fd);
171180
}
172181
}
173182

174183
/// Returns true if this socket is open, i.e. allowed to be polled and not shutdown
175184
bool isOpen() const { return _open; }
176185
/// Returns true if this socket has been closed, i.e. rejected from polling and potentially shutdown
177186
bool isClosed() const { return !_open; }
187+
bool isCounted() const { return _isCounted; }
178188

179189
constexpr Type type() const { return _type; }
180190
constexpr bool isIPType() const { return Type::IPv4 == _type || Type::IPv6 == _type; }
@@ -249,7 +259,7 @@ class Socket
249259
if constexpr (!Util::isMobileApp())
250260
{
251261
const int val = 1;
252-
if (::setsockopt(_fd, IPPROTO_TCP, TCP_NODELAY, (char*)&val, sizeof(val)) == -1)
262+
if (isOpen() && ::setsockopt(_fd, IPPROTO_TCP, TCP_NODELAY, (char*)&val, sizeof(val)) == -1)
253263
{
254264
static std::once_flag once;
255265
std::call_once(once,
@@ -409,6 +419,10 @@ class Socket
409419
LOG_TRC("Ignore further input on socket.");
410420
_ignoreInput = true;
411421
}
422+
423+
/// Returns connection count, lock-free
424+
static size_t connectionCount() { return statsConnectionCount; }
425+
412426
protected:
413427
/// Construct based on an existing socket fd.
414428
/// Used by accept() only.
@@ -422,6 +436,7 @@ class Socket
422436
, _lastSeenTime(_creationTime)
423437
, _bytesSent(0)
424438
, _bytesRcvd(0)
439+
, _isCounted(false)
425440
{
426441
init();
427442
}
@@ -493,6 +508,50 @@ class Socket
493508

494509
/// We check the owner even in the release builds, needs to be always correct.
495510
std::thread::id _owner;
511+
512+
bool _isCounted; // if true, must call `decrConnectionCount` in connection `shutdown`
513+
514+
/// Decrements global connection count
515+
/// @param fd the related file descriptor for logging
516+
static void decrConnectionCount(int fd)
517+
{
518+
std::lock_guard<std::mutex> lock(statsMutex);
519+
const size_t u = statsConnectionCount;
520+
auto logPrefix = [fd](std::ostream& os) { os << '#' << fd << ": "; };
521+
if (u > 0)
522+
{
523+
const size_t v = --statsConnectionCount;
524+
LOG_TRC("TCP Limiter: Count decremented: " << u << " -> " << v);
525+
}
526+
else
527+
LOG_WRN("TCP Limiter: Count decrement underflow: " << u);
528+
}
529+
530+
/// Increments global connection counter if not exceeding net::DefaultValue::maxTCPConnections.
531+
/// Returns true if free connections were available, otherwise false.
532+
/// No limitation is applied if net::DefaultValue::maxTCPConnections == 0.
533+
/// @param fd the related file descriptor for logging
534+
static bool checkAndIncrConnectionCount(int fd)
535+
{
536+
std::lock_guard<std::mutex> lock(statsMutex);
537+
const size_t u = statsConnectionCount;
538+
auto logPrefix = [fd](std::ostream& os) { os << '#' << fd << ": "; };
539+
if (net::Defaults.maxTCPConnections == 0 || u < net::Defaults.maxTCPConnections)
540+
{
541+
const size_t v = ++statsConnectionCount;
542+
LOG_TRC("TCP Limiter: Count incremented: " << u << " -> " << v);
543+
return true;
544+
}
545+
else
546+
{
547+
LOG_WRN("TCP Limiter: Limit reached: " << u);
548+
return false;
549+
}
550+
}
551+
friend class ServerSocket; // allow `checkAndIncrConnectionCount` for `ServerSocket::accept()`
552+
553+
static std::mutex statsMutex;
554+
static std::atomic<size_t> statsConnectionCount; // accepted TCP IPv4/IPv6 socket count
496555
};
497556

498557
inline std::ostream& operator<<(std::ostream& os, const Socket &s) { return s.stream(os); }
@@ -617,6 +676,18 @@ class SimpleSocketHandler : public ProtocolHandlerInterface
617676
void getIOStats(uint64_t &, uint64_t &) override {}
618677
};
619678

679+
/// A no-operation ProtocolHandlerInterface with dummy API.
680+
class NoOpSocketHandler : public SimpleSocketHandler
681+
{
682+
public:
683+
NoOpSocketHandler() = default;
684+
685+
void onConnect(const std::shared_ptr<StreamSocket>& /*socket*/) override {}
686+
void handleIncomingMessage(SocketDisposition &) override {}
687+
int getPollEvents(std::chrono::steady_clock::time_point /*now*/, int64_t &/*timeoutMaxMicroS*/) override { return 0; }
688+
void performWrites(std::size_t /*capacity*/) override {}
689+
};
690+
620691
/// Interface that receives and sends incoming messages.
621692
class MessageHandlerInterface :
622693
public std::enable_shared_from_this<MessageHandlerInterface>

test/UnitTimeoutBase.hpp

+6
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,8 @@ inline UnitBase::TestResult UnitTimeoutBase1::testHttp(const size_t connectionLi
204204
sessions.clear();
205205
TST_LOG("Clearing Poller: " << testname);
206206
socketPollers.clear();
207+
// TCP Connection Count: Just an estimation, no locking on server side
208+
TST_LOG("TCP Connection Count: " << testname << ", " << Socket::connectionCount() << " / " << net::Defaults.maxTCPConnections);
207209
TST_LOG("Ending Test: " << testname);
208210
return TestResult::Ok;
209211
}
@@ -302,6 +304,8 @@ inline UnitBase::TestResult UnitTimeoutBase1::testWSPing(const size_t connection
302304
sessions.clear();
303305
TST_LOG("Clearing Poller: " << testname);
304306
socketPollers.clear();
307+
// TCP Connection Count: Just an estimation, no locking on server side
308+
TST_LOG("TCP Connection Count: " << testname << ", " << Socket::connectionCount() << " / " << net::Defaults.maxTCPConnections);
305309
TST_LOG("Ending Test: " << testname);
306310
return TestResult::Ok;
307311
}
@@ -397,6 +401,8 @@ inline UnitBase::TestResult UnitTimeoutBase1::testWSDChatPing(const size_t conne
397401
sessions.clear();
398402
TST_LOG("Clearing Poller: " << testname);
399403
socketPollers.clear();
404+
// TCP Connection Count: Just an estimation, no locking on server side
405+
TST_LOG("TCP Connection Count: " << testname << ", " << Socket::connectionCount() << " / " << net::Defaults.maxTCPConnections);
400406
TST_LOG("Ending Test: " << testname);
401407
return TestResult::Ok;
402408
}

0 commit comments

Comments
 (0)