Skip to content

Commit b014e45

Browse files
committed
usage of session proxy inside orchestrator
1 parent 3d3514f commit b014e45

File tree

7 files changed

+58
-6
lines changed

7 files changed

+58
-6
lines changed

packages/orchestrator/cmd/mock-sandbox/mock.go

+3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"flag"
66
"fmt"
7+
"github.com/e2b-dev/infra/packages/orchestrator/internal/proxy"
78
"log"
89
"os"
910
"os/signal"
@@ -104,6 +105,7 @@ func mockSandbox(
104105
buildId,
105106
sandboxId string,
106107
dns *dns.DNS,
108+
proxy *proxy.SessionProxy,
107109
keepAlive time.Duration,
108110
networkPool *network.Pool,
109111
templateCache *template.Cache,
@@ -128,6 +130,7 @@ func mockSandbox(
128130
childCtx,
129131
tracer,
130132
dns,
133+
proxy,
131134
networkPool,
132135
templateCache,
133136
&orchestrator.SandboxConfig{

packages/orchestrator/cmd/mock-snapshot/mock.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"flag"
66
"fmt"
7+
"github.com/e2b-dev/infra/packages/orchestrator/internal/proxy"
78
"log"
89
"os"
910
"os/signal"
@@ -113,6 +114,7 @@ func mockSnapshot(
113114
buildId,
114115
sandboxId string,
115116
dns *dns.DNS,
117+
proxy *proxy.SessionProxy,
116118
keepAlive time.Duration,
117119
networkPool *network.Pool,
118120
templateCache *template.Cache,
@@ -137,6 +139,7 @@ func mockSnapshot(
137139
childCtx,
138140
tracer,
139141
dns,
142+
proxy,
140143
networkPool,
141144
templateCache,
142145
&orchestrator.SandboxConfig{
@@ -234,14 +237,15 @@ func mockSnapshot(
234237
return fmt.Errorf("failed to add snapshot to template cache: %w", err)
235238
}
236239

237-
fmt.Println("Add snapshot to template cache time: ", time.Since(snapshotTime).Milliseconds())
240+
fmt.Println("AddSandbox snapshot to template cache time: ", time.Since(snapshotTime).Milliseconds())
238241

239242
start = time.Now()
240243

241244
sbx, cleanup2, err := sandbox.NewSandbox(
242245
childCtx,
243246
tracer,
244247
dns,
248+
proxy,
245249
networkPool,
246250
templateCache,
247251
&orchestrator.SandboxConfig{

packages/orchestrator/internal/sandbox/sandbox_linux.go

+4
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"errors"
99
"fmt"
10+
"github.com/e2b-dev/infra/packages/orchestrator/internal/proxy"
1011
"net/http"
1112
"os"
1213
"sync/atomic"
@@ -86,6 +87,7 @@ func NewSandbox(
8687
ctx context.Context,
8788
tracer trace.Tracer,
8889
dns *dns.DNS,
90+
proxy *proxy.SessionProxy,
8991
networkPool *network.Pool,
9092
templateCache *template.Cache,
9193
config *orchestrator.SandboxConfig,
@@ -314,11 +316,13 @@ func NewSandbox(
314316
sbx.StartedAt = time.Now()
315317

316318
dns.Add(config.SandboxId, ips.HostIP())
319+
proxy.AddSandbox(config.SandboxId, ips.HostIP())
317320

318321
telemetry.ReportEvent(childCtx, "added DNS record", attribute.String("ip", ips.HostIP()), attribute.String("hostname", config.SandboxId))
319322

320323
cleanup.Add(func() error {
321324
dns.Remove(config.SandboxId, ips.HostIP())
325+
proxy.RemoveSandbox(config.SandboxId)
322326

323327
return nil
324328
})

packages/orchestrator/internal/sandbox/sandbox_other.go

+2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"errors"
99
"fmt"
10+
"github.com/e2b-dev/infra/packages/orchestrator/internal/proxy"
1011
"net/http"
1112
"sync/atomic"
1213
"time"
@@ -74,6 +75,7 @@ func NewSandbox(
7475
ctx context.Context,
7576
tracer trace.Tracer,
7677
dns *dns.DNS,
78+
proxy *proxy.SessionProxy,
7779
networkPool *network.Pool,
7880
templateCache *template.Cache,
7981
config *orchestrator.SandboxConfig,

packages/orchestrator/internal/server/main.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"github.com/e2b-dev/infra/packages/orchestrator/internal/proxy"
78
"math"
89
"net"
910
"os"
@@ -38,6 +39,7 @@ type server struct {
3839
orchestrator.UnimplementedSandboxServiceServer
3940
sandboxes *smap.Map[*sandbox.Sandbox]
4041
dns *dns.DNS
42+
proxy *proxy.SessionProxy
4143
tracer trace.Tracer
4244
networkPool *network.Pool
4345
templateCache *template.Cache
@@ -54,6 +56,7 @@ type Service struct {
5456
server *server
5557
grpc *grpc.Server
5658
dns *dns.DNS
59+
proxy *proxy.SessionProxy
5760
port uint16
5861
shutdown struct {
5962
once sync.Once
@@ -68,7 +71,7 @@ type Service struct {
6871
useClickhouseMetrics string
6972
}
7073

71-
func New(ctx context.Context, port uint, clientID string) (*Service, error) {
74+
func New(ctx context.Context, port uint, clientID string, proxy *proxy.SessionProxy) (*Service, error) {
7275
if port > math.MaxUint16 {
7376
return nil, fmt.Errorf("%d is larger than maximum possible port %d", port, math.MaxInt16)
7477
}
@@ -92,6 +95,7 @@ func New(ctx context.Context, port uint, clientID string) (*Service, error) {
9295
// BLOCK: initialize services
9396
{
9497
srv.dns = dns.New()
98+
srv.proxy = proxy
9599

96100
opts := []logging.Option{
97101
logging.WithLogOnEvents(logging.StartCall, logging.PayloadReceived, logging.PayloadSent, logging.FinishCall),
@@ -143,6 +147,7 @@ func New(ctx context.Context, port uint, clientID string) (*Service, error) {
143147
srv.server = &server{
144148
tracer: otel.Tracer(ServiceName),
145149
dns: srv.dns,
150+
proxy: srv.proxy,
146151
sandboxes: smap.New[*sandbox.Sandbox](),
147152
networkPool: networkPool,
148153
templateCache: templateCache,

packages/orchestrator/internal/server/sandboxes.go

+2
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ func (s *server) Create(ctxConn context.Context, req *orchestrator.SandboxCreate
4949
childCtx,
5050
s.tracer,
5151
s.dns,
52+
s.proxy,
5253
s.networkPool,
5354
s.templateCache,
5455
req.Sandbox,
@@ -182,6 +183,7 @@ func (s *server) Delete(ctxConn context.Context, in *orchestrator.SandboxDeleteR
182183

183184
// Don't allow connecting to the sandbox anymore.
184185
s.dns.Remove(in.SandboxId, sbx.Slot.HostIP())
186+
s.proxy.RemoveSandbox(in.SandboxId)
185187

186188
// Remove the sandbox from the cache to prevent loading it again in API during the time the instance is stopping.
187189
// Old comment:

packages/orchestrator/main.go

+36-4
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@ import (
55
"errors"
66
"flag"
77
"fmt"
8+
"github.com/e2b-dev/infra/packages/orchestrator/internal/proxy"
89
"log"
910
"os"
1011
"os/signal"
1112
"sync"
1213
"sync/atomic"
1314
"syscall"
15+
"time"
1416

1517
"go.uber.org/zap"
1618
"go.uber.org/zap/zapcore"
@@ -24,8 +26,9 @@ import (
2426
)
2527

2628
const (
27-
defaultPort = 5008
28-
ServiceName = "orchestrator"
29+
defaultPort = 5008
30+
defaultProxyPort = 5007
31+
ServiceName = "orchestrator"
2932
)
3033

3134
var commitSHA string
@@ -38,8 +41,10 @@ func main() {
3841
defer sigCancel()
3942

4043
var port uint
41-
4244
flag.UintVar(&port, "port", defaultPort, "orchestrator server port")
45+
46+
var proxyPort uint
47+
flag.UintVar(&proxyPort, "proxy-port", defaultProxyPort, "orchestrator proxy port")
4348
flag.Parse()
4449

4550
wg := &sync.WaitGroup{}
@@ -97,8 +102,9 @@ func main() {
97102
log.Println("Starting orchestrator", "commit", commitSHA)
98103

99104
clientID := consul.GetClientID()
105+
sessionProxy := proxy.New(proxyPort)
100106

101-
srv, err := server.New(ctx, port, clientID)
107+
srv, err := server.New(ctx, port, clientID, sessionProxy)
102108
if err != nil {
103109
zap.L().Fatal("failed to create server", zap.Error(err))
104110
}
@@ -149,6 +155,32 @@ func main() {
149155
}
150156
}()
151157

158+
wg.Add(1)
159+
go func() {
160+
defer wg.Done()
161+
162+
errChan := make(chan error, 1)
163+
go func() {
164+
err := sessionProxy.Start()
165+
errChan <- err
166+
}()
167+
168+
select {
169+
case <-ctx.Done():
170+
case <-errChan:
171+
if err != nil {
172+
zap.L().Error("session proxy failed", zap.Error(err))
173+
exitCode.Add(1)
174+
cancel()
175+
}
176+
}
177+
178+
// close session proxy, wait 5 seconds until all connections are closed
179+
shutdownCtx, shutdownCtxCancel := context.WithTimeout(context.Background(), 5*time.Second)
180+
defer shutdownCtxCancel()
181+
defer sessionProxy.Shutdown(shutdownCtx)
182+
}()
183+
152184
wg.Wait()
153185

154186
os.Exit(int(exitCode.Load()))

0 commit comments

Comments
 (0)