Skip to content

Commit

Permalink
[FLINK-36949][Runtime] Make getCurrentKey of async state operators re…
Browse files Browse the repository at this point in the history
…turn the ground truth (#25841)
  • Loading branch information
Zakelly authored Dec 23, 2024
1 parent 5b65539 commit f8014b6
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ public void setCurrentContext(RecordContext<K> switchingContext) {
}
}

public RecordContext<K> getCurrentContext() {
return currentContext;
}

/**
* Dispose a context.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,13 @@ public void setKeyContextElement2(StreamRecord record) throws Exception {

@Override
public Object getCurrentKey() {
return currentProcessingContext.getKey();
RecordContext currentContext = asyncExecutionController.getCurrentContext();
if (currentContext == null) {
throw new UnsupportedOperationException(
"Have not set the current key yet, this may because the operator has not "
+ "started to run, or you are invoking this under a non-keyed context.");
}
return currentContext.getKey();
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,13 @@ public final <T> void setAsyncKeyedContextElement(

@Override
public Object getCurrentKey() {
return currentProcessingContext.getKey();
RecordContext currentContext = asyncExecutionController.getCurrentContext();
if (currentContext == null) {
throw new UnsupportedOperationException(
"Have not set the current key yet, this may because the operator has not "
+ "started to run, or you are invoking this under a non-keyed context.");
}
return currentContext.getKey();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ void testWatermarkHooks() throws Exception {
testOperator.asyncProcessWithKey(
1L,
() -> {
assertThat(testOperator.getCurrentKey()).isEqualTo(1L);
testOperator.output(watermark.getTimestamp() + 1000L);
});
if (counter.incrementAndGet() % 2 == 0) {
Expand Down Expand Up @@ -484,13 +485,15 @@ protected void processWatermarkStatus(WatermarkStatus watermarkStatus, int index

@Override
public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
assertThat(getCurrentKey()).isEqualTo(timer.getKey());
output.collect(
new StreamRecord<>(
"EventTimer-" + timer.getKey() + "-" + timer.getTimestamp()));
}

@Override
public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
assertThat(getCurrentKey()).isEqualTo(timer.getKey());
output.collect(
new StreamRecord<>(
"ProcessingTimer-" + timer.getKey() + "-" + timer.getTimestamp()));
Expand Down Expand Up @@ -601,12 +604,14 @@ public void postProcessWatermark(Watermark watermark) throws Exception {

@Override
public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
assertThat(getCurrentKey()).isEqualTo(timer.getKey());
output.collect(new StreamRecord<>(timer.getTimestamp()));
}

@Override
public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer)
throws Exception {}
public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
assertThat(getCurrentKey()).isEqualTo(timer.getKey());
}

@Override
public void processElement1(StreamRecord<Long> element) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ void testWatermarkHooks() throws Exception {
operator.asyncProcessWithKey(
1L,
() -> {
assertThat(operator.getCurrentKey()).isEqualTo(1L);
operator.output(watermark.getTimestamp() + 1000L);
});
if (counter.incrementAndGet() % 2 == 0) {
Expand Down Expand Up @@ -620,13 +621,15 @@ public ElementOrder getElementOrder() {

@Override
public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
assertThat(getCurrentKey()).isEqualTo(timer.getKey());
output.collect(
new StreamRecord<>(
"EventTimer-" + timer.getKey() + "-" + timer.getTimestamp()));
}

@Override
public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
assertThat(getCurrentKey()).isEqualTo(timer.getKey());
output.collect(
new StreamRecord<>(
"ProcessingTimer-" + timer.getKey() + "-" + timer.getTimestamp()));
Expand Down Expand Up @@ -842,12 +845,14 @@ public void postProcessWatermark(Watermark watermark) throws Exception {

@Override
public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
assertThat(getCurrentKey()).isEqualTo(timer.getKey());
output.collect(new StreamRecord<>(timer.getTimestamp()));
}

@Override
public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer)
throws Exception {}
public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
assertThat(getCurrentKey()).isEqualTo(timer.getKey());
}

private Input<Long> createInput(int idx) {
return new AbstractInput<Long, Long>(this, idx) {
Expand Down

0 comments on commit f8014b6

Please sign in to comment.