Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sdk/core/azure-core-management/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@

### Features Added

- Added continuation token support for ARM Long-Running Operations (LROs):
- `PollingState.toContinuationToken()` - Serialize polling state to a Base64-encoded token
- `PollingState.fromContinuationToken()` - Deserialize token back to polling state
- `SyncPollerFactory.resumeFromToken()` - Resume a SyncPoller from a continuation token
- `ArmLroSyncPoller` - New implementation that supports continuation tokens for ARM LROs

### Breaking Changes

### Bugs Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,81 @@ public <T> void store(PollingContext<T> context) {
}
}

/**
* Serializes the current PollingState into a continuation token string that can be used to resume
* the long-running operation at a later time or in a different process.
* <p>
* The continuation token is a Base64-encoded JSON representation of the polling state. It contains
* all necessary information to reconstruct the poller and continue polling the operation, including:
* <ul>
* <li>Operation URL and HTTP method</li>
* <li>Current provisioning state</li>
* <li>Polling URLs (Azure-AsyncOperation, Location headers, etc.)</li>
* <li>Last known response data</li>
* </ul>
* <p>
* <strong>Security Note:</strong> The continuation token contains the operation URL. Ensure tokens
* are stored securely and transmitted over secure channels.
* <p>
* <strong>Compatibility Note:</strong> The token format is internal and may change between SDK versions.
* Tokens should only be used with the same version of the SDK that generated them.
*
* @return A Base64-encoded continuation token string representing the current polling state.
* @throws RuntimeException if the state cannot be serialized into a token.
*/
public String toContinuationToken() {
try {
String jsonState = this.serializerAdapter.serialize(this, SerializerEncoding.JSON);
return java.util.Base64.getEncoder()
.encodeToString(jsonState.getBytes(java.nio.charset.StandardCharsets.UTF_8));
} catch (IOException ioe) {
throw LOGGER.logExceptionAsError(
new RuntimeException("Failed to serialize PollingState to continuation token.", ioe));
}
}

/**
* Deserializes a continuation token string into a PollingState object that can be used to resume
* a long-running operation.
* <p>
* This method is the counterpart to {@link #toContinuationToken()} and reconstructs a PollingState
* from a previously serialized token. The reconstructed state can then be used to create a new
* SyncPoller that continues polling from where the previous poller left off.
*
* @param continuationToken The Base64-encoded continuation token string, previously obtained from
* {@link #toContinuationToken()}.
* @param serializerAdapter The serializer for decoding the token. This should be the same serializer
* type used by the service client.
* @return A PollingState object reconstructed from the continuation token.
* @throws IllegalArgumentException if the {@code continuationToken} or {@code serializerAdapter} is null or empty.
* @throws RuntimeException if the token cannot be decoded or deserialized. This may occur if:
* <ul>
* <li>The token is malformed or corrupted</li>
* <li>The token was created with a different SDK version</li>
* <li>The token format has changed</li>
* </ul>
*/
public static PollingState fromContinuationToken(String continuationToken, SerializerAdapter serializerAdapter) {
Objects.requireNonNull(continuationToken, "'continuationToken' cannot be null.");
Objects.requireNonNull(serializerAdapter, "'serializerAdapter' cannot be null.");

if (continuationToken.isEmpty()) {
throw LOGGER.logExceptionAsError(new IllegalArgumentException("'continuationToken' cannot be empty."));
}

try {
byte[] decodedBytes = java.util.Base64.getDecoder().decode(continuationToken);
String jsonState = new String(decodedBytes, java.nio.charset.StandardCharsets.UTF_8);
return PollingState.from(serializerAdapter, jsonState);
} catch (IllegalArgumentException iae) {
throw LOGGER.logExceptionAsError(new RuntimeException(
"Failed to decode continuation token. The token may be malformed or corrupted.", iae));
} catch (RuntimeException re) {
throw LOGGER.logExceptionAsError(new RuntimeException("Failed to deserialize continuation token. "
+ "The token may have been created with a different SDK version.", re));
}
}

/**
* @return the current status of the long-running-operation.
*/
Expand Down Expand Up @@ -277,7 +352,7 @@ FinalResult getFinalResult() {
/**
* @return the last response body this PollingState received
*/
String getLastResponseBody() {
public String getLastResponseBody() {
return this.lastResponseBody;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.management.polling;

import com.azure.core.management.implementation.polling.PollingState;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.polling.PollResponse;
import com.azure.core.util.polling.PollingContext;
import com.azure.core.util.polling.SyncPoller;
import com.azure.core.util.serializer.SerializerAdapter;

import java.time.Duration;
import java.util.Objects;

/**
* Azure Resource Manager (ARM) Long-Running Operation SyncPoller implementation with continuation token support.
* <p>
* This implementation wraps a standard {@link SyncPoller} and adds ARM-specific functionality including
* the ability to serialize the poller state to a continuation token and resume from such a token.
* <p>
* This class is package-private and should only be created through {@link SyncPollerFactory}.
*
* @param <T> The type of poll response value.
* @param <U> The type of the final result of long-running operation.
*/
final class ArmLroSyncPoller<T, U> implements SyncPoller<PollResult<T>, U> {
private static final ClientLogger LOGGER = new ClientLogger(ArmLroSyncPoller.class);

private final SyncPoller<PollResult<T>, U> innerPoller;
private final SerializerAdapter serializerAdapter;
// We'll need a way to access the PollingContext from the inner poller
// Since SimpleSyncPoller doesn't expose it, we'll need to track it ourselves
private final PollingContextAccessor<PollResult<T>> contextAccessor;

/**
* Functional interface to access the PollingContext from a poller.
*/
@FunctionalInterface
interface PollingContextAccessor<T> {
PollingContext<T> getContext();
}

/**
* Creates an ArmLroSyncPoller.
*
* @param innerPoller The underlying SyncPoller implementation.
* @param serializerAdapter The serializer for encoding/decoding.
* @param contextAccessor Accessor to get the current PollingContext.
*/
ArmLroSyncPoller(SyncPoller<PollResult<T>, U> innerPoller, SerializerAdapter serializerAdapter,
PollingContextAccessor<PollResult<T>> contextAccessor) {
this.innerPoller = Objects.requireNonNull(innerPoller, "'innerPoller' cannot be null.");
this.serializerAdapter = Objects.requireNonNull(serializerAdapter, "'serializerAdapter' cannot be null.");
this.contextAccessor = Objects.requireNonNull(contextAccessor, "'contextAccessor' cannot be null.");
}

@Override
public PollResponse<PollResult<T>> poll() {
return innerPoller.poll();
}

@Override
public PollResponse<PollResult<T>> waitForCompletion() {
return innerPoller.waitForCompletion();
}

@Override
public PollResponse<PollResult<T>> waitForCompletion(Duration timeout) {
return innerPoller.waitForCompletion(timeout);
}

@Override
public PollResponse<PollResult<T>>
waitUntil(com.azure.core.util.polling.LongRunningOperationStatus statusToWaitFor) {
return innerPoller.waitUntil(statusToWaitFor);
}

@Override
public PollResponse<PollResult<T>> waitUntil(Duration timeout,
com.azure.core.util.polling.LongRunningOperationStatus statusToWaitFor) {
return innerPoller.waitUntil(timeout, statusToWaitFor);
}

@Override
public U getFinalResult() {
return innerPoller.getFinalResult();
}

@Override
public U getFinalResult(Duration timeout) {
return innerPoller.getFinalResult(timeout);
}

@Override
public void cancelOperation() {
innerPoller.cancelOperation();
}

@Override
public SyncPoller<PollResult<T>, U> setPollInterval(Duration pollInterval) {
innerPoller.setPollInterval(pollInterval);
return this;
}

@Override
public String serializeContinuationToken() {
try {
PollingContext<PollResult<T>> context = contextAccessor.getContext();
PollingState pollingState = PollingState.from(serializerAdapter, context);
return pollingState.toContinuationToken();
} catch (Exception e) {
throw LOGGER.logExceptionAsError(new RuntimeException("Failed to serialize continuation token. "
+ "The poller may not have been started or the polling state may be unavailable.", e));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@

import com.azure.core.http.HttpPipeline;
import com.azure.core.http.rest.Response;
import com.azure.core.management.implementation.polling.PollOperation;
import com.azure.core.management.implementation.polling.PollingState;
import com.azure.core.management.implementation.polling.SyncPollOperation;
import com.azure.core.util.BinaryData;
import com.azure.core.util.Context;
import com.azure.core.util.polling.PollResponse;
import com.azure.core.util.polling.PollingContext;
import com.azure.core.util.polling.SyncPoller;
import com.azure.core.util.serializer.SerializerAdapter;

import java.lang.reflect.Type;
import java.time.Duration;
import java.util.function.Function;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -62,10 +67,132 @@ public static <T, U> SyncPoller<PollResult<T>, U> create(SerializerAdapter seria
public static <T, U> SyncPoller<PollResult<T>, U> create(SerializerAdapter serializerAdapter,
HttpPipeline httpPipeline, Type pollResultType, Type finalResultType, Duration defaultPollDuration,
Supplier<Response<BinaryData>> lroInitialResponseSupplier, Context context) {
return SyncPoller.createPoller(defaultPollDuration,
SyncPollOperation.activationFunction(serializerAdapter, pollResultType, lroInitialResponseSupplier),

// Create a holder for the PollingContext that we can access later
@SuppressWarnings({ "unchecked", "rawtypes" })
final PollingContext<PollResult<T>>[] contextHolder = (PollingContext<PollResult<T>>[]) new PollingContext[1];

// Wrap the activation function to capture the context
Function<PollingContext<PollResult<T>>, PollResponse<PollResult<T>>> wrappedActivation = pollingContext -> {
contextHolder[0] = pollingContext;
return SyncPollOperation
.<T>activationFunction(serializerAdapter, pollResultType, lroInitialResponseSupplier)
.apply(pollingContext);
};

SyncPoller<PollResult<T>, U> innerPoller = SyncPoller.createPoller(defaultPollDuration, wrappedActivation,
SyncPollOperation.pollFunction(serializerAdapter, httpPipeline, pollResultType, context),
SyncPollOperation.cancelFunction(context),
SyncPollOperation.fetchResultFunction(serializerAdapter, httpPipeline, finalResultType, context));

// Wrap in ArmLroSyncPoller to add continuation token support
return new ArmLroSyncPoller<>(innerPoller, serializerAdapter, () -> contextHolder[0]);
}

/**
* Resumes a SyncPoller for an Azure Resource Manager (ARM) long-running-operation (LRO) from a continuation token.
* <p>
* This method recreates a SyncPoller from a previously serialized continuation token, allowing the polling
* operation to be resumed from its last known state. This is useful for scenarios where a process needs to
* survive restarts or where polling needs to be transferred between different processes or instances.
* <p>
* The continuation token must have been obtained from a previous poller via
* {@link SyncPoller#serializeContinuationToken()}.
* <p>
* <strong>Example: Resuming a server creation operation</strong>
* <pre>{@code
* // Original process - start operation and get token
* SyncPoller<PollResult<ServerInner>, ServerInner> poller =
* client.beginCreate(resourceGroup, serverName, parameters);
* String token = poller.serializeContinuationToken();
* // Store token...
*
* // Later, in a different process - resume from token
* SyncPoller<PollResult<ServerInner>, ServerInner> resumedPoller =
* SyncPollerFactory.resumeFromToken(
* token,
* client.getSerializerAdapter(),
* client.getHttpPipeline(),
* new TypeReference<PollResult<ServerInner>>() {}.getJavaType(),
* ServerInner.class,
* Duration.ofSeconds(30),
* Context.NONE);
*
* // Continue polling until completion
* ServerInner result = resumedPoller.getFinalResult();
* }</pre>
*
* @param continuationToken The Base64-encoded continuation token string obtained from a previous poller.
* @param serializerAdapter The serializer for any encoding and decoding. This should be the same type
* as used by the original poller.
* @param httpPipeline The HttpPipeline for making HTTP requests (e.g., poll requests). This should be
* configured with the same authentication and policies as the original poller.
* @param pollResultType The type of the poll result. If no result is expected, this should be Void.class.
* @param finalResultType The type of the final result. If no result is expected, this should be Void.class.
* @param defaultPollDuration The default poll interval to use if the service does not return a retry-after value.
* @param context The context shared by all requests.
* @param <T> The type of poll result.
* @param <U> The type of final result.
* @return A SyncPoller that resumes polling from the state captured in the continuation token.
* @throws IllegalArgumentException if {@code continuationToken} or {@code serializerAdapter} is null or empty.
* @throws RuntimeException if the token cannot be decoded or deserialized, which may occur if:
* <ul>
* <li>The token is malformed or corrupted</li>
* <li>The token was created with a different SDK version</li>
* <li>The token format has changed</li>
* </ul>
*/
public static <T, U> SyncPoller<PollResult<T>, U> resumeFromToken(String continuationToken,
SerializerAdapter serializerAdapter, HttpPipeline httpPipeline, Type pollResultType, Type finalResultType,
Duration defaultPollDuration, Context context) {

// Deserialize the continuation token to get the PollingState
PollingState pollingState = PollingState.fromContinuationToken(continuationToken, serializerAdapter);

// Create a holder for the PollingContext
@SuppressWarnings({ "unchecked", "rawtypes" })
final PollingContext<PollResult<T>>[] contextHolder = (PollingContext<PollResult<T>>[]) new PollingContext[1];

// Create an activation function that returns the current state as the activation response
Function<PollingContext<PollResult<T>>, PollResponse<PollResult<T>>> activationFunction = pollingContext -> {
contextHolder[0] = pollingContext;
pollingState.store(pollingContext);
T result = PollOperation.deserialize(serializerAdapter, pollingState.getLastResponseBody(), pollResultType);
return new PollResponse<>(pollingState.getOperationStatus(), new PollResult<>(result),
pollingState.getPollDelay());
};

// Create the poller with the standard poll, cancel, and fetch result functions
SyncPoller<PollResult<T>, U> innerPoller = SyncPoller.createPoller(defaultPollDuration, activationFunction,
SyncPollOperation.pollFunction(serializerAdapter, httpPipeline, pollResultType, context),
SyncPollOperation.cancelFunction(context),
SyncPollOperation.fetchResultFunction(serializerAdapter, httpPipeline, finalResultType, context));

// Wrap in ArmLroSyncPoller to add continuation token support
return new ArmLroSyncPoller<>(innerPoller, serializerAdapter, () -> contextHolder[0]);
}

/**
* Resumes a SyncPoller for an Azure Resource Manager (ARM) long-running-operation (LRO) from a continuation token.
* <p>
* This is a convenience overload that uses {@link Context#NONE} for the context parameter.
*
* @param continuationToken The Base64-encoded continuation token string obtained from a previous poller.
* @param serializerAdapter The serializer for any encoding and decoding.
* @param httpPipeline The HttpPipeline for making HTTP requests.
* @param pollResultType The type of the poll result.
* @param finalResultType The type of the final result.
* @param defaultPollDuration The default poll interval to use.
* @param <T> The type of poll result.
* @param <U> The type of final result.
* @return A SyncPoller that resumes polling from the state captured in the continuation token.
* @throws IllegalArgumentException if {@code continuationToken} or {@code serializerAdapter} is null or empty.
* @throws RuntimeException if the token cannot be decoded or deserialized.
*/
public static <T, U> SyncPoller<PollResult<T>, U> resumeFromToken(String continuationToken,
SerializerAdapter serializerAdapter, HttpPipeline httpPipeline, Type pollResultType, Type finalResultType,
Duration defaultPollDuration) {
return resumeFromToken(continuationToken, serializerAdapter, httpPipeline, pollResultType, finalResultType,
defaultPollDuration, Context.NONE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ public class FooWithProvisioningState {
public FooWithProvisioningState() {
}

FooWithProvisioningState(String state) {
public FooWithProvisioningState(String state) {
this.properties = new Properties();
this.properties.provisioningState = state;
}

FooWithProvisioningState(String state, String resourceId) {
public FooWithProvisioningState(String state, String resourceId) {
this.properties = new Properties();
this.properties.provisioningState = state;
this.properties.resourceId = resourceId;
Expand Down
Loading
Loading