Skip to content

Commit 017b73d

Browse files
committed
container: Add setEgressTcp capnp capability and setEgress
This is part of the workstream to give a container access to a WorkerEntrypoint. How it works is that subrequest channels can implement a writeServiceDesignator function that they can fill with metadata appropiate to connect to the worker on the same machine. On workerd, this should just be a name and an entrypoint.
1 parent ccc4937 commit 017b73d

File tree

10 files changed

+63
-1
lines changed

10 files changed

+63
-1
lines changed

src/workerd/api/container.c++

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,15 @@ Container::Container(rpc::Container::Client rpcClient, bool running)
1717
: rpcClient(IoContext::current().addObject(kj::heap(kj::mv(rpcClient)))),
1818
running(running) {}
1919

20+
void Container::setEgress(jsg::Lock& js, kj::String addr, jsg::Ref<api::Fetcher> binding) {
21+
auto subrequestChannel = binding->getSubrequestChannel(IoContext::current());
22+
subrequestChannel->requireAllowsTransfer();
23+
auto request = rpcClient->setEgressTcpRequest();
24+
auto serviceDesignator = request.initService();
25+
subrequestChannel->writeServiceDesignator(serviceDesignator);
26+
IoContext::current().addTask(request.sendIgnoringResult());
27+
}
28+
2029
void Container::start(jsg::Lock& js, jsg::Optional<StartupOptions> maybeOptions) {
2130
auto flags = FeatureFlags::get(js);
2231
JSG_REQUIRE(!running, Error, "start() cannot be called on a container that is already running.");

src/workerd/api/container.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ class Container: public jsg::Object {
6363
jsg::Ref<Fetcher> getTcpPort(jsg::Lock& js, int port);
6464
jsg::Promise<void> setInactivityTimeout(jsg::Lock& js, int64_t durationMs);
6565

66+
void setEgress(jsg::Lock& js, kj::String addr, jsg::Ref<api::Fetcher> binding);
67+
6668
// TODO(containers): listenTcp()
6769

6870
JSG_RESOURCE_TYPE(Container, CompatibilityFlags::Reader flags) {
@@ -73,6 +75,9 @@ class Container: public jsg::Object {
7375
JSG_METHOD(signal);
7476
JSG_METHOD(getTcpPort);
7577
JSG_METHOD(setInactivityTimeout);
78+
if (flags.getWorkerdExperimental()) {
79+
JSG_METHOD(setEgress);
80+
}
7681
}
7782

7883
void visitForMemoryInfo(jsg::MemoryTracker& tracker) const {

src/workerd/io/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,7 @@ wd_capnp_library(src = "frankenvalue.capnp")
375375
wd_capnp_library(
376376
src = "container.capnp",
377377
deps = [
378+
":worker-interface_capnp",
378379
"@capnp-cpp//src/capnp/compat:byte-stream_capnp",
379380
],
380381
)

src/workerd/io/container.capnp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ $Cxx.namespace("workerd::rpc");
55
$Cxx.allowCancellation;
66

77
using import "/capnp/compat/byte-stream.capnp".ByteStream;
8+
using import "/workerd/io/worker-interface.capnp".ServiceDesignator;
89

910
interface Container @0x9aaceefc06523bca {
1011
# RPC interface to talk to a container, for containers attached to Durable Objects.
@@ -100,7 +101,7 @@ interface Container @0x9aaceefc06523bca {
100101
# attempting to connect.
101102
}
102103

103-
setInactivityTimeout @7 (durationMs :Int64);
104+
setInactivityTimeout @7 (durationMs :Int64);
104105
# Configures the duration where the runtime should shutdown the container after there is
105106
# no connections or activity to the Container.
106107
#
@@ -110,4 +111,8 @@ interface Container @0x9aaceefc06523bca {
110111
# Note that if there is an open connection to the container, the runtime must not shutdown the container.
111112
# If there is no activity timeout duration configured and no container connection, it's up to the runtime
112113
# to decide when to signal the container to exit.
114+
115+
setEgressTcp @8 (addr: Text, service: ServiceDesignator);
116+
# setEgressTcp will configure the container to send traffic to this
117+
# service
113118
}

src/workerd/io/io-channels.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,10 @@ class IoChannelFactory {
157157
// the returned WorkerInterface.
158158
virtual kj::Own<WorkerInterface> startRequest(SubrequestMetadata metadata) = 0;
159159

160+
virtual void writeServiceDesignator(rpc::ServiceDesignator::Builder builder) {
161+
JSG_FAIL_REQUIRE(Error, "You can't transfer to a service across an IO boundary");
162+
}
163+
160164
kj::Own<CapTableEntry> clone() override final {
161165
return kj::addRef(*this);
162166
}

src/workerd/io/worker-interface.capnp

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,35 @@ using import "/workerd/io/script-version.capnp".ScriptVersion;
1717
using import "/workerd/io/trace.capnp".TagValue;
1818
using import "/workerd/io/trace.capnp".UserSpanData;
1919

20+
struct ServiceDesignator {
21+
# ServiceDesignator serves as a way to transfer binding information to external services
22+
23+
name @0 :Text;
24+
# The service name.
25+
26+
entrypoint @1 :Text;
27+
# A modules-syntax Worker can export multiple named entrypoints. `export default {` specifies
28+
# the default entrypoint, whereas `export let foo = {` defines an entrypoint named `foo`. If
29+
# `entrypoint` is specified here, it names an alternate entrypoint to use on the target worker,
30+
# otherwise the default is used.
31+
32+
props :union {
33+
# Value to provide in `ctx.props` in the target worker
34+
# when invoked by this service.
35+
36+
empty @2 :Void;
37+
# Empty object. (This is the default.)
38+
39+
json @3 :Text;
40+
# A JSON-encoded value.
41+
}
42+
43+
metadata @4: Text;
44+
# A JSON-encoded value that carries information of the
45+
# service in how-to communicate back.
46+
# In workerd, this can be empty.
47+
}
48+
2049
# A 128-bit trace ID used to identify traces.
2150
struct TraceId {
2251
high @0 :UInt64;

src/workerd/server/container-client.c++

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,12 @@ kj::Promise<void> ContainerClient::setInactivityTimeout(SetInactivityTimeoutCont
431431
co_return;
432432
}
433433

434+
kj::Promise<void> ContainerClient::setEgressTcp(SetEgressTcpContext context) {
435+
// TODO: Implement setEgress
436+
KJ_UNIMPLEMENTED(
437+
"setEgress is not implemented in local development as it's a experimental feature");
438+
}
439+
434440
kj::Promise<void> ContainerClient::getTcpPort(GetTcpPortContext context) {
435441
const auto params = context.getParams();
436442
uint16_t port = params.getPort();

src/workerd/server/container-client.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class ContainerClient final: public rpc::Container::Server {
4040
kj::Promise<void> getTcpPort(GetTcpPortContext context) override;
4141
kj::Promise<void> listenTcp(ListenTcpContext context) override;
4242
kj::Promise<void> setInactivityTimeout(SetInactivityTimeoutContext context) override;
43+
kj::Promise<void> setEgressTcp(SetEgressTcpContext context) override;
4344

4445
private:
4546
capnp::ByteStreamFactory& byteStreamFactory;

types/generated-snapshot/experimental/index.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3827,6 +3827,7 @@ interface Container {
38273827
signal(signo: number): void;
38283828
getTcpPort(port: number): Fetcher;
38293829
setInactivityTimeout(durationMs: number | bigint): Promise<void>;
3830+
setEgress(addr: string, binding: Fetcher): void;
38303831
}
38313832
interface ContainerStartupOptions {
38323833
entrypoint?: string[];

types/generated-snapshot/experimental/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3838,6 +3838,7 @@ export interface Container {
38383838
signal(signo: number): void;
38393839
getTcpPort(port: number): Fetcher;
38403840
setInactivityTimeout(durationMs: number | bigint): Promise<void>;
3841+
setEgress(addr: string, binding: Fetcher): void;
38413842
}
38423843
export interface ContainerStartupOptions {
38433844
entrypoint?: string[];

0 commit comments

Comments
 (0)