From 82d62cb9f7d380d2eba5451854ff92e3be17181d Mon Sep 17 00:00:00 2001 From: Eugene Ostroukhov Date: Wed, 13 Nov 2024 13:25:42 -0800 Subject: [PATCH] fixup: Move system_api usage from tcp_socket_utils --- .../lib/event_engine/extensions/system_api.h | 7 +++ .../posix_engine/posix_endpoint.cc | 10 +-- .../posix_engine/posix_endpoint.h | 1 - .../posix_engine/posix_engine_listener.cc | 11 ++-- .../posix_engine_listener_utils.cc | 7 ++- .../posix_engine/posix_system_api.cc | 30 +++++++++ .../posix_engine/posix_system_api.h | 2 + .../posix_engine/tcp_socket_utils.cc | 62 +++---------------- .../posix_engine/tcp_socket_utils.h | 26 +++----- .../posix/tcp_posix_socket_utils_test.cc | 20 +++--- 10 files changed, 79 insertions(+), 97 deletions(-) diff --git a/src/core/lib/event_engine/extensions/system_api.h b/src/core/lib/event_engine/extensions/system_api.h index 1f1792256b214..d04747a8444dd 100644 --- a/src/core/lib/event_engine/extensions/system_api.h +++ b/src/core/lib/event_engine/extensions/system_api.h @@ -18,6 +18,8 @@ #include #include +#include "absl/status/status.h" + namespace grpc_event_engine { namespace experimental { @@ -60,6 +62,11 @@ class SystemApi { int flags) const = 0; virtual int SetSockOpt(FileDescriptor fd, int level, int optname, const void* optval, socklen_t optlen) const = 0; + + // Tries to set SO_NOSIGPIPE if available on this platform. + // If SO_NO_SIGPIPE is not available, returns not OK status. + virtual absl::Status SetSocketNoSigpipeIfPossible( + FileDescriptor fd) const = 0; }; } // namespace experimental diff --git a/src/core/lib/event_engine/posix_engine/posix_endpoint.cc b/src/core/lib/event_engine/posix_engine/posix_endpoint.cc index 568eabdb02b1a..24180535f25bd 100644 --- a/src/core/lib/event_engine/posix_engine/posix_endpoint.cc +++ b/src/core/lib/event_engine/posix_engine/posix_endpoint.cc @@ -502,11 +502,12 @@ void PosixEndpointImpl::UpdateRcvLowat() { if (set_rcvlowat_ == remaining) { return; } - auto result = sock_.SetSocketRcvLowat(*get_system_api(), remaining); - if (result.ok()) { - set_rcvlowat_ = *result; + int result = get_system_api()->SetSockOpt(fd_, SOL_SOCKET, SO_RCVLOWAT, + &remaining, sizeof(remaining)); + if (result == 0) { + set_rcvlowat_ = result; } else { - LOG(ERROR) << "ERROR in SO_RCVLOWAT: " << result.status().message(); + PLOG(ERROR) << "ERROR in SO_RCVLOWAT"; } } @@ -1264,7 +1265,6 @@ PosixEndpointImpl::PosixEndpointImpl(EventHandle* handle, engine_(engine) { fd_ = handle_->Poller()->GetSystemApi()->AdoptExternalFd(handle_->WrappedFd()); - sock_ = PosixSocketWrapper(fd_); PosixSocketWrapper sock(fd_); CHECK(options.resource_quota != nullptr); auto peer_addr_string = sock.PeerAddressString(*get_system_api()); diff --git a/src/core/lib/event_engine/posix_engine/posix_endpoint.h b/src/core/lib/event_engine/posix_engine/posix_endpoint.h index f0786926f6971..1a50b79b9656b 100644 --- a/src/core/lib/event_engine/posix_engine/posix_endpoint.h +++ b/src/core/lib/event_engine/posix_engine/posix_endpoint.h @@ -532,7 +532,6 @@ class PosixEndpointImpl : public grpc_core::RefCounted { struct cmsghdr* ProcessTimestamp(msghdr* msg, struct cmsghdr* cmsg); #endif // GRPC_LINUX_ERRQUEUE grpc_core::Mutex read_mu_; - PosixSocketWrapper sock_; FileDescriptor fd_; bool is_first_read_ = true; bool has_posted_reclaimer_ ABSL_GUARDED_BY(read_mu_) = false; diff --git a/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc b/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc index 6c576d51fc84e..f207f0e7531f3 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc +++ b/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc @@ -202,10 +202,11 @@ void PosixEngineListenerImpl::AsyncConnectionAcceptor::NotifyOnAccept( addr = EventEngine::ResolvedAddress(addr.address(), len); } SystemApi* system_api = handle_->Poller()->GetSystemApi(); - PosixSocketWrapper sock(system_api->AdoptExternalFd(fd)); - (void)sock.SetSocketNoSigpipeIfPossible(*system_api); - auto result = sock.ApplySocketMutatorInOptions( - GRPC_FD_SERVER_CONNECTION_USAGE, listener_->options_); + (void)system_api->SetSocketNoSigpipeIfPossible( + system_api->AdoptExternalFd(fd)); + auto result = ApplySocketMutatorInOptions(system_api->AdoptExternalFd(fd), + GRPC_FD_SERVER_CONNECTION_USAGE, + listener_->options_); if (!result.ok()) { LOG(ERROR) << "Closing acceptor. Failed to apply socket mutator: " << result; @@ -262,7 +263,7 @@ absl::Status PosixEngineListenerImpl::HandleExternalConnection( } SystemApi* system_api = poller_->GetSystemApi(); PosixSocketWrapper sock(system_api->AdoptExternalFd(fd)); - (void)sock.SetSocketNoSigpipeIfPossible(*system_api); + (void)system_api->SetSocketNoSigpipeIfPossible(sock.Fd()); auto peer_name = sock.PeerAddressString(*system_api); if (!peer_name.ok()) { return absl::UnknownError( diff --git a/src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc b/src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc index 85cd51d3a96e1..7a39b48209e38 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc +++ b/src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc @@ -169,9 +169,10 @@ absl::Status PrepareSocket(const SystemApi& system_api, GRPC_RETURN_IF_ERROR(socket.sock.SetSocketDscp(system_api, options.dscp)); socket.sock.TrySetSocketTcpUserTimeout(system_api, options, false); } - GRPC_RETURN_IF_ERROR(socket.sock.SetSocketNoSigpipeIfPossible(system_api)); - GRPC_RETURN_IF_ERROR(socket.sock.ApplySocketMutatorInOptions( - GRPC_FD_SERVER_LISTENER_USAGE, options)); + GRPC_RETURN_IF_ERROR( + system_api.SetSocketNoSigpipeIfPossible(socket.sock.Fd())); + GRPC_RETURN_IF_ERROR(ApplySocketMutatorInOptions( + socket.sock.Fd(), GRPC_FD_SERVER_LISTENER_USAGE, options)); if (system_api.Bind(fd, socket.addr.address(), socket.addr.size()) < 0) { auto sockaddr_str = ResolvedAddressToString(socket.addr); diff --git a/src/core/lib/event_engine/posix_engine/posix_system_api.cc b/src/core/lib/event_engine/posix_engine/posix_system_api.cc index d244dd1c45dd7..9ef5a2a94c061 100644 --- a/src/core/lib/event_engine/posix_engine/posix_system_api.cc +++ b/src/core/lib/event_engine/posix_engine/posix_system_api.cc @@ -78,6 +78,32 @@ int PosixSystemApi::SetSockOpt(FileDescriptor fd, int level, int optname, return setsockopt(fd.fd(), level, optname, optval, optlen); } +absl::Status PosixSystemApi::SetSocketNoSigpipeIfPossible( + FileDescriptor fd) const { +#ifdef GRPC_HAVE_SO_NOSIGPIPE + int val = 1; + int newval; + socklen_t intlen = sizeof(newval); + if (0 != SetSockOpt(fd_, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val))) { + return absl::Status( + absl::StatusCode::kInternal, + absl::StrCat("setsockopt(SO_NOSIGPIPE): ", grpc_core::StrError(errno))); + } + if (0 != GetSockOpt(fd_, SOL_SOCKET, SO_NOSIGPIPE, &newval, &intlen)) { + return absl::Status( + absl::StatusCode::kInternal, + absl::StrCat("getsockopt(SO_NOSIGPIPE): ", grpc_core::StrError(errno))); + } + if ((newval != 0) != (val != 0)) { + return absl::Status(absl::StatusCode::kInternal, + "Failed to set SO_NOSIGPIPE"); + } +#else + (void)fd; // Makes the compiler error go away +#endif + return absl::OkStatus(); +} + } // namespace experimental } // namespace grpc_event_engine @@ -144,6 +170,10 @@ int PosixSystemApi::SetSockOpt(FileDescriptor fd, int level, int optname, grpc_core::Crash("unimplemented"); } +absl::Status PosixSystemApi::SetSocketNoSigpipeIfPossible(FileDescriptor fd) { + grpc_core::Crash("unimplemented"); +} + } // namespace experimental } // namespace grpc_event_engine diff --git a/src/core/lib/event_engine/posix_engine/posix_system_api.h b/src/core/lib/event_engine/posix_engine/posix_system_api.h index 2be7dc2c60deb..69dd9730bddb4 100644 --- a/src/core/lib/event_engine/posix_engine/posix_system_api.h +++ b/src/core/lib/event_engine/posix_engine/posix_system_api.h @@ -41,6 +41,8 @@ class PosixSystemApi : public SystemApi { int flags) const override; int SetSockOpt(FileDescriptor fd, int level, int optname, const void* optval, socklen_t optlen) const override; + + absl::Status SetSocketNoSigpipeIfPossible(FileDescriptor fd) const override; }; } // namespace experimental diff --git a/src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc b/src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc index 60119f184e468..405a94a9d1e9d 100644 --- a/src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc +++ b/src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc @@ -48,7 +48,6 @@ #include "absl/log/check.h" #include "absl/log/log.h" #include "absl/status/status.h" -#include "src/core/lib/event_engine/extensions/supports_fd.h" #include "src/core/lib/event_engine/tcp_socket_utils.h" #include "src/core/util/status_helper.h" #include "src/core/util/strerror.h" @@ -144,9 +143,9 @@ absl::Status PrepareTcpClientSocket(const SystemApi& system_api, GRPC_RETURN_IF_ERROR(sock.SetSocketDscp(system_api, options.dscp)); sock.TrySetSocketTcpUserTimeout(system_api, options, true); } - GRPC_RETURN_IF_ERROR(sock.SetSocketNoSigpipeIfPossible(system_api)); - GRPC_RETURN_IF_ERROR(sock.ApplySocketMutatorInOptions( - GRPC_FD_CLIENT_CONNECTION_USAGE, options)); + GRPC_RETURN_IF_ERROR(system_api.SetSocketNoSigpipeIfPossible(sock.Fd())); + GRPC_RETURN_IF_ERROR(ApplySocketMutatorInOptions( + sock.Fd(), GRPC_FD_CLIENT_CONNECTION_USAGE, options)); // No errors. Set close_fd to false to ensure the socket is not closed. close_fd = false; return absl::OkStatus(); @@ -311,21 +310,6 @@ void UnlinkIfUnixDomainSocket( #endif } -// Instruct the kernel to wait for specified number of bytes to be received on -// the socket before generating an interrupt for packet receive. If the call -// succeeds, it returns the number of bytes (wait threshold) that was actually -// set. -absl::StatusOr PosixSocketWrapper::SetSocketRcvLowat( - const SystemApi& system_api, int bytes) { - if (system_api.SetSockOpt(fd_, SOL_SOCKET, SO_RCVLOWAT, &bytes, - sizeof(bytes)) != 0) { - return absl::Status( - absl::StatusCode::kInternal, - absl::StrCat("setsockopt(SO_RCVLOWAT): ", grpc_core::StrError(errno))); - } - return bytes; -} - // Set a socket to use zerocopy absl::Status PosixSocketWrapper::SetSocketZeroCopy( const SystemApi& system_api) { @@ -366,34 +350,6 @@ absl::Status PosixSocketWrapper::SetSocketNonBlocking( return absl::OkStatus(); } -absl::Status PosixSocketWrapper::SetSocketNoSigpipeIfPossible( - const SystemApi& system_api) { -#ifdef GRPC_HAVE_SO_NOSIGPIPE - int val = 1; - int newval; - socklen_t intlen = sizeof(newval); - if (0 != - system_api.SetSockOpt(fd_, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val))) { - return absl::Status( - absl::StatusCode::kInternal, - absl::StrCat("setsockopt(SO_NOSIGPIPE): ", grpc_core::StrError(errno))); - } - if (0 != - system_api.GetSockOpt(fd_, SOL_SOCKET, SO_NOSIGPIPE, &newval, &intlen)) { - return absl::Status( - absl::StatusCode::kInternal, - absl::StrCat("getsockopt(SO_NOSIGPIPE): ", grpc_core::StrError(errno))); - } - if ((newval != 0) != (val != 0)) { - return absl::Status(absl::StatusCode::kInternal, - "Failed to set SO_NOSIGPIPE"); - } -#else - (void)system_api; // Makes the compiler error go away -#endif - return absl::OkStatus(); -} - absl::Status PosixSocketWrapper::SetSocketIpPktInfoIfPossible( const SystemApi& system_api) { #ifdef GRPC_HAVE_IP_PKTINFO @@ -705,22 +661,22 @@ void PosixSocketWrapper::TrySetSocketTcpUserTimeout( } // Set a socket using a grpc_socket_mutator -absl::Status PosixSocketWrapper::SetSocketMutator( - grpc_fd_usage usage, grpc_socket_mutator* mutator) { +absl::Status SetSocketMutator(FileDescriptor fd, grpc_fd_usage usage, + grpc_socket_mutator* mutator) { CHECK(mutator); - if (!grpc_socket_mutator_mutate_fd(mutator, fd_.fd(), usage)) { + if (!grpc_socket_mutator_mutate_fd(mutator, fd.fd(), usage)) { return absl::Status(absl::StatusCode::kInternal, "grpc_socket_mutator failed."); } return absl::OkStatus(); } -absl::Status PosixSocketWrapper::ApplySocketMutatorInOptions( - grpc_fd_usage usage, const PosixTcpOptions& options) { +absl::Status ApplySocketMutatorInOptions(FileDescriptor fd, grpc_fd_usage usage, + const PosixTcpOptions& options) { if (options.socket_mutator == nullptr) { return absl::OkStatus(); } - return SetSocketMutator(usage, options.socket_mutator); + return SetSocketMutator(fd, usage, options.socket_mutator); } bool PosixSocketWrapper::IsIpv6LoopbackAvailable() { diff --git a/src/core/lib/event_engine/posix_engine/tcp_socket_utils.h b/src/core/lib/event_engine/posix_engine/tcp_socket_utils.h index 106d44d035e97..955d585ed3349 100644 --- a/src/core/lib/event_engine/posix_engine/tcp_socket_utils.h +++ b/src/core/lib/event_engine/posix_engine/tcp_socket_utils.h @@ -169,12 +169,6 @@ class PosixSocketWrapper { ~PosixSocketWrapper() = default; - // Instruct the kernel to wait for specified number of bytes to be received on - // the socket before generating an interrupt for packet receive. If the call - // succeeds, it returns the number of bytes (wait threshold) that was actually - // set. - absl::StatusOr SetSocketRcvLowat(const SystemApi& system_api, int bytes); - // Set socket to use zerocopy absl::Status SetSocketZeroCopy(const SystemApi& system_api); @@ -203,10 +197,6 @@ class PosixSocketWrapper { const PosixTcpOptions& options, bool is_client); - // Tries to set SO_NOSIGPIPE if available on this platform. - // If SO_NO_SIGPIPE is not available, returns not OK status. - absl::Status SetSocketNoSigpipeIfPossible(const SystemApi& system_api); - // Tries to set IP_PKTINFO if available on this platform. If IP_PKTINFO is not // available, returns not OK status. absl::Status SetSocketIpPktInfoIfPossible(const SystemApi& system_api); @@ -223,14 +213,6 @@ class PosixSocketWrapper { absl::Status SetSocketRcvBuf(const SystemApi& system_api, int buffer_size_bytes); - // Tries to set the socket using a grpc_socket_mutator - absl::Status SetSocketMutator(grpc_fd_usage usage, - grpc_socket_mutator* mutator); - - // Extracts the first socket mutator from config if any and applies on the fd. - absl::Status ApplySocketMutatorInOptions(grpc_fd_usage usage, - const PosixTcpOptions& options); - // Return LocalAddress as EventEngine::ResolvedAddress absl::StatusOr LocalAddress( const SystemApi& system_api); @@ -335,6 +317,14 @@ struct PosixSocketWrapper::PosixSocketCreateResult { bool SetSocketDualStack(int fd); +// Tries to set the socket using a grpc_socket_mutator +absl::Status SetSocketMutator(FileDescriptor fd, grpc_fd_usage usage, + grpc_socket_mutator* mutator); + +// Extracts the first socket mutator from config if any and applies on the fd. +absl::Status ApplySocketMutatorInOptions(FileDescriptor fd, grpc_fd_usage usage, + const PosixTcpOptions& options); + } // namespace experimental } // namespace grpc_event_engine diff --git a/test/core/event_engine/posix/tcp_posix_socket_utils_test.cc b/test/core/event_engine/posix/tcp_posix_socket_utils_test.cc index 0cdc6b8439546..7e7f8d26e4a0a 100644 --- a/test/core/event_engine/posix/tcp_posix_socket_utils_test.cc +++ b/test/core/event_engine/posix/tcp_posix_socket_utils_test.cc @@ -120,29 +120,25 @@ TEST(TcpPosixSocketUtilsTest, SocketMutatorTest) { mutator.option_value = IPTOS_LOWDELAY; EXPECT_TRUE( - posix_sock - .SetSocketMutator(GRPC_FD_CLIENT_CONNECTION_USAGE, - reinterpret_cast(&mutator)) + SetSocketMutator(posix_sock.Fd(), GRPC_FD_CLIENT_CONNECTION_USAGE, + reinterpret_cast(&mutator)) .ok()); mutator.option_value = IPTOS_THROUGHPUT; EXPECT_TRUE( - posix_sock - .SetSocketMutator(GRPC_FD_CLIENT_CONNECTION_USAGE, - reinterpret_cast(&mutator)) + SetSocketMutator(posix_sock.Fd(), GRPC_FD_CLIENT_CONNECTION_USAGE, + reinterpret_cast(&mutator)) .ok()); mutator.option_value = IPTOS_RELIABILITY; EXPECT_TRUE( - posix_sock - .SetSocketMutator(GRPC_FD_CLIENT_CONNECTION_USAGE, - reinterpret_cast(&mutator)) + SetSocketMutator(posix_sock.Fd(), GRPC_FD_CLIENT_CONNECTION_USAGE, + reinterpret_cast(&mutator)) .ok()); mutator.option_value = -1; EXPECT_FALSE( - posix_sock - .SetSocketMutator(GRPC_FD_CLIENT_CONNECTION_USAGE, - reinterpret_cast(&mutator)) + SetSocketMutator(posix_sock.Fd(), GRPC_FD_CLIENT_CONNECTION_USAGE, + reinterpret_cast(&mutator)) .ok()); system_api.Close(sock); };