From 2ac09728f6c7695ba071c68e8499129073e7ca6a Mon Sep 17 00:00:00 2001 From: 0div Date: Mon, 10 Mar 2025 17:39:02 -0700 Subject: [PATCH] start using redis client in ochestrator server and storing sandbox errors in it --- packages/orchestrator/internal/server/main.go | 19 ++++++++++++++++++- .../orchestrator/internal/server/sandboxes.go | 5 +++-- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/packages/orchestrator/internal/server/main.go b/packages/orchestrator/internal/server/main.go index edb2c266e1..d558c2feda 100644 --- a/packages/orchestrator/internal/server/main.go +++ b/packages/orchestrator/internal/server/main.go @@ -6,8 +6,10 @@ import ( "fmt" "math" "net" + "os" "sync" + "github.com/go-redis/redis/v8" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/selector" @@ -43,6 +45,8 @@ type server struct { pauseMu sync.Mutex clientID string // nomad node id devicePool *nbd.DevicePool + + redisClient *redis.Client } type Service struct { @@ -80,7 +84,19 @@ func New(ctx context.Context, port uint, clientID string) (*Service, error) { // BLOCK: initialize services { - srv.dns = dns.New() + var redisClient *redis.Client + if rurl := os.Getenv("REDIS_URL"); rurl != "" { + opts, err := redis.ParseURL(rurl) + if err != nil { + zap.L().Fatal("invalid redis URL", zap.String("url", rurl), zap.Error(err)) + } + + redisClient = redis.NewClient(opts) + } else { + zap.L().Warn("REDIS_URL not set, using local caches") + } + + srv.dns = dns.New(redisClient) opts := []logging.Option{ logging.WithLogOnEvents(logging.StartCall, logging.PayloadReceived, logging.PayloadSent, logging.FinishCall), @@ -117,6 +133,7 @@ func New(ctx context.Context, port uint, clientID string) (*Service, error) { templateCache: templateCache, clientID: clientID, devicePool: devicePool, + redisClient: redisClient, } } diff --git a/packages/orchestrator/internal/server/sandboxes.go b/packages/orchestrator/internal/server/sandboxes.go index b553273552..f79bb50277 100644 --- a/packages/orchestrator/internal/server/sandboxes.go +++ b/packages/orchestrator/internal/server/sandboxes.go @@ -76,6 +76,7 @@ func (s *server) Create(ctxConn context.Context, req *orchestrator.SandboxCreate waitErr := sbx.Wait() if waitErr != nil { sbxlogger.I(sbx).Error("failed to wait for sandbox, cleaning up", zap.Error(waitErr)) + s.redisClient.Set(ctx, fmt.Sprintf("sbx:%s:error", req.Sandbox.SandboxId), waitErr.Error(), 0) } cleanupErr := cleanup.Run() @@ -166,7 +167,7 @@ func (s *server) Delete(ctxConn context.Context, in *orchestrator.SandboxDeleteR } // Don't allow connecting to the sandbox anymore. - s.dns.Remove(in.SandboxId, sbx.Slot.HostIP()) + s.dns.Remove(ctx, in.SandboxId, sbx.Slot.HostIP()) // Remove the sandbox from the cache to prevent loading it again in API during the time the instance is stopping. // Old comment: @@ -220,7 +221,7 @@ func (s *server) Pause(ctx context.Context, in *orchestrator.SandboxPauseRequest return nil, status.New(codes.NotFound, errMsg.Error()).Err() } - s.dns.Remove(in.SandboxId, sbx.Slot.HostIP()) + s.dns.Remove(ctx, in.SandboxId, sbx.Slot.HostIP()) s.sandboxes.Remove(in.SandboxId) s.pauseMu.Unlock()