Skip to content

Commit 0dd9112

Browse files
committed
Add continuation token support for SyncPoller ARM LROs
- Add PollingState.toContinuationToken() and fromContinuationToken() methods - Add SyncPoller.serializeContinuationToken() interface method - Implement ArmLroSyncPoller wrapper with continuation token support - Add SyncPollerFactory.resumeFromToken() to reconstruct pollers from tokens - Add comprehensive unit and integration tests (12 tests, all passing) - Update CHANGELOGs for azure-core and azure-core-management This enables ARM Long-Running Operations to be resumed across process restarts by serializing the poller state to a Base64-encoded token.
1 parent 83ca5b7 commit 0dd9112

File tree

9 files changed

+803
-5
lines changed

9 files changed

+803
-5
lines changed

sdk/core/azure-core-management/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,12 @@
44

55
### Features Added
66

7+
- Added continuation token support for ARM Long-Running Operations (LROs):
8+
- `PollingState.toContinuationToken()` - Serialize polling state to a Base64-encoded token
9+
- `PollingState.fromContinuationToken()` - Deserialize token back to polling state
10+
- `SyncPollerFactory.resumeFromToken()` - Resume a SyncPoller from a continuation token
11+
- `ArmLroSyncPoller` - New implementation that supports continuation tokens for ARM LROs
12+
713
### Breaking Changes
814

915
### Bugs Fixed

sdk/core/azure-core-management/src/main/java/com/azure/core/management/implementation/polling/PollingState.java

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,81 @@ public <T> void store(PollingContext<T> context) {
169169
}
170170
}
171171

172+
/**
173+
* Serializes the current PollingState into a continuation token string that can be used to resume
174+
* the long-running operation at a later time or in a different process.
175+
* <p>
176+
* The continuation token is a Base64-encoded JSON representation of the polling state. It contains
177+
* all necessary information to reconstruct the poller and continue polling the operation, including:
178+
* <ul>
179+
* <li>Operation URL and HTTP method</li>
180+
* <li>Current provisioning state</li>
181+
* <li>Polling URLs (Azure-AsyncOperation, Location headers, etc.)</li>
182+
* <li>Last known response data</li>
183+
* </ul>
184+
* <p>
185+
* <strong>Security Note:</strong> The continuation token contains the operation URL. Ensure tokens
186+
* are stored securely and transmitted over secure channels.
187+
* <p>
188+
* <strong>Compatibility Note:</strong> The token format is internal and may change between SDK versions.
189+
* Tokens should only be used with the same version of the SDK that generated them.
190+
*
191+
* @return A Base64-encoded continuation token string representing the current polling state.
192+
* @throws RuntimeException if the state cannot be serialized into a token.
193+
*/
194+
public String toContinuationToken() {
195+
try {
196+
String jsonState = this.serializerAdapter.serialize(this, SerializerEncoding.JSON);
197+
return java.util.Base64.getEncoder()
198+
.encodeToString(jsonState.getBytes(java.nio.charset.StandardCharsets.UTF_8));
199+
} catch (IOException ioe) {
200+
throw LOGGER.logExceptionAsError(
201+
new RuntimeException("Failed to serialize PollingState to continuation token.", ioe));
202+
}
203+
}
204+
205+
/**
206+
* Deserializes a continuation token string into a PollingState object that can be used to resume
207+
* a long-running operation.
208+
* <p>
209+
* This method is the counterpart to {@link #toContinuationToken()} and reconstructs a PollingState
210+
* from a previously serialized token. The reconstructed state can then be used to create a new
211+
* SyncPoller that continues polling from where the previous poller left off.
212+
*
213+
* @param continuationToken The Base64-encoded continuation token string, previously obtained from
214+
* {@link #toContinuationToken()}.
215+
* @param serializerAdapter The serializer for decoding the token. This should be the same serializer
216+
* type used by the service client.
217+
* @return A PollingState object reconstructed from the continuation token.
218+
* @throws IllegalArgumentException if the {@code continuationToken} or {@code serializerAdapter} is null or empty.
219+
* @throws RuntimeException if the token cannot be decoded or deserialized. This may occur if:
220+
* <ul>
221+
* <li>The token is malformed or corrupted</li>
222+
* <li>The token was created with a different SDK version</li>
223+
* <li>The token format has changed</li>
224+
* </ul>
225+
*/
226+
public static PollingState fromContinuationToken(String continuationToken, SerializerAdapter serializerAdapter) {
227+
Objects.requireNonNull(continuationToken, "'continuationToken' cannot be null.");
228+
Objects.requireNonNull(serializerAdapter, "'serializerAdapter' cannot be null.");
229+
230+
if (continuationToken.isEmpty()) {
231+
throw LOGGER.logExceptionAsError(new IllegalArgumentException("'continuationToken' cannot be empty."));
232+
}
233+
234+
try {
235+
byte[] decodedBytes = java.util.Base64.getDecoder().decode(continuationToken);
236+
String jsonState = new String(decodedBytes, java.nio.charset.StandardCharsets.UTF_8);
237+
return PollingState.from(serializerAdapter, jsonState);
238+
} catch (IllegalArgumentException iae) {
239+
throw LOGGER.logExceptionAsError(new RuntimeException(
240+
"Failed to decode continuation token. The token may be malformed or corrupted.", iae));
241+
} catch (RuntimeException re) {
242+
throw LOGGER.logExceptionAsError(new RuntimeException("Failed to deserialize continuation token. "
243+
+ "The token may have been created with a different SDK version.", re));
244+
}
245+
}
246+
172247
/**
173248
* @return the current status of the long-running-operation.
174249
*/
@@ -277,7 +352,7 @@ FinalResult getFinalResult() {
277352
/**
278353
* @return the last response body this PollingState received
279354
*/
280-
String getLastResponseBody() {
355+
public String getLastResponseBody() {
281356
return this.lastResponseBody;
282357
}
283358

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.core.management.polling;
5+
6+
import com.azure.core.management.implementation.polling.PollingState;
7+
import com.azure.core.util.logging.ClientLogger;
8+
import com.azure.core.util.polling.PollResponse;
9+
import com.azure.core.util.polling.PollingContext;
10+
import com.azure.core.util.polling.SyncPoller;
11+
import com.azure.core.util.serializer.SerializerAdapter;
12+
13+
import java.time.Duration;
14+
import java.util.Objects;
15+
import java.util.function.BiFunction;
16+
import java.util.function.Function;
17+
18+
/**
19+
* Azure Resource Manager (ARM) Long-Running Operation SyncPoller implementation with continuation token support.
20+
* <p>
21+
* This implementation wraps a standard {@link SyncPoller} and adds ARM-specific functionality including
22+
* the ability to serialize the poller state to a continuation token and resume from such a token.
23+
* <p>
24+
* This class is package-private and should only be created through {@link SyncPollerFactory}.
25+
*
26+
* @param <T> The type of poll response value.
27+
* @param <U> The type of the final result of long-running operation.
28+
*/
29+
final class ArmLroSyncPoller<T, U> implements SyncPoller<PollResult<T>, U> {
30+
private static final ClientLogger LOGGER = new ClientLogger(ArmLroSyncPoller.class);
31+
32+
private final SyncPoller<PollResult<T>, U> innerPoller;
33+
private final SerializerAdapter serializerAdapter;
34+
// We'll need a way to access the PollingContext from the inner poller
35+
// Since SimpleSyncPoller doesn't expose it, we'll need to track it ourselves
36+
private final PollingContextAccessor<PollResult<T>> contextAccessor;
37+
38+
/**
39+
* Functional interface to access the PollingContext from a poller.
40+
*/
41+
@FunctionalInterface
42+
interface PollingContextAccessor<T> {
43+
PollingContext<T> getContext();
44+
}
45+
46+
/**
47+
* Creates an ArmLroSyncPoller.
48+
*
49+
* @param innerPoller The underlying SyncPoller implementation.
50+
* @param serializerAdapter The serializer for encoding/decoding.
51+
* @param contextAccessor Accessor to get the current PollingContext.
52+
*/
53+
ArmLroSyncPoller(SyncPoller<PollResult<T>, U> innerPoller, SerializerAdapter serializerAdapter,
54+
PollingContextAccessor<PollResult<T>> contextAccessor) {
55+
this.innerPoller = Objects.requireNonNull(innerPoller, "'innerPoller' cannot be null.");
56+
this.serializerAdapter = Objects.requireNonNull(serializerAdapter, "'serializerAdapter' cannot be null.");
57+
this.contextAccessor = Objects.requireNonNull(contextAccessor, "'contextAccessor' cannot be null.");
58+
}
59+
60+
@Override
61+
public PollResponse<PollResult<T>> poll() {
62+
return innerPoller.poll();
63+
}
64+
65+
@Override
66+
public PollResponse<PollResult<T>> waitForCompletion() {
67+
return innerPoller.waitForCompletion();
68+
}
69+
70+
@Override
71+
public PollResponse<PollResult<T>> waitForCompletion(Duration timeout) {
72+
return innerPoller.waitForCompletion(timeout);
73+
}
74+
75+
@Override
76+
public PollResponse<PollResult<T>>
77+
waitUntil(com.azure.core.util.polling.LongRunningOperationStatus statusToWaitFor) {
78+
return innerPoller.waitUntil(statusToWaitFor);
79+
}
80+
81+
@Override
82+
public PollResponse<PollResult<T>> waitUntil(Duration timeout,
83+
com.azure.core.util.polling.LongRunningOperationStatus statusToWaitFor) {
84+
return innerPoller.waitUntil(timeout, statusToWaitFor);
85+
}
86+
87+
@Override
88+
public U getFinalResult() {
89+
return innerPoller.getFinalResult();
90+
}
91+
92+
@Override
93+
public U getFinalResult(Duration timeout) {
94+
return innerPoller.getFinalResult(timeout);
95+
}
96+
97+
@Override
98+
public void cancelOperation() {
99+
innerPoller.cancelOperation();
100+
}
101+
102+
@Override
103+
public SyncPoller<PollResult<T>, U> setPollInterval(Duration pollInterval) {
104+
innerPoller.setPollInterval(pollInterval);
105+
return this;
106+
}
107+
108+
@Override
109+
public String serializeContinuationToken() {
110+
try {
111+
PollingContext<PollResult<T>> context = contextAccessor.getContext();
112+
PollingState pollingState = PollingState.from(serializerAdapter, context);
113+
return pollingState.toContinuationToken();
114+
} catch (Exception e) {
115+
throw LOGGER.logExceptionAsError(new RuntimeException("Failed to serialize continuation token. "
116+
+ "The poller may not have been started or the polling state may be unavailable.", e));
117+
}
118+
}
119+
}

sdk/core/azure-core-management/src/main/java/com/azure/core/management/polling/SyncPollerFactory.java

Lines changed: 129 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,19 @@
55

66
import com.azure.core.http.HttpPipeline;
77
import com.azure.core.http.rest.Response;
8+
import com.azure.core.management.implementation.polling.PollOperation;
9+
import com.azure.core.management.implementation.polling.PollingState;
810
import com.azure.core.management.implementation.polling.SyncPollOperation;
911
import com.azure.core.util.BinaryData;
1012
import com.azure.core.util.Context;
13+
import com.azure.core.util.polling.PollResponse;
14+
import com.azure.core.util.polling.PollingContext;
1115
import com.azure.core.util.polling.SyncPoller;
1216
import com.azure.core.util.serializer.SerializerAdapter;
1317

1418
import java.lang.reflect.Type;
1519
import java.time.Duration;
20+
import java.util.function.Function;
1621
import java.util.function.Supplier;
1722

1823
/**
@@ -62,10 +67,132 @@ public static <T, U> SyncPoller<PollResult<T>, U> create(SerializerAdapter seria
6267
public static <T, U> SyncPoller<PollResult<T>, U> create(SerializerAdapter serializerAdapter,
6368
HttpPipeline httpPipeline, Type pollResultType, Type finalResultType, Duration defaultPollDuration,
6469
Supplier<Response<BinaryData>> lroInitialResponseSupplier, Context context) {
65-
return SyncPoller.createPoller(defaultPollDuration,
66-
SyncPollOperation.activationFunction(serializerAdapter, pollResultType, lroInitialResponseSupplier),
70+
71+
// Create a holder for the PollingContext that we can access later
72+
@SuppressWarnings({ "unchecked", "rawtypes" })
73+
final PollingContext<PollResult<T>>[] contextHolder = (PollingContext<PollResult<T>>[]) new PollingContext[1];
74+
75+
// Wrap the activation function to capture the context
76+
Function<PollingContext<PollResult<T>>, PollResponse<PollResult<T>>> wrappedActivation = pollingContext -> {
77+
contextHolder[0] = pollingContext;
78+
return SyncPollOperation
79+
.<T>activationFunction(serializerAdapter, pollResultType, lroInitialResponseSupplier)
80+
.apply(pollingContext);
81+
};
82+
83+
SyncPoller<PollResult<T>, U> innerPoller = SyncPoller.createPoller(defaultPollDuration, wrappedActivation,
84+
SyncPollOperation.pollFunction(serializerAdapter, httpPipeline, pollResultType, context),
85+
SyncPollOperation.cancelFunction(context),
86+
SyncPollOperation.fetchResultFunction(serializerAdapter, httpPipeline, finalResultType, context));
87+
88+
// Wrap in ArmLroSyncPoller to add continuation token support
89+
return new ArmLroSyncPoller<>(innerPoller, serializerAdapter, () -> contextHolder[0]);
90+
}
91+
92+
/**
93+
* Resumes a SyncPoller for an Azure Resource Manager (ARM) long-running-operation (LRO) from a continuation token.
94+
* <p>
95+
* This method recreates a SyncPoller from a previously serialized continuation token, allowing the polling
96+
* operation to be resumed from its last known state. This is useful for scenarios where a process needs to
97+
* survive restarts or where polling needs to be transferred between different processes or instances.
98+
* <p>
99+
* The continuation token must have been obtained from a previous poller via
100+
* {@link SyncPoller#serializeContinuationToken()}.
101+
* <p>
102+
* <strong>Example: Resuming a server creation operation</strong>
103+
* <pre>{@code
104+
* // Original process - start operation and get token
105+
* SyncPoller<PollResult<ServerInner>, ServerInner> poller =
106+
* client.beginCreate(resourceGroup, serverName, parameters);
107+
* String token = poller.serializeContinuationToken();
108+
* // Store token...
109+
*
110+
* // Later, in a different process - resume from token
111+
* SyncPoller<PollResult<ServerInner>, ServerInner> resumedPoller =
112+
* SyncPollerFactory.resumeFromToken(
113+
* token,
114+
* client.getSerializerAdapter(),
115+
* client.getHttpPipeline(),
116+
* new TypeReference<PollResult<ServerInner>>() {}.getJavaType(),
117+
* ServerInner.class,
118+
* Duration.ofSeconds(30),
119+
* Context.NONE);
120+
*
121+
* // Continue polling until completion
122+
* ServerInner result = resumedPoller.getFinalResult();
123+
* }</pre>
124+
*
125+
* @param continuationToken The Base64-encoded continuation token string obtained from a previous poller.
126+
* @param serializerAdapter The serializer for any encoding and decoding. This should be the same type
127+
* as used by the original poller.
128+
* @param httpPipeline The HttpPipeline for making HTTP requests (e.g., poll requests). This should be
129+
* configured with the same authentication and policies as the original poller.
130+
* @param pollResultType The type of the poll result. If no result is expected, this should be Void.class.
131+
* @param finalResultType The type of the final result. If no result is expected, this should be Void.class.
132+
* @param defaultPollDuration The default poll interval to use if the service does not return a retry-after value.
133+
* @param context The context shared by all requests.
134+
* @param <T> The type of poll result.
135+
* @param <U> The type of final result.
136+
* @return A SyncPoller that resumes polling from the state captured in the continuation token.
137+
* @throws IllegalArgumentException if {@code continuationToken} or {@code serializerAdapter} is null or empty.
138+
* @throws RuntimeException if the token cannot be decoded or deserialized, which may occur if:
139+
* <ul>
140+
* <li>The token is malformed or corrupted</li>
141+
* <li>The token was created with a different SDK version</li>
142+
* <li>The token format has changed</li>
143+
* </ul>
144+
*/
145+
public static <T, U> SyncPoller<PollResult<T>, U> resumeFromToken(String continuationToken,
146+
SerializerAdapter serializerAdapter, HttpPipeline httpPipeline, Type pollResultType, Type finalResultType,
147+
Duration defaultPollDuration, Context context) {
148+
149+
// Deserialize the continuation token to get the PollingState
150+
PollingState pollingState = PollingState.fromContinuationToken(continuationToken, serializerAdapter);
151+
152+
// Create a holder for the PollingContext
153+
@SuppressWarnings({ "unchecked", "rawtypes" })
154+
final PollingContext<PollResult<T>>[] contextHolder = (PollingContext<PollResult<T>>[]) new PollingContext[1];
155+
156+
// Create an activation function that returns the current state as the activation response
157+
Function<PollingContext<PollResult<T>>, PollResponse<PollResult<T>>> activationFunction = pollingContext -> {
158+
contextHolder[0] = pollingContext;
159+
pollingState.store(pollingContext);
160+
T result = PollOperation.deserialize(serializerAdapter, pollingState.getLastResponseBody(), pollResultType);
161+
return new PollResponse<>(pollingState.getOperationStatus(), new PollResult<>(result),
162+
pollingState.getPollDelay());
163+
};
164+
165+
// Create the poller with the standard poll, cancel, and fetch result functions
166+
SyncPoller<PollResult<T>, U> innerPoller = SyncPoller.createPoller(defaultPollDuration, activationFunction,
67167
SyncPollOperation.pollFunction(serializerAdapter, httpPipeline, pollResultType, context),
68168
SyncPollOperation.cancelFunction(context),
69169
SyncPollOperation.fetchResultFunction(serializerAdapter, httpPipeline, finalResultType, context));
170+
171+
// Wrap in ArmLroSyncPoller to add continuation token support
172+
return new ArmLroSyncPoller<>(innerPoller, serializerAdapter, () -> contextHolder[0]);
173+
}
174+
175+
/**
176+
* Resumes a SyncPoller for an Azure Resource Manager (ARM) long-running-operation (LRO) from a continuation token.
177+
* <p>
178+
* This is a convenience overload that uses {@link Context#NONE} for the context parameter.
179+
*
180+
* @param continuationToken The Base64-encoded continuation token string obtained from a previous poller.
181+
* @param serializerAdapter The serializer for any encoding and decoding.
182+
* @param httpPipeline The HttpPipeline for making HTTP requests.
183+
* @param pollResultType The type of the poll result.
184+
* @param finalResultType The type of the final result.
185+
* @param defaultPollDuration The default poll interval to use.
186+
* @param <T> The type of poll result.
187+
* @param <U> The type of final result.
188+
* @return A SyncPoller that resumes polling from the state captured in the continuation token.
189+
* @throws IllegalArgumentException if {@code continuationToken} or {@code serializerAdapter} is null or empty.
190+
* @throws RuntimeException if the token cannot be decoded or deserialized.
191+
*/
192+
public static <T, U> SyncPoller<PollResult<T>, U> resumeFromToken(String continuationToken,
193+
SerializerAdapter serializerAdapter, HttpPipeline httpPipeline, Type pollResultType, Type finalResultType,
194+
Duration defaultPollDuration) {
195+
return resumeFromToken(continuationToken, serializerAdapter, httpPipeline, pollResultType, finalResultType,
196+
defaultPollDuration, Context.NONE);
70197
}
71198
}

sdk/core/azure-core-management/src/test/java/com/azure/core/management/implementation/polling/FooWithProvisioningState.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,12 @@ public class FooWithProvisioningState {
1212
public FooWithProvisioningState() {
1313
}
1414

15-
FooWithProvisioningState(String state) {
15+
public FooWithProvisioningState(String state) {
1616
this.properties = new Properties();
1717
this.properties.provisioningState = state;
1818
}
1919

20-
FooWithProvisioningState(String state, String resourceId) {
20+
public FooWithProvisioningState(String state, String resourceId) {
2121
this.properties = new Properties();
2222
this.properties.provisioningState = state;
2323
this.properties.resourceId = resourceId;

0 commit comments

Comments
 (0)