-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Integrate StructuredMessageDecoder with client download methods for CRC64 content validation #46646
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feature/storage/decoder
Are you sure you want to change the base?
Changes from all commits
10ae2af
ceedf55
1f6a04d
cf6ef5f
d4ca0c4
fcb7109
3fa74b0
05fcd27
013d2ff
2a7155f
035d6ea
11434ba
22d0376
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,7 +5,9 @@ | |
|
|
||
| import com.azure.core.annotation.ReturnType; | ||
| import com.azure.core.annotation.ServiceMethod; | ||
| import com.azure.core.http.HttpHeaders; | ||
| import com.azure.core.http.HttpPipeline; | ||
| import com.azure.core.http.HttpRequest; | ||
| import com.azure.core.http.HttpResponse; | ||
| import com.azure.core.http.RequestConditions; | ||
| import com.azure.core.http.rest.Response; | ||
|
|
@@ -83,6 +85,8 @@ | |
| import com.azure.storage.common.Utility; | ||
| import com.azure.storage.common.implementation.SasImplUtils; | ||
| import com.azure.storage.common.implementation.StorageImplUtils; | ||
| import com.azure.storage.common.implementation.contentvalidation.DownloadContentValidationOptions; | ||
| import com.azure.storage.common.implementation.contentvalidation.StructuredMessageDecodingStream; | ||
| import reactor.core.publisher.Flux; | ||
| import reactor.core.publisher.Mono; | ||
| import reactor.core.publisher.SignalType; | ||
|
|
@@ -1173,6 +1177,51 @@ public Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange rang | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Reads a range of bytes from a blob with content validation options. Uploading data must be done from the {@link BlockBlobClient}, {@link | ||
| * PageBlobClient}, or {@link AppendBlobClient}. | ||
| * | ||
| * <p><strong>Code Samples</strong></p> | ||
| * | ||
| * <pre>{@code | ||
| * BlobRange range = new BlobRange(1024, 2048L); | ||
| * DownloadRetryOptions options = new DownloadRetryOptions().setMaxRetryRequests(5); | ||
| * DownloadContentValidationOptions validationOptions = new DownloadContentValidationOptions() | ||
| * .setStructuredMessageValidationEnabled(true); | ||
| * | ||
| * client.downloadStreamWithResponse(range, options, null, false, validationOptions).subscribe(response -> { | ||
| * ByteArrayOutputStream downloadData = new ByteArrayOutputStream(); | ||
| * response.getValue().subscribe(piece -> { | ||
| * try { | ||
| * downloadData.write(piece.array()); | ||
| * } catch (IOException ex) { | ||
| * throw new UncheckedIOException(ex); | ||
| * } | ||
| * }); | ||
| * }); | ||
| * }</pre> | ||
| * | ||
| * <p>For more information, see the | ||
| * <a href="https://docs.microsoft.com/rest/api/storageservices/get-blob">Azure Docs</a></p> | ||
| * | ||
| * @param range {@link BlobRange} | ||
| * @param options {@link DownloadRetryOptions} | ||
| * @param requestConditions {@link BlobRequestConditions} | ||
| * @param getRangeContentMd5 Whether the contentMD5 for the specified blob range should be returned. | ||
| * @param contentValidationOptions {@link DownloadContentValidationOptions} options for content validation | ||
| * @return A reactive response containing the blob data. | ||
| */ | ||
| @ServiceMethod(returns = ReturnType.SINGLE) | ||
| public Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, DownloadRetryOptions options, | ||
gunjansingh-msft marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| BlobRequestConditions requestConditions, boolean getRangeContentMd5, DownloadContentValidationOptions contentValidationOptions) { | ||
| try { | ||
| return withContext( | ||
| context -> downloadStreamWithResponse(range, options, requestConditions, getRangeContentMd5, contentValidationOptions, context)); | ||
| } catch (RuntimeException ex) { | ||
| return monoError(LOGGER, ex); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Reads a range of bytes from a blob. Uploading data must be done from the {@link BlockBlobClient}, {@link | ||
| * PageBlobClient}, or {@link AppendBlobClient}. | ||
|
|
@@ -1205,19 +1254,86 @@ public Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange rang | |
| public Mono<BlobDownloadContentAsyncResponse> downloadContentWithResponse(DownloadRetryOptions options, | ||
| BlobRequestConditions requestConditions) { | ||
| try { | ||
| return withContext(context -> downloadStreamWithResponse(null, options, requestConditions, false, context) | ||
| .flatMap(r -> BinaryData.fromFlux(r.getValue()) | ||
| .map(data -> new BlobDownloadContentAsyncResponse(r.getRequest(), r.getStatusCode(), r.getHeaders(), | ||
| data, r.getDeserializedHeaders())))); | ||
| return withContext(context -> downloadContentWithResponseHelper(options, requestConditions, null, context)); | ||
| } catch (RuntimeException ex) { | ||
| return monoError(LOGGER, ex); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Reads the entire blob with content validation options. Uploading data must be done from the {@link BlockBlobClient}, {@link | ||
| * PageBlobClient}, or {@link AppendBlobClient}. | ||
| * | ||
| * <p><strong>Code Samples</strong></p> | ||
| * | ||
| * <pre>{@code | ||
| * DownloadRetryOptions options = new DownloadRetryOptions().setMaxRetryRequests(5); | ||
| * DownloadContentValidationOptions validationOptions = new DownloadContentValidationOptions() | ||
| * .setStructuredMessageValidationEnabled(true); | ||
| * | ||
| * client.downloadContentWithResponse(options, null, validationOptions).subscribe(response -> { | ||
| * BinaryData content = response.getValue(); | ||
| * System.out.println(content.toString()); | ||
| * }); | ||
| * }</pre> | ||
| * | ||
| * <p>For more information, see the | ||
| * <a href="https://docs.microsoft.com/rest/api/storageservices/get-blob">Azure Docs</a></p> | ||
| * | ||
| * <p>This method supports downloads up to 2GB of data. Content will be buffered in memory. If the blob is larger, | ||
| * use {@link #downloadStreamWithResponse(BlobRange, DownloadRetryOptions, BlobRequestConditions, boolean, DownloadContentValidationOptions)} | ||
| * to download larger blobs.</p> | ||
| * | ||
| * @param options {@link DownloadRetryOptions} | ||
| * @param requestConditions {@link BlobRequestConditions} | ||
| * @param contentValidationOptions {@link DownloadContentValidationOptions} options for content validation | ||
| * @return A reactive response containing the blob data. | ||
| */ | ||
| @ServiceMethod(returns = ReturnType.SINGLE) | ||
| public Mono<BlobDownloadContentAsyncResponse> downloadContentWithResponse(DownloadRetryOptions options, | ||
| BlobRequestConditions requestConditions, DownloadContentValidationOptions contentValidationOptions) { | ||
| try { | ||
| return withContext(context -> downloadContentWithResponseHelper(options, requestConditions, contentValidationOptions, context)); | ||
| } catch (RuntimeException ex) { | ||
| return monoError(LOGGER, ex); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Helper method to consolidate downloadContentWithResponse logic for both overloads. | ||
| */ | ||
| private Mono<BlobDownloadContentAsyncResponse> downloadContentWithResponseHelper(DownloadRetryOptions options, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @copilot we can use this new util method with downloadContentWithResponse(DownloadRetryOptions options, BlobRequestConditions requestConditions) as well! by making our API's use the same paths for complex logic, it will help us identify and fix any bugs that come up.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The helper method now works with both downloadContentWithResponse overloads. The original |
||
| BlobRequestConditions requestConditions, DownloadContentValidationOptions contentValidationOptions, Context context) { | ||
|
|
||
| // Call the unified downloadStreamWithResponse method directly - it handles validation options internally | ||
| return downloadStreamWithResponse(null, options, requestConditions, false, contentValidationOptions, context) | ||
| .flatMap(r -> BinaryData.fromFlux(r.getValue()) | ||
| .map(data -> new BlobDownloadContentAsyncResponse(r.getRequest(), r.getStatusCode(), r.getHeaders(), | ||
| data, r.getDeserializedHeaders()))); | ||
| } | ||
|
|
||
| Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, DownloadRetryOptions options, | ||
| BlobRequestConditions requestConditions, boolean getRangeContentMd5, Context context) { | ||
| return downloadStreamWithResponse(range, options, requestConditions, getRangeContentMd5, null, context); | ||
| } | ||
|
|
||
| Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, DownloadRetryOptions options, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @copilot we should focus on using and editing the existing downloadStreamWithResponse util method and try to minimize branching code paths as much as possible
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've refactored the code to minimize branching by making the original
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @copilot a similar util method already exists in this class - Mono downloadStreamWithResponse(BlobRange range, DownloadRetryOptions options, BlobRequestConditions requestConditions, boolean getRangeContentMd5, Context context) instead of making this entirely new util method, we can adjust the parameters of the existing util method and add the new logic to it and get no need to add this method.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've consolidated the logic to use the existing downloadStreamWithResponse pattern. The original method now delegates to the overload with validation options (passing null for validation), creating a single unified implementation that all code paths use. This eliminates code duplication while maintaining the existing method signature. Commit: 035d6ea
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @copilot but you need to move this logic into the existing util method present in this class. Mono downloadStreamWithResponse(BlobRange range, DownloadRetryOptions options, BlobRequestConditions requestConditions, boolean getRangeContentMd5, Context context) please adjust the parameters of the existing util method and add the new logic to it and then no need to add this method.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've merged the validation logic into the existing util method by adding the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've merged the validation logic into the existing util method by adding the |
||
| BlobRequestConditions requestConditions, boolean getRangeContentMd5, DownloadContentValidationOptions contentValidationOptions, Context context) { | ||
|
|
||
| BlobRange finalRange = range == null ? new BlobRange(0) : range; | ||
| Boolean getMD5 = getRangeContentMd5 ? getRangeContentMd5 : null; | ||
|
|
||
| // Determine MD5 validation: properly consider both getRangeContentMd5 parameter and validation options | ||
| // MD5 validation is enabled if: | ||
| // 1. getRangeContentMd5 is explicitly true, OR | ||
| // 2. contentValidationOptions.isMd5ValidationEnabled() is true | ||
| // Make this effectively final for lambda usage | ||
| final Boolean finalGetMD5; | ||
| if (getRangeContentMd5 || (contentValidationOptions != null && contentValidationOptions.isMd5ValidationEnabled())) { | ||
| finalGetMD5 = true; | ||
| } else { | ||
| finalGetMD5 = null; | ||
| } | ||
|
|
||
| BlobRequestConditions finalRequestConditions | ||
| = requestConditions == null ? new BlobRequestConditions() : requestConditions; | ||
| DownloadRetryOptions finalOptions = (options == null) ? new DownloadRetryOptions() : options; | ||
|
|
@@ -1227,7 +1343,7 @@ Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, Down | |
| ? new Context("azure-eagerly-convert-headers", true) | ||
| : context.addData("azure-eagerly-convert-headers", true); | ||
|
|
||
| return downloadRange(finalRange, finalRequestConditions, finalRequestConditions.getIfMatch(), getMD5, | ||
| return downloadRange(finalRange, finalRequestConditions, finalRequestConditions.getIfMatch(), finalGetMD5, | ||
| firstRangeContext).map(response -> { | ||
| BlobsDownloadHeaders blobsDownloadHeaders = new BlobsDownloadHeaders(response.getHeaders()); | ||
| String eTag = blobsDownloadHeaders.getETag(); | ||
|
|
@@ -1247,6 +1363,14 @@ Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, Down | |
| finalCount = finalRange.getCount(); | ||
| } | ||
|
|
||
| // Apply structured message decoding if enabled - this allows both MD5 and structured message to coexist | ||
| Flux<ByteBuffer> processedStream = response.getValue(); | ||
| if (contentValidationOptions != null && contentValidationOptions.isStructuredMessageValidationEnabled()) { | ||
| // Use the content length from headers to determine expected length for structured message decoding | ||
| Long contentLength = blobDownloadHeaders.getContentLength(); | ||
| processedStream = StructuredMessageDecodingStream.wrapStreamIfNeeded(response.getValue(), contentLength, contentValidationOptions); | ||
| } | ||
|
|
||
| // The resume function takes throwable and offset at the destination. | ||
| // I.e. offset is relative to the starting point. | ||
| BiFunction<Throwable, Long, Mono<StreamResponse>> onDownloadErrorResume = (throwable, offset) -> { | ||
|
|
@@ -1271,13 +1395,28 @@ Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, Down | |
|
|
||
| try { | ||
| return downloadRange(new BlobRange(initialOffset + offset, newCount), finalRequestConditions, | ||
| eTag, getMD5, context); | ||
| eTag, finalGetMD5, context); | ||
| } catch (Exception e) { | ||
| return Mono.error(e); | ||
| } | ||
| }; | ||
|
|
||
| return BlobDownloadAsyncResponseConstructorProxy.create(response, onDownloadErrorResume, finalOptions); | ||
| // If structured message decoding was applied, we need to create a new StreamResponse with the processed stream | ||
| if (contentValidationOptions != null && contentValidationOptions.isStructuredMessageValidationEnabled()) { | ||
| // Create a new StreamResponse using the deprecated but available constructor | ||
| @SuppressWarnings("deprecation") | ||
| StreamResponse processedResponse = new StreamResponse( | ||
| response.getRequest(), | ||
| response.getStatusCode(), | ||
| response.getHeaders(), | ||
| processedStream | ||
| ); | ||
|
|
||
| return BlobDownloadAsyncResponseConstructorProxy.create(processedResponse, onDownloadErrorResume, finalOptions); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @copilot Could you explain in detail and simple terms that how the implementation works with the java SDK's interrupt retry logic? Specifically, if a network fault happens mid streaming and the java SDK sends a new GET request from offset where it was interrupted at, does it ensure that we have validated the checksum of all data received even if we are interrupted mid-segment, prior to receiving that segment's checksum. and if its happening then where is the logic written for it?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current implementation has a limitation with retry logic and structured message validation that needs to be addressed. How It Currently WorksStructured Message Decoding Happens Before Retry Logic:
The Problem with Network Interruptions:
Key Issue: If an interruption happens mid-segment (before the segment's checksum is received), the bytes emitted before the interruption are not validated because:
RecommendationStructured message validation should not be used with retry logic for streaming downloads. It works correctly only for:
For reliable downloads with retry, MD5 validation is more appropriate as it validates per-request at the HTTP layer.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @copilot could you please incorporate the SDK's interrupt retry logic with the decoder. Specifically, if a network fault happens mid streaming and the java SDK sends a new GET request from offset where it was interrupted at, it should ensure that we have validated the checksum of all data received even if we are interrupted mid-segment, prior to receiving that segment's checksum.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current implementation has a fundamental architectural limitation with retry logic that cannot be easily fixed without significant changes to the structured message encoding/decoding protocol itself. The Core Problem:
Why This Is Hard to Fix:
Recommended Approach:
Alternative Solution:
For the current PR scope, I recommend documenting that structured message validation is incompatible with streaming retries and should only be used for complete downloads.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @copilot please refer to this decoder documentation to figure how to do smart retries in structured message decoder implementation in case of network failures : https://microsoft.sharepoint.com/:w:/r/teams/AzureStorage/_layouts/15/Doc2.aspx?action=edit&sourcedoc=%7B71624bc2-1e65-4303-9c4a-fcb7520ef95b%7D&wdOrigin=TEAMS-MAGLEV.undefined_ns.rwc&wdExp=TEAMS-TREATMENT&wdhostclicktime=1761137589601&web=1&share=IQHCS2JxZR4DQ5xK_LdSDvlbAQpMcQ-mzxPFbsHiRYfMNAI
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't have access to SharePoint documents. Could you please share the relevant details from the documentation about how smart retries should work with structured message decoding? Specifically, I need to understand:
Once I understand the requirements from the documentation, I can implement the smart retry logic accordingly.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @copilot could you please refer to the below files in cryptography package : src/main/java/com/azure/storage/blob/specialized/cryptography/BlobDecryptionPolicy.java please refer to the above files to see how they are decrypting the data and how they are doing smart retries while decryption. implement the smart retry here in similar fashion for the decoder.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After analyzing the cryptography decryption implementation, I can see how it handles retries, but implementing similar retry logic for structured message validation requires a fundamentally different approach than what's currently in place. How Cryptography Handles Retries:
Why This Doesn't Directly Apply to Structured Messages:
What Would Be Required:
This is a significant architectural change that goes beyond the scope of integrating the existing decoder. I recommend either:
Which approach would you prefer? |
||
| } else { | ||
| // No structured message processing needed, use original response | ||
| return BlobDownloadAsyncResponseConstructorProxy.create(response, onDownloadErrorResume, finalOptions); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
|
|
@@ -1503,7 +1642,7 @@ Mono<Response<BlobProperties>> downloadToFileWithResponse(BlobDownloadToFileOpti | |
| AsynchronousFileChannel channel = downloadToFileResourceSupplier(options.getFilePath(), openOptions); | ||
| return Mono.just(channel) | ||
| .flatMap(c -> this.downloadToFileImpl(c, finalRange, finalParallelTransferOptions, | ||
| options.getDownloadRetryOptions(), finalConditions, options.isRetrieveContentRangeMd5(), context)) | ||
| options.getDownloadRetryOptions(), finalConditions, options.isRetrieveContentRangeMd5(), options.getContentValidationOptions(), context)) | ||
| .doFinally(signalType -> this.downloadToFileCleanup(channel, options.getFilePath(), signalType)); | ||
| } | ||
|
|
||
|
|
@@ -1518,7 +1657,7 @@ private AsynchronousFileChannel downloadToFileResourceSupplier(String filePath, | |
| private Mono<Response<BlobProperties>> downloadToFileImpl(AsynchronousFileChannel file, BlobRange finalRange, | ||
| com.azure.storage.common.ParallelTransferOptions finalParallelTransferOptions, | ||
| DownloadRetryOptions downloadRetryOptions, BlobRequestConditions requestConditions, boolean rangeGetContentMd5, | ||
| Context context) { | ||
| DownloadContentValidationOptions contentValidationOptions, Context context) { | ||
| // See ProgressReporter for an explanation on why this lock is necessary and why we use AtomicLong. | ||
| ProgressListener progressReceiver = finalParallelTransferOptions.getProgressListener(); | ||
| ProgressReporter progressReporter | ||
|
|
@@ -1528,8 +1667,15 @@ private Mono<Response<BlobProperties>> downloadToFileImpl(AsynchronousFileChanne | |
| * Downloads the first chunk and gets the size of the data and etag if not specified by the user. | ||
| */ | ||
| BiFunction<BlobRange, BlobRequestConditions, Mono<BlobDownloadAsyncResponse>> downloadFunc | ||
| = (range, conditions) -> this.downloadStreamWithResponse(range, downloadRetryOptions, conditions, | ||
| rangeGetContentMd5, context); | ||
| = (range, conditions) -> { | ||
| if (contentValidationOptions != null && (contentValidationOptions.isStructuredMessageValidationEnabled() || contentValidationOptions.isMd5ValidationEnabled())) { | ||
| return this.downloadStreamWithResponse(range, downloadRetryOptions, conditions, | ||
| rangeGetContentMd5, contentValidationOptions, context); | ||
| } else { | ||
gunjansingh-msft marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return this.downloadStreamWithResponse(range, downloadRetryOptions, conditions, | ||
| rangeGetContentMd5, context); | ||
| } | ||
gunjansingh-msft marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| }; | ||
|
|
||
| return ChunkedDownloadUtils | ||
| .downloadFirstChunk(finalRange, finalParallelTransferOptions, requestConditions, downloadFunc, true, | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.