diff --git a/bench/micro/walkspeed/go.mod b/bench/micro/walkspeed/go.mod new file mode 100644 index 00000000000..cc5605b3bed --- /dev/null +++ b/bench/micro/walkspeed/go.mod @@ -0,0 +1,5 @@ +module github.com/NVIDIA/aistore/bench/walk + +go 1.24.0 + +require github.com/karrick/godirwalk v1.17.0 diff --git a/bench/micro/walkspeed/go.sum b/bench/micro/walkspeed/go.sum new file mode 100644 index 00000000000..ec4a9a1683e --- /dev/null +++ b/bench/micro/walkspeed/go.sum @@ -0,0 +1,2 @@ +github.com/karrick/godirwalk v1.17.0 h1:b4kY7nqDdioR/6qnbHQyDvmA17u5G1cZ6J+CZXwSWoI= +github.com/karrick/godirwalk v1.17.0/go.mod h1:j4mkqPuvaLI8mp1DroR3P6ad7cyYd4c1qeJ3RV7ULlk= diff --git a/bench/micro/walkspeed/walk_test.go b/bench/micro/walkspeed/walk_test.go new file mode 100644 index 00000000000..88c7dd57bbc --- /dev/null +++ b/bench/micro/walkspeed/walk_test.go @@ -0,0 +1,181 @@ +// Package walkspeed compares godirwalk and filepath.WalkDir walking performance +/* + * Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved. + */ +package walkspeed + +import ( + iofs "io/fs" + "os" + "path/filepath" + "strconv" + "sync/atomic" + "testing" + + "github.com/karrick/godirwalk" +) + +// Examples: +// +// go test -bench=. -benchtime=10s -benchmem + +const ( + numFilesDflt = 500_000 // NOTE: 50K files is way too small + scratchDftl = 64 * 1024 + depthDflt = 16 +) + +// setupLargeDir creates a flat directory with numFiles files. +func setupLargeDir(b *testing.B, numFiles int) string { + dir := "large_dir" + if err := os.RemoveAll(dir); err != nil { + b.Fatalf("Failed to remove large_dir: %v", err) + } + if err := os.Mkdir(dir, 0o755); err != nil { + b.Fatalf("Failed to create large_dir: %v", err) + } + for i := range numFiles { + fname := filepath.Join(dir, "file_"+strconv.Itoa(i)) + if err := os.WriteFile(fname, []byte{}, 0o600); err != nil { + b.Fatalf("Failed to create file %s: %v", fname, err) + } + } + return dir +} + +// setupDeepTree creates a nested directory with numFiles files across depth levels. +func setupDeepTree(b *testing.B, numFiles, depth int) string { + dir := "deep_tree" + if err := os.RemoveAll(dir); err != nil { + b.Fatalf("Failed to remove deep_tree: %v", err) + } + if err := os.Mkdir(dir, 0o755); err != nil { + b.Fatalf("Failed to create deep_tree: %v", err) + } + + filesPerLevel := numFiles / depth + remainder := numFiles % depth + fileCount := 0 + currentDir := dir + + for d := 0; d < depth && fileCount < numFiles; d++ { + levelFiles := filesPerLevel + if d < remainder { + levelFiles++ + } + + for f := 0; f < levelFiles && fileCount < numFiles; f++ { + fname := filepath.Join(currentDir, "file_"+strconv.Itoa(fileCount)) + if err := os.WriteFile(fname, []byte{}, 0o600); err != nil { + b.Fatalf("Failed to create file %s: %v", fname, err) + } + fileCount++ + } + + if fileCount < numFiles { + currentDir = filepath.Join(currentDir, "subdir_"+strconv.Itoa(d)) + if err := os.Mkdir(currentDir, 0o755); err != nil { + b.Fatalf("Failed to create directory %s: %v", currentDir, err) + } + } + } + return dir +} + +// BenchmarkGodirwalkLargeDir benchmarks godirwalk on a flat directory. +func BenchmarkGodirwalkLargeDir(b *testing.B) { + dir := setupLargeDir(b, numFilesDflt) + defer os.RemoveAll(dir) + + b.ResetTimer() + var count int64 + b.RunParallel(func(pb *testing.PB) { + scratch := make([]byte, scratchDftl) + for pb.Next() { + err := godirwalk.Walk(dir, &godirwalk.Options{ + Callback: func(_ string, de *godirwalk.Dirent) error { + if de.IsRegular() { + atomic.AddInt64(&count, 1) + } + return nil + }, + Unsorted: false, + ScratchBuffer: scratch, + }) + if err != nil { + b.Fatalf("godirwalk error: %v", err) + } + } + }) +} + +// BenchmarkWalkDirLargeDir benchmarks filepath.WalkDir on a flat directory. +func BenchmarkWalkDirLargeDir(b *testing.B) { + dir := setupLargeDir(b, numFilesDflt) + defer os.RemoveAll(dir) + + b.ResetTimer() + var count int64 + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + err := filepath.WalkDir(dir, func(_ string, d iofs.DirEntry, _ error) error { + if d.Type().IsRegular() { + atomic.AddInt64(&count, 1) + } + return nil + }) + if err != nil { + b.Fatalf("WalkDir error: %v", err) + } + } + }) +} + +// BenchmarkGodirwalkDeepTree benchmarks godirwalk on a nested directory. +func BenchmarkGodirwalkDeepTree(b *testing.B) { + dir := setupDeepTree(b, numFilesDflt, depthDflt) + defer os.RemoveAll(dir) + + b.ResetTimer() + var count int64 + b.RunParallel(func(pb *testing.PB) { + scratch := make([]byte, scratchDftl) + for pb.Next() { + err := godirwalk.Walk(dir, &godirwalk.Options{ + Callback: func(_ string, de *godirwalk.Dirent) error { + if de.IsRegular() { + atomic.AddInt64(&count, 1) + } + return nil + }, + Unsorted: false, + ScratchBuffer: scratch, + }) + if err != nil { + b.Fatalf("godirwalk error: %v", err) + } + } + }) +} + +// BenchmarkWalkDirDeepTree benchmarks filepath.WalkDir on a nested directory. +func BenchmarkWalkDirDeepTree(b *testing.B) { + dir := setupDeepTree(b, numFilesDflt, depthDflt) + defer os.RemoveAll(dir) + + b.ResetTimer() + var count int64 + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + err := filepath.WalkDir(dir, func(_ string, d iofs.DirEntry, _ error) error { + if d.Type().IsRegular() { + atomic.AddInt64(&count, 1) + } + return nil + }) + if err != nil { + b.Fatalf("WalkDir error: %v", err) + } + } + }) +} diff --git a/fs/walk.go b/fs/walk.go index bb413e0e2b1..ba7473b30f9 100644 --- a/fs/walk.go +++ b/fs/walk.go @@ -5,19 +5,15 @@ package fs import ( - "context" iofs "io/fs" "os" "path/filepath" - "sort" "github.com/NVIDIA/aistore/cmn" "github.com/NVIDIA/aistore/cmn/atomic" "github.com/NVIDIA/aistore/cmn/cos" "github.com/NVIDIA/aistore/cmn/debug" "github.com/NVIDIA/aistore/cmn/feat" - "github.com/NVIDIA/aistore/cmn/nlog" - "github.com/NVIDIA/aistore/memsys" "github.com/karrick/godirwalk" ) @@ -45,7 +41,7 @@ type ( Dir string Prefix string CTs []string - Sorted bool + Sorted bool // Ignored when using stdlib implementation (always sorted). } errCallbackWrapper struct { @@ -57,62 +53,31 @@ type ( dir string // root pathname errCallbackWrapper } -) -// PathErrToAction is a default error callback for fast godirwalk.Walk. -// The idea is that on any error that was produced during the walk we dispatch -// this handler and act upon the error. -// -// By default it halts on bucket level errors because there is no option to -// continue walking if there is a problem with a bucket. Also we count "soft" -// errors and abort if we reach certain amount of them. -func (ew *errCallbackWrapper) PathErrToAction(_ string, err error) godirwalk.ErrorAction { - if cmn.IsErrBucketLevel(err) { - return godirwalk.Halt - } - if ew.counter.Load() > errThreshold { - return godirwalk.Halt + walker interface { + walk(fqn []string, opts *WalkOpts) error + mpathChildren(opts *WalkOpts) ([]string, error) } - if cmn.IsErrObjLevel(err) { - ew.counter.Inc() - return godirwalk.SkipNode - } - return godirwalk.Halt -} - -// godirwalk is used by default. To switch to standard filepath.Walk: -// 1. Rewrite `callback` as follows: -// -// func (opts *WalkOpts) callback(fqn string, de os.FileInfo, err error) error { -// if err != nil { -// if err := cmn.PathWalkErr(err); err != nil { -// return err -// } -// return nil -// } -// return opts.callback(fqn, de) -// } -// -// 2. Replace `Walk` body with one-liner: -// return filepath.Walk(fqn, opts.callback) -// -// No more changes required. -// -// NOTE: for standard filepath.Walk option 'Sorted' is ignored +) -// interface guard -var _ DirEntry = (*godirwalk.Dirent)(nil) +// ///////////////////////////////// +// HARDCODED TO GODIRWALK IMPLEMENTATION FOR BETTER MEMORY EFFICIENCY (15-30%) +// TO SWITCH TO STANDARD LIBRARY VERSION: +// REPLACE WITH: var useWalker walker = &stdlib{} +// IN wd.wcb() REPLACE PathErrToAction and godirwalk.Halt +// WITH PathWalkError and err +// ///////////////////////////////// +var useWalker walker = &godir{} -func (opts *WalkOpts) callback(fqn string, de *godirwalk.Dirent) error { - return opts.Callback(fqn, de) +func Walk(opts *WalkOpts) error { + fqns, err := resolveFQNs(opts) + if err != nil { + return err + } + return useWalker.walk(fqns, opts) } -func Walk(opts *WalkOpts) error { - var ( - fqns []string - err error - ew = &errCallbackWrapper{} - ) +func resolveFQNs(opts *WalkOpts) (fqns []string, err error) { switch { case opts.Dir != "": debug.Assert(opts.Prefix == "") @@ -131,41 +96,8 @@ func Walk(opts *WalkOpts) error { default: // all buckets debug.Assert(len(opts.CTs) > 0) fqns, err = allMpathCTpaths(opts) - if len(fqns) == 0 || err != nil { - return err - } } - scratch, slab := memsys.PageMM().AllocSize(memsys.DefaultBufSize) - gOpts := &godirwalk.Options{ - ErrorCallback: ew.PathErrToAction, // "halts the walk" or "skips the node" (detailed comment above) - Callback: opts.callback, - Unsorted: !opts.Sorted, - ScratchBuffer: scratch, - } - for _, fqn := range fqns { - err1 := godirwalk.Walk(fqn, gOpts) - if err1 == nil || os.IsNotExist(err1) { - continue - } - if cmn.IsErrMpathNotFound(err1) { - nlog.Errorln(err1) // mountpath is getting detached or disabled - continue - } - if cmn.IsErrAborted(err1) { - // Errors different from cmn.ErrAborted should not be overwritten - // by cmn.ErrAborted. Assign err = err1 only when there wasn't any other error - if err == nil { - err = err1 - } - continue - } - if err1 != context.Canceled && !cmn.IsErrObjNought(err1) { - nlog.Errorln(err) - } - err = err1 - } - slab.Free(scratch) - return err + return } func _join(bdir, prefix string) string { @@ -188,7 +120,7 @@ func _join(bdir, prefix string) string { } func allMpathCTpaths(opts *WalkOpts) (fqns []string, err error) { - children, erc := mpathChildren(opts) + children, erc := useWalker.mpathChildren(opts) if erc != nil { return nil, erc } @@ -216,7 +148,7 @@ func allMpathCTpaths(opts *WalkOpts) (fqns []string, err error) { } func AllMpathBcks(opts *WalkOpts) (bcks []cmn.Bck, err error) { - children, erc := mpathChildren(opts) + children, erc := useWalker.mpathChildren(opts) if erc != nil { return nil, erc } @@ -231,25 +163,6 @@ func AllMpathBcks(opts *WalkOpts) (bcks []cmn.Bck, err error) { return } -func mpathChildren(opts *WalkOpts) (children []string, err error) { - var ( - fqn = opts.Mi.MakePathBck(&opts.Bck) - scratch, slab = memsys.PageMM().AllocSize(memsys.DefaultBufSize) - ) - children, err = godirwalk.ReadDirnames(fqn, scratch) - slab.Free(scratch) - if err != nil { - if os.IsNotExist(err) { - err = nil - } - return - } - if opts.Sorted { - sort.Strings(children) - } - return -} - //////////////////// // WalkDir & walkDirWrapper - non-recursive walk //////////////////// @@ -261,10 +174,11 @@ func WalkDir(dir string, ucb func(string, DirEntry) error) error { return filepath.WalkDir(dir, wd.wcb) } -// wraps around user callback to implement default error handling and skipping +// wraps around user callback to implement default error handling and skipping. func (wd *walkDirWrapper) wcb(path string, de iofs.DirEntry, err error) error { if err != nil { - // Walk and WalkDir share the same error-processing logic (hence, godirwalk enum) + // Walk and WalkDir share the same error-processing logic + // IF USING THE STANDARD LIBRARY: REPLACE WITH PathWalkError AND err if path != wd.dir && wd.PathErrToAction(path, err) != godirwalk.Halt { err = nil } diff --git a/fs/walk_godir.go b/fs/walk_godir.go new file mode 100644 index 00000000000..6e973884376 --- /dev/null +++ b/fs/walk_godir.go @@ -0,0 +1,103 @@ +// Package fs provides mountpath and FQN abstractions and methods to resolve/map stored content +/* + * Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved. + */ +package fs + +import ( + "context" + "os" + "sort" + + "github.com/NVIDIA/aistore/cmn" + "github.com/NVIDIA/aistore/cmn/nlog" + "github.com/NVIDIA/aistore/memsys" + + "github.com/karrick/godirwalk" +) + +type godir struct{} + +// interface guard +var _ DirEntry = (*godirwalk.Dirent)(nil) + +// PathErrToAction is a default error callback for fast godirwalk.Walk. +// The idea is that on any error that was produced during the walk we dispatch +// this handler and act upon the error. +// +// By default it halts on bucket level errors because there is no option to +// continue walking if there is a problem with a bucket. Also we count "soft" +// errors and abort if we reach certain amount of them. +func (ew *errCallbackWrapper) PathErrToAction(_ string, err error) godirwalk.ErrorAction { + if cmn.IsErrBucketLevel(err) { + return godirwalk.Halt + } + if ew.counter.Load() > errThreshold { + return godirwalk.Halt + } + if cmn.IsErrObjLevel(err) { + ew.counter.Inc() + return godirwalk.SkipNode + } + return godirwalk.Halt +} + +func (godir) walk(fqns []string, opts *WalkOpts) error { + var ( + err error + ew = &errCallbackWrapper{} + ) + scratch, slab := memsys.PageMM().AllocSize(memsys.DefaultBufSize) + gOpts := &godirwalk.Options{ + ErrorCallback: ew.PathErrToAction, // "halts the walk" or "skips the node" (detailed comment above) + Callback: func(fqn string, de *godirwalk.Dirent) error { + return opts.Callback(fqn, de) + }, + Unsorted: !opts.Sorted, + ScratchBuffer: scratch, + } + for _, fqn := range fqns { + err1 := godirwalk.Walk(fqn, gOpts) + if err1 == nil || os.IsNotExist(err1) { + continue + } + if cmn.IsErrMpathNotFound(err1) { + nlog.Errorln(err1) // mountpath is getting detached or disabled + continue + } + if cmn.IsErrAborted(err1) { + // Errors different from cmn.ErrAborted should not be overwritten + // by cmn.ErrAborted. Assign err = err1 only when there wasn't any other error + if err == nil { + err = err1 + } + continue + } + if err1 != context.Canceled && !cmn.IsErrObjNought(err1) { + nlog.Errorln(err1) + } + err = err1 + } + slab.Free(scratch) + return err +} + +func (godir) mpathChildren(opts *WalkOpts) (children []string, err error) { + var ( + fqn = opts.Mi.MakePathBck(&opts.Bck) + scratch, slab = memsys.PageMM().AllocSize(memsys.DefaultBufSize) + ) + + children, err = godirwalk.ReadDirnames(fqn, scratch) + slab.Free(scratch) + if err != nil { + if os.IsNotExist(err) { + err = nil + } + return + } + if opts.Sorted { + sort.Strings(children) + } + return +} diff --git a/fs/walk_stdlib.go b/fs/walk_stdlib.go new file mode 100644 index 00000000000..1a3bf86ea2b --- /dev/null +++ b/fs/walk_stdlib.go @@ -0,0 +1,94 @@ +// Package fs provides mountpath and FQN abstractions and methods to resolve/map stored content +/* + * Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved. + */ +package fs + +import ( + "context" + iofs "io/fs" + "os" + "path/filepath" + + "github.com/NVIDIA/aistore/cmn" + "github.com/NVIDIA/aistore/cmn/nlog" +) + +//nolint:unused // alternative implementation; unused because godirwalk is hardcoded in walk.go +type stdlib struct{} + +// PathWalkError is a default error callback for filepath.WalkDir +// +// By default it halts on bucket level errors because there is no option to +// continue walking if there is a problem with a bucket. Also we count "soft" +// errors and abort if we reach certain amount of them. +func (ew *errCallbackWrapper) PathWalkError(_ string, err error) error { + if cmn.IsErrBucketLevel(err) { + return err + } + if ew.counter.Load() > errThreshold { + return err + } + if cmn.IsErrObjLevel(err) { + ew.counter.Inc() + return nil + } + return err +} + +//nolint:unused // alternative implementation; unused because godirwalk is hardcoded in walk.go +func (stdlib) walk(fqns []string, opts *WalkOpts) error { + var ( + err error + ew = &errCallbackWrapper{} + ) + walkFn := func(path string, de iofs.DirEntry, err error) error { + if err != nil { + if path != "" && ew.PathWalkError(path, err) != err { + return nil + } + return err + } + return opts.Callback(path, de) + } + for _, fqn := range fqns { + err1 := filepath.WalkDir(fqn, walkFn) + if err1 == nil || os.IsNotExist(err1) { + continue + } + if cmn.IsErrMpathNotFound(err1) { + nlog.Errorln(err1) + continue + } + if cmn.IsErrAborted(err1) { + if err == nil { + err = err1 + } + continue + } + if err1 != context.Canceled && !cmn.IsErrObjNought(err1) { + nlog.Errorln(err1) + } + err = err1 + } + return err +} + +//nolint:unused // alternative implementation; unused because godirwalk is hardcoded in walk.go +func (stdlib) mpathChildren(opts *WalkOpts) (children []string, err error) { + fqn := opts.Mi.MakePathBck(&opts.Bck) + // os.ReadDir returns the entries in lexical order so additional + // sorting is not needed. + entries, err := os.ReadDir(fqn) + if err != nil { + if os.IsNotExist(err) { + err = nil + } + return + } + children = make([]string, 0, len(entries)) + for _, entry := range entries { + children = append(children, entry.Name()) + } + return +}