Skip to content

Commit

Permalink
fixup: Move system_api usage from tcp_socket_utils
Browse files Browse the repository at this point in the history
  • Loading branch information
eugeneo committed Nov 13, 2024
1 parent b18ffbe commit 82d62cb
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 97 deletions.
7 changes: 7 additions & 0 deletions src/core/lib/event_engine/extensions/system_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include <grpc/event_engine/event_engine.h>
#include <grpc/support/port_platform.h>

#include "absl/status/status.h"

namespace grpc_event_engine {
namespace experimental {

Expand Down Expand Up @@ -60,6 +62,11 @@ class SystemApi {
int flags) const = 0;
virtual int SetSockOpt(FileDescriptor fd, int level, int optname,
const void* optval, socklen_t optlen) const = 0;

// Tries to set SO_NOSIGPIPE if available on this platform.
// If SO_NO_SIGPIPE is not available, returns not OK status.
virtual absl::Status SetSocketNoSigpipeIfPossible(
FileDescriptor fd) const = 0;
};

} // namespace experimental
Expand Down
10 changes: 5 additions & 5 deletions src/core/lib/event_engine/posix_engine/posix_endpoint.cc
Original file line number Diff line number Diff line change
Expand Up @@ -502,11 +502,12 @@ void PosixEndpointImpl::UpdateRcvLowat() {
if (set_rcvlowat_ == remaining) {
return;
}
auto result = sock_.SetSocketRcvLowat(*get_system_api(), remaining);
if (result.ok()) {
set_rcvlowat_ = *result;
int result = get_system_api()->SetSockOpt(fd_, SOL_SOCKET, SO_RCVLOWAT,
&remaining, sizeof(remaining));
if (result == 0) {
set_rcvlowat_ = result;
} else {
LOG(ERROR) << "ERROR in SO_RCVLOWAT: " << result.status().message();
PLOG(ERROR) << "ERROR in SO_RCVLOWAT";
}
}

Expand Down Expand Up @@ -1264,7 +1265,6 @@ PosixEndpointImpl::PosixEndpointImpl(EventHandle* handle,
engine_(engine) {
fd_ =
handle_->Poller()->GetSystemApi()->AdoptExternalFd(handle_->WrappedFd());
sock_ = PosixSocketWrapper(fd_);
PosixSocketWrapper sock(fd_);
CHECK(options.resource_quota != nullptr);
auto peer_addr_string = sock.PeerAddressString(*get_system_api());
Expand Down
1 change: 0 additions & 1 deletion src/core/lib/event_engine/posix_engine/posix_endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,6 @@ class PosixEndpointImpl : public grpc_core::RefCounted<PosixEndpointImpl> {
struct cmsghdr* ProcessTimestamp(msghdr* msg, struct cmsghdr* cmsg);
#endif // GRPC_LINUX_ERRQUEUE
grpc_core::Mutex read_mu_;
PosixSocketWrapper sock_;
FileDescriptor fd_;
bool is_first_read_ = true;
bool has_posted_reclaimer_ ABSL_GUARDED_BY(read_mu_) = false;
Expand Down
11 changes: 6 additions & 5 deletions src/core/lib/event_engine/posix_engine/posix_engine_listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,11 @@ void PosixEngineListenerImpl::AsyncConnectionAcceptor::NotifyOnAccept(
addr = EventEngine::ResolvedAddress(addr.address(), len);
}
SystemApi* system_api = handle_->Poller()->GetSystemApi();
PosixSocketWrapper sock(system_api->AdoptExternalFd(fd));
(void)sock.SetSocketNoSigpipeIfPossible(*system_api);
auto result = sock.ApplySocketMutatorInOptions(
GRPC_FD_SERVER_CONNECTION_USAGE, listener_->options_);
(void)system_api->SetSocketNoSigpipeIfPossible(
system_api->AdoptExternalFd(fd));
auto result = ApplySocketMutatorInOptions(system_api->AdoptExternalFd(fd),
GRPC_FD_SERVER_CONNECTION_USAGE,
listener_->options_);
if (!result.ok()) {
LOG(ERROR) << "Closing acceptor. Failed to apply socket mutator: "
<< result;
Expand Down Expand Up @@ -262,7 +263,7 @@ absl::Status PosixEngineListenerImpl::HandleExternalConnection(
}
SystemApi* system_api = poller_->GetSystemApi();
PosixSocketWrapper sock(system_api->AdoptExternalFd(fd));
(void)sock.SetSocketNoSigpipeIfPossible(*system_api);
(void)system_api->SetSocketNoSigpipeIfPossible(sock.Fd());
auto peer_name = sock.PeerAddressString(*system_api);
if (!peer_name.ok()) {
return absl::UnknownError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,10 @@ absl::Status PrepareSocket(const SystemApi& system_api,
GRPC_RETURN_IF_ERROR(socket.sock.SetSocketDscp(system_api, options.dscp));
socket.sock.TrySetSocketTcpUserTimeout(system_api, options, false);
}
GRPC_RETURN_IF_ERROR(socket.sock.SetSocketNoSigpipeIfPossible(system_api));
GRPC_RETURN_IF_ERROR(socket.sock.ApplySocketMutatorInOptions(
GRPC_FD_SERVER_LISTENER_USAGE, options));
GRPC_RETURN_IF_ERROR(
system_api.SetSocketNoSigpipeIfPossible(socket.sock.Fd()));
GRPC_RETURN_IF_ERROR(ApplySocketMutatorInOptions(
socket.sock.Fd(), GRPC_FD_SERVER_LISTENER_USAGE, options));

if (system_api.Bind(fd, socket.addr.address(), socket.addr.size()) < 0) {
auto sockaddr_str = ResolvedAddressToString(socket.addr);
Expand Down
30 changes: 30 additions & 0 deletions src/core/lib/event_engine/posix_engine/posix_system_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,32 @@ int PosixSystemApi::SetSockOpt(FileDescriptor fd, int level, int optname,
return setsockopt(fd.fd(), level, optname, optval, optlen);
}

absl::Status PosixSystemApi::SetSocketNoSigpipeIfPossible(
FileDescriptor fd) const {
#ifdef GRPC_HAVE_SO_NOSIGPIPE
int val = 1;
int newval;
socklen_t intlen = sizeof(newval);
if (0 != SetSockOpt(fd_, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val))) {
return absl::Status(
absl::StatusCode::kInternal,
absl::StrCat("setsockopt(SO_NOSIGPIPE): ", grpc_core::StrError(errno)));
}
if (0 != GetSockOpt(fd_, SOL_SOCKET, SO_NOSIGPIPE, &newval, &intlen)) {
return absl::Status(
absl::StatusCode::kInternal,
absl::StrCat("getsockopt(SO_NOSIGPIPE): ", grpc_core::StrError(errno)));
}
if ((newval != 0) != (val != 0)) {
return absl::Status(absl::StatusCode::kInternal,
"Failed to set SO_NOSIGPIPE");
}
#else
(void)fd; // Makes the compiler error go away
#endif
return absl::OkStatus();
}

} // namespace experimental
} // namespace grpc_event_engine

Expand Down Expand Up @@ -144,6 +170,10 @@ int PosixSystemApi::SetSockOpt(FileDescriptor fd, int level, int optname,
grpc_core::Crash("unimplemented");
}

absl::Status PosixSystemApi::SetSocketNoSigpipeIfPossible(FileDescriptor fd) {
grpc_core::Crash("unimplemented");
}

} // namespace experimental
} // namespace grpc_event_engine

Expand Down
2 changes: 2 additions & 0 deletions src/core/lib/event_engine/posix_engine/posix_system_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class PosixSystemApi : public SystemApi {
int flags) const override;
int SetSockOpt(FileDescriptor fd, int level, int optname, const void* optval,
socklen_t optlen) const override;

absl::Status SetSocketNoSigpipeIfPossible(FileDescriptor fd) const override;
};

} // namespace experimental
Expand Down
62 changes: 9 additions & 53 deletions src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
#include "absl/log/check.h"
#include "absl/log/log.h"
#include "absl/status/status.h"
#include "src/core/lib/event_engine/extensions/supports_fd.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/util/status_helper.h"
#include "src/core/util/strerror.h"
Expand Down Expand Up @@ -144,9 +143,9 @@ absl::Status PrepareTcpClientSocket(const SystemApi& system_api,
GRPC_RETURN_IF_ERROR(sock.SetSocketDscp(system_api, options.dscp));
sock.TrySetSocketTcpUserTimeout(system_api, options, true);
}
GRPC_RETURN_IF_ERROR(sock.SetSocketNoSigpipeIfPossible(system_api));
GRPC_RETURN_IF_ERROR(sock.ApplySocketMutatorInOptions(
GRPC_FD_CLIENT_CONNECTION_USAGE, options));
GRPC_RETURN_IF_ERROR(system_api.SetSocketNoSigpipeIfPossible(sock.Fd()));
GRPC_RETURN_IF_ERROR(ApplySocketMutatorInOptions(
sock.Fd(), GRPC_FD_CLIENT_CONNECTION_USAGE, options));
// No errors. Set close_fd to false to ensure the socket is not closed.
close_fd = false;
return absl::OkStatus();
Expand Down Expand Up @@ -311,21 +310,6 @@ void UnlinkIfUnixDomainSocket(
#endif
}

// Instruct the kernel to wait for specified number of bytes to be received on
// the socket before generating an interrupt for packet receive. If the call
// succeeds, it returns the number of bytes (wait threshold) that was actually
// set.
absl::StatusOr<int> PosixSocketWrapper::SetSocketRcvLowat(
const SystemApi& system_api, int bytes) {
if (system_api.SetSockOpt(fd_, SOL_SOCKET, SO_RCVLOWAT, &bytes,
sizeof(bytes)) != 0) {
return absl::Status(
absl::StatusCode::kInternal,
absl::StrCat("setsockopt(SO_RCVLOWAT): ", grpc_core::StrError(errno)));
}
return bytes;
}

// Set a socket to use zerocopy
absl::Status PosixSocketWrapper::SetSocketZeroCopy(
const SystemApi& system_api) {
Expand Down Expand Up @@ -366,34 +350,6 @@ absl::Status PosixSocketWrapper::SetSocketNonBlocking(
return absl::OkStatus();
}

absl::Status PosixSocketWrapper::SetSocketNoSigpipeIfPossible(
const SystemApi& system_api) {
#ifdef GRPC_HAVE_SO_NOSIGPIPE
int val = 1;
int newval;
socklen_t intlen = sizeof(newval);
if (0 !=
system_api.SetSockOpt(fd_, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val))) {
return absl::Status(
absl::StatusCode::kInternal,
absl::StrCat("setsockopt(SO_NOSIGPIPE): ", grpc_core::StrError(errno)));
}
if (0 !=
system_api.GetSockOpt(fd_, SOL_SOCKET, SO_NOSIGPIPE, &newval, &intlen)) {
return absl::Status(
absl::StatusCode::kInternal,
absl::StrCat("getsockopt(SO_NOSIGPIPE): ", grpc_core::StrError(errno)));
}
if ((newval != 0) != (val != 0)) {
return absl::Status(absl::StatusCode::kInternal,
"Failed to set SO_NOSIGPIPE");
}
#else
(void)system_api; // Makes the compiler error go away
#endif
return absl::OkStatus();
}

absl::Status PosixSocketWrapper::SetSocketIpPktInfoIfPossible(
const SystemApi& system_api) {
#ifdef GRPC_HAVE_IP_PKTINFO
Expand Down Expand Up @@ -705,22 +661,22 @@ void PosixSocketWrapper::TrySetSocketTcpUserTimeout(
}

// Set a socket using a grpc_socket_mutator
absl::Status PosixSocketWrapper::SetSocketMutator(
grpc_fd_usage usage, grpc_socket_mutator* mutator) {
absl::Status SetSocketMutator(FileDescriptor fd, grpc_fd_usage usage,
grpc_socket_mutator* mutator) {
CHECK(mutator);
if (!grpc_socket_mutator_mutate_fd(mutator, fd_.fd(), usage)) {
if (!grpc_socket_mutator_mutate_fd(mutator, fd.fd(), usage)) {
return absl::Status(absl::StatusCode::kInternal,
"grpc_socket_mutator failed.");
}
return absl::OkStatus();
}

absl::Status PosixSocketWrapper::ApplySocketMutatorInOptions(
grpc_fd_usage usage, const PosixTcpOptions& options) {
absl::Status ApplySocketMutatorInOptions(FileDescriptor fd, grpc_fd_usage usage,
const PosixTcpOptions& options) {
if (options.socket_mutator == nullptr) {
return absl::OkStatus();
}
return SetSocketMutator(usage, options.socket_mutator);
return SetSocketMutator(fd, usage, options.socket_mutator);
}

bool PosixSocketWrapper::IsIpv6LoopbackAvailable() {
Expand Down
26 changes: 8 additions & 18 deletions src/core/lib/event_engine/posix_engine/tcp_socket_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,6 @@ class PosixSocketWrapper {

~PosixSocketWrapper() = default;

// Instruct the kernel to wait for specified number of bytes to be received on
// the socket before generating an interrupt for packet receive. If the call
// succeeds, it returns the number of bytes (wait threshold) that was actually
// set.
absl::StatusOr<int> SetSocketRcvLowat(const SystemApi& system_api, int bytes);

// Set socket to use zerocopy
absl::Status SetSocketZeroCopy(const SystemApi& system_api);

Expand Down Expand Up @@ -203,10 +197,6 @@ class PosixSocketWrapper {
const PosixTcpOptions& options,
bool is_client);

// Tries to set SO_NOSIGPIPE if available on this platform.
// If SO_NO_SIGPIPE is not available, returns not OK status.
absl::Status SetSocketNoSigpipeIfPossible(const SystemApi& system_api);

// Tries to set IP_PKTINFO if available on this platform. If IP_PKTINFO is not
// available, returns not OK status.
absl::Status SetSocketIpPktInfoIfPossible(const SystemApi& system_api);
Expand All @@ -223,14 +213,6 @@ class PosixSocketWrapper {
absl::Status SetSocketRcvBuf(const SystemApi& system_api,
int buffer_size_bytes);

// Tries to set the socket using a grpc_socket_mutator
absl::Status SetSocketMutator(grpc_fd_usage usage,
grpc_socket_mutator* mutator);

// Extracts the first socket mutator from config if any and applies on the fd.
absl::Status ApplySocketMutatorInOptions(grpc_fd_usage usage,
const PosixTcpOptions& options);

// Return LocalAddress as EventEngine::ResolvedAddress
absl::StatusOr<EventEngine::ResolvedAddress> LocalAddress(
const SystemApi& system_api);
Expand Down Expand Up @@ -335,6 +317,14 @@ struct PosixSocketWrapper::PosixSocketCreateResult {

bool SetSocketDualStack(int fd);

// Tries to set the socket using a grpc_socket_mutator
absl::Status SetSocketMutator(FileDescriptor fd, grpc_fd_usage usage,
grpc_socket_mutator* mutator);

// Extracts the first socket mutator from config if any and applies on the fd.
absl::Status ApplySocketMutatorInOptions(FileDescriptor fd, grpc_fd_usage usage,
const PosixTcpOptions& options);

} // namespace experimental
} // namespace grpc_event_engine

Expand Down
20 changes: 8 additions & 12 deletions test/core/event_engine/posix/tcp_posix_socket_utils_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,29 +120,25 @@ TEST(TcpPosixSocketUtilsTest, SocketMutatorTest) {

mutator.option_value = IPTOS_LOWDELAY;
EXPECT_TRUE(
posix_sock
.SetSocketMutator(GRPC_FD_CLIENT_CONNECTION_USAGE,
reinterpret_cast<grpc_socket_mutator*>(&mutator))
SetSocketMutator(posix_sock.Fd(), GRPC_FD_CLIENT_CONNECTION_USAGE,
reinterpret_cast<grpc_socket_mutator*>(&mutator))
.ok());
mutator.option_value = IPTOS_THROUGHPUT;
EXPECT_TRUE(
posix_sock
.SetSocketMutator(GRPC_FD_CLIENT_CONNECTION_USAGE,
reinterpret_cast<grpc_socket_mutator*>(&mutator))
SetSocketMutator(posix_sock.Fd(), GRPC_FD_CLIENT_CONNECTION_USAGE,
reinterpret_cast<grpc_socket_mutator*>(&mutator))
.ok());

mutator.option_value = IPTOS_RELIABILITY;
EXPECT_TRUE(
posix_sock
.SetSocketMutator(GRPC_FD_CLIENT_CONNECTION_USAGE,
reinterpret_cast<grpc_socket_mutator*>(&mutator))
SetSocketMutator(posix_sock.Fd(), GRPC_FD_CLIENT_CONNECTION_USAGE,
reinterpret_cast<grpc_socket_mutator*>(&mutator))
.ok());

mutator.option_value = -1;
EXPECT_FALSE(
posix_sock
.SetSocketMutator(GRPC_FD_CLIENT_CONNECTION_USAGE,
reinterpret_cast<grpc_socket_mutator*>(&mutator))
SetSocketMutator(posix_sock.Fd(), GRPC_FD_CLIENT_CONNECTION_USAGE,
reinterpret_cast<grpc_socket_mutator*>(&mutator))
.ok());
system_api.Close(sock);
};
Expand Down

0 comments on commit 82d62cb

Please sign in to comment.