Skip to content

Commit

Permalink
fix: purge
Browse files Browse the repository at this point in the history
Signed-off-by: Luka Brecic <[email protected]>
  • Loading branch information
lbrecic committed Dec 23, 2024
1 parent b67a139 commit 84a5d97
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 32 deletions.
59 changes: 53 additions & 6 deletions pkg/cmd/purge.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ import (
"errors"
"fmt"
"os"
"path/filepath"

"github.com/daytonaio/daytona/cmd/daytona/config"
"github.com/daytonaio/daytona/internal"
"github.com/daytonaio/daytona/internal/util/apiclient"
"github.com/daytonaio/daytona/pkg/cmd/bootstrap"
server_cmd "github.com/daytonaio/daytona/pkg/cmd/server"
"github.com/daytonaio/daytona/pkg/cmd/workspace/create"
"github.com/daytonaio/daytona/pkg/posthogservice"
"github.com/daytonaio/daytona/pkg/runner"
"github.com/daytonaio/daytona/pkg/server"
"github.com/daytonaio/daytona/pkg/server/headscale"
"github.com/daytonaio/daytona/pkg/telemetry"
Expand Down Expand Up @@ -46,12 +49,6 @@ var purgeCmd = &cobra.Command{
return err
}

// FIXME: TODO
// runnerConfig, err := runner.GetConfig()
// if err != nil {
// return err
// }

serverConfigDir, err := server.GetConfigDir()
if err != nil {
return err
Expand Down Expand Up @@ -164,6 +161,47 @@ var purgeCmd = &cobra.Command{
return err
}

localRunnerErrChan := make(chan error)
var localRunner runner.IRunner

go func() {
if serverConfig.LocalRunnerDisabled != nil && *serverConfig.LocalRunnerDisabled {
err = server_cmd.HandleDisabledLocalRunner()
if err != nil {
localRunnerErrChan <- err
}
return
}

localRunnerConfig := server_cmd.GetLocalRunnerConfig(filepath.Join(serverConfigDir, "local-runner"))

err = server_cmd.EnsureRunnerRegistered()
if err != nil {
localRunnerErrChan <- err
}

params := bootstrap.LocalRunnerParams{
ServerConfig: serverConfig,
RunnerConfig: localRunnerConfig,
ConfigDir: serverConfigDir,
TelemetryService: telemetryService,
}

localRunner, err = bootstrap.GetLocalRunner(params)
if err != nil {
localRunnerErrChan <- err
}

localRunnerErrChan <- localRunner.Start(context.Background())
}()

if serverConfig.LocalRunnerDisabled != nil && !*serverConfig.LocalRunnerDisabled {
err = server_cmd.AwaitLocalRunnerStarted()
if err != nil {
localRunnerErrChan <- err
}
}

errs := server.Purge(ctx, forceFlag)
if len(errs) > 0 {
errMessage := ""
Expand Down Expand Up @@ -196,6 +234,15 @@ var purgeCmd = &cobra.Command{
}
}

if localRunner != nil {
fmt.Println("Purging providers...")
err = localRunner.Purge(ctx)
if err != nil {
return err
}
fmt.Println("Providers purged.")
}

fmt.Println("Server purged.")

fmt.Println("\nDeleting the SSH configuration file")
Expand Down
37 changes: 22 additions & 15 deletions pkg/cmd/server/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,21 +130,33 @@ var ServeCmd = &cobra.Command{

go func() {
if c.LocalRunnerDisabled != nil && *c.LocalRunnerDisabled {
err = handleDisabledLocalRunner()
err = HandleDisabledLocalRunner()
if err != nil {
localRunnerErrChan <- err
}
return
}

localRunnerConfig := getLocalRunnerConfig(filepath.Join(configDir, "local-runner"))
localRunnerConfig := GetLocalRunnerConfig(filepath.Join(configDir, "local-runner"))

localRunnerErrChan <- startLocalRunner(bootstrap.LocalRunnerParams{
err = EnsureRunnerRegistered()
if err != nil {
localRunnerErrChan <- err
}

params := bootstrap.LocalRunnerParams{
ServerConfig: c,
RunnerConfig: localRunnerConfig,
ConfigDir: configDir,
TelemetryService: telemetryService,
})
}

runner, err := bootstrap.GetLocalRunner(params)
if err != nil {
localRunnerErrChan <- err
}

localRunnerErrChan <- runner.Start(context.Background())
}()

err = waitForApiServerToStart(apiServer)
Expand All @@ -158,7 +170,7 @@ var ServeCmd = &cobra.Command{
}

if c.LocalRunnerDisabled != nil && !*c.LocalRunnerDisabled {
err = awaitLocalRunnerStarted()
err = AwaitLocalRunnerStarted()
if err != nil {
localRunnerErrChan <- err
}
Expand Down Expand Up @@ -239,7 +251,7 @@ func ensureDefaultProfile(server *server.Server, apiPort uint32) error {
})
}

func startLocalRunner(params bootstrap.LocalRunnerParams) error {
func EnsureRunnerRegistered() error {
runnerService := server.GetInstance(nil).RunnerService

_, err := runnerService.GetRunner(context.Background(), bootstrap.LOCAL_RUNNER_ID)
Expand All @@ -257,15 +269,10 @@ func startLocalRunner(params bootstrap.LocalRunnerParams) error {
}
}

runner, err := bootstrap.GetLocalRunner(params)
if err != nil {
return err
}

return runner.Start(context.Background())
return err
}

func getLocalRunnerConfig(configDir string) *runner.Config {
func GetLocalRunnerConfig(configDir string) *runner.Config {
providersDir := filepath.Join(configDir, "providers")
logFilePath := filepath.Join(configDir, "runner.log")

Expand All @@ -277,7 +284,7 @@ func getLocalRunnerConfig(configDir string) *runner.Config {
}
}

func awaitLocalRunnerStarted() error {
func AwaitLocalRunnerStarted() error {
server := server.GetInstance(nil)
startTime := time.Now()

Expand All @@ -302,7 +309,7 @@ func awaitLocalRunnerStarted() error {
return nil
}

func handleDisabledLocalRunner() error {
func HandleDisabledLocalRunner() error {
runnerService := server.GetInstance(nil).RunnerService

_, err := runnerService.GetRunner(context.Background(), bootstrap.LOCAL_RUNNER_ID)
Expand Down
5 changes: 5 additions & 0 deletions pkg/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const RUNNER_METADATA_UPDATE_INTERVAL = 2 * time.Second
type IRunner interface {
Start(ctx context.Context) error
CheckAndRunJobs(ctx context.Context) error
Purge(ctx context.Context) error
}

type RunnerConfig struct {
Expand Down Expand Up @@ -173,6 +174,10 @@ func (r *Runner) CheckAndRunJobs(ctx context.Context) error {
return nil
}

func (r *Runner) Purge(ctx context.Context) error {
return r.providerManager.Purge()
}

func (r *Runner) runJob(ctx context.Context, j *models.Job) error {
var job jobs.IJob

Expand Down
68 changes: 57 additions & 11 deletions pkg/server/purge.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,44 @@ func (s *Server) Purge(ctx context.Context, force bool) []error {
}
}

fmt.Println("Deleting all workspaces...")

workspaces, err := s.WorkspaceService.ListWorkspaces(ctx, services.WorkspaceRetrievalParams{})
if err != nil {
s.trackPurgeError(ctx, force, err)
if !force {
return []error{err}
}
}

if err == nil {
for _, workspace := range workspaces {
err := s.WorkspaceService.RemoveWorkspace(ctx, workspace.Id)
if err != nil {
s.trackPurgeError(ctx, force, err)
if !force {
return []error{err}
} else {
fmt.Printf("Failed to delete %s: %v\n", workspace.Name, err)
}
} else {
fmt.Printf("Workspace %s deleted\n", workspace.Name)
}
}
} else {
fmt.Printf("Failed to list workspaces: %v\n", err)
}

err = s.WorkspaceService.AwaitEmptyList(ctx, time.Minute)
if err != nil {
s.trackPurgeError(ctx, force, err)
if !force {
return []error{err}
} else {
fmt.Printf("Failed to await empty workspace list: %v\n", err)
}
}

fmt.Println("Deleting all targets...")

targets, err := s.TargetService.ListTargets(ctx, nil, services.TargetRetrievalParams{})
Expand Down Expand Up @@ -57,17 +95,25 @@ func (s *Server) Purge(ctx context.Context, force bool) []error {
fmt.Printf("Failed to list targets: %v\n", err)
}

// FIXME: todo
// fmt.Println("Purging providers...")
// err = s.ProviderManager.Purge()
// if err != nil {
// s.trackPurgeError(ctx, force, err)
// if !force {
// return []error{err}
// } else {
// fmt.Printf("Failed to purge providers: %v\n", err)
// }
// }
err = s.TargetService.AwaitEmptyList(ctx, time.Minute)
if err != nil {
s.trackPurgeError(ctx, force, err)
if !force {
return []error{err}
} else {
fmt.Printf("Failed to await empty target list: %v\n", err)
}
}

err = s.TargetService.AwaitEmptyList(ctx, time.Minute)
if err != nil {
s.trackPurgeError(ctx, force, err)
if !force {
return []error{err}
} else {
fmt.Printf("Failed to await empty target list: %v\n", err)
}
}

fmt.Println("Purging builds...")
errs := s.BuildService.Delete(ctx, nil, force)
Expand Down
25 changes: 25 additions & 0 deletions pkg/server/targets/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ package targets

import (
"context"
"errors"
"io"
"time"

"github.com/daytonaio/daytona/pkg/logs"
"github.com/daytonaio/daytona/pkg/models"
Expand Down Expand Up @@ -88,3 +90,26 @@ func (s *TargetService) UpdateTargetProviderMetadata(ctx context.Context, target
tg.ProviderMetadata = &metadata
return s.targetStore.Save(ctx, tg)
}

func (s *TargetService) AwaitEmptyList(ctx context.Context, waitTime time.Duration) error {
timeout := time.NewTimer(waitTime)
defer timeout.Stop()

for {
select {
case <-timeout.C:
return errors.New("awaiting empty build list timed out")
default:
targets, err := s.ListTargets(ctx, nil, services.TargetRetrievalParams{})
if err != nil {
return err
}

if len(targets) == 0 {
return nil
}

time.Sleep(time.Second)
}
}
}
25 changes: 25 additions & 0 deletions pkg/server/workspaces/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ package workspaces

import (
"context"
"errors"
"io"
"time"

"github.com/daytonaio/daytona/pkg/gitprovider"
"github.com/daytonaio/daytona/pkg/logs"
Expand Down Expand Up @@ -103,3 +105,26 @@ func (s *WorkspaceService) UpdateWorkspaceProviderMetadata(ctx context.Context,
w.ProviderMetadata = &metadata
return s.workspaceStore.Save(ctx, w)
}

func (s *WorkspaceService) AwaitEmptyList(ctx context.Context, waitTime time.Duration) error {
timeout := time.NewTimer(waitTime)
defer timeout.Stop()

for {
select {
case <-timeout.C:
return errors.New("awaiting empty build list timed out")
default:
workspaces, err := s.ListWorkspaces(ctx, services.WorkspaceRetrievalParams{})
if err != nil {
return err
}

if len(workspaces) == 0 {
return nil
}

time.Sleep(time.Second)
}
}
}
2 changes: 2 additions & 0 deletions pkg/services/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"io"
"time"

"github.com/daytonaio/daytona/pkg/models"
"github.com/daytonaio/daytona/pkg/stores"
Expand All @@ -24,6 +25,7 @@ type ITargetService interface {
RemoveTarget(ctx context.Context, targetId string) error
ForceRemoveTarget(ctx context.Context, targetId string) error
HandleSuccessfulCreation(ctx context.Context, targetId string) error
AwaitEmptyList(ctx context.Context, waitTime time.Duration) error

GetTargetLogReader(ctx context.Context, targetId string) (io.Reader, error)
GetTargetLogWriter(ctx context.Context, targetId string) (io.WriteCloser, error)
Expand Down
2 changes: 2 additions & 0 deletions pkg/services/workspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"io"
"time"

"github.com/daytonaio/daytona/pkg/gitprovider"
"github.com/daytonaio/daytona/pkg/models"
Expand All @@ -21,6 +22,7 @@ type IWorkspaceService interface {
RemoveWorkspace(ctx context.Context, workspaceId string) error
ForceRemoveWorkspace(ctx context.Context, workspaceId string) error
UpdateWorkspaceProviderMetadata(ctx context.Context, workspaceId, metadata string) error
AwaitEmptyList(ctx context.Context, waitTime time.Duration) error

GetWorkspaceLogReader(ctx context.Context, workspaceId string) (io.Reader, error)
GetWorkspaceLogWriter(ctx context.Context, workspaceId string) (io.WriteCloser, error)
Expand Down

0 comments on commit 84a5d97

Please sign in to comment.