From 509776b9255b4418428d809c287da3952bfaf10f Mon Sep 17 00:00:00 2001 From: "John Rusk [MSFT]" Date: Wed, 16 Oct 2019 15:50:22 +1300 Subject: [PATCH 1/9] Use actual chunk size for progress calculations (#684) Because for PageBlobs and AzureFiles, the size is capped below what the user may request, so just user the "requested" size gave wrong results in % complete --- common/chunkStatusLogger.go | 8 +++++++- ste/mgr-JobPartTransferMgr.go | 20 ++------------------ ste/xfer-anyToRemote.go | 3 ++- ste/xfer-remoteToLocal.go | 3 ++- 4 files changed, 13 insertions(+), 21 deletions(-) diff --git a/common/chunkStatusLogger.go b/common/chunkStatusLogger.go index ab150df73..ffe88ef0d 100644 --- a/common/chunkStatusLogger.go +++ b/common/chunkStatusLogger.go @@ -34,6 +34,7 @@ import ( type ChunkID struct { Name string offsetInFile int64 + length int64 // What is this chunk's progress currently waiting on? // Must be a pointer, because the ChunkID itself is a struct. @@ -55,12 +56,13 @@ type ChunkID struct { // And maybe at that point, we would also put Length into chunkID, and use that in jptm.ReportChunkDone } -func NewChunkID(name string, offsetInFile int64) ChunkID { +func NewChunkID(name string, offsetInFile int64, length int64) ChunkID { dummyWaitReasonIndex := int32(0) zeroNotificationState := int32(0) return ChunkID{ Name: name, offsetInFile: offsetInFile, + length: length, waitReasonIndex: &dummyWaitReasonIndex, // must initialize, so don't get nil pointer on usage completionNotifiedToJptm: &zeroNotificationState, } @@ -94,6 +96,10 @@ func (id ChunkID) IsPseudoChunk() bool { return id.offsetInFile < 0 } +func (id ChunkID) Length() int64 { + return id.length +} + var EWaitReason = WaitReason{0, ""} // WaitReason identifies the one thing that a given chunk is waiting on, at a given moment. diff --git a/ste/mgr-JobPartTransferMgr.go b/ste/mgr-JobPartTransferMgr.go index 06f8ca10d..3873c7e8c 100644 --- a/ste/mgr-JobPartTransferMgr.go +++ b/ste/mgr-JobPartTransferMgr.go @@ -42,7 +42,6 @@ type IJobPartTransferMgr interface { ShouldDecompress() bool GetSourceCompressionType() (common.CompressionType, error) ReportChunkDone(id common.ChunkID) (lastChunk bool, chunksDone uint32) - UnsafeReportChunkDone() (lastChunk bool, chunksDone uint32) TransferStatusIgnoringCancellation() common.TransferStatus SetStatus(status common.TransferStatus) SetErrorCode(errorCode int32) @@ -136,7 +135,6 @@ type jobPartTransferMgr struct { // used to show whether THIS jptm holds the destination lock atomicDestLockHeldIndicator uint32 - jobPartMgr IJobPartMgr // Refers to the "owning" Job Part jobPartPlanTransfer *JobPartPlanTransfer transferIndex uint32 @@ -425,17 +423,8 @@ func (jptm *jobPartTransferMgr) ReportChunkDone(id common.ChunkID) (lastChunk bo // track progress if jptm.IsLive() { - info := jptm.Info() - successBytesDelta := common.AtomicMorphInt64(&jptm.atomicSuccessfulBytes, func(old int64) (new int64, delta interface{}) { - new = old + int64(info.BlockSize) // assume we just completed a full block - if new > info.SourceSize { // but if that assumption gives over-counts total bytes in blob, make a correction - new = info.SourceSize // (we do it this way because we don't have the actual chunk size available to us here) - } - delta = new - old - return - }).(int64) - - JobsAdmin.AddSuccessfulBytesInActiveFiles(successBytesDelta) + atomic.AddInt64(&jptm.atomicSuccessfulBytes, id.Length()) + JobsAdmin.AddSuccessfulBytesInActiveFiles(id.Length()) } // Do our actual processing @@ -448,11 +437,6 @@ func (jptm *jobPartTransferMgr) ReportChunkDone(id common.ChunkID) (lastChunk bo return lastChunk, chunksDone } -// TODO: phase this method out. It's just here to support parts of the codebase that don't yet have chunk IDs -func (jptm *jobPartTransferMgr) UnsafeReportChunkDone() (lastChunk bool, chunksDone uint32) { - return jptm.ReportChunkDone(common.NewChunkID("", 0)) -} - // If an automatic action has been specified for after the last chunk, run it now // (Prior to introduction of this routine, individual chunkfuncs had to check the return values // of ReportChunkDone and then implement their own versions of the necessary transfer epilogue code. diff --git a/ste/xfer-anyToRemote.go b/ste/xfer-anyToRemote.go index f35034158..8d8903a3e 100644 --- a/ste/xfer-anyToRemote.go +++ b/ste/xfer-anyToRemote.go @@ -210,7 +210,6 @@ func scheduleSendChunks(jptm IJobPartTransferMgr, srcPath string, srcFile common chunkIDCount := int32(0) for startIndex := int64(0); startIndex < srcSize || isDummyChunkInEmptyFile(startIndex, srcSize); startIndex += int64(chunkSize) { - id := common.NewChunkID(srcPath, startIndex) adjustedChunkSize := int64(chunkSize) // compute actual size of the chunk @@ -218,6 +217,8 @@ func scheduleSendChunks(jptm IJobPartTransferMgr, srcPath string, srcFile common adjustedChunkSize = srcSize - startIndex } + id := common.NewChunkID(srcPath, startIndex, adjustedChunkSize) // TODO: stop using adjustedChunkSize, below, and use the size that's in the ID + if srcInfoProvider.IsLocal() { if jptm.WasCanceled() { prefetchErr = jobCancelledLocalPrefetchErr diff --git a/ste/xfer-remoteToLocal.go b/ste/xfer-remoteToLocal.go index 48a0f26a4..a9fb6323e 100644 --- a/ste/xfer-remoteToLocal.go +++ b/ste/xfer-remoteToLocal.go @@ -189,7 +189,6 @@ func remoteToLocal(jptm IJobPartTransferMgr, p pipeline.Pipeline, pacer pacer, d chunkCount := uint32(0) for startIndex := int64(0); startIndex < fileSize; startIndex += downloadChunkSize { - id := common.NewChunkID(info.Destination, startIndex) adjustedChunkSize := downloadChunkSize // compute exact size of the chunk @@ -197,6 +196,8 @@ func remoteToLocal(jptm IJobPartTransferMgr, p pipeline.Pipeline, pacer pacer, d adjustedChunkSize = fileSize - startIndex } + id := common.NewChunkID(info.Destination, startIndex, adjustedChunkSize) // TODO: stop using adjustedChunkSize, below, and use the size that's in the ID + // Wait until its OK to schedule it // To prevent excessive RAM consumption, we have a limit on the amount of scheduled-but-not-yet-saved data // TODO: as per comment above, currently, if there's an error here we must continue because we must schedule all chunks From 1f9c9b180de5bd55d261911d42c0dd5b82ff8af6 Mon Sep 17 00:00:00 2001 From: Ze Qian Zhang Date: Wed, 16 Oct 2019 00:36:27 -0700 Subject: [PATCH 2/9] Added back cancel-from-stdin option (#686) * Added back cancel-from-stdin option * Added comments to clarify intent * Added clarifying comment --- cmd/copy.go | 6 +++ cmd/remove.go | 5 ++ cmd/root.go | 7 +++ cmd/sync.go | 5 ++ common/lifecyleMgr.go | 116 ++++++++++++++++++++++++++++++------------ 5 files changed, 107 insertions(+), 32 deletions(-) diff --git a/cmd/copy.go b/cmd/copy.go index 28e7ae21f..e02d7c575 100644 --- a/cmd/copy.go +++ b/cmd/copy.go @@ -1295,6 +1295,12 @@ func init() { } else if len(args) == 2 { // normal copy raw.src = args[0] raw.dst = args[1] + + // under normal copy, we may ask the user questions such as whether to overwrite a file + glcm.EnableInputWatcher() + if cancelFromStdin { + glcm.EnableCancelFromStdIn() + } } else { return errors.New("wrong number of arguments, please refer to the help page on usage of this command") } diff --git a/cmd/remove.go b/cmd/remove.go index f0d15e926..b475e2111 100644 --- a/cmd/remove.go +++ b/cmd/remove.go @@ -62,6 +62,11 @@ func init() { return nil }, Run: func(cmd *cobra.Command, args []string) { + glcm.EnableInputWatcher() + if cancelFromStdin { + glcm.EnableCancelFromStdIn() + } + cooked, err := raw.cook() if err != nil { glcm.Error("failed to parse user input due to error: " + err.Error()) diff --git a/cmd/root.go b/cmd/root.go index a7b17e308..af5f24a4e 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -39,6 +39,7 @@ var azcopyLogPathFolder string var azcopyJobPlanFolder string var azcopyMaxFileAndSocketHandles int var outputFormatRaw string +var cancelFromStdin bool var azcopyOutputFormat common.OutputFormat var cmdLineCapMegaBitsPerSecond uint32 @@ -110,6 +111,12 @@ func init() { rootCmd.PersistentFlags().Uint32Var(&cmdLineCapMegaBitsPerSecond, "cap-mbps", 0, "Caps the transfer rate, in megabits per second. Moment-by-moment throughput might vary slightly from the cap. If this option is set to zero, or it is omitted, the throughput isn't capped.") rootCmd.PersistentFlags().StringVar(&outputFormatRaw, "output-type", "text", "Format of the command's output. The choices include: text, json. The default value is 'text'.") + + // Note: this is due to Windows not supporting signals properly + rootCmd.PersistentFlags().BoolVar(&cancelFromStdin, "cancel-from-stdin", false, "Used by partner teams to send in `cancel` through stdin to stop a job.") + + // reserved for partner teams + rootCmd.PersistentFlags().MarkHidden("cancel-from-stdin") } // always spins up a new goroutine, because sometimes the aka.ms URL can't be reached (e.g. a constrained environment where diff --git a/cmd/sync.go b/cmd/sync.go index 1e6aafcc4..0ddce31f7 100644 --- a/cmd/sync.go +++ b/cmd/sync.go @@ -523,6 +523,11 @@ func init() { return nil }, Run: func(cmd *cobra.Command, args []string) { + glcm.EnableInputWatcher() + if cancelFromStdin { + glcm.EnableCancelFromStdIn() + } + cooked, err := raw.cook() if err != nil { glcm.Error("error parsing the input given by the user. Failed with error " + err.Error()) diff --git a/common/lifecyleMgr.go b/common/lifecyleMgr.go index 960d2bc3c..a3f23a2f3 100644 --- a/common/lifecyleMgr.go +++ b/common/lifecyleMgr.go @@ -3,7 +3,6 @@ package common import ( "bufio" "fmt" - "io" "os" "os/signal" "runtime" @@ -18,16 +17,22 @@ import ( // only one instance of the formatter should exist var lcm = func() (lcmgr *lifecycleMgr) { lcmgr = &lifecycleMgr{ - msgQueue: make(chan outputMessage, 1000), - progressCache: "", - cancelChannel: make(chan os.Signal, 1), - outputFormat: EOutputFormat.Text(), // output text by default - logSanitizer: NewAzCopyLogSanitizer(), + msgQueue: make(chan outputMessage, 1000), + progressCache: "", + cancelChannel: make(chan os.Signal, 1), + outputFormat: EOutputFormat.Text(), // output text by default + logSanitizer: NewAzCopyLogSanitizer(), + inputQueue: make(chan userInput, 1000), + allowCancelFromStdIn: false, + allowWatchInput: false, } // kick off the single routine that processes output go lcmgr.processOutputMessage() + // and process input + go lcmgr.watchInputs() + // Check if need to do CPU profiling, and do CPU profiling accordingly when azcopy life start. lcmgr.checkAndStartCPUProfiling() @@ -49,6 +54,8 @@ type LifecycleMgr interface { GetEnvironmentVariable(EnvironmentVariable) string // get the environment variable or its default value ClearEnvironmentVariable(EnvironmentVariable) // clears the environment variable SetOutputFormat(OutputFormat) // change the output format of the entire application + EnableInputWatcher() // depending on the command, we may allow user to give input through Stdin + EnableCancelFromStdIn() // allow user to send in `cancel` to stop the job } func GetLifecycleMgr() LifecycleMgr { @@ -57,12 +64,72 @@ func GetLifecycleMgr() LifecycleMgr { // single point of control for all outputs type lifecycleMgr struct { - msgQueue chan outputMessage - progressCache string // useful for keeping job progress on the last line - cancelChannel chan os.Signal - waitEverCalled int32 - outputFormat OutputFormat - logSanitizer pipeline.LogSanitizer + msgQueue chan outputMessage + progressCache string // useful for keeping job progress on the last line + cancelChannel chan os.Signal + waitEverCalled int32 + outputFormat OutputFormat + logSanitizer pipeline.LogSanitizer + inputQueue chan userInput // msgs from the user + allowWatchInput bool // accept user inputs and place then in the inputQueue + allowCancelFromStdIn bool // allow user to send in 'cancel' from the stdin to stop the current job +} + +type userInput struct { + timeReceived time.Time + content string +} + +// should be started in a single go routine +func (lcm *lifecycleMgr) watchInputs() { + consoleReader := bufio.NewReader(os.Stdin) + for { + // sleep for a bit, the option might be enabled later + if !lcm.allowWatchInput { + time.Sleep(time.Microsecond * 500) + continue + } + + // reads input until the first occurrence of \n in the input, + input, err := consoleReader.ReadString('\n') + timeReceived := time.Now() + if err != nil { + continue + } + + // remove spaces before/after the content + msg := strings.TrimSpace(input) + + if lcm.allowCancelFromStdIn && strings.EqualFold(msg, "cancel") { + lcm.cancelChannel <- os.Interrupt + } else { + lcm.inputQueue <- userInput{timeReceived: timeReceived, content: msg} + } + } +} + +// get the answer to a question that was asked at a certain time +// only user input after the specified time is returned to make sure that we are getting the right answer to our question +// NOTE: to ask a question, go through Prompt, to guarantee that only 1 question is asked at a time +func (lcm *lifecycleMgr) getInputAfterTime(time time.Time) string { + for { + msg := <-lcm.inputQueue + + // keep reading until we find an input that came in after the user specified time + if msg.timeReceived.After(time) { + return msg.content + } + + // otherwise keep waiting as it's possible that the user has not typed it in yet + } +} + +func (lcm *lifecycleMgr) EnableInputWatcher() { + lcm.allowWatchInput = true +} + +func (lcm *lifecycleMgr) EnableCancelFromStdIn() { + lcm.allowCancelFromStdIn = true } func (lcm *lifecycleMgr) ClearEnvironmentVariable(variable EnvironmentVariable) { @@ -261,6 +328,7 @@ func (lcm *lifecycleMgr) processNoneOutput(msgToOutput outputMessage) { func (lcm *lifecycleMgr) processJSONOutput(msgToOutput outputMessage) { msgType := msgToOutput.msgType + questionTime := time.Now() // simply output the json message // we assume the msgContent is already formatted correctly @@ -272,7 +340,7 @@ func (lcm *lifecycleMgr) processJSONOutput(msgToOutput outputMessage) { os.Exit(int(msgToOutput.exitCode)) } else if msgType == eOutputMessageType.Prompt() { // read the response to the prompt and send it back through the channel - msgToOutput.inputChannel <- lcm.readInCleanLineFromStdIn() + msgToOutput.inputChannel <- lcm.getInputAfterTime(questionTime) } } @@ -325,6 +393,8 @@ func (lcm *lifecycleMgr) processTextOutput(msgToOutput outputMessage) { fmt.Println(msgToOutput.msgContent) } case eOutputMessageType.Prompt(): + questionTime := time.Now() + if lcm.progressCache != "" { // a progress status is already on the last line // print the prompt from the beginning on current line fmt.Print("\r") @@ -345,7 +415,7 @@ func (lcm *lifecycleMgr) processTextOutput(msgToOutput outputMessage) { } // read the response to the prompt and send it back through the channel - msgToOutput.inputChannel <- lcm.readInCleanLineFromStdIn() + msgToOutput.inputChannel <- lcm.getInputAfterTime(questionTime) } } @@ -388,24 +458,6 @@ func (lcm *lifecycleMgr) InitiateProgressReporting(jc WorkController) { }() } -// reads in a single line from stdin -// trims the new line, and also the extra spaces around the content -func (lcm *lifecycleMgr) readInCleanLineFromStdIn() string { - consoleReader := bufio.NewReader(os.Stdin) - - // reads input until the first occurrence of \n in the input, - input, err := consoleReader.ReadString('\n') - // When the user cancel the job more than one time before providing the - // input there will be an EOF Error. - if err == io.EOF { - return "" - } - - // remove the delimiter "\n" and spaces before/after the content - input = strings.TrimSpace(input) - return strings.Trim(input, " ") -} - func (lcm *lifecycleMgr) GetEnvironmentVariable(env EnvironmentVariable) string { value := os.Getenv(env.Name) if value == "" { From 18c0876a41a8cf1a5bed96f9d53e7dc61c6058a1 Mon Sep 17 00:00:00 2001 From: "John Rusk [MSFT]" Date: Wed, 16 Oct 2019 20:38:05 +1300 Subject: [PATCH 3/9] Fix race condition in shutdown of decompressingWriter (#688) --- common/decompressingWriter.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/common/decompressingWriter.go b/common/decompressingWriter.go index e53083507..38e46463d 100644 --- a/common/decompressingWriter.go +++ b/common/decompressingWriter.go @@ -65,17 +65,20 @@ func (d decompressingWriter) decompressorFactory(tp CompressionType, preader *io func (d decompressingWriter) worker(tp CompressionType, preader *io.PipeReader, destination io.WriteCloser, workerError chan error) { + var err error + var dec io.ReadCloser + defer func() { _ = destination.Close() // always close the destination file before we exit, since its a WriteCloser _ = preader.Close() + workerError <- err // send the error AFTER we have closed everything, to avoid race conditions where callers assume all closes are completed when we return }() // make the decompressor. Must be in the worker method because, // like the rest of read, this reads from the pipe. // (Factory reads from pipe to read the zip/gzip file header) - dec, err := d.decompressorFactory(tp, preader) + dec, err = d.decompressorFactory(tp, preader) if err != nil { - workerError <- err return } @@ -84,7 +87,7 @@ func (d decompressingWriter) worker(tp CompressionType, preader *io.PipeReader, b := decompressingWriterBufferPool.RentSlice(decompressingWriterCopyBufferSize) _, err = io.CopyBuffer(destination, dec, b) // returns err==nil if hits EOF, as per docs decompressingWriterBufferPool.ReturnSlice(b) - workerError <- err + return } From eb6eb5e04ecc43f335accdfe95ef309d94ad0e6d Mon Sep 17 00:00:00 2001 From: Ze Qian Zhang Date: Wed, 16 Oct 2019 10:51:37 -0700 Subject: [PATCH 4/9] Added flag to define delete snapshot options (#690) * Added flag to define delete snapshot options * Minor fix for common.DeleteSnapshotsOption parsing * Updated delete-snapshots flag to be consistent with convention --- cmd/copy.go | 9 +++++++++ cmd/remove.go | 1 + cmd/removeProcessor.go | 3 ++- common/fe-ste-models.go | 35 +++++++++++++++++++++++++++++++++++ common/rpc-models.go | 29 +++++++++++++++-------------- ste/JobPartPlan.go | 5 ++++- ste/JobPartPlanFileName.go | 1 + ste/mgr-JobPartMgr.go | 4 ++++ ste/mgr-JobPartTransferMgr.go | 5 +++++ ste/xfer-deleteBlob.go | 12 ++++++++++-- 10 files changed, 86 insertions(+), 18 deletions(-) diff --git a/cmd/copy.go b/cmd/copy.go index e02d7c575..85934b9e0 100644 --- a/cmd/copy.go +++ b/cmd/copy.go @@ -96,6 +96,7 @@ type rawCopyCmdArgs struct { putMd5 bool md5ValidationOption string CheckLength bool + deleteSnapshotsOption string // defines the type of the blob at the destination in case of upload / account to account copy blobType string blockBlobTier string @@ -400,6 +401,12 @@ func (raw rawCopyCmdArgs) cookWithId(jobId common.JobID) (cookedCopyCmdArgs, err cooked.noGuessMimeType = raw.noGuessMimeType cooked.preserveLastModifiedTime = raw.preserveLastModifiedTime + // Make sure the given input is the one of the enums given by the blob SDK + err = cooked.deleteSnapshotsOption.Parse(raw.deleteSnapshotsOption) + if err != nil { + return cooked, err + } + if cooked.contentType != "" { cooked.noGuessMimeType = true // As specified in the help text, noGuessMimeType is inferred here. } @@ -656,6 +663,7 @@ type cookedCopyCmdArgs struct { cacheControl string noGuessMimeType bool preserveLastModifiedTime bool + deleteSnapshotsOption common.DeleteSnapshotsOption putMd5 bool md5ValidationOption common.HashValidationOption CheckLength bool @@ -873,6 +881,7 @@ func (cca *cookedCopyCmdArgs) processCopyJobPartOrders() (err error) { PreserveLastModifiedTime: cca.preserveLastModifiedTime, PutMd5: cca.putMd5, MD5ValidationOption: cca.md5ValidationOption, + DeleteSnapshotsOption: cca.deleteSnapshotsOption, }, // source sas is stripped from the source given by the user and it will not be stored in the part plan file. SourceSAS: cca.sourceSAS, diff --git a/cmd/remove.go b/cmd/remove.go index b475e2111..90e9ed867 100644 --- a/cmd/remove.go +++ b/cmd/remove.go @@ -91,4 +91,5 @@ func init() { deleteCmd.PersistentFlags().StringVar(&raw.excludePath, "exclude-path", "", "Exclude these paths when removing. "+ "This option does not support wildcard characters (*). Checks relative path prefix. For example: myFolder;myFolder/subDirName/file.pdf") deleteCmd.PersistentFlags().StringVar(&raw.listOfFilesToCopy, "list-of-files", "", "Defines the location of a file which contains the list of files and directories to be deleted. The relative paths should be delimited by line breaks, and the paths should NOT be URL-encoded.") + deleteCmd.PersistentFlags().StringVar(&raw.deleteSnapshotsOption, "delete-snapshots", "", "By default, the delete operation fails if a blob has snapshots. Specify 'include' to remove the root blob and all its snapshots; alternatively specify 'only' to remove only the snapshots but keep the root blob.") } diff --git a/cmd/removeProcessor.go b/cmd/removeProcessor.go index e4e29fa7e..8e4f9341c 100644 --- a/cmd/removeProcessor.go +++ b/cmd/removeProcessor.go @@ -37,7 +37,8 @@ func newRemoveTransferProcessor(cca *cookedCopyCmdArgs, numOfTransfersPerPart in SourceSAS: cca.sourceSAS, // flags - LogLevel: cca.logVerbosity, + LogLevel: cca.logVerbosity, + BlobAttributes: common.BlobTransferAttributes{DeleteSnapshotsOption: cca.deleteSnapshotsOption}, } reportFirstPart := func(jobStarted bool) { diff --git a/common/fe-ste-models.go b/common/fe-ste-models.go index 95bd5984b..d2b588938 100644 --- a/common/fe-ste-models.go +++ b/common/fe-ste-models.go @@ -111,6 +111,41 @@ type PartNumber uint32 type Version uint32 type Status uint32 +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +var EDeleteSnapshotsOption = DeleteSnapshotsOption(0) + +type DeleteSnapshotsOption uint8 + +func (DeleteSnapshotsOption) None() DeleteSnapshotsOption { return DeleteSnapshotsOption(0) } +func (DeleteSnapshotsOption) Include() DeleteSnapshotsOption { return DeleteSnapshotsOption(1) } +func (DeleteSnapshotsOption) Only() DeleteSnapshotsOption { return DeleteSnapshotsOption(2) } + +func (d DeleteSnapshotsOption) String() string { + return enum.StringInt(d, reflect.TypeOf(d)) +} + +func (d *DeleteSnapshotsOption) Parse(s string) error { + // allow empty to mean "None" + if s == "" { + *d = EDeleteSnapshotsOption.None() + return nil + } + + val, err := enum.ParseInt(reflect.TypeOf(d), s, true, true) + if err == nil { + *d = val.(DeleteSnapshotsOption) + } + return err +} + +func (d DeleteSnapshotsOption) ToDeleteSnapshotsOptionType() azblob.DeleteSnapshotsOptionType { + if d == EDeleteSnapshotsOption.None() { + return azblob.DeleteSnapshotsOptionNone + } + + return azblob.DeleteSnapshotsOptionType(strings.ToLower(d.String())) +} + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// type DeleteDestination uint32 diff --git a/common/rpc-models.go b/common/rpc-models.go index 951207c20..72de39c62 100644 --- a/common/rpc-models.go +++ b/common/rpc-models.go @@ -104,20 +104,21 @@ type ListRequest struct { // This struct represents the optional attribute for blob request header type BlobTransferAttributes struct { - BlobType BlobType // The type of a blob - BlockBlob, PageBlob, AppendBlob - ContentType string // The content type specified for the blob. - ContentEncoding string // Specifies which content encodings have been applied to the blob. - ContentLanguage string // Specifies the language of the content - ContentDisposition string // Specifies the content disposition - CacheControl string // Specifies the cache control header - BlockBlobTier BlockBlobTier // Specifies the tier to set on the block blobs. - PageBlobTier PageBlobTier // Specifies the tier to set on the page blobs. - Metadata string // User-defined Name-value pairs associated with the blob - NoGuessMimeType bool // represents user decision to interpret the content-encoding from source file - PreserveLastModifiedTime bool // when downloading, tell engine to set file's timestamp to timestamp of blob - PutMd5 bool // when uploading, should we create and PUT Content-MD5 hashes - MD5ValidationOption HashValidationOption // when downloading, how strictly should we validate MD5 hashes? - BlockSizeInBytes uint32 + BlobType BlobType // The type of a blob - BlockBlob, PageBlob, AppendBlob + ContentType string // The content type specified for the blob. + ContentEncoding string // Specifies which content encodings have been applied to the blob. + ContentLanguage string // Specifies the language of the content + ContentDisposition string // Specifies the content disposition + CacheControl string // Specifies the cache control header + BlockBlobTier BlockBlobTier // Specifies the tier to set on the block blobs. + PageBlobTier PageBlobTier // Specifies the tier to set on the page blobs. + Metadata string // User-defined Name-value pairs associated with the blob + NoGuessMimeType bool // represents user decision to interpret the content-encoding from source file + PreserveLastModifiedTime bool // when downloading, tell engine to set file's timestamp to timestamp of blob + PutMd5 bool // when uploading, should we create and PUT Content-MD5 hashes + MD5ValidationOption HashValidationOption // when downloading, how strictly should we validate MD5 hashes? + BlockSizeInBytes uint32 // when uploading/downloading/copying, specify the size of each chunk + DeleteSnapshotsOption DeleteSnapshotsOption // when deleting, specify what to do with the snapshots } type JobIDDetails struct { diff --git a/ste/JobPartPlan.go b/ste/JobPartPlan.go index 0924fe4f7..d63fd395c 100644 --- a/ste/JobPartPlan.go +++ b/ste/JobPartPlan.go @@ -14,7 +14,7 @@ import ( // dataSchemaVersion defines the data schema version of JobPart order files supported by // current version of azcopy // To be Incremented every time when we release azcopy with changed dataSchema -const DataSchemaVersion common.Version = 9 +const DataSchemaVersion common.Version = 10 const ( CustomHeaderMaxBytes = 256 @@ -74,6 +74,9 @@ type JobPartPlanHeader struct { // jobStatus_doNotUse is a private member whose value can be accessed by Status and SetJobStatus // jobStatus_doNotUse should not be directly accessed anywhere except by the Status and SetJobStatus atomicJobStatus common.JobStatus + + // For delete operation specify what to do with snapshots + DeleteSnapshotsOption common.DeleteSnapshotsOption } // Status returns the job status stored in JobPartPlanHeader in thread-safe manner diff --git a/ste/JobPartPlanFileName.go b/ste/JobPartPlanFileName.go index ccbae577f..0aefefdd1 100644 --- a/ste/JobPartPlanFileName.go +++ b/ste/JobPartPlanFileName.go @@ -185,6 +185,7 @@ func (jpfn JobPartPlanFileName) Create(order common.CopyJobPartOrderRequest) { S2SInvalidMetadataHandleOption: order.S2SInvalidMetadataHandleOption, DestLengthValidation: order.DestLengthValidation, atomicJobStatus: common.EJobStatus.InProgress(), // We default to InProgress + DeleteSnapshotsOption: order.BlobAttributes.DeleteSnapshotsOption, } // Copy any strings into their respective fields diff --git a/ste/mgr-JobPartMgr.go b/ste/mgr-JobPartMgr.go index cc821765e..b834a7f2c 100644 --- a/ste/mgr-JobPartMgr.go +++ b/ste/mgr-JobPartMgr.go @@ -629,6 +629,10 @@ func (jpm *jobPartMgr) localDstData() *JobPartPlanDstLocal { return &jpm.Plan().DstLocalData } +func (jpm *jobPartMgr) deleteSnapshotsOption() common.DeleteSnapshotsOption { + return jpm.Plan().DeleteSnapshotsOption +} + // Call Done when a transfer has completed its epilog; this method returns the number of transfers completed so far func (jpm *jobPartMgr) ReportTransferDone() (transfersDone uint32) { transfersDone = atomic.AddUint32(&jpm.atomicTransfersDone, 1) diff --git a/ste/mgr-JobPartTransferMgr.go b/ste/mgr-JobPartTransferMgr.go index 3873c7e8c..fdfd9964c 100644 --- a/ste/mgr-JobPartTransferMgr.go +++ b/ste/mgr-JobPartTransferMgr.go @@ -82,6 +82,7 @@ type IJobPartTransferMgr interface { LogAtLevelForCurrentTransfer(level pipeline.LogLevel, msg string) GetOverwritePrompter() *overwritePrompter common.ILogger + DeleteSnapshotsOption() common.DeleteSnapshotsOption } type TransferInfo struct { @@ -385,6 +386,10 @@ func (jptm *jobPartTransferMgr) MD5ValidationOption() common.HashValidationOptio return jptm.jobPartMgr.(*jobPartMgr).localDstData().MD5VerificationOption } +func (jptm *jobPartTransferMgr) DeleteSnapshotsOption() common.DeleteSnapshotsOption { + return jptm.jobPartMgr.(*jobPartMgr).deleteSnapshotsOption() +} + func (jptm *jobPartTransferMgr) BlobTypeOverride() common.BlobType { return jptm.jobPartMgr.BlobTypeOverride() } diff --git a/ste/xfer-deleteBlob.go b/ste/xfer-deleteBlob.go index 73740c399..77358463b 100644 --- a/ste/xfer-deleteBlob.go +++ b/ste/xfer-deleteBlob.go @@ -5,12 +5,15 @@ import ( "net/http" "net/url" "strings" + "sync" "github.com/Azure/azure-pipeline-go/pipeline" "github.com/Azure/azure-storage-azcopy/common" "github.com/Azure/azure-storage-blob-go/azblob" ) +var explainedSkippedRemoveOnce sync.Once + func DeleteBlobPrologue(jptm IJobPartTransferMgr, p pipeline.Pipeline, pacer pacer) { info := jptm.Info() @@ -32,6 +35,10 @@ func DeleteBlobPrologue(jptm IJobPartTransferMgr, p pipeline.Pipeline, pacer pac if status == common.ETransferStatus.Failed() { jptm.LogError(info.Source, "DELETE ERROR ", err) } else if status == common.ETransferStatus.SkippedBlobHasSnapshots() { + explainedSkippedRemoveOnce.Do(func() { + common.GetLifecycleMgr().Info("Blobs with snapshots are skipped. Please specify the --delete-snapshots flag for alternative behaviors.") + }) + // log at error level so that it's clear why the transfer was skipped even when the log level is set to error jptm.Log(pipeline.LogError, fmt.Sprintf("DELETE SKIPPED(blob has snapshots): %s", strings.Split(info.Destination, "?")[0])) } else { @@ -42,8 +49,9 @@ func DeleteBlobPrologue(jptm IJobPartTransferMgr, p pipeline.Pipeline, pacer pac jptm.ReportTransferDone() } - // TODO confirm whether we should add a new flag to allow user to specify whether snapshots should be removed - _, err := srcBlobURL.Delete(jptm.Context(), azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{}) + // note: if deleteSnapshotsOption is 'only', which means deleting all the snapshots but keep the root blob + // we still count this delete operation as successful since we accomplished the desired outcome + _, err := srcBlobURL.Delete(jptm.Context(), jptm.DeleteSnapshotsOption().ToDeleteSnapshotsOptionType(), azblob.BlobAccessConditions{}) if err != nil { if strErr, ok := err.(azblob.StorageError); ok { // if the delete failed with err 404, i.e resource not found, then mark the transfer as success. From 5c780fbc6221a1ba53c6b4fd73b4de5c980c8e0d Mon Sep 17 00:00:00 2001 From: zezha-msft Date: Wed, 16 Oct 2019 11:04:10 -0700 Subject: [PATCH 5/9] Fixed mockedLifecycleManager with missing methods --- cmd/zt_interceptors_for_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cmd/zt_interceptors_for_test.go b/cmd/zt_interceptors_for_test.go index 56b3c9d85..391988d18 100644 --- a/cmd/zt_interceptors_for_test.go +++ b/cmd/zt_interceptors_for_test.go @@ -123,6 +123,8 @@ func (*mockedLifecycleManager) GetEnvironmentVariable(env common.EnvironmentVari return value } func (*mockedLifecycleManager) SetOutputFormat(common.OutputFormat) {} +func (*mockedLifecycleManager) EnableInputWatcher() {} +func (*mockedLifecycleManager) EnableCancelFromStdIn() {} type dummyProcessor struct { record []storedObject From cdd1156f79485bd437f3a68b4238b13c62a6d126 Mon Sep 17 00:00:00 2001 From: "John Rusk [MSFT]" Date: Fri, 18 Oct 2019 11:28:48 +1300 Subject: [PATCH 6/9] Write x-ms-request-id into the abbreviated logging for successful operations (#694) Because we may still need to look up these requests in some support situations. --- ste/xferLogPolicy.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/ste/xferLogPolicy.go b/ste/xferLogPolicy.go index 173948499..2124419d1 100644 --- a/ste/xferLogPolicy.go +++ b/ste/xferLogPolicy.go @@ -132,6 +132,7 @@ func NewRequestLogPolicyFactory(o RequestLogOptions) pipeline.Factory { pipeline.WriteRequestWithResponse(b, prepareRequestForLogging(request), response.Response(), err) // only write full headers if debugging or error } else { writeRequestAsOneLine(b, prepareRequestForLogging(request)) + writeActivityId(b, response.Response()) } if logBody { @@ -183,6 +184,17 @@ func writeRequestAsOneLine(b *bytes.Buffer, request *http.Request) { fmt.Fprint(b, " "+request.Method+" "+request.URL.String()+"\n") } +func writeActivityId(b *bytes.Buffer, response *http.Response) { + if response == nil { + return + } + const key = "X-Ms-Request-Id" // use this, rather than client ID, because this one is easier to search by in Service logs + value, ok := response.Header[key] + if ok { + fmt.Fprintf(b, " %s: %+v\n", key, value) + } +} + func prepareRequestForLogging(request pipeline.Request) *http.Request { req := request rawQuery := req.URL.RawQuery From c457d2566c9997ad52e183f1221597bfd000227a Mon Sep 17 00:00:00 2001 From: "John Rusk [MSFT]" Date: Fri, 18 Oct 2019 12:09:59 +1300 Subject: [PATCH 7/9] Fix/extra warnings for parameter changes (#695) * Warn on unsupported wildcard use in parameters * Warn clearly on use of obsolete include and exclude parameters * Add legacy param warning to sync --- cmd/copy.go | 47 +++++++++++++++++++++++++++++++++++++++-------- cmd/sync.go | 13 +++++++++++++ 2 files changed, 52 insertions(+), 8 deletions(-) diff --git a/cmd/copy.go b/cmd/copy.go index 85934b9e0..be3dc5c17 100644 --- a/cmd/copy.go +++ b/cmd/copy.go @@ -32,6 +32,7 @@ import ( "os" "runtime" "strings" + "sync" "time" "github.com/Azure/azure-pipeline-go/pipeline" @@ -73,6 +74,8 @@ type rawCopyCmdArgs struct { excludePath string includeFileAttributes string excludeFileAttributes string + legacyInclude string // used only for warnings + legacyExclude string // used only for warnings // filters from flags listOfFilesToCopy string @@ -308,10 +311,17 @@ func (raw rawCopyCmdArgs) cookWithId(jobId common.JobID) (cookedCopyCmdArgs, err // This saves us time because we know *exactly* what we're looking for right off the bat. // Note that exclude-path is handled as a filter unlike include-path. + if raw.legacyInclude != "" || raw.legacyExclude != "" { + return cooked, fmt.Errorf("the include and exclude parameters have been replaced by include-pattern; include-path; exclude-pattern and exclude-path. For info, run: azcopy copy help") + } + if (len(raw.include) > 0 || len(raw.exclude) > 0) && cooked.fromTo == common.EFromTo.BlobFSTrash() { return cooked, fmt.Errorf("include/exclude flags are not supported for this destination") } + // warn on exclude unsupported wildcards here. Include have to be later, to cover list-of-files + raw.warnIfHasWildcard(excludeWarningOncer, "exclude-path", raw.excludePath) + // unbuffered so this reads as we need it to rather than all at once in bulk listChan := make(chan string) var f *os.File @@ -330,6 +340,14 @@ func (raw rawCopyCmdArgs) cookWithId(jobId common.JobID) (cookedCopyCmdArgs, err go func() { defer close(listChan) + addToChannel := func(v string, paramName string) { + // empty strings should be ignored, otherwise the source root itself is selected + if len(v) > 0 { + raw.warnIfHasWildcard(includeWarningOncer, paramName, v) + listChan <- v + } + } + if f != nil { scanner := bufio.NewScanner(f) checkBOM := false @@ -364,10 +382,7 @@ func (raw rawCopyCmdArgs) cookWithId(jobId common.JobID) (cookedCopyCmdArgs, err headerLineNum++ } - // empty strings should be ignored, otherwise the source root itself is selected - if len(v) > 0 { - listChan <- v - } + addToChannel(v, "list-of-files") } } @@ -375,10 +390,7 @@ func (raw rawCopyCmdArgs) cookWithId(jobId common.JobID) (cookedCopyCmdArgs, err includePathList := raw.parsePatterns(raw.includePath) for _, v := range includePathList { - // empty strings should be ignored, otherwise the source root itself is selected - if len(v) > 0 { - listChan <- v - } + addToChannel(v, "include-path") } }() @@ -596,6 +608,19 @@ func (raw rawCopyCmdArgs) cookWithId(jobId common.JobID) (cookedCopyCmdArgs, err return cooked, nil } +var excludeWarningOncer = &sync.Once{} +var includeWarningOncer = &sync.Once{} + +func (raw *rawCopyCmdArgs) warnIfHasWildcard(oncer *sync.Once, paramName string, value string) { + if strings.Contains(value, "*") || strings.Contains(value, "?") { + oncer.Do(func() { + glcm.Info(fmt.Sprintf("*** Warning *** The %s parameter does not support wildcards. The wildcard "+ + "character provided will be interpreted literally and will not have any wildcard effect. To use wildcards "+ + "(in filenames only, not paths) use include-pattern or exclude-pattern", paramName)) + }) + } +} + // When other commands use the copy command arguments to cook cook, set the blobType to None and validation option // else parsing the arguments will fail. func (raw *rawCopyCmdArgs) setMandatoryDefaults() { @@ -1394,6 +1419,12 @@ func init() { cpCmd.PersistentFlags().MarkHidden("list-of-files") cpCmd.PersistentFlags().MarkHidden("s2s-get-properties-in-backend") + // temp, to assist users with change in param names, by providing a clearer message when these obsolete ones are accidentally used + cpCmd.PersistentFlags().StringVar(&raw.legacyInclude, "include", "", "Legacy include param. DO NOT USE") + cpCmd.PersistentFlags().StringVar(&raw.legacyExclude, "exclude", "", "Legacy exclude param. DO NOT USE") + cpCmd.PersistentFlags().MarkHidden("include") + cpCmd.PersistentFlags().MarkHidden("exclude") + // Hide the flush-threshold flag since it is implemented only for CI. cpCmd.PersistentFlags().Uint32Var(&ste.ADLSFlushThreshold, "flush-threshold", 7500, "Adjust the number of blocks to flush at once on accounts that have a hierarchical namespace.") cpCmd.PersistentFlags().MarkHidden("flush-threshold") diff --git a/cmd/sync.go b/cmd/sync.go index 0ddce31f7..b0de51b40 100644 --- a/cmd/sync.go +++ b/cmd/sync.go @@ -47,6 +47,8 @@ type rawSyncCmdArgs struct { exclude string includeFileAttributes string excludeFileAttributes string + legacyInclude string // for warning messages only + legacyExclude string // for warning messages only followSymlinks bool putMd5 bool @@ -152,6 +154,11 @@ func (raw *rawSyncCmdArgs) cook() (cookedSyncCmdArgs, error) { return cooked, err } + // warn on legacy filters + if raw.legacyInclude != "" || raw.legacyExclude != "" { + return cooked, fmt.Errorf("the include and exclude parameters have been replaced by include-pattern and exclude-pattern. They work on filenames only (not paths)") + } + // parse the filter patterns cooked.include = raw.parsePatterns(raw.include) cooked.exclude = raw.parsePatterns(raw.exclude) @@ -555,6 +562,12 @@ func init() { syncCmd.PersistentFlags().BoolVar(&raw.putMd5, "put-md5", false, "Create an MD5 hash of each file, and save the hash as the Content-MD5 property of the destination blob or file. (By default the hash is NOT created.) Only available when uploading.") syncCmd.PersistentFlags().StringVar(&raw.md5ValidationOption, "check-md5", common.DefaultHashValidationOption.String(), "Specifies how strictly MD5 hashes should be validated when downloading. This option is only available when downloading. Available values include: NoCheck, LogOnly, FailIfDifferent, FailIfDifferentOrMissing. (default 'FailIfDifferent').") + // temp, to assist users with change in param names, by providing a clearer message when these obsolete ones are accidentally used + syncCmd.PersistentFlags().StringVar(&raw.legacyInclude, "include", "", "Legacy include param. DO NOT USE") + syncCmd.PersistentFlags().StringVar(&raw.legacyExclude, "exclude", "", "Legacy exclude param. DO NOT USE") + syncCmd.PersistentFlags().MarkHidden("include") + syncCmd.PersistentFlags().MarkHidden("exclude") + // TODO follow sym link is not implemented, clarify behavior first //syncCmd.PersistentFlags().BoolVar(&raw.followSymlinks, "follow-symlinks", false, "follow symbolic links when performing sync from local file system.") From 995332452c6d28685feaca9a8d2493b09444ceed Mon Sep 17 00:00:00 2001 From: zezha-msft Date: Thu, 17 Oct 2019 16:20:32 -0700 Subject: [PATCH 8/9] Bumped version for 10.3.1 and added changelog entry --- ChangeLog.md | 14 ++++++++++++++ common/version.go | 2 +- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/ChangeLog.md b/ChangeLog.md index c7a25446f..489cfd0e9 100644 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -1,6 +1,20 @@ # Change Log +## Version 10.3.1 + +### New features + +1. Added helpful deprecation notice for legacy include/exclude flags. +1. Added back request ID at log level INFO. +1. Added back cancel-from-stdin option for partner integration. +1. Added flag to define delete snapshot options for the remove command. + +### Bug fix + +1. Fixed race condition in shutdown of decompressingWriter. +1. Made progress reporting more accurate. + ## Version 10.3.0 ### Breaking changes diff --git a/common/version.go b/common/version.go index cc4d72804..09d80c2fa 100644 --- a/common/version.go +++ b/common/version.go @@ -1,6 +1,6 @@ package common -const AzcopyVersion = "10.3.0" +const AzcopyVersion = "10.3.1" const UserAgent = "AzCopy/" + AzcopyVersion const S3ImportUserAgent = "S3Import " + UserAgent const BenchmarkUserAgent = "Benchmark " + UserAgent From dd5f21fee19b10f75e8a742a69b3c963aa45702c Mon Sep 17 00:00:00 2001 From: zezha-msft Date: Thu, 17 Oct 2019 16:55:25 -0700 Subject: [PATCH 9/9] Minor fix to enable S3 smoke tests if S3_TESTS_OFF is present but empty --- .../scripts/test_service_to_service_copy.py | 42 +++++++++---------- testSuite/scripts/utility.py | 2 +- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/testSuite/scripts/test_service_to_service_copy.py b/testSuite/scripts/test_service_to_service_copy.py index d2840678c..cff780d45 100644 --- a/testSuite/scripts/test_service_to_service_copy.py +++ b/testSuite/scripts/test_service_to_service_copy.py @@ -302,76 +302,76 @@ def test_copy_single_17mb_file_from_file_to_blob_oauth(self): # Test from S3 to blob copy. ################################## def test_copy_single_1kb_file_from_s3_to_blob(self): - if 'S3_TESTS_OFF' in os.environ: + if 'S3_TESTS_OFF' in os.environ and os.environ['S3_TESTS_OFF'] != "": self.skipTest('S3 testing is disabled for this smoke test run.') src_bucket_url = util.get_object_without_sas(util.test_s2s_src_s3_service_url, self.bucket_name_s3_blob) dst_container_url = util.get_object_sas(util.test_s2s_dst_blob_account_url, self.bucket_name_s3_blob) self.util_test_copy_single_file_from_x_to_x(src_bucket_url, "S3", dst_container_url, "Blob", 1) def test_copy_single_0kb_file_from_s3_to_blob(self): - if 'S3_TESTS_OFF' in os.environ: + if 'S3_TESTS_OFF' in os.environ and os.environ['S3_TESTS_OFF'] != "": self.skipTest('S3 testing is disabled for this smoke test run.') src_bucket_url = util.get_object_without_sas(util.test_s2s_src_s3_service_url, self.bucket_name_s3_blob) dst_container_url = util.get_object_sas(util.test_s2s_dst_blob_account_url, self.bucket_name_s3_blob) self.util_test_copy_single_file_from_x_to_x(src_bucket_url, "S3", dst_container_url, "Blob", 0) def test_copy_single_63mb_file_from_s3_to_blob(self): - if 'S3_TESTS_OFF' in os.environ: + if 'S3_TESTS_OFF' in os.environ and os.environ['S3_TESTS_OFF'] != "": self.skipTest('S3 testing is disabled for this smoke test run.') src_bucket_url = util.get_object_without_sas(util.test_s2s_src_s3_service_url, self.bucket_name_s3_blob) dst_container_url = util.get_object_sas(util.test_s2s_dst_blob_account_url, self.bucket_name_s3_blob) self.util_test_copy_single_file_from_x_to_x(src_bucket_url, "S3", dst_container_url, "Blob", 63 * 1024 * 1024) def test_copy_10_files_from_s3_bucket_to_blob_container(self): - if 'S3_TESTS_OFF' in os.environ: + if 'S3_TESTS_OFF' in os.environ and os.environ['S3_TESTS_OFF'] != "": self.skipTest('S3 testing is disabled for this smoke test run.') src_bucket_url = util.get_object_without_sas(util.test_s2s_src_s3_service_url, self.bucket_name_s3_blob) dst_container_url = util.get_object_sas(util.test_s2s_dst_blob_account_url, self.bucket_name_s3_blob) self.util_test_copy_n_files_from_x_bucket_to_x_bucket(src_bucket_url, "S3", dst_container_url, "Blob") def test_copy_10_files_from_s3_bucket_to_blob_account(self): - if 'S3_TESTS_OFF' in os.environ: + if 'S3_TESTS_OFF' in os.environ and os.environ['S3_TESTS_OFF'] != "": self.skipTest('S3 testing is disabled for this smoke test run.') src_bucket_url = util.get_object_without_sas(util.test_s2s_src_s3_service_url, self.bucket_name_s3_blob) self.util_test_copy_n_files_from_s3_bucket_to_blob_account(src_bucket_url, util.test_s2s_dst_blob_account_url) def test_copy_file_from_s3_bucket_to_blob_container_strip_top_dir_recursive(self): - if 'S3_TESTS_OFF' in os.environ: + if 'S3_TESTS_OFF' in os.environ and os.environ['S3_TESTS_OFF'] != "": self.skipTest('S3 testing is disabled for this smoke test run.') src_bucket_url = util.get_object_without_sas(util.test_s2s_src_s3_service_url, self.bucket_name_s3_blob) dst_container_url = util.get_object_sas(util.test_s2s_dst_blob_account_url, self.bucket_name_s3_blob) self.util_test_copy_file_from_x_bucket_to_x_bucket_strip_top_dir(src_bucket_url, "S3", dst_container_url, "Blob", True) def test_copy_file_from_s3_bucket_to_blob_container_strip_top_dir_non_recursive(self): - if 'S3_TESTS_OFF' in os.environ: + if 'S3_TESTS_OFF' in os.environ and os.environ['S3_TESTS_OFF'] != "": self.skipTest('S3 testing is disabled for this smoke test run.') src_bucket_url = util.get_object_without_sas(util.test_s2s_src_s3_service_url, self.bucket_name_s3_blob) dst_container_url = util.get_object_sas(util.test_s2s_dst_blob_account_url, self.bucket_name_s3_blob) self.util_test_copy_file_from_x_bucket_to_x_bucket_strip_top_dir(src_bucket_url, "S3", dst_container_url, "Blob", False) def test_copy_n_files_from_s3_dir_to_blob_dir(self): - if 'S3_TESTS_OFF' in os.environ: + if 'S3_TESTS_OFF' in os.environ and os.environ['S3_TESTS_OFF'] != "": self.skipTest('S3 testing is disabled for this smoke test run.') src_bucket_url = util.get_object_without_sas(util.test_s2s_src_s3_service_url, self.bucket_name_s3_blob) dst_container_url = util.get_object_sas(util.test_s2s_dst_blob_account_url, self.bucket_name_s3_blob) self.util_test_copy_n_files_from_x_dir_to_x_dir(src_bucket_url, "S3", dst_container_url, "Blob") def test_copy_n_files_from_s3_dir_to_blob_dir_strip_top_dir_recursive(self): - if 'S3_TESTS_OFF' in os.environ: + if 'S3_TESTS_OFF' in os.environ and os.environ['S3_TESTS_OFF'] != "": self.skipTest('S3 testing is disabled for this smoke test run.') src_bucket_url = util.get_object_without_sas(util.test_s2s_src_s3_service_url, self.bucket_name_s3_blob) dst_container_url = util.get_object_sas(util.test_s2s_dst_blob_account_url, self.bucket_name_s3_blob) self.util_test_copy_n_files_from_x_dir_to_x_dir_strip_top_dir(src_bucket_url, "S3", dst_container_url, "Blob", True) def test_copy_n_files_from_s3_dir_to_blob_dir_strip_top_dir_non_recursive(self): - if 'S3_TESTS_OFF' in os.environ: + if 'S3_TESTS_OFF' in os.environ and os.environ['S3_TESTS_OFF'] != "": self.skipTest('S3 testing is disabled for this smoke test run.') src_bucket_url = util.get_object_without_sas(util.test_s2s_src_s3_service_url, self.bucket_name_s3_blob) dst_container_url = util.get_object_sas(util.test_s2s_dst_blob_account_url, self.bucket_name_s3_blob) self.util_test_copy_n_files_from_x_dir_to_x_dir_strip_top_dir(src_bucket_url, "S3", dst_container_url, "Blob", False) def test_copy_files_from_s3_service_to_blob_account(self): - if 'S3_TESTS_OFF' in os.environ: + if 'S3_TESTS_OFF' in os.environ and os.environ['S3_TESTS_OFF'] != "": self.skipTest('S3 testing is disabled for this smoke test run.') self.util_test_copy_files_from_x_account_to_x_account( util.test_s2s_src_s3_service_url, @@ -381,7 +381,7 @@ def test_copy_files_from_s3_service_to_blob_account(self): self.bucket_name_s3_blob) def test_copy_single_file_from_s3_to_blob_propertyandmetadata(self): - if 'S3_TESTS_OFF' in os.environ: + if 'S3_TESTS_OFF' in os.environ and os.environ['S3_TESTS_OFF'] != "": self.skipTest('S3 testing is disabled for this smoke test run.') src_bucket_url = util.get_object_without_sas(util.test_s2s_src_s3_service_url, self.bucket_name_s3_blob) dst_container_url = util.get_object_sas(util.test_s2s_dst_blob_account_url, self.bucket_name_s3_blob) @@ -392,7 +392,7 @@ def test_copy_single_file_from_s3_to_blob_propertyandmetadata(self): "Blob") def test_copy_single_file_from_s3_to_blob_no_preserve_propertyandmetadata(self): - if 'S3_TESTS_OFF' in os.environ: + if 'S3_TESTS_OFF' in os.environ and os.environ['S3_TESTS_OFF'] != "": self.skipTest('S3 testing is disabled for this smoke test run.') src_bucket_url = util.get_object_without_sas(util.test_s2s_src_s3_service_url, self.bucket_name_s3_blob) dst_container_url = util.get_object_sas(util.test_s2s_dst_blob_account_url, self.bucket_name_s3_blob) @@ -404,7 +404,7 @@ def test_copy_single_file_from_s3_to_blob_no_preserve_propertyandmetadata(self): False) def test_copy_file_from_s3_bucket_to_blob_container_propertyandmetadata(self): - if 'S3_TESTS_OFF' in os.environ: + if 'S3_TESTS_OFF' in os.environ and os.environ['S3_TESTS_OFF'] != "": self.skipTest('S3 testing is disabled for this smoke test run.') src_bucket_url = util.get_object_without_sas(util.test_s2s_src_s3_service_url, self.bucket_name_s3_blob) dst_container_url = util.get_object_sas(util.test_s2s_dst_blob_account_url, self.bucket_name_s3_blob) @@ -415,7 +415,7 @@ def test_copy_file_from_s3_bucket_to_blob_container_propertyandmetadata(self): "Blob") def test_copy_file_from_s3_bucket_to_blob_container_no_preserve_propertyandmetadata(self): - if 'S3_TESTS_OFF' in os.environ: + if 'S3_TESTS_OFF' in os.environ and os.environ['S3_TESTS_OFF'] != "": self.skipTest('S3 testing is disabled for this smoke test run.') src_bucket_url = util.get_object_without_sas(util.test_s2s_src_s3_service_url, self.bucket_name_s3_blob) dst_container_url = util.get_object_sas(util.test_s2s_dst_blob_account_url, self.bucket_name_s3_blob) @@ -427,7 +427,7 @@ def test_copy_file_from_s3_bucket_to_blob_container_no_preserve_propertyandmetad False) def test_overwrite_copy_single_file_from_s3_to_blob(self): - if 'S3_TESTS_OFF' in os.environ: + if 'S3_TESTS_OFF' in os.environ and os.environ['S3_TESTS_OFF'] != "": self.skipTest('S3 testing is disabled for this smoke test run.') src_bucket_url = util.get_object_without_sas(util.test_s2s_src_s3_service_url, self.bucket_name_s3_blob) dst_container_url = util.get_object_sas(util.test_s2s_dst_blob_account_url, self.bucket_name_s3_blob) @@ -440,7 +440,7 @@ def test_overwrite_copy_single_file_from_s3_to_blob(self): True) def test_non_overwrite_copy_single_file_from_s3_to_blob(self): - if 'S3_TESTS_OFF' in os.environ: + if 'S3_TESTS_OFF' in os.environ and os.environ['S3_TESTS_OFF'] != "": self.skipTest('S3 testing is disabled for this smoke test run.') src_bucket_url = util.get_object_without_sas(util.test_s2s_src_s3_service_url, self.bucket_name_s3_blob) dst_container_url = util.get_object_sas(util.test_s2s_dst_blob_account_url, self.bucket_name_s3_blob) @@ -453,7 +453,7 @@ def test_non_overwrite_copy_single_file_from_s3_to_blob(self): False) def test_copy_single_file_from_s3_to_blob_with_url_encoded_slash_as_filename(self): - if 'S3_TESTS_OFF' in os.environ: + if 'S3_TESTS_OFF' in os.environ and os.environ['S3_TESTS_OFF'] != "": self.skipTest('S3 testing is disabled for this smoke test run.') src_bucket_url = util.get_object_without_sas(util.test_s2s_src_s3_service_url, self.bucket_name_s3_blob) dst_container_url = util.get_object_sas(util.test_s2s_dst_blob_account_url, self.bucket_name_s3_blob) @@ -467,7 +467,7 @@ def test_copy_single_file_from_s3_to_blob_with_url_encoded_slash_as_filename(sel "%252F") #encoded name for %2F, as path will be decoded def test_copy_single_file_from_s3_to_blob_excludeinvalidmetadata(self): - if 'S3_TESTS_OFF' in os.environ: + if 'S3_TESTS_OFF' in os.environ and os.environ['S3_TESTS_OFF'] != "": self.skipTest('S3 testing is disabled for this smoke test run.') self.util_test_copy_single_file_from_s3_to_blob_handleinvalidmetadata( "", # By default it should be ExcludeIfInvalid @@ -476,7 +476,7 @@ def test_copy_single_file_from_s3_to_blob_excludeinvalidmetadata(self): ) def test_copy_single_file_from_s3_to_blob_renameinvalidmetadata(self): - if 'S3_TESTS_OFF' in os.environ: + if 'S3_TESTS_OFF' in os.environ and os.environ['S3_TESTS_OFF'] != "": self.skipTest('S3 testing is disabled for this smoke test run.') self.util_test_copy_single_file_from_s3_to_blob_handleinvalidmetadata( "RenameIfInvalid", # By default it should be ExcludeIfInvalid @@ -490,7 +490,7 @@ def util_test_copy_single_file_from_s3_to_blob_handleinvalidmetadata( invalidMetadataHandleOption, srcS3Metadata, expectResolvedMetadata): - if 'S3_TESTS_OFF' in os.environ: + if 'S3_TESTS_OFF' in os.environ and os.environ['S3_TESTS_OFF'] != "": self.skipTest('S3 testing is disabled for this smoke test run.') srcBucketURL = util.get_object_without_sas(util.test_s2s_src_s3_service_url, self.bucket_name_s3_blob) dstBucketURL = util.get_object_sas(util.test_s2s_dst_blob_account_url, self.bucket_name_s3_blob) diff --git a/testSuite/scripts/utility.py b/testSuite/scripts/utility.py index c79e2546f..e27172dbb 100644 --- a/testSuite/scripts/utility.py +++ b/testSuite/scripts/utility.py @@ -108,7 +108,7 @@ def clean_test_blob_account(account): return True def clean_test_s3_account(account): - if 'S3_TESTS_OFF' in os.environ: + if 'S3_TESTS_OFF' in os.environ and os.environ['S3_TESTS_OFF'] != "": return True result = Command("clean").add_arguments(account).add_flags("serviceType", "S3").add_flags("resourceType", "Account").execute_azcopy_clean() if not result: