Skip to content

Commit 3d660be

Browse files
committed
[FLINK-14068][core] Removes API that uses org.apache.flink.api.common.time.Time
1 parent ad9c4f6 commit 3d660be

File tree

389 files changed

+1324
-1627
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

389 files changed

+1324
-1627
lines changed

docs/content.zh/docs/dev/datastream/fault-tolerance/state.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ env.execute()
302302
```java
303303
import org.apache.flink.api.common.state.StateTtlConfig;
304304
import org.apache.flink.api.common.state.ValueStateDescriptor;
305-
import org.apache.flink.api.common.time.Time;
305+
import java.time.Duration;
306306

307307
StateTtlConfig ttlConfig = StateTtlConfig
308308
.newBuilder(Duration.ofSeconds(1))
@@ -318,7 +318,7 @@ stateDescriptor.enableTimeToLive(ttlConfig);
318318
```scala
319319
import org.apache.flink.api.common.state.StateTtlConfig
320320
import org.apache.flink.api.common.state.ValueStateDescriptor
321-
import org.apache.flink.api.common.time.Time
321+
import java.time.Duration
322322

323323
val ttlConfig = StateTtlConfig
324324
.newBuilder(Duration.ofSeconds(1))
@@ -438,7 +438,7 @@ ttl_config = StateTtlConfig \
438438
{{< tab "Java" >}}
439439
```java
440440
import org.apache.flink.api.common.state.StateTtlConfig;
441-
import org.apache.flink.api.common.time.Time;
441+
import java.time.Duration;
442442

443443
StateTtlConfig ttlConfig = StateTtlConfig
444444
.newBuilder(Duration.ofSeconds(1))
@@ -449,7 +449,7 @@ StateTtlConfig ttlConfig = StateTtlConfig
449449
{{< tab "Scala" >}}
450450
```scala
451451
import org.apache.flink.api.common.state.StateTtlConfig
452-
import org.apache.flink.api.common.time.Time
452+
import java.time.Duration
453453

454454
val ttlConfig = StateTtlConfig
455455
.newBuilder(Duration.ofSeconds(1))

docs/content/docs/dev/datastream/fault-tolerance/state.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ functionality can then be enabled in any state descriptor by passing the configu
341341
```java
342342
import org.apache.flink.api.common.state.StateTtlConfig;
343343
import org.apache.flink.api.common.state.ValueStateDescriptor;
344-
import org.apache.flink.api.common.time.Time;
344+
import java.time.Duration;
345345

346346
StateTtlConfig ttlConfig = StateTtlConfig
347347
.newBuilder(Duration.ofSeconds(1))
@@ -357,7 +357,7 @@ stateDescriptor.enableTimeToLive(ttlConfig);
357357
```scala
358358
import org.apache.flink.api.common.state.StateTtlConfig
359359
import org.apache.flink.api.common.state.ValueStateDescriptor
360-
import org.apache.flink.api.common.time.Time
360+
import java.time.Duration
361361

362362
val ttlConfig = StateTtlConfig
363363
.newBuilder(Duration.ofSeconds(1))
@@ -488,7 +488,7 @@ It can be configured in `StateTtlConfig`:
488488
{{< tab "Java" >}}
489489
```java
490490
import org.apache.flink.api.common.state.StateTtlConfig;
491-
import org.apache.flink.api.common.time.Time;
491+
import java.time.Duration;
492492

493493
StateTtlConfig ttlConfig = StateTtlConfig
494494
.newBuilder(Duration.ofSeconds(1))
@@ -499,7 +499,7 @@ StateTtlConfig ttlConfig = StateTtlConfig
499499
{{< tab "Scala" >}}
500500
```scala
501501
import org.apache.flink.api.common.state.StateTtlConfig
502-
import org.apache.flink.api.common.time.Time
502+
import java.time.Duration
503503

504504
val ttlConfig = StateTtlConfig
505505
.newBuilder(Duration.ofSeconds(1))

flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.annotation.VisibleForTesting;
2323
import org.apache.flink.api.common.JobID;
24-
import org.apache.flink.api.common.time.Time;
2524
import org.apache.flink.client.ClientUtils;
2625
import org.apache.flink.client.cli.ClientOptions;
2726
import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
@@ -50,6 +49,7 @@
5049
import org.slf4j.Logger;
5150
import org.slf4j.LoggerFactory;
5251

52+
import java.time.Duration;
5353
import java.util.ArrayList;
5454
import java.util.Collection;
5555
import java.util.Collections;
@@ -364,10 +364,8 @@ private CompletableFuture<JobResult> getJobResult(
364364
final JobID jobId,
365365
final ScheduledExecutor scheduledExecutor,
366366
final boolean tolerateMissingResult) {
367-
final Time timeout =
368-
Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
369-
final Time retryPeriod =
370-
Time.milliseconds(configuration.get(ClientOptions.CLIENT_RETRY_PERIOD).toMillis());
367+
final Duration timeout = configuration.get(ClientOptions.CLIENT_TIMEOUT);
368+
final Duration retryPeriod = configuration.get(ClientOptions.CLIENT_RETRY_PERIOD);
371369
final CompletableFuture<JobResult> jobResultFuture =
372370
JobStatusPollingUtils.getJobResult(
373371
dispatcherGateway, jobId, scheduledExecutor, timeout, retryPeriod);

flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.flink.api.common.JobID;
2424
import org.apache.flink.api.common.JobStatus;
2525
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
26-
import org.apache.flink.api.common.time.Time;
2726
import org.apache.flink.core.execution.JobClient;
2827
import org.apache.flink.core.execution.SavepointFormatType;
2928
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
@@ -40,6 +39,7 @@
4039
import javax.annotation.Nullable;
4140

4241
import java.io.IOException;
42+
import java.time.Duration;
4343
import java.util.Map;
4444
import java.util.concurrent.CompletableFuture;
4545
import java.util.concurrent.CompletionException;
@@ -59,15 +59,15 @@ public class EmbeddedJobClient implements JobClient, CoordinationRequestGateway
5959

6060
private final ScheduledExecutor retryExecutor;
6161

62-
private final Time timeout;
62+
private final Duration timeout;
6363

6464
private final ClassLoader classLoader;
6565

6666
public EmbeddedJobClient(
6767
final JobID jobId,
6868
final DispatcherGateway dispatcherGateway,
6969
final ScheduledExecutor retryExecutor,
70-
final Time rpcTimeout,
70+
final Duration rpcTimeout,
7171
final ClassLoader classLoader) {
7272
this.jobId = checkNotNull(jobId);
7373
this.dispatcherGateway = checkNotNull(dispatcherGateway);
@@ -136,7 +136,7 @@ public CompletableFuture<Map<String, Object>> getAccumulators() {
136136
public CompletableFuture<JobExecutionResult> getJobExecutionResult() {
137137
checkNotNull(classLoader);
138138

139-
final Time retryPeriod = Time.milliseconds(100L);
139+
final Duration retryPeriod = Duration.ofMillis(100L);
140140
return JobStatusPollingUtils.getJobResult(
141141
dispatcherGateway, jobId, retryExecutor, timeout, retryPeriod)
142142
.thenApply(

flink-clients/src/main/java/org/apache/flink/client/deployment/application/JobStatusPollingUtils.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@
2121
import org.apache.flink.annotation.VisibleForTesting;
2222
import org.apache.flink.api.common.JobID;
2323
import org.apache.flink.api.common.JobStatus;
24-
import org.apache.flink.api.common.time.Time;
2524
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
2625
import org.apache.flink.runtime.jobmaster.JobResult;
2726
import org.apache.flink.util.concurrent.ScheduledExecutor;
2827

28+
import java.time.Duration;
2929
import java.util.concurrent.CompletableFuture;
3030
import java.util.concurrent.TimeUnit;
3131
import java.util.function.Supplier;
@@ -50,14 +50,14 @@ static CompletableFuture<JobResult> getJobResult(
5050
final DispatcherGateway dispatcherGateway,
5151
final JobID jobId,
5252
final ScheduledExecutor scheduledExecutor,
53-
final Time rpcTimeout,
54-
final Time retryPeriod) {
53+
final Duration rpcTimeout,
54+
final Duration retryPeriod) {
5555

5656
return pollJobResultAsync(
5757
() -> dispatcherGateway.requestJobStatus(jobId, rpcTimeout),
5858
() -> dispatcherGateway.requestJobResult(jobId, rpcTimeout),
5959
scheduledExecutor,
60-
retryPeriod.toMilliseconds());
60+
retryPeriod.toMillis());
6161
}
6262

6363
@VisibleForTesting

flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.api.common.JobID;
23-
import org.apache.flink.api.common.time.Time;
2423
import org.apache.flink.api.dag.Pipeline;
2524
import org.apache.flink.client.cli.ClientOptions;
2625
import org.apache.flink.client.deployment.executors.PipelineExecutorUtils;
@@ -43,6 +42,7 @@
4342

4443
import java.net.InetSocketAddress;
4544
import java.net.MalformedURLException;
45+
import java.time.Duration;
4646
import java.util.Collection;
4747
import java.util.List;
4848
import java.util.Optional;
@@ -136,8 +136,7 @@ private CompletableFuture<JobClient> submitAndGetJobClientFuture(
136136
final Configuration configuration,
137137
final ClassLoader userCodeClassloader)
138138
throws MalformedURLException {
139-
final Time timeout =
140-
Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
139+
final Duration timeout = configuration.get(ClientOptions.CLIENT_TIMEOUT);
141140

142141
final JobGraph jobGraph =
143142
PipelineExecutorUtils.getJobGraph(pipeline, configuration, userCodeClassloader);
@@ -191,7 +190,7 @@ private static CompletableFuture<JobID> submitJob(
191190
final Configuration configuration,
192191
final DispatcherGateway dispatcherGateway,
193192
final JobGraph jobGraph,
194-
final Time rpcTimeout) {
193+
final Duration rpcTimeout) {
195194
checkNotNull(jobGraph);
196195

197196
LOG.info("Submitting Job with JobId={}.", jobGraph.getJobID());

flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.api.common.JobID;
23-
import org.apache.flink.api.common.time.Time;
2423
import org.apache.flink.client.cli.ClientOptions;
2524
import org.apache.flink.client.deployment.application.EmbeddedJobClient;
2625
import org.apache.flink.configuration.Configuration;
@@ -29,6 +28,7 @@
2928
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
3029
import org.apache.flink.util.concurrent.ScheduledExecutor;
3130

31+
import java.time.Duration;
3232
import java.util.Collection;
3333

3434
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -81,9 +81,7 @@ public PipelineExecutor getExecutor(final Configuration configuration) {
8181
dispatcherGateway,
8282
configuration,
8383
(jobId, userCodeClassloader) -> {
84-
final Time timeout =
85-
Time.milliseconds(
86-
configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
84+
final Duration timeout = configuration.get(ClientOptions.CLIENT_TIMEOUT);
8785
return new EmbeddedJobClient(
8886
jobId, dispatcherGateway, retryExecutor, timeout, userCodeClassloader);
8987
});

flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.flink.api.common.JobStatus;
2424
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
2525
import org.apache.flink.api.common.cache.DistributedCache;
26-
import org.apache.flink.api.common.time.Time;
2726
import org.apache.flink.api.java.tuple.Tuple2;
2827
import org.apache.flink.client.ClientUtils;
2928
import org.apache.flink.client.program.ClusterClient;
@@ -300,7 +299,7 @@ public void close() {
300299
TimeUnit.MILLISECONDS,
301300
retryExecutorService);
302301

303-
this.restClient.shutdown(Time.seconds(5));
302+
this.restClient.shutdown(Duration.ofSeconds(5));
304303
ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, this.executorService);
305304

306305
try {

flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/AsyncSinkBaseITCase.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.flink.connector.base.sink;
1919

2020
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
21-
import org.apache.flink.api.common.time.Time;
2221
import org.apache.flink.runtime.client.JobExecutionException;
2322
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
2423
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -29,6 +28,8 @@
2928
import org.junit.jupiter.api.extension.ExtendWith;
3029
import org.junit.jupiter.api.extension.RegisterExtension;
3130

31+
import java.time.Duration;
32+
3233
import static org.assertj.core.api.Assertions.assertThatThrownBy;
3334

3435
/** Integration tests of a baseline generic sink that implements the AsyncSinkBase. */
@@ -66,7 +67,7 @@ public void testFailuresOnPersistingToDestinationAreCaughtAndRaised() {
6667
@Test
6768
public void testThatNoIssuesOccurWhenCheckpointingIsEnabled() throws Exception {
6869
env.enableCheckpointing(20);
69-
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.milliseconds(200)));
70+
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Duration.ofMillis(200)));
7071
env.fromSequence(1, 10_000).map(Object::toString).sinkTo(new ArrayListAsyncSink());
7172
env.execute("Integration Test: AsyncSinkBaseITCase");
7273
}

flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.flink.api.common.RuntimeExecutionMode;
2222
import org.apache.flink.api.common.functions.RichMapFunction;
2323
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
24-
import org.apache.flink.api.common.time.Time;
2524
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
2625
import org.apache.flink.api.connector.source.Boundedness;
2726
import org.apache.flink.configuration.Configuration;
@@ -34,6 +33,8 @@
3433
import org.apache.flink.streaming.api.graph.StreamGraph;
3534
import org.apache.flink.streaming.api.operators.StreamSource;
3635

36+
import java.time.Duration;
37+
3738
/** Tests the functionality of the {@link FileSink} in BATCH mode. */
3839
class BatchExecutionFileSinkITCase extends FileSinkITBase {
3940

@@ -49,7 +50,7 @@ protected JobGraph createJobGraph(boolean triggerFailover, String path) {
4950
env.configure(config, getClass().getClassLoader());
5051

5152
if (triggerFailover) {
52-
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.milliseconds(100)));
53+
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Duration.ofMillis(100)));
5354
} else {
5455
env.setRestartStrategy(RestartStrategies.noRestart());
5556
}

flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
2323
import org.apache.flink.api.common.state.ListState;
2424
import org.apache.flink.api.common.state.ListStateDescriptor;
25-
import org.apache.flink.api.common.time.Time;
2625
import org.apache.flink.configuration.Configuration;
2726
import org.apache.flink.configuration.ExecutionOptions;
2827
import org.apache.flink.core.execution.CheckpointingMode;
@@ -39,6 +38,7 @@
3938
import org.junit.jupiter.api.AfterEach;
4039
import org.junit.jupiter.api.BeforeEach;
4140

41+
import java.time.Duration;
4242
import java.util.Collections;
4343
import java.util.Map;
4444
import java.util.UUID;
@@ -82,7 +82,7 @@ protected JobGraph createJobGraph(boolean triggerFailover, String path) {
8282
env.enableCheckpointing(10, CheckpointingMode.EXACTLY_ONCE);
8383

8484
if (triggerFailover) {
85-
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.milliseconds(100)));
85+
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Duration.ofMillis(100)));
8686
} else {
8787
env.setRestartStrategy(RestartStrategies.noRestart());
8888
}

flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketTest.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -236,11 +236,7 @@ void testTableRollingOnProcessingTime(@TempDir java.nio.file.Path tmpDir) throws
236236
Path path = new Path(outDir.toURI());
237237

238238
FileSystemTableSink.TableRollingPolicy tableRollingPolicy =
239-
new FileSystemTableSink.TableRollingPolicy(
240-
false,
241-
Long.MAX_VALUE,
242-
Duration.ofMillis(20).toMillis(),
243-
Duration.ofMillis(10).toMillis());
239+
new FileSystemTableSink.TableRollingPolicy(false, Long.MAX_VALUE, 20L, 10L);
244240

245241
TestRecoverableWriter recoverableWriter = getRecoverableWriter(path);
246242
FileWriterBucket<RowData> bucket =

flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
package org.apache.flink.streaming.connectors.wikiedits;
2020

21-
import org.apache.flink.api.common.time.Time;
2221
import org.apache.flink.streaming.api.functions.source.SourceFunction;
2322
import org.apache.flink.streaming.api.watermark.Watermark;
2423
import org.apache.flink.testutils.junit.RetryOnFailure;
@@ -31,6 +30,7 @@
3130

3231
import java.net.InetSocketAddress;
3332
import java.net.Socket;
33+
import java.time.Duration;
3434
import java.util.Objects;
3535
import java.util.concurrent.ArrayBlockingQueue;
3636
import java.util.concurrent.BlockingQueue;
@@ -53,7 +53,7 @@ class WikipediaEditsSourceTest {
5353
@RetryOnFailure(times = 1)
5454
void testWikipediaEditsSource() throws Exception {
5555
if (canConnect(1, TimeUnit.SECONDS)) {
56-
final Time testTimeout = Time.seconds(60);
56+
final Duration testTimeout = Duration.ofSeconds(60);
5757
final WikipediaEditsSource wikipediaEditsSource = new WikipediaEditsSource();
5858

5959
ExecutorService executorService = null;
@@ -116,8 +116,8 @@ void testWikipediaEditsSource() throws Exception {
116116
}
117117
}
118118

119-
private long deadlineNanos(Time testTimeout) {
120-
return System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(testTimeout.toMilliseconds());
119+
private long deadlineNanos(Duration testTimeout) {
120+
return System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(testTimeout.toMillis());
121121
}
122122

123123
private static class CollectingSourceContext<T> implements SourceFunction.SourceContext<T> {

0 commit comments

Comments
 (0)