diff --git a/packages/orchestrator/internal/server/main.go b/packages/orchestrator/internal/server/main.go index c71460633c..126e9001be 100644 --- a/packages/orchestrator/internal/server/main.go +++ b/packages/orchestrator/internal/server/main.go @@ -9,6 +9,7 @@ import ( "os" "sync" + "github.com/go-redis/redis/v8" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/selector" @@ -48,6 +49,8 @@ type server struct { useLokiMetrics string useClickhouseMetrics string + + redisClient *redis.Client } type Service struct { @@ -91,7 +94,19 @@ func New(ctx context.Context, port uint, clientID string) (*Service, error) { // BLOCK: initialize services { - srv.dns = dns.New() + var redisClient *redis.Client + if rurl := os.Getenv("REDIS_URL"); rurl != "" { + opts, err := redis.ParseURL(rurl) + if err != nil { + zap.L().Fatal("invalid redis URL", zap.String("url", rurl), zap.Error(err)) + } + + redisClient = redis.NewClient(opts) + } else { + zap.L().Warn("REDIS_URL not set, using local caches") + } + + srv.dns = dns.New(redisClient) opts := []logging.Option{ logging.WithLogOnEvents(logging.StartCall, logging.PayloadReceived, logging.PayloadSent, logging.FinishCall), @@ -151,6 +166,7 @@ func New(ctx context.Context, port uint, clientID string) (*Service, error) { clickhouseStore: clickhouseStore, useLokiMetrics: useLokiMetrics, useClickhouseMetrics: useClickhouseMetrics, + redisClient: redisClient, } } diff --git a/packages/orchestrator/internal/server/sandboxes.go b/packages/orchestrator/internal/server/sandboxes.go index 5c33189abb..24c7f229db 100644 --- a/packages/orchestrator/internal/server/sandboxes.go +++ b/packages/orchestrator/internal/server/sandboxes.go @@ -78,6 +78,7 @@ func (s *server) Create(ctxConn context.Context, req *orchestrator.SandboxCreate waitErr := sbx.Wait() if waitErr != nil { sbxlogger.I(sbx).Error("failed to wait for sandbox, cleaning up", zap.Error(waitErr)) + s.redisClient.Set(ctx, fmt.Sprintf("sbx:%s:error", req.Sandbox.SandboxId), waitErr.Error(), 0) } cleanupErr := cleanup.Run() @@ -181,7 +182,7 @@ func (s *server) Delete(ctxConn context.Context, in *orchestrator.SandboxDeleteR } // Don't allow connecting to the sandbox anymore. - s.dns.Remove(in.SandboxId, sbx.Slot.HostIP()) + s.dns.Remove(ctx, in.SandboxId, sbx.Slot.HostIP()) // Remove the sandbox from the cache to prevent loading it again in API during the time the instance is stopping. // Old comment: @@ -235,7 +236,7 @@ func (s *server) Pause(ctx context.Context, in *orchestrator.SandboxPauseRequest return nil, status.New(codes.NotFound, errMsg.Error()).Err() } - s.dns.Remove(in.SandboxId, sbx.Slot.HostIP()) + s.dns.Remove(ctx, in.SandboxId, sbx.Slot.HostIP()) s.sandboxes.Remove(in.SandboxId) s.pauseMu.Unlock()