Skip to content

Commit 6078af6

Browse files
committed
[FLINK-14068][core] Removes API that uses org.apache.flink.api.common.time.Time
1 parent 8dc40ab commit 6078af6

File tree

390 files changed

+1345
-1817
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

390 files changed

+1345
-1817
lines changed

docs/content.zh/docs/dev/datastream/fault-tolerance/state.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ env.execute()
302302
```java
303303
import org.apache.flink.api.common.state.StateTtlConfig;
304304
import org.apache.flink.api.common.state.ValueStateDescriptor;
305-
import org.apache.flink.api.common.time.Time;
305+
import java.time.Duration;
306306

307307
StateTtlConfig ttlConfig = StateTtlConfig
308308
.newBuilder(Duration.ofSeconds(1))
@@ -318,7 +318,7 @@ stateDescriptor.enableTimeToLive(ttlConfig);
318318
```scala
319319
import org.apache.flink.api.common.state.StateTtlConfig
320320
import org.apache.flink.api.common.state.ValueStateDescriptor
321-
import org.apache.flink.api.common.time.Time
321+
import java.time.Duration
322322

323323
val ttlConfig = StateTtlConfig
324324
.newBuilder(Duration.ofSeconds(1))
@@ -337,7 +337,7 @@ from pyflink.common.typeinfo import Types
337337
from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig
338338

339339
ttl_config = StateTtlConfig \
340-
.new_builder(Time.seconds(1)) \
340+
.new_builder(Duration.ofSeconds(1)) \
341341
.set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) \
342342
.set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) \
343343
.build()
@@ -420,7 +420,7 @@ from pyflink.common.time import Time
420420
from pyflink.datastream.state import StateTtlConfig
421421

422422
ttl_config = StateTtlConfig \
423-
.new_builder(Time.seconds(1)) \
423+
.new_builder(Duration.ofSeconds(1)) \
424424
.disable_cleanup_in_background() \
425425
.build()
426426
```
@@ -438,7 +438,7 @@ ttl_config = StateTtlConfig \
438438
{{< tab "Java" >}}
439439
```java
440440
import org.apache.flink.api.common.state.StateTtlConfig;
441-
import org.apache.flink.api.common.time.Time;
441+
import java.time.Duration;
442442

443443
StateTtlConfig ttlConfig = StateTtlConfig
444444
.newBuilder(Duration.ofSeconds(1))
@@ -449,7 +449,7 @@ StateTtlConfig ttlConfig = StateTtlConfig
449449
{{< tab "Scala" >}}
450450
```scala
451451
import org.apache.flink.api.common.state.StateTtlConfig
452-
import org.apache.flink.api.common.time.Time
452+
import java.time.Duration
453453

454454
val ttlConfig = StateTtlConfig
455455
.newBuilder(Duration.ofSeconds(1))
@@ -463,7 +463,7 @@ from pyflink.common.time import Time
463463
from pyflink.datastream.state import StateTtlConfig
464464

465465
ttl_config = StateTtlConfig \
466-
.new_builder(Time.seconds(1)) \
466+
.new_builder(Duration.ofSeconds(1)) \
467467
.cleanup_full_snapshot() \
468468
.build()
469469
```
@@ -507,7 +507,7 @@ from pyflink.common.time import Time
507507
from pyflink.datastream.state import StateTtlConfig
508508

509509
ttl_config = StateTtlConfig \
510-
.new_builder(Time.seconds(1)) \
510+
.new_builder(Duration.ofSeconds(1)) \
511511
.cleanup_incrementally(10, True) \
512512
.build()
513513
```

docs/content.zh/docs/ops/state/task_failure_recovery.md

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm
6969
val env = StreamExecutionEnvironment.getExecutionEnvironment()
7070
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
7171
3, // 尝试重启的次数
72-
Time.of(10, TimeUnit.SECONDS) // 延时
72+
Duration.ofSeconds(10) // 延时
7373
))
7474
```
7575
{{< /tab >}}
@@ -126,7 +126,7 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm
126126
val env = StreamExecutionEnvironment.getExecutionEnvironment()
127127
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
128128
3, // 尝试重启的次数
129-
Time.of(10, TimeUnit.SECONDS) // 延时
129+
Duration.ofSeconds(10) // 延时
130130
))
131131
```
132132
{{< /tab >}}
@@ -184,10 +184,10 @@ env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
184184
```scala
185185
val env = StreamExecutionEnvironment.getExecutionEnvironment()
186186
env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
187-
Time.of(1, TimeUnit.MILLISECONDS), // initial delay between restarts
188-
Time.of(1000, TimeUnit.MILLISECONDS), // maximum delay between restarts
187+
Duration.ofMillis(1), // initial delay between restarts
188+
Duration.ofMillis(1000), // maximum delay between restarts
189189
1.1, // exponential multiplier
190-
Time.of(2, TimeUnit.SECONDS), // 重置延迟时间到初始值的阈值
190+
Duration.ofSeconds(2), // 重置延迟时间到初始值的阈值
191191
0.1 // jitter
192192
))
193193
```
@@ -279,8 +279,8 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm
279279
val env = StreamExecutionEnvironment.getExecutionEnvironment()
280280
env.setRestartStrategy(RestartStrategies.failureRateRestart(
281281
3, // 每个时间间隔的最大故障次数
282-
Time.of(5, TimeUnit.MINUTES), // 测量故障率的时间间隔
283-
Time.of(10, TimeUnit.SECONDS) // 延时
282+
Duration.ofMinutes(5), // 测量故障率的时间间隔
283+
Duration.ofSeconds(10) // 延时
284284
))
285285
```
286286
{{< /tab >}}

docs/content/docs/dev/datastream/fault-tolerance/state.md

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ functionality can then be enabled in any state descriptor by passing the configu
341341
```java
342342
import org.apache.flink.api.common.state.StateTtlConfig;
343343
import org.apache.flink.api.common.state.ValueStateDescriptor;
344-
import org.apache.flink.api.common.time.Time;
344+
import java.time.Duration;
345345

346346
StateTtlConfig ttlConfig = StateTtlConfig
347347
.newBuilder(Duration.ofSeconds(1))
@@ -357,7 +357,7 @@ stateDescriptor.enableTimeToLive(ttlConfig);
357357
```scala
358358
import org.apache.flink.api.common.state.StateTtlConfig
359359
import org.apache.flink.api.common.state.ValueStateDescriptor
360-
import org.apache.flink.api.common.time.Time
360+
import java.time.Duration
361361

362362
val ttlConfig = StateTtlConfig
363363
.newBuilder(Duration.ofSeconds(1))
@@ -376,7 +376,7 @@ from pyflink.common.typeinfo import Types
376376
from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig
377377

378378
ttl_config = StateTtlConfig \
379-
.new_builder(Time.seconds(1)) \
379+
.new_builder(Duration.ofSeconds(1)) \
380380
.set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) \
381381
.set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) \
382382
.build()
@@ -467,7 +467,7 @@ from pyflink.common.time import Time
467467
from pyflink.datastream.state import StateTtlConfig
468468

469469
ttl_config = StateTtlConfig \
470-
.new_builder(Time.seconds(1)) \
470+
.new_builder(Duration.ofSeconds(1)) \
471471
.disable_cleanup_in_background() \
472472
.build()
473473
```
@@ -488,7 +488,7 @@ It can be configured in `StateTtlConfig`:
488488
{{< tab "Java" >}}
489489
```java
490490
import org.apache.flink.api.common.state.StateTtlConfig;
491-
import org.apache.flink.api.common.time.Time;
491+
import java.time.Duration;
492492

493493
StateTtlConfig ttlConfig = StateTtlConfig
494494
.newBuilder(Duration.ofSeconds(1))
@@ -499,7 +499,7 @@ StateTtlConfig ttlConfig = StateTtlConfig
499499
{{< tab "Scala" >}}
500500
```scala
501501
import org.apache.flink.api.common.state.StateTtlConfig
502-
import org.apache.flink.api.common.time.Time
502+
import java.time.Duration
503503

504504
val ttlConfig = StateTtlConfig
505505
.newBuilder(Duration.ofSeconds(1))
@@ -513,7 +513,7 @@ from pyflink.common.time import Time
513513
from pyflink.datastream.state import StateTtlConfig
514514

515515
ttl_config = StateTtlConfig \
516-
.new_builder(Time.seconds(1)) \
516+
.new_builder(Duration.ofSeconds(1)) \
517517
.cleanup_full_snapshot() \
518518
.build()
519519
```
@@ -563,7 +563,7 @@ from pyflink.common.time import Time
563563
from pyflink.datastream.state import StateTtlConfig
564564

565565
ttl_config = StateTtlConfig \
566-
.new_builder(Time.seconds(1)) \
566+
.new_builder(Duration.ofSeconds(1)) \
567567
.cleanup_incrementally(10, True) \
568568
.build()
569569
```
@@ -622,7 +622,7 @@ from pyflink.common.time import Time
622622
from pyflink.datastream.state import StateTtlConfig
623623

624624
ttl_config = StateTtlConfig \
625-
.new_builder(Time.seconds(1)) \
625+
.new_builder(Duration.ofSeconds(1)) \
626626
.cleanup_in_rocksdb_compact_filter(1000, Duration.of_hours(1)) \
627627
.build()
628628
```

docs/content/docs/ops/state/task_failure_recovery.md

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm
7373
val env = StreamExecutionEnvironment.getExecutionEnvironment()
7474
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
7575
3, // number of restart attempts
76-
Time.of(10, TimeUnit.SECONDS) // delay
76+
Duration.ofSeconds(10) // delay
7777
))
7878
```
7979
{{< /tab >}}
@@ -129,7 +129,7 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm
129129
val env = StreamExecutionEnvironment.getExecutionEnvironment()
130130
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
131131
3, // number of restart attempts
132-
Time.of(10, TimeUnit.SECONDS) // delay
132+
Duration.ofSeconds(10) // delay
133133
))
134134
```
135135
{{< /tab >}}
@@ -188,10 +188,10 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm
188188
```scala
189189
val env = StreamExecutionEnvironment.getExecutionEnvironment()
190190
env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
191-
Time.of(1, TimeUnit.MILLISECONDS), // initial delay between restarts
192-
Time.of(1000, TimeUnit.MILLISECONDS), // maximum delay between restarts
191+
Duration.ofMillis(1), // initial delay between restarts
192+
Duration.ofMillis(1000), // maximum delay between restarts
193193
1.1, // exponential multiplier
194-
Time.of(2, TimeUnit.SECONDS), // threshold duration to reset delay to its initial value
194+
Duration.ofSeconds(2), // threshold duration to reset delay to its initial value
195195
0.1 // jitter
196196
))
197197
```
@@ -281,8 +281,8 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm
281281
val env = StreamExecutionEnvironment.getExecutionEnvironment()
282282
env.setRestartStrategy(RestartStrategies.failureRateRestart(
283283
3, // max failures per unit
284-
Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
285-
Time.of(10, TimeUnit.SECONDS) // delay
284+
Duration.ofMinutes(5), //time interval for measuring failure rate
285+
Duration.ofSeconds(10) // delay
286286
))
287287
```
288288
{{< /tab >}}

flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.annotation.VisibleForTesting;
2323
import org.apache.flink.api.common.JobID;
24-
import org.apache.flink.api.common.time.Time;
2524
import org.apache.flink.client.ClientUtils;
2625
import org.apache.flink.client.cli.ClientOptions;
2726
import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
@@ -50,6 +49,7 @@
5049
import org.slf4j.Logger;
5150
import org.slf4j.LoggerFactory;
5251

52+
import java.time.Duration;
5353
import java.util.ArrayList;
5454
import java.util.Collection;
5555
import java.util.Collections;
@@ -364,10 +364,8 @@ private CompletableFuture<JobResult> getJobResult(
364364
final JobID jobId,
365365
final ScheduledExecutor scheduledExecutor,
366366
final boolean tolerateMissingResult) {
367-
final Time timeout =
368-
Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
369-
final Time retryPeriod =
370-
Time.milliseconds(configuration.get(ClientOptions.CLIENT_RETRY_PERIOD).toMillis());
367+
final Duration timeout = configuration.get(ClientOptions.CLIENT_TIMEOUT);
368+
final Duration retryPeriod = configuration.get(ClientOptions.CLIENT_RETRY_PERIOD);
371369
final CompletableFuture<JobResult> jobResultFuture =
372370
JobStatusPollingUtils.getJobResult(
373371
dispatcherGateway, jobId, scheduledExecutor, timeout, retryPeriod);

flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.flink.api.common.JobID;
2424
import org.apache.flink.api.common.JobStatus;
2525
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
26-
import org.apache.flink.api.common.time.Time;
2726
import org.apache.flink.core.execution.JobClient;
2827
import org.apache.flink.core.execution.SavepointFormatType;
2928
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
@@ -40,6 +39,7 @@
4039
import javax.annotation.Nullable;
4140

4241
import java.io.IOException;
42+
import java.time.Duration;
4343
import java.util.Map;
4444
import java.util.concurrent.CompletableFuture;
4545
import java.util.concurrent.CompletionException;
@@ -59,15 +59,15 @@ public class EmbeddedJobClient implements JobClient, CoordinationRequestGateway
5959

6060
private final ScheduledExecutor retryExecutor;
6161

62-
private final Time timeout;
62+
private final Duration timeout;
6363

6464
private final ClassLoader classLoader;
6565

6666
public EmbeddedJobClient(
6767
final JobID jobId,
6868
final DispatcherGateway dispatcherGateway,
6969
final ScheduledExecutor retryExecutor,
70-
final Time rpcTimeout,
70+
final Duration rpcTimeout,
7171
final ClassLoader classLoader) {
7272
this.jobId = checkNotNull(jobId);
7373
this.dispatcherGateway = checkNotNull(dispatcherGateway);
@@ -136,7 +136,7 @@ public CompletableFuture<Map<String, Object>> getAccumulators() {
136136
public CompletableFuture<JobExecutionResult> getJobExecutionResult() {
137137
checkNotNull(classLoader);
138138

139-
final Time retryPeriod = Time.milliseconds(100L);
139+
final Duration retryPeriod = Duration.ofMillis(100L);
140140
return JobStatusPollingUtils.getJobResult(
141141
dispatcherGateway, jobId, retryExecutor, timeout, retryPeriod)
142142
.thenApply(

flink-clients/src/main/java/org/apache/flink/client/deployment/application/JobStatusPollingUtils.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@
2121
import org.apache.flink.annotation.VisibleForTesting;
2222
import org.apache.flink.api.common.JobID;
2323
import org.apache.flink.api.common.JobStatus;
24-
import org.apache.flink.api.common.time.Time;
2524
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
2625
import org.apache.flink.runtime.jobmaster.JobResult;
2726
import org.apache.flink.util.concurrent.ScheduledExecutor;
2827

28+
import java.time.Duration;
2929
import java.util.concurrent.CompletableFuture;
3030
import java.util.concurrent.TimeUnit;
3131
import java.util.function.Supplier;
@@ -50,14 +50,14 @@ static CompletableFuture<JobResult> getJobResult(
5050
final DispatcherGateway dispatcherGateway,
5151
final JobID jobId,
5252
final ScheduledExecutor scheduledExecutor,
53-
final Time rpcTimeout,
54-
final Time retryPeriod) {
53+
final Duration rpcTimeout,
54+
final Duration retryPeriod) {
5555

5656
return pollJobResultAsync(
5757
() -> dispatcherGateway.requestJobStatus(jobId, rpcTimeout),
5858
() -> dispatcherGateway.requestJobResult(jobId, rpcTimeout),
5959
scheduledExecutor,
60-
retryPeriod.toMilliseconds());
60+
retryPeriod.toMillis());
6161
}
6262

6363
@VisibleForTesting

flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.api.common.JobID;
23-
import org.apache.flink.api.common.time.Time;
2423
import org.apache.flink.api.dag.Pipeline;
2524
import org.apache.flink.client.cli.ClientOptions;
2625
import org.apache.flink.client.deployment.executors.PipelineExecutorUtils;
@@ -43,6 +42,7 @@
4342

4443
import java.net.InetSocketAddress;
4544
import java.net.MalformedURLException;
45+
import java.time.Duration;
4646
import java.util.Collection;
4747
import java.util.List;
4848
import java.util.Optional;
@@ -136,8 +136,7 @@ private CompletableFuture<JobClient> submitAndGetJobClientFuture(
136136
final Configuration configuration,
137137
final ClassLoader userCodeClassloader)
138138
throws MalformedURLException {
139-
final Time timeout =
140-
Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
139+
final Duration timeout = configuration.get(ClientOptions.CLIENT_TIMEOUT);
141140

142141
final JobGraph jobGraph =
143142
PipelineExecutorUtils.getJobGraph(pipeline, configuration, userCodeClassloader);
@@ -191,7 +190,7 @@ private static CompletableFuture<JobID> submitJob(
191190
final Configuration configuration,
192191
final DispatcherGateway dispatcherGateway,
193192
final JobGraph jobGraph,
194-
final Time rpcTimeout) {
193+
final Duration rpcTimeout) {
195194
checkNotNull(jobGraph);
196195

197196
LOG.info("Submitting Job with JobId={}.", jobGraph.getJobID());

0 commit comments

Comments
 (0)