Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create missing resources in sync, allow blob to overlap #2894

Merged
merged 16 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (raw rawCopyCmdArgs) stripTrailingWildcardOnRemoteSource(location common.Lo
gURLParts := common.NewGenericResourceURLParts(*resourceURL, location)

if err != nil {
err = fmt.Errorf("failed to parse url %s; %s", result, err)
err = fmt.Errorf("failed to parse url %s; %w", result, err)
return
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/copyEnumeratorInit.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (cca *CookedCopyCmdArgs) initEnumerator(jobPartOrder common.CopyJobPartOrde
containers, err := acctTraverser.listContainers()

if err != nil {
return nil, fmt.Errorf("failed to list containers: %s", err)
return nil, fmt.Errorf("failed to list containers: %w", err)
}

// Resolve all container names up front.
Expand Down
4 changes: 2 additions & 2 deletions cmd/jobsResume.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func (rca resumeCmdArgs) getSourceAndDestinationServiceClients(
jobID, err := common.ParseJobID(rca.jobID)
if err != nil {
// Error for invalid JobId format
return nil, nil, fmt.Errorf("error parsing the jobId %s. Failed with error %s", rca.jobID, err.Error())
return nil, nil, fmt.Errorf("error parsing the jobId %s. Failed with error %w", rca.jobID, err)
}

// But we don't want to supply a reauth token if we're not using OAuth. That could cause problems if say, a SAS is invalid.
Expand Down Expand Up @@ -358,7 +358,7 @@ func (rca resumeCmdArgs) process() error {
jobID, err := common.ParseJobID(rca.jobID)
if err != nil {
// If parsing gives an error, hence it is not a valid JobId format
return fmt.Errorf("error parsing the jobId %s. Failed with error %s", rca.jobID, err.Error())
return fmt.Errorf("error parsing the jobId %s. Failed with error %w", rca.jobID, err)
}

// if no logging, set this empty so that we don't display the log location
Expand Down
4 changes: 4 additions & 0 deletions cmd/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type rawSyncCmdArgs struct {
backupMode bool
putMd5 bool
md5ValidationOption string
includeRoot bool
// this flag indicates the user agreement with respect to deleting the extra files at the destination
// which do not exists at source. With this flag turned on/off, users will not be asked for permission.
// otherwise the user is prompted to make a decision
Expand Down Expand Up @@ -385,6 +386,7 @@ func (raw *rawSyncCmdArgs) cook() (cookedSyncCmdArgs, error) {
cooked.deleteDestinationFileIfNecessary = raw.deleteDestinationFileIfNecessary

cooked.includeDirectoryStubs = raw.includeDirectoryStubs
cooked.includeRoot = raw.includeRoot

return cooked, nil
}
Expand Down Expand Up @@ -436,6 +438,7 @@ type cookedSyncCmdArgs struct {
forceIfReadOnly bool
backupMode bool
includeDirectoryStubs bool
includeRoot bool

// commandString hold the user given command which is logged to the Job log file
commandString string
Expand Down Expand Up @@ -870,6 +873,7 @@ func init() {
syncCmd.PersistentFlags().StringVar(&raw.trailingDot, "trailing-dot", "", "'Enable' by default to treat file share related operations in a safe manner. Available options: "+strings.Join(common.ValidTrailingDotOptions(), ", ")+". "+
"Choose 'Disable' to go back to legacy (potentially unsafe) treatment of trailing dot files where the file service will trim any trailing dots in paths. This can result in potential data corruption if the transfer contains two paths that differ only by a trailing dot (ex: mypath and mypath.). If this flag is set to 'Disable' and AzCopy encounters a trailing dot file, it will warn customers in the scanning log but will not attempt to abort the operation."+
"If the destination does not support trailing dot files (Windows or Blob Storage), AzCopy will fail if the trailing dot file is the root of the transfer and skip any trailing dot paths encountered during enumeration.")
syncCmd.PersistentFlags().BoolVar(&raw.includeRoot, "include-root", false, "Disabled by default. Enable to include the root directory's properties when persisting properties such as SMB or HNS ACLs")

syncCmd.PersistentFlags().StringVar(&raw.compareHash, "compare-hash", "None", "Inform sync to rely on hashes as an alternative to LMT. Missing hashes at a remote source will throw an error. (None, MD5) Default: None")
syncCmd.PersistentFlags().StringVar(&common.LocalHashDir, "hash-meta-dir", "", "When using `--local-hash-storage-mode=HiddenFiles` you can specify an alternate directory to store hash metadata files in (as opposed to next to the related files in the source)")
Expand Down
58 changes: 52 additions & 6 deletions cmd/syncEnumerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import (
"context"
"errors"
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/file"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/fileerror"
"runtime"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -93,11 +97,51 @@ func (cca *cookedSyncCmdArgs) initEnumerator(ctx context.Context) (enumerator *s

// verify that the traversers are targeting the same type of resources
sourceIsDir, _ := sourceTraverser.IsDirectory(true)
destIsDir, _ := destinationTraverser.IsDirectory(true)
if sourceIsDir != destIsDir {
return nil, errors.New("trying to sync between different resource types (either file <-> directory or directory <-> file) which is not allowed." +
"sync must happen between source and destination of the same type, e.g. either file <-> file or directory <-> directory." +
"To make sure target is handled as a directory, add a trailing '/' to the target.")
destIsDir, err := destinationTraverser.IsDirectory(true)

var resourceMismatchError = errors.New("trying to sync between different resource types (either file <-> directory or directory <-> file) which is not allowed." +
adreed-msft marked this conversation as resolved.
Show resolved Hide resolved
"sync must happen between source and destination of the same type, e.g. either file <-> file or directory <-> directory." +
"To make sure target is handled as a directory, add a trailing '/' to the target.")

if cca.fromTo.To() == common.ELocation.Blob() || cca.fromTo.To() == common.ELocation.BlobFS() {
adreed-msft marked this conversation as resolved.
Show resolved Hide resolved

/*
This is an "opinionated" choice. Blob has no formal understanding of directories. As such, we don't care about if it's a directory.

If they sync a lone blob, they sync a lone blob.
If it lands on a directory stub, FNS is OK with this, but HNS isn't. It'll fail in that case. This is still semantically valid in FNS.
If they sync a prefix of blobs, they sync a prefix of blobs. This will always succeed, and won't break any semantics about FNS.

So my (Adele's) opinion moving forward is:
- Hierarchies don't exist in flat namespaces.
- Instead, there are objects and prefixes.
- Stubs exist to clarify prefixes.
- Stubs do not exist to enforce naming conventions.
- We are a tool, tools can be misused. It is up to the customer to validate everything they intend to do.
*/

if bloberror.HasCode(err, bloberror.ContainerNotFound) { // We can resolve a missing container. Let's create it.
bt := destinationTraverser.(*blobTraverser)
sc := bt.serviceClient // it being a blob traverser is a relatively safe assumption, because
bUrlParts, _ := blob.ParseURL(bt.rawURL) // it should totally have succeeded by now anyway
_, err = sc.NewContainerClient(bUrlParts.ContainerName).Create(ctx, nil) // If it doesn't work out, this will surely bubble up later anyway. It won't be long.
if err != nil {
glcm.Warn(fmt.Sprintf("Failed to create the missing destination container: %v", err))
}
// At this point, we'll let the destination be written to with the original resource type.
}
} else if err != nil && fileerror.HasCode(err, fileerror.ShareNotFound) { // We can resolve a missing share. Let's create it.
ft := destinationTraverser.(*fileTraverser)
sc := ft.serviceClient
fUrlParts, _ := file.ParseURL(ft.rawURL) // this should have succeeded by now.
_, err = sc.NewShareClient(fUrlParts.ShareName).Create(ctx, nil) // If it doesn't work out, this will surely bubble up later anyway. It won't be long.
if err != nil {
glcm.Warn(fmt.Sprintf("Failed to create the missing destination container: %v", err))
}
// At this point, we'll let the destination be written to with the original resource type, as it will get created in this transfer.
} else if err == nil && sourceIsDir != destIsDir {
// If the destination exists, and isn't blob though, we have to match resource types.
return nil, resourceMismatchError
}

// set up the filters in the right order
Expand Down Expand Up @@ -129,7 +173,8 @@ func (cca *cookedSyncCmdArgs) initEnumerator(ctx context.Context) (enumerator *s
}

// decide our folder transfer strategy
fpo, folderMessage := NewFolderPropertyOption(cca.fromTo, cca.recursive, true, filters, cca.preserveSMBInfo, cca.preservePermissions.IsTruthy(), false, strings.EqualFold(cca.destination.Value, common.Dev_Null), cca.includeDirectoryStubs) // sync always acts like stripTopDir=true
// sync always acts like stripTopDir=true, but if we intend to persist the root, we must tell NewFolderPropertyOption stripTopDir=false.
fpo, folderMessage := NewFolderPropertyOption(cca.fromTo, cca.recursive, !cca.includeRoot, filters, cca.preserveSMBInfo, cca.preservePermissions.IsTruthy(), false, strings.EqualFold(cca.destination.Value, common.Dev_Null), cca.includeDirectoryStubs)
if !cca.dryrunMode {
glcm.Info(folderMessage)
}
Expand Down Expand Up @@ -328,6 +373,7 @@ func IsDestinationCaseInsensitive(fromTo common.FromTo) bool {
} else {
return false
}

}

func quitIfInSync(transferJobInitiated, anyDestinationFileDeleted bool, cca *cookedSyncCmdArgs) {
Expand Down
22 changes: 20 additions & 2 deletions cmd/zc_enumerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ import (
"context"
"errors"
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/datalakeerror"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/fileerror"
"net/url"
"os"
"path/filepath"
"runtime"
"strings"
Expand Down Expand Up @@ -291,7 +295,7 @@ func newStoredObject(morpher objectMorpher, name string, relativePath string, en
// pass each StoredObject to the given objectProcessor if it passes all the filters
type ResourceTraverser interface {
Traverse(preprocessor objectMorpher, processor objectProcessor, filters []ObjectFilter) error
IsDirectory(isSource bool) (bool, error)
IsDirectory(isSource bool) (isDirectory bool, err error)
// isDirectory has an isSource flag for a single exception to blob.
// Blob should ONLY check remote if it's a source.
// On destinations, because blobs and virtual directories can share names, we should support placing in both ways.
Expand Down Expand Up @@ -739,17 +743,31 @@ func newSyncEnumerator(primaryTraverser, secondaryTraverser ResourceTraverser, i
}

func (e *syncEnumerator) enumerate() (err error) {
handleAcceptableErrors := func() {
switch {
case err == nil: // don't do any error checking
case fileerror.HasCode(err, fileerror.ResourceNotFound),
datalakeerror.HasCode(err, datalakeerror.ResourceNotFound),
bloberror.HasCode(err, bloberror.BlobNotFound),
strings.Contains(err.Error(), "The system cannot find the"),
errors.Is(err, os.ErrNotExist):
err = nil // Oh no! Oh well. We'll create it later.
}
}

// enumerate the primary resource and build lookup map
err = e.primaryTraverser.Traverse(noPreProccessor, e.objectIndexer.store, e.filters)
handleAcceptableErrors()
if err != nil {
return
return err
}

// enumerate the secondary resource and as the objects pass the filters
// they will be passed to the object comparator
// which can process given objects based on what's already indexed
// note: transferring can start while scanning is ongoing
err = e.secondaryTraverser.Traverse(noPreProccessor, e.objectComparator, e.filters)
handleAcceptableErrors()
if err != nil {
return
}
Expand Down
36 changes: 24 additions & 12 deletions cmd/zc_traverser_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,34 @@ type blobTraverser struct {
isDFS bool
}

func (t *blobTraverser) IsDirectory(isSource bool) (bool, error) {
var NonErrorDirectoryStubOverlappable = errors.New("The directory stub exists, and can overlap.")

func (t *blobTraverser) IsDirectory(isSource bool) (isDirectory bool, err error) {
isDirDirect := copyHandlerUtil{}.urlIsContainerOrVirtualDirectory(t.rawURL)

blobURLParts, err := blob.ParseURL(t.rawURL)
if err != nil {
return false, err
}

// Skip the single blob check if we're checking a destination.
// This is an individual exception for blob because blob supports virtual directories and blobs sharing the same name.
// On HNS accounts, we would still perform this test. The user may have provided directory name without path-separator
if isDirDirect { // a container or a path ending in '/' is always directory
return true, nil
if blobURLParts.ContainerName != "" && blobURLParts.BlobName == "" {
// If it's a container, let's ensure that container exists. Listing is a safe assumption to be valid, because how else would we enumerate?
adreed-msft marked this conversation as resolved.
Show resolved Hide resolved
containerClient := t.serviceClient.NewContainerClient(blobURLParts.ContainerName)
p := containerClient.NewListBlobsFlatPager(nil)
_, err = p.NextPage(t.ctx)

if bloberror.HasCode(err, bloberror.AuthorizationPermissionMismatch) {
// Maybe we don't have the ability to list? Can we get container properties as a fallback?
_, propErr := containerClient.GetProperties(t.ctx, nil)
err = common.Iff(propErr == nil, nil, err)
}
}

return true, err
}
if !isSource && !t.isDFS {
// destination on blob endpoint. If it does not end in '/' it is a file
Expand All @@ -98,10 +118,6 @@ func (t *blobTraverser) IsDirectory(isSource bool) (bool, error) {
return isDirStub, nil
}

blobURLParts, err := blob.ParseURL(t.rawURL)
if err != nil {
return false, err
}
containerClient := t.serviceClient.NewContainerClient(blobURLParts.ContainerName)
searchPrefix := strings.TrimSuffix(blobURLParts.BlobName, common.AZCOPY_PATH_SEPARATOR_STRING) + common.AZCOPY_PATH_SEPARATOR_STRING
maxResults := int32(1)
Expand All @@ -116,12 +132,8 @@ func (t *blobTraverser) IsDirectory(isSource bool) (bool, error) {
}

if len(resp.Segment.BlobItems) == 0 {
// Not a directory
// If the blob is not found return the error to throw
if bloberror.HasCode(blobErr, bloberror.BlobNotFound) {
return false, errors.New(common.FILE_NOT_FOUND)
}
return false, blobErr
// Not a directory, but there was also no file on site. Therefore, there's nothing.
return false, errors.New(common.FILE_NOT_FOUND)
}

return true, nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/zc_traverser_blob_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type blobAccountTraverser struct {
excludeContainerName []ObjectFilter
}

func (t *blobAccountTraverser) IsDirectory(_ bool) (bool, error) {
func (t *blobAccountTraverser) IsDirectory(isSource bool) (bool, error) {
return true, nil // Returns true as account traversal is inherently folder-oriented and recursive.
}

Expand Down
15 changes: 12 additions & 3 deletions cmd/zc_traverser_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,16 @@ func createFileClientFromServiceClient(fileURLParts file.URLParts, client *servi
func (t *fileTraverser) IsDirectory(bool) (bool, error) {
// Azure file share case
if gCopyUtil.urlIsContainerOrVirtualDirectory(t.rawURL) {
return true, nil
// Let's at least test if it exists, that way we toss an error.
fileURLParts, err := file.ParseURL(t.rawURL)
if err != nil {
return true, err
}
directoryClient := t.serviceClient.NewShareClient(fileURLParts.ShareName)
p := directoryClient.NewRootDirectoryClient().NewListFilesAndDirectoriesPager(nil)
_, err = p.NextPage(t.ctx)

return true, err
}

// Need make request to ensure if it's directory
Expand All @@ -95,7 +104,7 @@ func (t *fileTraverser) IsDirectory(bool) (bool, error) {
if azcopyScanningLogger != nil {
azcopyScanningLogger.Log(common.LogWarning, fmt.Sprintf("Failed to check if the destination is a folder or a file (Azure Files). Assuming the destination is a file: %s", err))
}
return false, nil
return false, err
}

return true, nil
Expand Down Expand Up @@ -284,7 +293,7 @@ func (t *fileTraverser) Traverse(preprocessor objectMorpher, processor objectPro
for pager.More() {
lResp, err := pager.NextPage(t.ctx)
if err != nil {
return fmt.Errorf("cannot list files due to reason %s", err)
return fmt.Errorf("cannot list files due to reason %w", err)
}
for _, fileInfo := range lResp.Segment.Files {
if invalidBlobOrWindowsName(*fileInfo.Name) {
Expand Down
16 changes: 8 additions & 8 deletions cmd/zc_traverser_file_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ type fileAccountTraverser struct {

// a generic function to notify that a new stored object has been enumerated
incrementEnumerationCounter enumerationCounterFunc
trailingDot common.TrailingDotOption
destination *common.Location
trailingDot common.TrailingDotOption
destination *common.Location
}

func (t *fileAccountTraverser) IsDirectory(isSource bool) (bool, error) {
Expand Down Expand Up @@ -107,13 +107,13 @@ func (t *fileAccountTraverser) Traverse(preprocessor objectMorpher, processor ob

func newFileAccountTraverser(serviceClient *service.Client, shareName string, ctx context.Context, getProperties bool, incrementEnumerationCounter enumerationCounterFunc, trailingDot common.TrailingDotOption, destination *common.Location) (t *fileAccountTraverser) {
t = &fileAccountTraverser{
ctx: ctx,
ctx: ctx,
incrementEnumerationCounter: incrementEnumerationCounter,
serviceClient: serviceClient,
sharePattern: shareName,
getProperties: getProperties,
trailingDot: trailingDot,
destination: destination,
serviceClient: serviceClient,
sharePattern: shareName,
getProperties: getProperties,
trailingDot: trailingDot,
destination: destination,
}
return
}
2 changes: 1 addition & 1 deletion cmd/zc_traverser_gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (t *gcpTraverser) IsDirectory(isSource bool) (bool, error) {
//Directories do not have attributes and hence throw error
_, err := obj.Attrs(t.ctx)
if err == gcpUtils.ErrObjectNotExist {
return true, nil
return true, err
}
return false, nil
}
Expand Down
Loading
Loading