Skip to content

Commit

Permalink
[eventengine] Wrap more FDs
Browse files Browse the repository at this point in the history
  • Loading branch information
eugeneo committed Nov 11, 2024
1 parent f90e2c8 commit 54b227f
Show file tree
Hide file tree
Showing 35 changed files with 425 additions and 241 deletions.
52 changes: 52 additions & 0 deletions include/grpc/event_engine/event_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#ifndef GRPC_EVENT_ENGINE_EVENT_ENGINE_H
#define GRPC_EVENT_ENGINE_EVENT_ENGINE_H

#include <fcntl.h>
#include <grpc/event_engine/endpoint_config.h>
#include <grpc/event_engine/extensible.h>
#include <grpc/event_engine/memory_allocator.h>
Expand All @@ -23,6 +24,10 @@

#include <vector>

#ifdef GRPC_LINUX_EPOLL
#include <sys/epoll.h>
#endif

#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
Expand Down Expand Up @@ -162,6 +167,53 @@ class EventEngine : public std::enable_shared_from_this<EventEngine>,
socklen_t size_ = 0;
};

class FileDescriptor {
public:
static FileDescriptor MakeEventFd(int initval, int flags);
// return absl::Status(absl::StatusCode::kInternal,
// absl::StrCat("pipe: ", grpc_core::StrError(errno)));
static FileDescriptor FromIomgr(int fd);
static FileDescriptor FromAresSocket(int ares_socket);
static int socketpair(int domain, int type, int protocol,
absl::Span<FileDescriptor> sv);

FileDescriptor() = default;
FileDescriptor(const FileDescriptor& other) = default;

// Id is based on fd but is not fd. IO will fail.
int id() const { return ~fd_; }
// int fd() const { return fd_; }
bool ready() const { return fd_ > 0; }
void close() const;
bool epoll_ctl(int op, EventEngine::FileDescriptor epfd, void* event) const;
int getsockopt(int level, int optname, void* optval,
socklen_t* optlen) const;
int setsockopt(int level, int optname, const void* optval,
socklen_t optlen) const;
int ioctl(int op, void* arg);
int fcntl(int op, int args);
void invalidate() { fd_ = -1; }
int getsockname(struct sockaddr* addr, socklen_t* addrlen);
int getpeername(struct sockaddr* addr, socklen_t* addrlen);
ssize_t read(void* buf, size_t nbyte);
#ifdef GRPC_LINUX_EPOLL
int epoll_ctl(int op, int fd, struct epoll_event* event);
#endif
EventEngine::FileDescriptor accept(struct sockaddr* address,
socklen_t* address_len);
ssize_t write(const void* buf, size_t nbyte);
int connect(const struct sockaddr* addr, socklen_t addrlen);

bool grpc_socket_mutator_mutate_fd(void* mutator, int usage);
int file_descriptor_for_polling() const;
int file_descriptor_for_iomgr() const;

private:
explicit FileDescriptor(int fd) : fd_(fd) {}

int fd_ = -1;
};

/// One end of a connection between a gRPC client and server. Endpoints are
/// created when connections are established, and Endpoint operations are
/// gRPC's primary means of communication.
Expand Down
12 changes: 11 additions & 1 deletion src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2105,6 +2105,7 @@ grpc_cc_library(
"absl/strings",
],
deps = [
"event_engine_extensions",
"event_engine_poller",
"forkable",
"posix_event_engine_closure",
Expand Down Expand Up @@ -2156,7 +2157,11 @@ grpc_cc_library(
"lib/event_engine/posix_engine/wakeup_fd_posix.h",
],
external_deps = ["absl/status"],
deps = ["//:gpr_platform"],
deps = [
"//:event_engine_base_hdrs",
"//:gpr_platform",
"event_engine_extensions"
],
)

grpc_cc_library(
Expand Down Expand Up @@ -2349,8 +2354,10 @@ grpc_cc_library(
"absl/log:log",
],
deps = [
"event_engine_extensions",
"iomgr_port",
"strerror",
"//:event_engine_base_hdrs",
"//:gpr",
],
)
Expand All @@ -2370,8 +2377,10 @@ grpc_cc_library(
"absl/types:optional",
],
deps = [
"event_engine_extensions",
"iomgr_port",
"posix_event_engine_internal_errqueue",
"//:event_engine_base_hdrs",
"//:gpr",
],
)
Expand Down Expand Up @@ -2494,6 +2503,7 @@ grpc_cc_library(
"absl/strings",
],
deps = [
"event_engine_extensions",
"event_engine_tcp_socket_utils",
"iomgr_port",
"posix_event_engine_tcp_socket_utils",
Expand Down
21 changes: 21 additions & 0 deletions src/core/lib/event_engine/extensions/system_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
#include <grpc/event_engine/event_engine.h>
#include <grpc/support/port_platform.h>

#ifdef GRPC_LINUX_EPOLL
#include <sys/epoll.h>
#endif

namespace grpc_event_engine {
namespace experimental {

Expand All @@ -34,18 +38,31 @@ class FileDescriptor {
int fd_;
};

#ifdef GRPC_LINUX_EPOLL
using EpollEvent = epoll_event;
#else
struct EpollEvent {};
#endif

class SystemApi {
public:
virtual ~SystemApi() = default;

// Factories
virtual FileDescriptor AdoptExternalFd(int fd) const = 0;
virtual FileDescriptor EventFd(unsigned int initval, int flags) const = 0;
virtual FileDescriptor Socket(int domain, int type, int protocol) const = 0;

// Functions operating on file descriptors
virtual int Bind(FileDescriptor fd, const struct sockaddr* addr,
socklen_t addrlen) const = 0;
virtual void Close(FileDescriptor fd) const = 0;
virtual int EpollWait(FileDescriptor epfd, EpollEvent* events, int maxevents,
int timeout) const = 0;
virtual int EpollCtl(FileDescriptor epfd, int op, FileDescriptor fd,
EpollEvent* event) const = 0;
virtual int EventfdRead(FileDescriptor fd) const = 0;
virtual int EventfdWrite(FileDescriptor fd, uint64_t counter) const = 0;
virtual int Fcntl(FileDescriptor fd, int op, int args) const = 0;
virtual int GetSockOpt(FileDescriptor fd, int level, int optname,
void* optval, socklen_t* optlen) const = 0;
Expand All @@ -54,12 +71,16 @@ class SystemApi {
virtual int GetPeerName(FileDescriptor fd, struct sockaddr* addr,
socklen_t* addrlen) const = 0;
virtual int Listen(FileDescriptor fd, int backlog) const = 0;
virtual long Read(FileDescriptor fd, void* buf, size_t nbyte) const = 0;
virtual long RecvMsg(FileDescriptor fd, struct msghdr* msg,
int flags) const = 0;
virtual long SendMsg(FileDescriptor fd, const struct msghdr* message,
int flags) const = 0;
virtual int SetSockOpt(FileDescriptor fd, int level, int optname,
const void* optval, socklen_t optlen) const = 0;
virtual void Shutdown(FileDescriptor fd, int how) const = 0;
virtual long Write(FileDescriptor fd, const void* buf,
size_t nbyte) const = 0;
};

} // namespace experimental
Expand Down
3 changes: 2 additions & 1 deletion src/core/lib/event_engine/grpc_polled_fd.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ class GrpcPolledFdFactory {
virtual std::unique_ptr<GrpcPolledFd> NewGrpcPolledFdLocked(
ares_socket_t as) = 0;
// Optionally configures the ares channel after creation
virtual void ConfigureAresChannelLocked(ares_channel channel) = 0;
virtual void ConfigureAresChannelLocked(const SystemApi& api,
ares_channel channel) = 0;
};

} // namespace experimental
Expand Down
Loading

0 comments on commit 54b227f

Please sign in to comment.