Skip to content

Commit bd3fbc2

Browse files
committed
Add continuation token support for SyncPoller ARM LROs
- Add PollingState.toContinuationToken() and fromContinuationToken() methods - Add SyncPoller.serializeContinuationToken() interface method - Implement ArmLroSyncPoller wrapper with continuation token support - Add SyncPollerFactory.resumeFromToken() to reconstruct pollers from tokens - Add comprehensive unit and integration tests (12 tests, all passing) - Update CHANGELOGs for azure-core and azure-core-management This enables ARM Long-Running Operations to be resumed across process restarts by serializing the poller state to a Base64-encoded token.
1 parent 83ca5b7 commit bd3fbc2

File tree

10 files changed

+927
-5
lines changed

10 files changed

+927
-5
lines changed

sdk/core/azure-core-management/CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,13 @@
44

55
### Features Added
66

7+
- Added continuation token support for ARM Long-Running Operations (LROs):
8+
- `PollingState.toContinuationToken()` - Serialize polling state to a Base64-encoded token
9+
- `PollingState.fromContinuationToken()` - Deserialize token back to polling state
10+
- `SyncPollerFactory.resumeFromToken()` - Resume a SyncPoller from a continuation token
11+
- `ArmLroSyncPoller` - New implementation that supports continuation tokens for ARM LROs
12+
- `PollerResumeContext` - Helper class to simplify resuming pollers by bundling infrastructure parameters
13+
714
### Breaking Changes
815

916
### Bugs Fixed

sdk/core/azure-core-management/src/main/java/com/azure/core/management/implementation/polling/PollingState.java

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,81 @@ public <T> void store(PollingContext<T> context) {
169169
}
170170
}
171171

172+
/**
173+
* Serializes the current PollingState into a continuation token string that can be used to resume
174+
* the long-running operation at a later time or in a different process.
175+
* <p>
176+
* The continuation token is a Base64-encoded JSON representation of the polling state. It contains
177+
* all necessary information to reconstruct the poller and continue polling the operation, including:
178+
* <ul>
179+
* <li>Operation URL and HTTP method</li>
180+
* <li>Current provisioning state</li>
181+
* <li>Polling URLs (Azure-AsyncOperation, Location headers, etc.)</li>
182+
* <li>Last known response data</li>
183+
* </ul>
184+
* <p>
185+
* <strong>Security Note:</strong> The continuation token contains the operation URL. Ensure tokens
186+
* are stored securely and transmitted over secure channels.
187+
* <p>
188+
* <strong>Compatibility Note:</strong> The token format is internal and may change between SDK versions.
189+
* Tokens should only be used with the same version of the SDK that generated them.
190+
*
191+
* @return A Base64-encoded continuation token string representing the current polling state.
192+
* @throws RuntimeException if the state cannot be serialized into a token.
193+
*/
194+
public String toContinuationToken() {
195+
try {
196+
String jsonState = this.serializerAdapter.serialize(this, SerializerEncoding.JSON);
197+
return java.util.Base64.getEncoder()
198+
.encodeToString(jsonState.getBytes(java.nio.charset.StandardCharsets.UTF_8));
199+
} catch (IOException ioe) {
200+
throw LOGGER.logExceptionAsError(
201+
new RuntimeException("Failed to serialize PollingState to continuation token.", ioe));
202+
}
203+
}
204+
205+
/**
206+
* Deserializes a continuation token string into a PollingState object that can be used to resume
207+
* a long-running operation.
208+
* <p>
209+
* This method is the counterpart to {@link #toContinuationToken()} and reconstructs a PollingState
210+
* from a previously serialized token. The reconstructed state can then be used to create a new
211+
* SyncPoller that continues polling from where the previous poller left off.
212+
*
213+
* @param continuationToken The Base64-encoded continuation token string, previously obtained from
214+
* {@link #toContinuationToken()}.
215+
* @param serializerAdapter The serializer for decoding the token. This should be the same serializer
216+
* type used by the service client.
217+
* @return A PollingState object reconstructed from the continuation token.
218+
* @throws IllegalArgumentException if the {@code continuationToken} or {@code serializerAdapter} is null or empty.
219+
* @throws RuntimeException if the token cannot be decoded or deserialized. This may occur if:
220+
* <ul>
221+
* <li>The token is malformed or corrupted</li>
222+
* <li>The token was created with a different SDK version</li>
223+
* <li>The token format has changed</li>
224+
* </ul>
225+
*/
226+
public static PollingState fromContinuationToken(String continuationToken, SerializerAdapter serializerAdapter) {
227+
Objects.requireNonNull(continuationToken, "'continuationToken' cannot be null.");
228+
Objects.requireNonNull(serializerAdapter, "'serializerAdapter' cannot be null.");
229+
230+
if (continuationToken.isEmpty()) {
231+
throw LOGGER.logExceptionAsError(new IllegalArgumentException("'continuationToken' cannot be empty."));
232+
}
233+
234+
try {
235+
byte[] decodedBytes = java.util.Base64.getDecoder().decode(continuationToken);
236+
String jsonState = new String(decodedBytes, java.nio.charset.StandardCharsets.UTF_8);
237+
return PollingState.from(serializerAdapter, jsonState);
238+
} catch (IllegalArgumentException iae) {
239+
throw LOGGER.logExceptionAsError(new RuntimeException(
240+
"Failed to decode continuation token. The token may be malformed or corrupted.", iae));
241+
} catch (RuntimeException re) {
242+
throw LOGGER.logExceptionAsError(new RuntimeException("Failed to deserialize continuation token. "
243+
+ "The token may have been created with a different SDK version.", re));
244+
}
245+
}
246+
172247
/**
173248
* @return the current status of the long-running-operation.
174249
*/
@@ -277,7 +352,7 @@ FinalResult getFinalResult() {
277352
/**
278353
* @return the last response body this PollingState received
279354
*/
280-
String getLastResponseBody() {
355+
public String getLastResponseBody() {
281356
return this.lastResponseBody;
282357
}
283358

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.core.management.polling;
5+
6+
import com.azure.core.management.implementation.polling.PollingState;
7+
import com.azure.core.util.logging.ClientLogger;
8+
import com.azure.core.util.polling.PollResponse;
9+
import com.azure.core.util.polling.PollingContext;
10+
import com.azure.core.util.polling.SyncPoller;
11+
import com.azure.core.util.serializer.SerializerAdapter;
12+
13+
import java.time.Duration;
14+
import java.util.Objects;
15+
import java.util.function.BiFunction;
16+
import java.util.function.Function;
17+
18+
/**
19+
* Azure Resource Manager (ARM) Long-Running Operation SyncPoller implementation with continuation token support.
20+
* <p>
21+
* This implementation wraps a standard {@link SyncPoller} and adds ARM-specific functionality including
22+
* the ability to serialize the poller state to a continuation token and resume from such a token.
23+
* <p>
24+
* This class is package-private and should only be created through {@link SyncPollerFactory}.
25+
*
26+
* @param <T> The type of poll response value.
27+
* @param <U> The type of the final result of long-running operation.
28+
*/
29+
final class ArmLroSyncPoller<T, U> implements SyncPoller<PollResult<T>, U> {
30+
private static final ClientLogger LOGGER = new ClientLogger(ArmLroSyncPoller.class);
31+
32+
private final SyncPoller<PollResult<T>, U> innerPoller;
33+
private final SerializerAdapter serializerAdapter;
34+
// We'll need a way to access the PollingContext from the inner poller
35+
// Since SimpleSyncPoller doesn't expose it, we'll need to track it ourselves
36+
private final PollingContextAccessor<PollResult<T>> contextAccessor;
37+
38+
/**
39+
* Functional interface to access the PollingContext from a poller.
40+
*/
41+
@FunctionalInterface
42+
interface PollingContextAccessor<T> {
43+
PollingContext<T> getContext();
44+
}
45+
46+
/**
47+
* Creates an ArmLroSyncPoller.
48+
*
49+
* @param innerPoller The underlying SyncPoller implementation.
50+
* @param serializerAdapter The serializer for encoding/decoding.
51+
* @param contextAccessor Accessor to get the current PollingContext.
52+
*/
53+
ArmLroSyncPoller(SyncPoller<PollResult<T>, U> innerPoller, SerializerAdapter serializerAdapter,
54+
PollingContextAccessor<PollResult<T>> contextAccessor) {
55+
this.innerPoller = Objects.requireNonNull(innerPoller, "'innerPoller' cannot be null.");
56+
this.serializerAdapter = Objects.requireNonNull(serializerAdapter, "'serializerAdapter' cannot be null.");
57+
this.contextAccessor = Objects.requireNonNull(contextAccessor, "'contextAccessor' cannot be null.");
58+
}
59+
60+
@Override
61+
public PollResponse<PollResult<T>> poll() {
62+
return innerPoller.poll();
63+
}
64+
65+
@Override
66+
public PollResponse<PollResult<T>> waitForCompletion() {
67+
return innerPoller.waitForCompletion();
68+
}
69+
70+
@Override
71+
public PollResponse<PollResult<T>> waitForCompletion(Duration timeout) {
72+
return innerPoller.waitForCompletion(timeout);
73+
}
74+
75+
@Override
76+
public PollResponse<PollResult<T>>
77+
waitUntil(com.azure.core.util.polling.LongRunningOperationStatus statusToWaitFor) {
78+
return innerPoller.waitUntil(statusToWaitFor);
79+
}
80+
81+
@Override
82+
public PollResponse<PollResult<T>> waitUntil(Duration timeout,
83+
com.azure.core.util.polling.LongRunningOperationStatus statusToWaitFor) {
84+
return innerPoller.waitUntil(timeout, statusToWaitFor);
85+
}
86+
87+
@Override
88+
public U getFinalResult() {
89+
return innerPoller.getFinalResult();
90+
}
91+
92+
@Override
93+
public U getFinalResult(Duration timeout) {
94+
return innerPoller.getFinalResult(timeout);
95+
}
96+
97+
@Override
98+
public void cancelOperation() {
99+
innerPoller.cancelOperation();
100+
}
101+
102+
@Override
103+
public SyncPoller<PollResult<T>, U> setPollInterval(Duration pollInterval) {
104+
innerPoller.setPollInterval(pollInterval);
105+
return this;
106+
}
107+
108+
@Override
109+
public String serializeContinuationToken() {
110+
try {
111+
PollingContext<PollResult<T>> context = contextAccessor.getContext();
112+
PollingState pollingState = PollingState.from(serializerAdapter, context);
113+
return pollingState.toContinuationToken();
114+
} catch (Exception e) {
115+
throw LOGGER.logExceptionAsError(new RuntimeException("Failed to serialize continuation token. "
116+
+ "The poller may not have been started or the polling state may be unavailable.", e));
117+
}
118+
}
119+
}
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.core.management.polling;
5+
6+
import com.azure.core.http.HttpPipeline;
7+
import com.azure.core.util.Context;
8+
import com.azure.core.util.serializer.SerializerAdapter;
9+
10+
import java.lang.reflect.Type;
11+
import java.time.Duration;
12+
import java.util.Objects;
13+
14+
/**
15+
* Context object that holds the infrastructure components needed to resume a SyncPoller from a continuation token.
16+
* This class simplifies the API for resuming pollers, especially when calling from languages like Scala,
17+
* by bundling related parameters together.
18+
*
19+
* <p><strong>Example usage:</strong></p>
20+
* <pre>{@code
21+
* // Create a reusable context
22+
* PollerResumeContext context = new PollerResumeContext(
23+
* manager.serviceClient().getSerializerAdapter(),
24+
* manager.serviceClient().getHttpPipeline(),
25+
* Duration.ofSeconds(30)
26+
* );
27+
*
28+
* // Resume from token with simpler API
29+
* SyncPoller<?, ?> poller = context.resumeFromToken(
30+
* token,
31+
* ServerInner.class,
32+
* ServerInner.class
33+
* );
34+
* }</pre>
35+
*/
36+
public final class PollerResumeContext {
37+
private final SerializerAdapter serializerAdapter;
38+
private final HttpPipeline httpPipeline;
39+
private final Duration defaultPollDuration;
40+
private final Context context;
41+
42+
/**
43+
* Creates a PollerResumeContext with the specified infrastructure components.
44+
*
45+
* @param serializerAdapter The serializer for encoding/decoding.
46+
* @param httpPipeline The HTTP pipeline for making requests.
47+
* @param defaultPollDuration The default poll interval.
48+
*/
49+
public PollerResumeContext(SerializerAdapter serializerAdapter, HttpPipeline httpPipeline,
50+
Duration defaultPollDuration) {
51+
this(serializerAdapter, httpPipeline, defaultPollDuration, Context.NONE);
52+
}
53+
54+
/**
55+
* Creates a PollerResumeContext with the specified infrastructure components and context.
56+
*
57+
* @param serializerAdapter The serializer for encoding/decoding.
58+
* @param httpPipeline The HTTP pipeline for making requests.
59+
* @param defaultPollDuration The default poll interval.
60+
* @param context The context for requests.
61+
*/
62+
public PollerResumeContext(SerializerAdapter serializerAdapter, HttpPipeline httpPipeline,
63+
Duration defaultPollDuration, Context context) {
64+
this.serializerAdapter = Objects.requireNonNull(serializerAdapter, "'serializerAdapter' cannot be null.");
65+
this.httpPipeline = Objects.requireNonNull(httpPipeline, "'httpPipeline' cannot be null.");
66+
this.defaultPollDuration
67+
= Objects.requireNonNull(defaultPollDuration, "'defaultPollDuration' cannot be null.");
68+
this.context = Objects.requireNonNull(context, "'context' cannot be null.");
69+
}
70+
71+
/**
72+
* Resumes a SyncPoller from a continuation token using this context.
73+
*
74+
* @param continuationToken The continuation token.
75+
* @param pollResultType The type of the poll result.
76+
* @param finalResultType The type of the final result.
77+
* @param <T> The type of poll result.
78+
* @param <U> The type of final result.
79+
* @return A SyncPoller that resumes polling from the token.
80+
*/
81+
public <T, U> com.azure.core.util.polling.SyncPoller<PollResult<T>, U> resumeFromToken(String continuationToken,
82+
Type pollResultType, Type finalResultType) {
83+
return SyncPollerFactory.resumeFromToken(continuationToken, serializerAdapter, httpPipeline, pollResultType,
84+
finalResultType, defaultPollDuration, context);
85+
}
86+
87+
/**
88+
* Gets the serializer adapter.
89+
*
90+
* @return The serializer adapter.
91+
*/
92+
public SerializerAdapter getSerializerAdapter() {
93+
return serializerAdapter;
94+
}
95+
96+
/**
97+
* Gets the HTTP pipeline.
98+
*
99+
* @return The HTTP pipeline.
100+
*/
101+
public HttpPipeline getHttpPipeline() {
102+
return httpPipeline;
103+
}
104+
105+
/**
106+
* Gets the default poll duration.
107+
*
108+
* @return The default poll duration.
109+
*/
110+
public Duration getDefaultPollDuration() {
111+
return defaultPollDuration;
112+
}
113+
114+
/**
115+
* Gets the context.
116+
*
117+
* @return The context.
118+
*/
119+
public Context getContext() {
120+
return context;
121+
}
122+
}
123+

0 commit comments

Comments
 (0)