Skip to content

Commit 5dbd704

Browse files
committed
Fixes Python API
1 parent 4765e90 commit 5dbd704

File tree

7 files changed

+24
-200
lines changed

7 files changed

+24
-200
lines changed

flink-core/src/main/java/org/apache/flink/api/common/time/Time.java

Lines changed: 0 additions & 172 deletions
This file was deleted.

flink-python/pyflink/common/restart_strategy.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from py4j.java_gateway import get_java_class
2323

2424
from pyflink.java_gateway import get_gateway
25-
from pyflink.util.java_utils import to_j_flink_time, from_j_flink_time
25+
from pyflink.util.java_utils import to_j_duration, from_j_duration
2626

2727
__all__ = ['RestartStrategies', 'RestartStrategyConfiguration']
2828

@@ -92,7 +92,7 @@ def __init__(self, restart_attempts=None, delay_between_attempts_interval=None,
9292
self._j_restart_strategy_configuration = \
9393
gateway.jvm.RestartStrategies\
9494
.fixedDelayRestart(
95-
restart_attempts, to_j_flink_time(delay_between_attempts_interval))
95+
restart_attempts, to_j_duration(delay_between_attempts_interval))
9696
super(RestartStrategies.FixedDelayRestartStrategyConfiguration, self)\
9797
.__init__(self._j_restart_strategy_configuration)
9898
else:
@@ -103,7 +103,7 @@ def get_restart_attempts(self) -> int:
103103
return self._j_restart_strategy_configuration.getRestartAttempts()
104104

105105
def get_delay_between_attempts_interval(self) -> timedelta:
106-
return from_j_flink_time(
106+
return from_j_duration(
107107
self._j_restart_strategy_configuration.getDelayBetweenAttemptsInterval())
108108

109109
class FailureRateRestartStrategyConfiguration(RestartStrategyConfiguration):
@@ -129,8 +129,8 @@ def __init__(self,
129129
self._j_restart_strategy_configuration = \
130130
gateway.jvm.RestartStrategies\
131131
.FailureRateRestartStrategyConfiguration(max_failure_rate,
132-
to_j_flink_time(failure_interval),
133-
to_j_flink_time(
132+
to_j_duration(failure_interval),
133+
to_j_duration(
134134
delay_between_attempts_interval))
135135
super(RestartStrategies.FailureRateRestartStrategyConfiguration, self)\
136136
.__init__(self._j_restart_strategy_configuration)
@@ -142,11 +142,11 @@ def get_max_failure_rate(self) -> int:
142142
return self._j_restart_strategy_configuration.getMaxFailureRate()
143143

144144
def get_failure_interval(self) -> timedelta:
145-
return from_j_flink_time(self._j_restart_strategy_configuration.getFailureInterval())
145+
return from_j_duration(self._j_restart_strategy_configuration.getFailureInterval())
146146

147147
def get_delay_between_attempts_interval(self) -> timedelta:
148-
return from_j_flink_time(self._j_restart_strategy_configuration
149-
.getDelayBetweenAttemptsInterval())
148+
return from_j_duration(self._j_restart_strategy_configuration
149+
.getDelayBetweenAttemptsInterval())
150150

151151
class FallbackRestartStrategyConfiguration(RestartStrategyConfiguration):
152152
"""

flink-python/pyflink/fn_execution/embedded/java_utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040

4141
# Java StateTtlConfig
4242
JStateTtlConfig = findClass('org.apache.flink.api.common.state.StateTtlConfig')
43-
JTime = findClass('org.apache.flink.api.common.time.Time')
43+
JDuration = findClass('java.time.Duration')
4444
JUpdateType = findClass('org.apache.flink.api.common.state.StateTtlConfig$UpdateType')
4545
JStateVisibility = findClass('org.apache.flink.api.common.state.StateTtlConfig$StateVisibility')
4646

@@ -140,7 +140,7 @@ def to_java_typeinfo(type_info: TypeInformation):
140140

141141
def to_java_state_ttl_config(ttl_config: StateTtlConfig):
142142
j_ttl_config_builder = JStateTtlConfig.newBuilder(
143-
JTime.milliseconds(ttl_config.get_ttl().to_milliseconds()))
143+
JDuration.ofMillis(ttl_config.get_ttl().to_milliseconds()))
144144

145145
update_type = ttl_config.get_update_type()
146146
if update_type == StateTtlConfig.UpdateType.Disabled:

flink-python/pyflink/table/table_config.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -190,9 +190,9 @@ def set_idle_state_retention_time(self,
190190
least 5 minutes greater than minTime. Set to
191191
0 (zero) to never clean-up the state.
192192
"""
193-
j_time_class = get_gateway().jvm.org.apache.flink.api.common.time.Time
194-
j_min_time = j_time_class.milliseconds(long(round(min_time.total_seconds() * 1000)))
195-
j_max_time = j_time_class.milliseconds(long(round(max_time.total_seconds() * 1000)))
193+
j_duration_class = get_gateway().jvm.java.time.Duration
194+
j_min_time = j_duration_class.ofMillis(long(round(min_time.total_seconds() * 1000)))
195+
j_max_time = j_duration_class.ofMillis(long(round(max_time.total_seconds() * 1000)))
196196
self._j_table_config.setIdleStateRetentionTime(j_min_time, j_max_time)
197197

198198
def set_idle_state_retention(self, duration: datetime.timedelta):

flink-python/pyflink/util/java_utils.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,21 +38,19 @@ def to_jarray(j_type, arr):
3838
return j_arr
3939

4040

41-
def to_j_flink_time(time_delta):
41+
def to_j_duration(time_delta):
4242
gateway = get_gateway()
43-
TimeUnit = gateway.jvm.java.util.concurrent.TimeUnit
44-
Time = gateway.jvm.org.apache.flink.api.common.time.Time
43+
Duration = gateway.jvm.java.time.Duration
4544
if isinstance(time_delta, timedelta):
46-
total_microseconds = round(time_delta.total_seconds() * 1000 * 1000)
47-
return Time.of(total_microseconds, TimeUnit.MICROSECONDS)
45+
total_milliseconds = round(time_delta.total_seconds() * 1000)
4846
else:
49-
# time delta in milliseconds
5047
total_milliseconds = time_delta
51-
return Time.milliseconds(total_milliseconds)
5248

49+
return Duration.ofMillis(total_milliseconds)
5350

54-
def from_j_flink_time(j_flink_time):
55-
total_milliseconds = j_flink_time.toMilliseconds()
51+
52+
def from_j_duration(j_duration):
53+
total_milliseconds = j_duration.toMillis()
5654
return timedelta(milliseconds=total_milliseconds)
5755

5856

flink-python/src/main/java/org/apache/flink/python/util/ProtoUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.api.common.functions.RuntimeContext;
2323
import org.apache.flink.api.common.state.StateTtlConfig;
24-
import org.apache.flink.api.common.time.Time;
2524
import org.apache.flink.api.common.typeinfo.TypeInformation;
2625
import org.apache.flink.fnexecution.v1.FlinkFnApi;
2726
import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
@@ -38,6 +37,7 @@
3837
import com.google.protobuf.ByteString;
3938
import org.apache.beam.model.pipeline.v1.RunnerApi;
4039

40+
import java.time.Duration;
4141
import java.util.ArrayList;
4242
import java.util.List;
4343
import java.util.Map;
@@ -439,7 +439,7 @@ private static FlinkFnApi.CoderInfoDescriptor createCoderInfoDescriptorProto(
439439
public static StateTtlConfig parseStateTtlConfigFromProto(
440440
FlinkFnApi.StateDescriptor.StateTTLConfig stateTTLConfigProto) {
441441
StateTtlConfig.Builder builder =
442-
StateTtlConfig.newBuilder(Time.milliseconds(stateTTLConfigProto.getTtl()))
442+
StateTtlConfig.newBuilder(Duration.ofMillis(stateTTLConfigProto.getTtl()))
443443
.setUpdateType(
444444
parseUpdateTypeFromProto(stateTTLConfigProto.getUpdateType()))
445445
.setStateVisibility(

flink-python/src/test/java/org/apache/flink/streaming/api/utils/ProtoUtilsTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,12 @@
1919
package org.apache.flink.streaming.api.utils;
2020

2121
import org.apache.flink.api.common.state.StateTtlConfig;
22-
import org.apache.flink.api.common.time.Time;
2322
import org.apache.flink.fnexecution.v1.FlinkFnApi;
2423
import org.apache.flink.python.util.ProtoUtils;
2524

2625
import org.junit.jupiter.api.Test;
2726

2827
import java.time.Duration;
29-
import java.util.concurrent.TimeUnit;
3028

3129
import static org.assertj.core.api.Assertions.assertThat;
3230

@@ -82,7 +80,7 @@ void testParseStateTtlConfigFromProto() {
8280
.build();
8381
FlinkFnApi.StateDescriptor.StateTTLConfig stateTTLConfigProto =
8482
FlinkFnApi.StateDescriptor.StateTTLConfig.newBuilder()
85-
.setTtl(Time.of(1000, TimeUnit.MILLISECONDS).toMilliseconds())
83+
.setTtl(1000)
8684
.setUpdateType(
8785
FlinkFnApi.StateDescriptor.StateTTLConfig.UpdateType
8886
.OnCreateAndWrite)
@@ -99,7 +97,7 @@ void testParseStateTtlConfigFromProto() {
9997
.isEqualTo(StateTtlConfig.UpdateType.OnCreateAndWrite);
10098
assertThat(stateTTLConfig.getStateVisibility())
10199
.isEqualTo(StateTtlConfig.StateVisibility.NeverReturnExpired);
102-
assertThat(stateTTLConfig.getTtl()).isEqualTo(Time.milliseconds(1000));
100+
assertThat(stateTTLConfig.getTimeToLive()).isEqualTo(Duration.ofMillis(1000));
103101
assertThat(stateTTLConfig.getTtlTimeCharacteristic())
104102
.isEqualTo(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime);
105103

0 commit comments

Comments
 (0)