Skip to content

Commit f7ccb97

Browse files
committed
Revert "fix python"
This reverts commit c540f35.
1 parent c540f35 commit f7ccb97

File tree

19 files changed

+148
-115
lines changed

19 files changed

+148
-115
lines changed

flink-python/pyflink/common/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
from pyflink.common.serializer import TypeSerializer
6565
from pyflink.common.typeinfo import Types, TypeInformation
6666
from pyflink.common.types import Row, RowKind
67-
from pyflink.common.time import Duration, Instant
67+
from pyflink.common.time import Duration, Instant, Time
6868
from pyflink.common.watermark_strategy import WatermarkStrategy, \
6969
AssignerWithPeriodicWatermarksWrapper
7070

@@ -94,6 +94,7 @@
9494
"WatermarkStrategy",
9595
"Duration",
9696
"Instant",
97+
"Time",
9798
"AssignerWithPeriodicWatermarksWrapper"
9899
]
99100

flink-python/pyflink/common/time.py

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
################################################################################
1818
from pyflink.java_gateway import get_gateway
1919

20-
__all__ = ['Duration', 'Instant']
20+
__all__ = ['Duration', 'Instant', 'Time']
2121

2222

2323
class Duration(object):
@@ -51,9 +51,6 @@ def of_nanos(nanos: int):
5151
def of_seconds(seconds: int):
5252
return Duration(get_gateway().jvm.java.time.Duration.ofSeconds(seconds))
5353

54-
def to_millis(self):
55-
return self._j_duration.to_millis()
56-
5754
def __eq__(self, other):
5855
return isinstance(other, Duration) and self._j_duration.equals(other._j_duration)
5956

@@ -86,3 +83,42 @@ def __eq__(self, other):
8683

8784
def __repr__(self):
8885
return 'Instant<{}, {}>'.format(self.seconds, self.nanos)
86+
87+
88+
class Time(object):
89+
"""
90+
The definition of a time interval.
91+
"""
92+
93+
def __init__(self, milliseconds: int):
94+
self._milliseconds = milliseconds
95+
96+
def to_milliseconds(self) -> int:
97+
return self._milliseconds
98+
99+
@staticmethod
100+
def milliseconds(milliseconds: int):
101+
return Time(milliseconds)
102+
103+
@staticmethod
104+
def seconds(seconds: int):
105+
return Time.milliseconds(seconds * 1000)
106+
107+
@staticmethod
108+
def minutes(minutes: int):
109+
return Time.seconds(minutes * 60)
110+
111+
@staticmethod
112+
def hours(hours: int):
113+
return Time.minutes(hours * 60)
114+
115+
@staticmethod
116+
def days(days: int):
117+
return Time.hours(days * 24)
118+
119+
def __eq__(self, other):
120+
return (self.__class__ == other.__class__ and
121+
self._milliseconds == other._milliseconds)
122+
123+
def __str__(self):
124+
return "{} ms".format(self._milliseconds)

flink-python/pyflink/datastream/data_stream.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1888,7 +1888,7 @@ def side_output_late_data(self, output_tag: OutputTag) -> 'WindowedStream':
18881888
18891889
>>> tag = OutputTag("late-data", Types.TUPLE([Types.INT(), Types.STRING()]))
18901890
>>> main_stream = ds.key_by(lambda x: x[1]) \\
1891-
... .window(TumblingEventTimeWindows.of(Duration.of_seconds(5))) \\
1891+
... .window(TumblingEventTimeWindows.of(Time.seconds(5))) \\
18921892
... .side_output_late_data(tag) \\
18931893
... .reduce(lambda a, b: a[0] + b[0], b[1])
18941894
>>> late_stream = main_stream.get_side_output(tag)
@@ -1918,7 +1918,7 @@ def reduce(self,
19181918
::
19191919
19201920
>>> ds.key_by(lambda x: x[1]) \\
1921-
... .window(TumblingEventTimeWindows.of(Duration.of_seconds(5))) \\
1921+
... .window(TumblingEventTimeWindows.of(Time.seconds(5))) \\
19221922
... .reduce(lambda a, b: a[0] + b[0], b[1])
19231923
19241924
:param reduce_function: The reduce function.
@@ -1982,7 +1982,7 @@ def aggregate(self,
19821982
... def merge(self, a: Tuple[int, int], b: Tuple[int, int]) -> Tuple[int, int]:
19831983
... return a[0] + b[0], a[1] + b[1]
19841984
>>> ds.key_by(lambda x: x[1]) \\
1985-
... .window(TumblingEventTimeWindows.of(Duration.of_seconds(5))) \\
1985+
... .window(TumblingEventTimeWindows.of(Time.seconds(5))) \\
19861986
... .aggregate(AverageAggregate(),
19871987
... accumulator_type=Types.TUPLE([Types.LONG(), Types.LONG()]),
19881988
... output_type=Types.DOUBLE())
@@ -2167,7 +2167,7 @@ def side_output_late_data(self, output_tag: OutputTag) -> 'AllWindowedStream':
21672167
::
21682168
21692169
>>> tag = OutputTag("late-data", Types.TUPLE([Types.INT(), Types.STRING()]))
2170-
>>> main_stream = ds.window_all(TumblingEventTimeWindows.of(Duration.of_seconds(5))) \\
2170+
>>> main_stream = ds.window_all(TumblingEventTimeWindows.of(Time.seconds(5))) \\
21712171
... .side_output_late_data(tag) \\
21722172
... .process(MyProcessAllWindowFunction(),
21732173
... Types.TUPLE([Types.LONG(), Types.LONG(), Types.INT()]))
@@ -2190,7 +2190,7 @@ def reduce(self,
21902190
Example:
21912191
::
21922192
2193-
>>> ds.window_all(TumblingEventTimeWindows.of(Duration.of_seconds(5))) \\
2193+
>>> ds.window_all(TumblingEventTimeWindows.of(Time.seconds(5))) \\
21942194
... .reduce(lambda a, b: a[0] + b[0], b[1])
21952195
21962196
:param reduce_function: The reduce function.
@@ -2255,7 +2255,7 @@ def aggregate(self,
22552255
... def merge(self, a: Tuple[int, int], b: Tuple[int, int]) -> Tuple[int, int]:
22562256
... return a[0] + b[0], a[1] + b[1]
22572257
...
2258-
>>> ds.window_all(TumblingEventTimeWindows.of(Duration.of_seconds(5))) \\
2258+
>>> ds.window_all(TumblingEventTimeWindows.of(Time.seconds(5))) \\
22592259
... .aggregate(AverageAggregate(),
22602260
... accumulator_type=Types.TUPLE([Types.LONG(), Types.LONG()]),
22612261
... output_type=Types.DOUBLE())

flink-python/pyflink/datastream/state.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from enum import Enum
2020
from typing import TypeVar, Generic, Iterable, List, Iterator, Dict, Tuple, Optional
2121

22-
from pyflink.common.time import Duration
22+
from pyflink.common.time import Duration, Time
2323
from pyflink.common.typeinfo import TypeInformation, Types
2424

2525
__all__ = [
@@ -623,7 +623,7 @@ def __init__(self,
623623
update_type: UpdateType,
624624
state_visibility: StateVisibility,
625625
ttl_time_characteristic: TtlTimeCharacteristic,
626-
ttl: Duration,
626+
ttl: Time,
627627
cleanup_strategies: 'StateTtlConfig.CleanupStrategies'):
628628
self._update_type = update_type
629629
self._state_visibility = state_visibility
@@ -632,7 +632,7 @@ def __init__(self,
632632
self._cleanup_strategies = cleanup_strategies
633633

634634
@staticmethod
635-
def new_builder(ttl: Duration):
635+
def new_builder(ttl: Time):
636636
return StateTtlConfig.Builder(ttl)
637637

638638
def get_update_type(self) -> 'StateTtlConfig.UpdateType':
@@ -641,7 +641,7 @@ def get_update_type(self) -> 'StateTtlConfig.UpdateType':
641641
def get_state_visibility(self) -> 'StateTtlConfig.StateVisibility':
642642
return self._state_visibility
643643

644-
def get_time_to_live(self) -> Duration:
644+
def get_ttl(self) -> Time:
645645
return self._ttl
646646

647647
def get_ttl_time_characteristic(self) -> 'StateTtlConfig.TtlTimeCharacteristic':
@@ -669,7 +669,7 @@ def _from_proto(proto):
669669
state_visibility = StateTtlConfig.StateVisibility._from_proto(proto.state_visibility)
670670
ttl_time_characteristic = \
671671
StateTtlConfig.TtlTimeCharacteristic._from_proto(proto.ttl_time_characteristic)
672-
ttl = Duration.of_millis(proto.ttl)
672+
ttl = Time.milliseconds(proto.ttl)
673673
cleanup_strategies = StateTtlConfig.CleanupStrategies._from_proto(proto.cleanup_strategies)
674674
builder = StateTtlConfig.new_builder(ttl) \
675675
.set_update_type(update_type) \
@@ -694,7 +694,7 @@ class Builder(object):
694694
Builder for the StateTtlConfig.
695695
"""
696696

697-
def __init__(self, ttl: Duration):
697+
def __init__(self, ttl: Time):
698698
self._ttl = ttl
699699
self._update_type = StateTtlConfig.UpdateType.OnCreateAndWrite
700700
self._state_visibility = StateTtlConfig.StateVisibility.NeverReturnExpired
@@ -847,7 +847,7 @@ def disable_cleanup_in_background(self) -> 'StateTtlConfig.Builder':
847847
self._is_cleanup_in_background = False
848848
return self
849849

850-
def set_ttl(self, ttl: Duration) -> 'StateTtlConfig.Builder':
850+
def set_ttl(self, ttl: Time) -> 'StateTtlConfig.Builder':
851851
"""
852852
Sets the ttl time.
853853

flink-python/pyflink/datastream/tests/test_data_stream.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from typing import Tuple
2424

2525
from pyflink.common import Row, Configuration
26-
from pyflink.common.time import Duration
26+
from pyflink.common.time import Time
2727
from pyflink.common.typeinfo import Types
2828
from pyflink.common.watermark_strategy import WatermarkStrategy, TimestampAssigner
2929
from pyflink.datastream import (TimeCharacteristic, RuntimeContext, SlotSharingGroup,
@@ -151,7 +151,7 @@ def open(self, runtime_context: RuntimeContext):
151151
self.list_state = runtime_context.get_list_state(list_state_descriptor)
152152
map_state_descriptor = MapStateDescriptor('map_state', Types.INT(), Types.STRING())
153153
state_ttl_config = StateTtlConfig \
154-
.new_builder(Duration.of_seconds(1)) \
154+
.new_builder(Time.seconds(1)) \
155155
.set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite) \
156156
.set_state_visibility(
157157
StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) \
@@ -268,7 +268,7 @@ def open(self, runtime_context: RuntimeContext):
268268
descriptor = AggregatingStateDescriptor(
269269
'aggregating_state', MyAggregateFunction(), Types.INT())
270270
state_ttl_config = StateTtlConfig \
271-
.new_builder(Duration.of_seconds(1)) \
271+
.new_builder(Time.seconds(1)) \
272272
.set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite) \
273273
.disable_cleanup_in_background() \
274274
.build()

flink-python/pyflink/datastream/tests/test_window.py

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from typing import Iterable, Tuple, Dict
1919

2020
from pyflink.common import Configuration
21-
from pyflink.common.time import Duration
21+
from pyflink.common.time import Time
2222
from pyflink.common.typeinfo import Types
2323
from pyflink.common.watermark_strategy import WatermarkStrategy, TimestampAssigner
2424
from pyflink.datastream.data_stream import DataStream
@@ -59,7 +59,7 @@ def test_event_time_tumbling_window(self):
5959
.with_timestamp_assigner(SecondColumnTimestampAssigner())
6060
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
6161
.key_by(lambda x: x[0], key_type=Types.STRING()) \
62-
.window(TumblingEventTimeWindows.of(Duration.of_millis(5))) \
62+
.window(TumblingEventTimeWindows.of(Time.milliseconds(5))) \
6363
.process(CountWindowProcessFunction(),
6464
Types.TUPLE([Types.STRING(), Types.LONG(), Types.LONG(), Types.INT()])) \
6565
.add_sink(self.test_sink)
@@ -94,7 +94,7 @@ def test_event_time_sliding_window(self):
9494

9595
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
9696
.key_by(lambda x: x[0], key_type=Types.STRING()) \
97-
.window(SlidingEventTimeWindows.of(Duration.of_millis(5), Duration.of_millis(2))) \
97+
.window(SlidingEventTimeWindows.of(Time.milliseconds(5), Time.milliseconds(2))) \
9898
.process(CountWindowProcessFunction(),
9999
Types.TUPLE([Types.STRING(), Types.LONG(), Types.LONG(), Types.INT()])) \
100100
.add_sink(self.test_sink)
@@ -128,7 +128,7 @@ def test_event_time_session_window(self):
128128

129129
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
130130
.key_by(lambda x: x[0], key_type=Types.STRING()) \
131-
.window(EventTimeSessionWindows.with_gap(Duration.of_millis(5))) \
131+
.window(EventTimeSessionWindows.with_gap(Time.milliseconds(5))) \
132132
.process(CountWindowProcessFunction(),
133133
Types.TUPLE([Types.STRING(), Types.LONG(), Types.LONG(), Types.INT()])) \
134134
.add_sink(self.test_sink)
@@ -166,7 +166,7 @@ def test_window_reduce_passthrough(self):
166166
.with_timestamp_assigner(SecondColumnTimestampAssigner())
167167
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
168168
.key_by(lambda x: x[0], key_type=Types.STRING()) \
169-
.window(EventTimeSessionWindows.with_gap(Duration.of_millis(2))) \
169+
.window(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \
170170
.reduce(lambda a, b: (b[0], a[1] + b[1]),
171171
output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
172172
.add_sink(self.test_sink)
@@ -194,7 +194,7 @@ def process(self, key, context: ProcessWindowFunction.Context,
194194

195195
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
196196
.key_by(lambda x: x[0], key_type=Types.STRING()) \
197-
.window(EventTimeSessionWindows.with_gap(Duration.of_millis(2))) \
197+
.window(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \
198198
.reduce(lambda a, b: (b[0], a[1] + b[1]),
199199
window_function=MyProcessFunction(),
200200
output_type=Types.STRING()) \
@@ -243,7 +243,7 @@ def merge(self, acc_a: Tuple[str, Dict[int, int]], acc_b: Tuple[str, Dict[int, i
243243

244244
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
245245
.key_by(lambda x: x[0], key_type=Types.STRING()) \
246-
.window(EventTimeSessionWindows.with_gap(Duration.of_millis(2))) \
246+
.window(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \
247247
.aggregate(MyAggregateFunction(),
248248
output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
249249
.add_sink(self.test_sink)
@@ -276,7 +276,7 @@ def merge(self, acc_a: Tuple[int, str], acc_b: Tuple[int, str]):
276276

277277
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
278278
.key_by(lambda x: x[0], key_type=Types.STRING()) \
279-
.window(EventTimeSessionWindows.with_gap(Duration.of_millis(2))) \
279+
.window(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \
280280
.aggregate(MyAggregateFunction(),
281281
accumulator_type=Types.TUPLE([Types.INT(), Types.STRING()]),
282282
output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
@@ -317,7 +317,7 @@ def process(self, key: str, context: ProcessWindowFunction.Context,
317317

318318
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
319319
.key_by(lambda x: x[0], key_type=Types.STRING()) \
320-
.window(EventTimeSessionWindows.with_gap(Duration.of_millis(2))) \
320+
.window(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \
321321
.aggregate(MyAggregateFunction(),
322322
window_function=MyProcessWindowFunction(),
323323
accumulator_type=Types.TUPLE([Types.INT(), Types.STRING()]),
@@ -341,7 +341,7 @@ def test_session_window_late_merge(self):
341341
.with_timestamp_assigner(SecondColumnTimestampAssigner())
342342
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
343343
.key_by(lambda x: x[0], key_type=Types.STRING()) \
344-
.window(EventTimeSessionWindows.with_gap(Duration.of_millis(5))) \
344+
.window(EventTimeSessionWindows.with_gap(Time.milliseconds(5))) \
345345
.process(CountWindowProcessFunction(),
346346
Types.TUPLE([Types.STRING(), Types.LONG(), Types.LONG(), Types.INT()])) \
347347
.add_sink(self.test_sink)
@@ -360,7 +360,7 @@ def test_event_time_session_window_with_purging_trigger(self):
360360

361361
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
362362
.key_by(lambda x: x[0], key_type=Types.STRING()) \
363-
.window(EventTimeSessionWindows.with_gap(Duration.of_millis(3))) \
363+
.window(EventTimeSessionWindows.with_gap(Time.milliseconds(3))) \
364364
.trigger(PurgingTrigger.of(EventTimeTrigger.create())) \
365365
.process(CountWindowProcessFunction(),
366366
Types.TUPLE([Types.STRING(), Types.LONG(), Types.LONG(), Types.INT()])) \
@@ -408,7 +408,7 @@ def test_event_time_tumbling_window_all(self):
408408
.with_timestamp_assigner(SecondColumnTimestampAssigner())
409409

410410
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
411-
.window_all(TumblingEventTimeWindows.of(Duration.of_millis(5))) \
411+
.window_all(TumblingEventTimeWindows.of(Time.milliseconds(5))) \
412412
.process(CountAllWindowProcessFunction(),
413413
Types.TUPLE([Types.LONG(), Types.LONG(), Types.INT()])) \
414414
.add_sink(self.test_sink)
@@ -426,7 +426,7 @@ def test_window_all_reduce(self):
426426
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
427427
.with_timestamp_assigner(SecondColumnTimestampAssigner())
428428
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
429-
.window_all(EventTimeSessionWindows.with_gap(Duration.of_millis(2))) \
429+
.window_all(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \
430430
.reduce(lambda a, b: (a[0], a[1] + b[1]),
431431
output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
432432
.add_sink(self.test_sink)
@@ -454,7 +454,7 @@ def process(self, context: 'ProcessAllWindowFunction.Context',
454454
)
455455

456456
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
457-
.window_all(EventTimeSessionWindows.with_gap(Duration.of_millis(2))) \
457+
.window_all(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \
458458
.reduce(lambda a, b: (a[0], a[1] + b[1]),
459459
window_function=MyProcessFunction(),
460460
output_type=Types.STRING()) \
@@ -501,7 +501,7 @@ def merge(self, acc_a: Tuple[str, Dict[int, int]], acc_b: Tuple[str, Dict[int, i
501501
return acc_a[0], new_number_map
502502

503503
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
504-
.window_all(EventTimeSessionWindows.with_gap(Duration.of_millis(2))) \
504+
.window_all(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \
505505
.aggregate(MyAggregateFunction(),
506506
output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
507507
.add_sink(self.test_sink)
@@ -540,7 +540,7 @@ def process(self, context: ProcessAllWindowFunction.Context,
540540
yield "key {} timestamp sum {}".format(agg_result[0], agg_result[1])
541541

542542
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
543-
.window_all(EventTimeSessionWindows.with_gap(Duration.of_millis(2))) \
543+
.window_all(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \
544544
.aggregate(MyAggregateFunction(),
545545
window_function=MyProcessWindowFunction(),
546546
accumulator_type=Types.TUPLE([Types.INT(), Types.STRING()]),
@@ -573,7 +573,7 @@ def test_side_output_late_data(self):
573573
type_info=Types.ROW([Types.STRING(), Types.INT()]))
574574
ds2 = ds1.assign_timestamps_and_watermarks(watermark_strategy) \
575575
.key_by(lambda e: e[0]) \
576-
.window(TumblingEventTimeWindows.of(Duration.of_millis(5))) \
576+
.window(TumblingEventTimeWindows.of(Time.milliseconds(5))) \
577577
.allowed_lateness(0) \
578578
.side_output_late_data(tag) \
579579
.process(CountWindowProcessFunction(),
@@ -619,7 +619,7 @@ def extract_timestamp(self, value: tuple, record_timestamp: int) -> int:
619619
ds.key_by(
620620
lambda x: (x[0], x[1], x[2])
621621
).window(
622-
TumblingEventTimeWindows.of(Duration.of_minutes(1))
622+
TumblingEventTimeWindows.of(Time.minutes(1))
623623
).reduce(
624624
lambda x, y: (x[0], x[1], x[2], x[3] + y[3]),
625625
output_type=Types.TUPLE([Types.LONG(), Types.STRING(), Types.STRING(), Types.INT()])

0 commit comments

Comments
 (0)