diff --git a/main.tf b/main.tf index 2c1388c53c..b6ddc56192 100644 --- a/main.tf +++ b/main.tf @@ -219,6 +219,7 @@ module "nomad" { # Orchestrator orchestrator_port = var.orchestrator_port + orchestrator_proxy_port = var.orchestrator_proxy_port fc_env_pipeline_bucket_name = module.buckets.fc_env_pipeline_bucket_name # Template manager diff --git a/packages/client-proxy/main.go b/packages/client-proxy/main.go index a517403932..8b849af9d7 100644 --- a/packages/client-proxy/main.go +++ b/packages/client-proxy/main.go @@ -28,12 +28,13 @@ import ( ) const ( - ServiceName = "client-proxy" - dnsServer = "api.service.consul:5353" - healthCheckPort = 3001 - port = 3002 - sandboxPort = 3003 - maxRetries = 3 + ServiceName = "client-proxy" + dnsServer = "api.service.consul:5353" + healthCheckPort = 3001 + port = 3002 + sandboxPort = 3003 // legacy session proxy port + orchestratorProxyPort = 5007 // orchestrator proxy port + maxRetries = 3 ) var commitSHA string diff --git a/packages/nomad/main.tf b/packages/nomad/main.tf index 7a0ee6d75f..b41f9c06fd 100644 --- a/packages/nomad/main.tf +++ b/packages/nomad/main.tf @@ -324,6 +324,7 @@ resource "nomad_job" "orchestrator" { jobspec = templatefile("${path.module}/orchestrator.hcl", { gcp_zone = var.gcp_zone port = var.orchestrator_port + proxy_port = var.orchestrator_proxy_port environment = var.environment consul_acl_token = var.consul_acl_token_secret diff --git a/packages/nomad/orchestrator.hcl b/packages/nomad/orchestrator.hcl index 6cf4f49fec..cfc882b593 100644 --- a/packages/nomad/orchestrator.hcl +++ b/packages/nomad/orchestrator.hcl @@ -25,6 +25,11 @@ job "orchestrator" { } } + service { + name = "orchestrator-proxy" + port = "${proxy_port}" + } + task "start" { driver = "raw_exec" @@ -45,7 +50,7 @@ job "orchestrator" { config { command = "/bin/bash" - args = ["-c", " chmod +x local/orchestrator && local/orchestrator --port ${port}"] + args = ["-c", " chmod +x local/orchestrator && local/orchestrator --port ${port} --proxy-port ${proxy_port}"] } artifact { diff --git a/packages/nomad/variables.tf b/packages/nomad/variables.tf index caed5d6a7a..353b5fa1b3 100644 --- a/packages/nomad/variables.tf +++ b/packages/nomad/variables.tf @@ -185,6 +185,10 @@ variable "orchestrator_port" { type = number } +variable "orchestrator_proxy_port" { + type = number +} + variable "fc_env_pipeline_bucket_name" { type = string } diff --git a/packages/orchestrator/cmd/mock-sandbox/mock.go b/packages/orchestrator/cmd/mock-sandbox/mock.go index b983801ba6..3d76dbefac 100644 --- a/packages/orchestrator/cmd/mock-sandbox/mock.go +++ b/packages/orchestrator/cmd/mock-sandbox/mock.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/otel" "github.com/e2b-dev/infra/packages/orchestrator/internal/dns" + "github.com/e2b-dev/infra/packages/orchestrator/internal/proxy" "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox" "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/nbd" "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/network" @@ -51,6 +52,8 @@ func main() { }() dnsServer := dns.New() + proxyServer := proxy.New(3333) + go func() { log.Printf("Starting DNS server") @@ -87,6 +90,7 @@ func main() { *buildId, *sandboxId+"-"+strconv.Itoa(v), dnsServer, + proxyServer, time.Duration(*keepAlive)*time.Second, networkPool, templateCache, @@ -104,6 +108,7 @@ func mockSandbox( buildId, sandboxId string, dns *dns.DNS, + proxy *proxy.SandboxProxy, keepAlive time.Duration, networkPool *network.Pool, templateCache *template.Cache, @@ -128,6 +133,7 @@ func mockSandbox( childCtx, tracer, dns, + proxy, networkPool, templateCache, &orchestrator.SandboxConfig{ diff --git a/packages/orchestrator/cmd/mock-snapshot/mock.go b/packages/orchestrator/cmd/mock-snapshot/mock.go index 4b25a60ded..ae659bc73f 100644 --- a/packages/orchestrator/cmd/mock-snapshot/mock.go +++ b/packages/orchestrator/cmd/mock-snapshot/mock.go @@ -14,6 +14,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/e2b-dev/infra/packages/orchestrator/internal/dns" + "github.com/e2b-dev/infra/packages/orchestrator/internal/proxy" "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox" "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/nbd" "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/network" @@ -51,6 +52,7 @@ func main() { cancel() }() + proxyServer := proxy.New(3333) dnsServer := dns.New() go func() { log.Printf("Starting DNS server") @@ -90,6 +92,7 @@ func main() { *buildId, *sandboxId+"-"+strconv.Itoa(v), dnsServer, + proxyServer, time.Duration(*keepAlive)*time.Second, networkPool, templateCache, @@ -113,6 +116,7 @@ func mockSnapshot( buildId, sandboxId string, dns *dns.DNS, + proxy *proxy.SandboxProxy, keepAlive time.Duration, networkPool *network.Pool, templateCache *template.Cache, @@ -137,6 +141,7 @@ func mockSnapshot( childCtx, tracer, dns, + proxy, networkPool, templateCache, &orchestrator.SandboxConfig{ @@ -242,6 +247,7 @@ func mockSnapshot( childCtx, tracer, dns, + proxy, networkPool, templateCache, &orchestrator.SandboxConfig{ diff --git a/packages/orchestrator/internal/proxy/proxy.go b/packages/orchestrator/internal/proxy/proxy.go new file mode 100644 index 0000000000..23fcc587c8 --- /dev/null +++ b/packages/orchestrator/internal/proxy/proxy.go @@ -0,0 +1,196 @@ +package proxy + +import ( + "bytes" + "context" + _ "embed" + "encoding/json" + "fmt" + "go.uber.org/zap" + "html/template" + "net/http" + "net/http/httputil" + "net/url" + "regexp" + "strconv" + "strings" + "time" + + "github.com/e2b-dev/infra/packages/shared/pkg/meters" + "github.com/e2b-dev/infra/packages/shared/pkg/smap" +) + +//go:embed proxy_browser_502.html +var proxyBrowser502PageHtml string + +var browserRegex = regexp.MustCompile(`(?i)mozilla|chrome|safari|firefox|edge|opera|msie`) +var browserTemplate = template.Must(template.New("template").Parse(proxyBrowser502PageHtml)) + +type htmlTemplateData struct { + SandboxId string + SandboxHost string + SandboxPort string +} + +type jsonTemplateData struct { + Error string `json:"error"` + SandboxId string `json:"sandboxId"` + Port uint64 `json:"port"` +} + +type SandboxProxy struct { + sandboxes *smap.Map[string] + server *http.Server +} + +func New(port uint) *SandboxProxy { + server := &http.Server{Addr: fmt.Sprintf(":%d", port)} + + return &SandboxProxy{ + server: server, + sandboxes: smap.New[string](), + } +} + +func (p *SandboxProxy) AddSandbox(sandboxID, ip string) { + p.sandboxes.Insert(sandboxID, ip) +} + +func (p *SandboxProxy) RemoveSandbox(sandboxID string, ip string) { + p.sandboxes.RemoveCb(sandboxID, func(k string, v string, ok bool) bool { return ok && v == ip }) +} + +func (p *SandboxProxy) Start() error { + // similar values to our old the nginx configuration + serverTransport := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + MaxIdleConns: 1024, // Matches worker_connections + MaxIdleConnsPerHost: 8192, // Matches keepalive_requests + IdleConnTimeout: 620 * time.Second, // Matches keepalive_timeout + TLSHandshakeTimeout: 10 * time.Second, // Similar to client_header_timeout + ResponseHeaderTimeout: 24 * time.Hour, // Matches proxy_read_timeout + DisableKeepAlives: true, // Disable keep-alives, envd doesn't support idle connections + } + + p.server.Handler = http.HandlerFunc(p.proxyHandler(serverTransport)) + return p.server.ListenAndServe() +} + +func (p *SandboxProxy) Shutdown(ctx context.Context) { + err := p.server.Shutdown(ctx) + if err != nil { + zap.L().Error("failed to shutdown proxy server", zap.Error(err)) + } +} + +func (p *SandboxProxy) proxyHandler(transport *http.Transport) func(w http.ResponseWriter, r *http.Request) { + activeConnections, err := meters.GetUpDownCounter(meters.OrchestratorProxyActiveConnectionsCounterMeterName) + if err != nil { + zap.L().Error("failed to create active connections counter", zap.Error(err)) + } + + return func(w http.ResponseWriter, r *http.Request) { + if activeConnections != nil { + activeConnections.Add(r.Context(), 1) + defer func() { + activeConnections.Add(r.Context(), -1) + }() + } + + // Extract sandbox id from the host (--.e2b.dev) + hostSplit := strings.Split(r.Host, "-") + if len(hostSplit) < 2 { + zap.L().Warn("invalid host to proxy", zap.String("host", r.Host)) + http.Error(w, "Invalid host", http.StatusBadRequest) + return + } + + sandboxID := hostSplit[1] + sandboxPortRaw := hostSplit[0] + sandboxPort, sandboxPortErr := strconv.ParseUint(sandboxPortRaw, 10, 64) + if sandboxPortErr != nil { + zap.L().Warn("invalid sandbox port", zap.String("sandbox_port", sandboxPortRaw)) + http.Error(w, "Invalid sandbox port", http.StatusBadRequest) + } + + sbxIp, sbxFound := p.sandboxes.Get(sandboxID) + if !sbxFound { + zap.L().Warn("sandbox not found", zap.String("sandbox_id", sandboxID)) + http.Error(w, "Sandbox not found", http.StatusNotFound) + return + } + + logger := zap.L().With(zap.String("sandbox_id", sandboxID), zap.String("sandbox_ip", sbxIp), zap.Uint64("sandbox_req_port", sandboxPort), zap.String("sandbox_port_path", r.URL.Path)) + + // We've resolved the node to proxy the request to + logger.Debug("Proxying request") + targetUrl := &url.URL{ + Scheme: "http", + Host: fmt.Sprintf("%s:%d", sbxIp, sandboxPort), + } + + // Proxy the request + proxy := httputil.NewSingleHostReverseProxy(targetUrl) + proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) { + logger.Error("Reverse proxy error", zap.Error(err)) + + if p.isBrowser(r.UserAgent()) { + res, resErr := p.buildHtmlClosedPortError(sandboxID, r.Host, sandboxPort) + if resErr != nil { + logger.Error("Failed to build HTML error response", zap.Error(resErr)) + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusBadGateway) + w.Header().Add("Content-Type", "text/html") + w.Write(res) + return + } + + w.WriteHeader(http.StatusBadGateway) + w.Header().Add("Content-Type", "application/json") + w.Write(p.buildJsonClosedPortError(sandboxID, sandboxPort)) + } + + proxy.ModifyResponse = func(resp *http.Response) error { + if resp.StatusCode >= 500 { + logger.Error("Backend responded with error", zap.Int("status_code", resp.StatusCode)) + } else { + logger.Info("Backend responded", zap.Int("status_code", resp.StatusCode)) + } + + return nil + } + + proxy.Transport = transport + proxy.ServeHTTP(w, r) + } +} + +func (p *SandboxProxy) buildHtmlClosedPortError(sandboxId string, host string, port uint64) ([]byte, error) { + htmlResponse := new(bytes.Buffer) + htmlVars := htmlTemplateData{SandboxId: sandboxId, SandboxHost: host, SandboxPort: strconv.FormatUint(port, 10)} + + err := browserTemplate.Execute(htmlResponse, htmlVars) + if err != nil { + return nil, err + } + + return htmlResponse.Bytes(), nil +} + +func (p *SandboxProxy) buildJsonClosedPortError(sandboxId string, port uint64) []byte { + response := jsonTemplateData{ + Error: "The sandbox is running but port is not open", + SandboxId: sandboxId, + Port: port, + } + + responseBytes, _ := json.Marshal(response) + return responseBytes +} + +func (p *SandboxProxy) isBrowser(userAgent string) bool { + return browserRegex.MatchString(userAgent) +} diff --git a/packages/orchestrator/internal/proxy/proxy_browser_502.html b/packages/orchestrator/internal/proxy/proxy_browser_502.html new file mode 100644 index 0000000000..d7125f4779 --- /dev/null +++ b/packages/orchestrator/internal/proxy/proxy_browser_502.html @@ -0,0 +1,23 @@ + + + + + Closed Port Error + + + +
+ +
+

Closed Port Error

+

The sandbox {{.SandboxId}} is running but there's no service running on port {{.SandboxPort}}.

+
+
+ {{.SandboxHost}} +
Connection refused on port {{.SandboxPort}}
+
+

Please ensure that your service is properly configured and running on the specified port.

+ Check the sandbox logs for more information → +
+ + \ No newline at end of file diff --git a/packages/orchestrator/internal/sandbox/sandbox_linux.go b/packages/orchestrator/internal/sandbox/sandbox_linux.go index 35b7618737..f20df3ac67 100644 --- a/packages/orchestrator/internal/sandbox/sandbox_linux.go +++ b/packages/orchestrator/internal/sandbox/sandbox_linux.go @@ -21,6 +21,7 @@ import ( "golang.org/x/sys/unix" "github.com/e2b-dev/infra/packages/orchestrator/internal/dns" + "github.com/e2b-dev/infra/packages/orchestrator/internal/proxy" "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/build" "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/fc" "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/nbd" @@ -86,6 +87,7 @@ func NewSandbox( ctx context.Context, tracer trace.Tracer, dns *dns.DNS, + proxy *proxy.SandboxProxy, networkPool *network.Pool, templateCache *template.Cache, config *orchestrator.SandboxConfig, @@ -314,11 +316,13 @@ func NewSandbox( sbx.StartedAt = time.Now() dns.Add(config.SandboxId, ips.HostIP()) + proxy.AddSandbox(config.SandboxId, ips.HostIP()) telemetry.ReportEvent(childCtx, "added DNS record", attribute.String("ip", ips.HostIP()), attribute.String("hostname", config.SandboxId)) cleanup.Add(func() error { dns.Remove(config.SandboxId, ips.HostIP()) + proxy.RemoveSandbox(config.SandboxId, ips.HostIP()) return nil }) diff --git a/packages/orchestrator/internal/sandbox/sandbox_other.go b/packages/orchestrator/internal/sandbox/sandbox_other.go index 2390318800..25374a9565 100644 --- a/packages/orchestrator/internal/sandbox/sandbox_other.go +++ b/packages/orchestrator/internal/sandbox/sandbox_other.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/otel/trace" "github.com/e2b-dev/infra/packages/orchestrator/internal/dns" + "github.com/e2b-dev/infra/packages/orchestrator/internal/proxy" "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/build" "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/nbd" "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/network" @@ -74,6 +75,7 @@ func NewSandbox( ctx context.Context, tracer trace.Tracer, dns *dns.DNS, + proxy *proxy.SandboxProxy, networkPool *network.Pool, templateCache *template.Cache, config *orchestrator.SandboxConfig, diff --git a/packages/orchestrator/internal/server/main.go b/packages/orchestrator/internal/server/main.go index c71460633c..e2eb3a57c8 100644 --- a/packages/orchestrator/internal/server/main.go +++ b/packages/orchestrator/internal/server/main.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/e2b-dev/infra/packages/orchestrator/internal/proxy" "math" "net" "os" @@ -38,6 +39,7 @@ type server struct { orchestrator.UnimplementedSandboxServiceServer sandboxes *smap.Map[*sandbox.Sandbox] dns *dns.DNS + proxy *proxy.SandboxProxy tracer trace.Tracer networkPool *network.Pool templateCache *template.Cache @@ -54,6 +56,7 @@ type Service struct { server *server grpc *grpc.Server dns *dns.DNS + proxy *proxy.SandboxProxy port uint16 shutdown struct { once sync.Once @@ -68,7 +71,7 @@ type Service struct { useClickhouseMetrics string } -func New(ctx context.Context, port uint, clientID string) (*Service, error) { +func New(ctx context.Context, port uint, clientID string, proxy *proxy.SandboxProxy) (*Service, error) { if port > math.MaxUint16 { return nil, fmt.Errorf("%d is larger than maximum possible port %d", port, math.MaxInt16) } @@ -92,6 +95,7 @@ func New(ctx context.Context, port uint, clientID string) (*Service, error) { // BLOCK: initialize services { srv.dns = dns.New() + srv.proxy = proxy opts := []logging.Option{ logging.WithLogOnEvents(logging.StartCall, logging.PayloadReceived, logging.PayloadSent, logging.FinishCall), @@ -143,6 +147,7 @@ func New(ctx context.Context, port uint, clientID string) (*Service, error) { srv.server = &server{ tracer: otel.Tracer(ServiceName), dns: srv.dns, + proxy: srv.proxy, sandboxes: smap.New[*sandbox.Sandbox](), networkPool: networkPool, templateCache: templateCache, diff --git a/packages/orchestrator/internal/server/sandboxes.go b/packages/orchestrator/internal/server/sandboxes.go index 5c33189abb..26e63283b4 100644 --- a/packages/orchestrator/internal/server/sandboxes.go +++ b/packages/orchestrator/internal/server/sandboxes.go @@ -49,6 +49,7 @@ func (s *server) Create(ctxConn context.Context, req *orchestrator.SandboxCreate childCtx, s.tracer, s.dns, + s.proxy, s.networkPool, s.templateCache, req.Sandbox, @@ -182,6 +183,7 @@ func (s *server) Delete(ctxConn context.Context, in *orchestrator.SandboxDeleteR // Don't allow connecting to the sandbox anymore. s.dns.Remove(in.SandboxId, sbx.Slot.HostIP()) + s.proxy.RemoveSandbox(in.SandboxId, sbx.Slot.HostIP()) // Remove the sandbox from the cache to prevent loading it again in API during the time the instance is stopping. // Old comment: diff --git a/packages/orchestrator/main.go b/packages/orchestrator/main.go index 400fe5375d..ffd6e4eb72 100644 --- a/packages/orchestrator/main.go +++ b/packages/orchestrator/main.go @@ -5,6 +5,8 @@ import ( "errors" "flag" "fmt" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" "log" "os" "os/signal" @@ -12,10 +14,8 @@ import ( "sync/atomic" "syscall" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "github.com/e2b-dev/infra/packages/orchestrator/internal/consul" + "github.com/e2b-dev/infra/packages/orchestrator/internal/proxy" "github.com/e2b-dev/infra/packages/orchestrator/internal/server" "github.com/e2b-dev/infra/packages/shared/pkg/env" "github.com/e2b-dev/infra/packages/shared/pkg/logger" @@ -24,8 +24,9 @@ import ( ) const ( - defaultPort = 5008 - ServiceName = "orchestrator" + defaultPort = 5008 + defaultProxyPort = 5007 + ServiceName = "orchestrator" ) var commitSHA string @@ -38,8 +39,10 @@ func main() { defer sigCancel() var port uint - flag.UintVar(&port, "port", defaultPort, "orchestrator server port") + + var proxyPort uint + flag.UintVar(&proxyPort, "proxy-port", defaultProxyPort, "orchestrator proxy port") flag.Parse() wg := &sync.WaitGroup{} @@ -97,8 +100,9 @@ func main() { log.Println("Starting orchestrator", "commit", commitSHA) clientID := consul.GetClientID() + sessionProxy := proxy.New(proxyPort) - srv, err := server.New(ctx, port, clientID) + srv, err := server.New(ctx, port, clientID, sessionProxy) if err != nil { zap.L().Fatal("failed to create server", zap.Error(err)) } @@ -149,6 +153,29 @@ func main() { } }() + wg.Add(1) + go func() { + defer wg.Done() + + errChan := make(chan error, 1) + go func() { + errChan <- sessionProxy.Start() + }() + + select { + case <-ctx.Done(): + case err = <-errChan: + if err != nil { + zap.L().Error("session proxy failed", zap.Error(err)) + exitCode.Add(1) + cancel() + } + } + + // close sandbox proxy, this will wait until all sessions are closed + defer sessionProxy.Shutdown(context.Background()) + }() + wg.Wait() os.Exit(int(exitCode.Load())) diff --git a/packages/shared/pkg/meters/main.go b/packages/shared/pkg/meters/main.go index 9fccb31f98..1a15882618 100644 --- a/packages/shared/pkg/meters/main.go +++ b/packages/shared/pkg/meters/main.go @@ -16,12 +16,13 @@ const ( type UpDownCounterType string const ( - SandboxCountMeterName UpDownCounterType = "api.env.instance.running" - BuildCounterMeterName = "api.env.build.running" - NewNetworkSlotSPoolCounterMeterName = "orchestrator.network.slots_pool.new" - ReusedNetworkSlotSPoolCounterMeterName = "orchestrator.network.slots_pool.reused" - NBDkSlotSReadyPoolCounterMeterName = "orchestrator.nbd.slots_pool.read" - ActiveConnectionsCounterMeterName = "client_proxy.connections.active" + SandboxCountMeterName UpDownCounterType = "api.env.instance.running" + BuildCounterMeterName = "api.env.build.running" + NewNetworkSlotSPoolCounterMeterName = "orchestrator.network.slots_pool.new" + ReusedNetworkSlotSPoolCounterMeterName = "orchestrator.network.slots_pool.reused" + NBDkSlotSReadyPoolCounterMeterName = "orchestrator.nbd.slots_pool.read" + ActiveConnectionsCounterMeterName = "client_proxy.connections.active" + OrchestratorProxyActiveConnectionsCounterMeterName = "orchestrator.proxy.connections.active" ) var meter = otel.GetMeterProvider().Meter("nomad") diff --git a/variables.tf b/variables.tf index e6268ffd3d..307d5fb482 100644 --- a/variables.tf +++ b/variables.tf @@ -159,6 +159,11 @@ variable "orchestrator_port" { default = 5008 } +variable "orchestrator_proxy_port" { + type = number + default = 5007 +} + variable "template_manager_port" { type = number default = 5009