Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: purge #1456

Draft
wants to merge 1 commit into
base: target-workspace-refactor
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 53 additions & 6 deletions pkg/cmd/purge.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ import (
"errors"
"fmt"
"os"
"path/filepath"

"github.com/daytonaio/daytona/cmd/daytona/config"
"github.com/daytonaio/daytona/internal"
"github.com/daytonaio/daytona/internal/util/apiclient"
"github.com/daytonaio/daytona/pkg/cmd/bootstrap"
server_cmd "github.com/daytonaio/daytona/pkg/cmd/server"
"github.com/daytonaio/daytona/pkg/cmd/workspace/create"
"github.com/daytonaio/daytona/pkg/posthogservice"
"github.com/daytonaio/daytona/pkg/runner"
"github.com/daytonaio/daytona/pkg/server"
"github.com/daytonaio/daytona/pkg/server/headscale"
"github.com/daytonaio/daytona/pkg/telemetry"
Expand Down Expand Up @@ -46,12 +49,6 @@ var purgeCmd = &cobra.Command{
return err
}

// FIXME: TODO
// runnerConfig, err := runner.GetConfig()
// if err != nil {
// return err
// }

serverConfigDir, err := server.GetConfigDir()
if err != nil {
return err
Expand Down Expand Up @@ -164,6 +161,47 @@ var purgeCmd = &cobra.Command{
return err
}

localRunnerErrChan := make(chan error)
var localRunner runner.IRunner

go func() {
if serverConfig.LocalRunnerDisabled != nil && *serverConfig.LocalRunnerDisabled {
err = server_cmd.HandleDisabledLocalRunner()
if err != nil {
localRunnerErrChan <- err
}
return
}

localRunnerConfig := server_cmd.GetLocalRunnerConfig(filepath.Join(serverConfigDir, "local-runner"))

err = server_cmd.EnsureRunnerRegistered()
if err != nil {
localRunnerErrChan <- err
}

params := bootstrap.LocalRunnerParams{
ServerConfig: serverConfig,
RunnerConfig: localRunnerConfig,
ConfigDir: serverConfigDir,
TelemetryService: telemetryService,
}

localRunner, err = bootstrap.GetLocalRunner(params)
if err != nil {
localRunnerErrChan <- err
}

localRunnerErrChan <- localRunner.Start(context.Background())
}()

if serverConfig.LocalRunnerDisabled != nil && !*serverConfig.LocalRunnerDisabled {
err = server_cmd.AwaitLocalRunnerStarted()
if err != nil {
localRunnerErrChan <- err
}
}

errs := server.Purge(ctx, forceFlag)
if len(errs) > 0 {
errMessage := ""
Expand Down Expand Up @@ -196,6 +234,15 @@ var purgeCmd = &cobra.Command{
}
}

if localRunner != nil {
fmt.Println("Purging providers...")
err = localRunner.Purge(ctx)
if err != nil {
return err
}
fmt.Println("Providers purged.")
}

fmt.Println("Server purged.")

fmt.Println("\nDeleting the SSH configuration file")
Expand Down
37 changes: 22 additions & 15 deletions pkg/cmd/server/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,21 +130,33 @@ var ServeCmd = &cobra.Command{

go func() {
if c.LocalRunnerDisabled != nil && *c.LocalRunnerDisabled {
err = handleDisabledLocalRunner()
err = HandleDisabledLocalRunner()
if err != nil {
localRunnerErrChan <- err
}
return
}

localRunnerConfig := getLocalRunnerConfig(filepath.Join(configDir, "local-runner"))
localRunnerConfig := GetLocalRunnerConfig(filepath.Join(configDir, "local-runner"))

localRunnerErrChan <- startLocalRunner(bootstrap.LocalRunnerParams{
err = EnsureRunnerRegistered()
if err != nil {
localRunnerErrChan <- err
}

params := bootstrap.LocalRunnerParams{
ServerConfig: c,
RunnerConfig: localRunnerConfig,
ConfigDir: configDir,
TelemetryService: telemetryService,
})
}

runner, err := bootstrap.GetLocalRunner(params)
if err != nil {
localRunnerErrChan <- err
}

localRunnerErrChan <- runner.Start(context.Background())
}()

err = waitForApiServerToStart(apiServer)
Expand All @@ -158,7 +170,7 @@ var ServeCmd = &cobra.Command{
}

if c.LocalRunnerDisabled != nil && !*c.LocalRunnerDisabled {
err = awaitLocalRunnerStarted()
err = AwaitLocalRunnerStarted()
if err != nil {
localRunnerErrChan <- err
}
Expand Down Expand Up @@ -239,7 +251,7 @@ func ensureDefaultProfile(server *server.Server, apiPort uint32) error {
})
}

func startLocalRunner(params bootstrap.LocalRunnerParams) error {
func EnsureRunnerRegistered() error {
runnerService := server.GetInstance(nil).RunnerService

_, err := runnerService.GetRunner(context.Background(), bootstrap.LOCAL_RUNNER_ID)
Expand All @@ -257,15 +269,10 @@ func startLocalRunner(params bootstrap.LocalRunnerParams) error {
}
}

runner, err := bootstrap.GetLocalRunner(params)
if err != nil {
return err
}

return runner.Start(context.Background())
return err
}

func getLocalRunnerConfig(configDir string) *runner.Config {
func GetLocalRunnerConfig(configDir string) *runner.Config {
providersDir := filepath.Join(configDir, "providers")
logFilePath := filepath.Join(configDir, "runner.log")

Expand All @@ -277,7 +284,7 @@ func getLocalRunnerConfig(configDir string) *runner.Config {
}
}

func awaitLocalRunnerStarted() error {
func AwaitLocalRunnerStarted() error {
server := server.GetInstance(nil)
startTime := time.Now()

Expand All @@ -302,7 +309,7 @@ func awaitLocalRunnerStarted() error {
return nil
}

func handleDisabledLocalRunner() error {
func HandleDisabledLocalRunner() error {
runnerService := server.GetInstance(nil).RunnerService

_, err := runnerService.GetRunner(context.Background(), bootstrap.LOCAL_RUNNER_ID)
Expand Down
5 changes: 5 additions & 0 deletions pkg/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const RUNNER_METADATA_UPDATE_INTERVAL = 2 * time.Second
type IRunner interface {
Start(ctx context.Context) error
CheckAndRunJobs(ctx context.Context) error
Purge(ctx context.Context) error
}

type RunnerConfig struct {
Expand Down Expand Up @@ -173,6 +174,10 @@ func (r *Runner) CheckAndRunJobs(ctx context.Context) error {
return nil
}

func (r *Runner) Purge(ctx context.Context) error {
return r.providerManager.Purge()
}

func (r *Runner) runJob(ctx context.Context, j *models.Job) error {
var job jobs.IJob

Expand Down
68 changes: 57 additions & 11 deletions pkg/server/purge.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,44 @@ func (s *Server) Purge(ctx context.Context, force bool) []error {
}
}

fmt.Println("Deleting all workspaces...")

workspaces, err := s.WorkspaceService.ListWorkspaces(ctx, services.WorkspaceRetrievalParams{})
if err != nil {
s.trackPurgeError(ctx, force, err)
if !force {
return []error{err}
}
}

if err == nil {
for _, workspace := range workspaces {
err := s.WorkspaceService.RemoveWorkspace(ctx, workspace.Id)
if err != nil {
s.trackPurgeError(ctx, force, err)
if !force {
return []error{err}
} else {
fmt.Printf("Failed to delete %s: %v\n", workspace.Name, err)
}
} else {
fmt.Printf("Workspace %s deleted\n", workspace.Name)
}
}
} else {
fmt.Printf("Failed to list workspaces: %v\n", err)
}

err = s.WorkspaceService.AwaitEmptyList(ctx, time.Minute)
if err != nil {
s.trackPurgeError(ctx, force, err)
if !force {
return []error{err}
} else {
fmt.Printf("Failed to await empty workspace list: %v\n", err)
}
}

fmt.Println("Deleting all targets...")

targets, err := s.TargetService.ListTargets(ctx, nil, services.TargetRetrievalParams{})
Expand Down Expand Up @@ -57,17 +95,25 @@ func (s *Server) Purge(ctx context.Context, force bool) []error {
fmt.Printf("Failed to list targets: %v\n", err)
}

// FIXME: todo
// fmt.Println("Purging providers...")
// err = s.ProviderManager.Purge()
// if err != nil {
// s.trackPurgeError(ctx, force, err)
// if !force {
// return []error{err}
// } else {
// fmt.Printf("Failed to purge providers: %v\n", err)
// }
// }
err = s.TargetService.AwaitEmptyList(ctx, time.Minute)
if err != nil {
s.trackPurgeError(ctx, force, err)
if !force {
return []error{err}
} else {
fmt.Printf("Failed to await empty target list: %v\n", err)
}
}

err = s.TargetService.AwaitEmptyList(ctx, time.Minute)
if err != nil {
s.trackPurgeError(ctx, force, err)
if !force {
return []error{err}
} else {
fmt.Printf("Failed to await empty target list: %v\n", err)
}
}

fmt.Println("Purging builds...")
errs := s.BuildService.Delete(ctx, nil, force)
Expand Down
25 changes: 25 additions & 0 deletions pkg/server/targets/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ package targets

import (
"context"
"errors"
"io"
"time"

"github.com/daytonaio/daytona/pkg/logs"
"github.com/daytonaio/daytona/pkg/models"
Expand Down Expand Up @@ -88,3 +90,26 @@ func (s *TargetService) UpdateTargetProviderMetadata(ctx context.Context, target
tg.ProviderMetadata = &metadata
return s.targetStore.Save(ctx, tg)
}

func (s *TargetService) AwaitEmptyList(ctx context.Context, waitTime time.Duration) error {
timeout := time.NewTimer(waitTime)
defer timeout.Stop()

for {
select {
case <-timeout.C:
return errors.New("awaiting empty build list timed out")
default:
targets, err := s.ListTargets(ctx, nil, services.TargetRetrievalParams{})
if err != nil {
return err
}

if len(targets) == 0 {
return nil
}

time.Sleep(time.Second)
}
}
}
25 changes: 25 additions & 0 deletions pkg/server/workspaces/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ package workspaces

import (
"context"
"errors"
"io"
"time"

"github.com/daytonaio/daytona/pkg/gitprovider"
"github.com/daytonaio/daytona/pkg/logs"
Expand Down Expand Up @@ -103,3 +105,26 @@ func (s *WorkspaceService) UpdateWorkspaceProviderMetadata(ctx context.Context,
w.ProviderMetadata = &metadata
return s.workspaceStore.Save(ctx, w)
}

func (s *WorkspaceService) AwaitEmptyList(ctx context.Context, waitTime time.Duration) error {
timeout := time.NewTimer(waitTime)
defer timeout.Stop()

for {
select {
case <-timeout.C:
return errors.New("awaiting empty build list timed out")
default:
workspaces, err := s.ListWorkspaces(ctx, services.WorkspaceRetrievalParams{})
if err != nil {
return err
}

if len(workspaces) == 0 {
return nil
}

time.Sleep(time.Second)
}
}
}
2 changes: 2 additions & 0 deletions pkg/services/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"io"
"time"

"github.com/daytonaio/daytona/pkg/models"
"github.com/daytonaio/daytona/pkg/stores"
Expand All @@ -24,6 +25,7 @@ type ITargetService interface {
RemoveTarget(ctx context.Context, targetId string) error
ForceRemoveTarget(ctx context.Context, targetId string) error
HandleSuccessfulCreation(ctx context.Context, targetId string) error
AwaitEmptyList(ctx context.Context, waitTime time.Duration) error

GetTargetLogReader(ctx context.Context, targetId string) (io.Reader, error)
GetTargetLogWriter(ctx context.Context, targetId string) (io.WriteCloser, error)
Expand Down
2 changes: 2 additions & 0 deletions pkg/services/workspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"io"
"time"

"github.com/daytonaio/daytona/pkg/gitprovider"
"github.com/daytonaio/daytona/pkg/models"
Expand All @@ -21,6 +22,7 @@ type IWorkspaceService interface {
RemoveWorkspace(ctx context.Context, workspaceId string) error
ForceRemoveWorkspace(ctx context.Context, workspaceId string) error
UpdateWorkspaceProviderMetadata(ctx context.Context, workspaceId, metadata string) error
AwaitEmptyList(ctx context.Context, waitTime time.Duration) error

GetWorkspaceLogReader(ctx context.Context, workspaceId string) (io.Reader, error)
GetWorkspaceLogWriter(ctx context.Context, workspaceId string) (io.WriteCloser, error)
Expand Down