processFunction);
+ /**
+ * Adds a type information hint about the return type of this operator. This method can be used
+ * in cases where Flink cannot determine automatically what the produced type of a function is.
+ * That can be the case if the function uses generic type variables in the return type that
+ * cannot be inferred from the input type.
+ *
+ * In most cases, the methods {@link #returns(Class)} and {@link #returns(TypeHint)} are
+ * preferable.
+ *
+ * @param typeInfo type information as a return type hint
+ * @return This operator with a given return type hint.
+ */
+ GlobalStream returns(TypeInformation typeInfo);
+
/**
* Apply a two output operation to this {@link GlobalStream}.
*
diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/KeyedPartitionStream.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/KeyedPartitionStream.java
index 2ca8feca27545..f1f7d1542b435 100644
--- a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/KeyedPartitionStream.java
+++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/KeyedPartitionStream.java
@@ -19,6 +19,7 @@
package org.apache.flink.datastream.api.stream;
import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.dsv2.Sink;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
@@ -50,6 +51,20 @@ ProcessConfigurableAndKeyedPartitionStream process(
OneInputStreamProcessFunction processFunction,
KeySelector newKeySelector);
+ /**
+ * Adds a type information hint about the return type of this operator. This method can be used
+ * in cases where Flink cannot determine automatically what the produced type of a function is.
+ * That can be the case if the function uses generic type variables in the return type that
+ * cannot be inferred from the input type.
+ *
+ * In most cases, the methods {@link #returns(Class)} and {@link #returns(TypeHint)} are
+ * preferable.
+ *
+ * @param typeInfo type information as a return type hint
+ * @return This operator with a given return type hint.
+ */
+ KeyedPartitionStream returns(TypeInformation typeInfo);
+
/**
* Apply an operation to this {@link KeyedPartitionStream}.
*
diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/NonKeyedPartitionStream.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/NonKeyedPartitionStream.java
index 165071ce75bb0..156b2dd4d930c 100644
--- a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/NonKeyedPartitionStream.java
+++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/NonKeyedPartitionStream.java
@@ -19,6 +19,7 @@
package org.apache.flink.datastream.api.stream;
import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.dsv2.Sink;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
@@ -41,6 +42,20 @@ public interface NonKeyedPartitionStream extends DataStream {
ProcessConfigurableAndNonKeyedPartitionStream process(
OneInputStreamProcessFunction processFunction);
+ /**
+ * Adds a type information hint about the return type of this operator. This method can be used
+ * in cases where Flink cannot determine automatically what the produced type of a function is.
+ * That can be the case if the function uses generic type variables in the return type that
+ * cannot be inferred from the input type.
+ *
+ * In most cases, the methods {@link #returns(Class)} and {@link #returns(TypeHint)} are
+ * preferable.
+ *
+ * @param typeInfo type information as a return type hint
+ * @return This operator with a given return type hint.
+ */
+ NonKeyedPartitionStream returns(TypeInformation typeInfo);
+
/**
* Apply a two output operation to this {@link NonKeyedPartitionStream}.
*
diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/GlobalStreamImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/GlobalStreamImpl.java
index 9e3888df7bb43..21c08571d7424 100644
--- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/GlobalStreamImpl.java
+++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/GlobalStreamImpl.java
@@ -56,6 +56,7 @@
import java.util.Collections;
import java.util.HashSet;
+import static java.util.Objects.requireNonNull;
import static org.apache.flink.datastream.impl.utils.StreamUtils.validateStates;
/** The implementation of {@link GlobalStream}. */
@@ -101,6 +102,13 @@ public ProcessConfigurableAndGlobalStream process(
}
}
+ @Override
+ public GlobalStream returns(TypeInformation typeInfo) {
+ requireNonNull(typeInfo, "TypeInformation must not be null");
+ transformation.setOutputType(typeInfo);
+ return this;
+ }
+
@Override
public TwoGlobalStreams process(
TwoOutputStreamProcessFunction processFunction) {
diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java
index 6eaf7dbac3e53..3375ff503a3df 100644
--- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java
+++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java
@@ -59,6 +59,7 @@
import java.util.Collections;
import java.util.HashSet;
+import static java.util.Objects.requireNonNull;
import static org.apache.flink.datastream.impl.utils.StreamUtils.validateStates;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -189,6 +190,13 @@ public ProcessConfigurableAndKeyedPartitionStream process(
TypeExtractor.getKeySelectorTypes(newKeySelector, outputStream.getType())));
}
+ @Override
+ public KeyedPartitionStream returns(TypeInformation typeInfo) {
+ requireNonNull(typeInfo, "TypeInformation must not be null");
+ transformation.setOutputType(typeInfo);
+ return this;
+ }
+
@Override
public ProcessConfigurableAndTwoKeyedPartitionStreams process(
TwoOutputStreamProcessFunction processFunction,
diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java
index be523dc0f58f9..4c9ce921c67bb 100644
--- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java
+++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java
@@ -57,6 +57,7 @@
import java.util.Collections;
import java.util.HashSet;
+import static java.util.Objects.requireNonNull;
import static org.apache.flink.datastream.impl.utils.StreamUtils.validateStates;
import static org.apache.flink.util.Preconditions.checkState;
@@ -111,6 +112,13 @@ public ProcessConfigurableAndNonKeyedPartitionStream process(
new NonKeyedPartitionStreamImpl<>(environment, outputTransform));
}
+ @Override
+ public NonKeyedPartitionStream returns(TypeInformation typeInfo) {
+ requireNonNull(typeInfo, "TypeInformation must not be null");
+ transformation.setOutputType(typeInfo);
+ return this;
+ }
+
@Override
public ProcessConfigurableAndTwoNonKeyedPartitionStream process(
TwoOutputStreamProcessFunction processFunction) {
diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndGlobalStreamImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndGlobalStreamImpl.java
index 02c464127ac10..22d12dced7543 100644
--- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndGlobalStreamImpl.java
+++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndGlobalStreamImpl.java
@@ -18,6 +18,7 @@
package org.apache.flink.datastream.impl.stream;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.dsv2.Sink;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
@@ -30,6 +31,8 @@
import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
import org.apache.flink.datastream.api.stream.ProcessConfigurable;
+import static java.util.Objects.requireNonNull;
+
/**
* The implementation of {@link ProcessConfigurableAndGlobalStream}. This forwarding all process
* methods to the underlying stream.
@@ -50,6 +53,13 @@ public ProcessConfigurableAndGlobalStream process(
return stream.process(processFunction);
}
+ @Override
+ public GlobalStream returns(TypeInformation typeInfo) {
+ requireNonNull(typeInfo, "TypeInformation must not be null");
+ transformation.setOutputType(typeInfo);
+ return this;
+ }
+
@Override
public TwoGlobalStreams process(
TwoOutputStreamProcessFunction processFunction) {
diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndKeyedPartitionStreamImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndKeyedPartitionStreamImpl.java
index ccaacb0cbb84b..11e6a70f88fae 100644
--- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndKeyedPartitionStreamImpl.java
+++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndKeyedPartitionStreamImpl.java
@@ -18,6 +18,7 @@
package org.apache.flink.datastream.impl.stream;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.dsv2.Sink;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
@@ -33,6 +34,8 @@
import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream;
import org.apache.flink.datastream.api.stream.ProcessConfigurable;
+import static java.util.Objects.requireNonNull;
+
/**
* The implementation of {@link ProcessConfigurableAndKeyedPartitionStream}. This forwarding all
* process methods to the underlying stream.
@@ -54,6 +57,13 @@ public ProcessConfigurableAndKeyedPartitionStream process(
return stream.process(processFunction, newKeySelector);
}
+ @Override
+ public KeyedPartitionStream returns(TypeInformation typeInfo) {
+ requireNonNull(typeInfo, "TypeInformation must not be null");
+ transformation.setOutputType(typeInfo);
+ return this;
+ }
+
@Override
public ProcessConfigurableAndNonKeyedPartitionStream process(
OneInputStreamProcessFunction processFunction) {
diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndNonKeyedPartitionStreamImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndNonKeyedPartitionStreamImpl.java
index 82f958d7e0a6a..3664e521a3034 100644
--- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndNonKeyedPartitionStreamImpl.java
+++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableAndNonKeyedPartitionStreamImpl.java
@@ -18,6 +18,7 @@
package org.apache.flink.datastream.impl.stream;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.dsv2.Sink;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
@@ -31,6 +32,8 @@
import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream;
import org.apache.flink.datastream.api.stream.ProcessConfigurable;
+import static java.util.Objects.requireNonNull;
+
/**
* The implementation of {@link ProcessConfigurableAndNonKeyedPartitionStream}. This forwarding all
* process methods to the underlying stream.
@@ -52,6 +55,13 @@ public ProcessConfigurableAndNonKeyedPartitionStream process(
return stream.process(processFunction);
}
+ @Override
+ public NonKeyedPartitionStream returns(TypeInformation typeInfo) {
+ requireNonNull(typeInfo, "TypeInformation must not be null");
+ this.transformation.setOutputType(typeInfo);
+ return this;
+ }
+
@Override
public ProcessConfigurableAndTwoNonKeyedPartitionStream process(
TwoOutputStreamProcessFunction processFunction) {
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/wordcount/WordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/wordcount/WordCount.java
index 1f99c1c8fd4d5..dda5cb8a81aab 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/wordcount/WordCount.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/wordcount/WordCount.java
@@ -23,6 +23,8 @@
import org.apache.flink.api.common.state.StateDeclarations;
import org.apache.flink.api.common.state.ValueStateDeclaration;
import org.apache.flink.api.common.typeinfo.TypeDescriptors;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
import org.apache.flink.api.connector.dsv2.WrappedSink;
import org.apache.flink.api.connector.dsv2.WrappedSource;
@@ -153,8 +155,23 @@ public static void main(String[] args) throws Exception {
// The text lines read from the source are split into words
// using a user-defined process function. The tokenizer, implemented below,
// will output each word as a (2-tuple) containing (word, 1)
- text.process(new Tokenizer())
+ // (OneInputStreamProcessFunction
+ text.process(
+ (OneInputStreamProcessFunction>)
+ (record, output, ctx) -> {
+ // normalize and split the line
+ String[] tokens = record.toLowerCase().split("\\W+");
+
+ // emit the pairs
+ for (String token : tokens) {
+ if (!token.isEmpty()) {
+ output.collect(new Tuple2<>(token, 1));
+ }
+ }
+ })
.withName("tokenizer")
+ .returns(TypeInformation.of(new TypeHint>() {}))
+
// keyBy groups tuples based on the first field, the word.
// Using a keyBy allows performing aggregations and other
// stateful transformations over data on a per-key basis.
@@ -203,32 +220,6 @@ public static void main(String[] args) throws Exception {
// USER PROCESS FUNCTIONS
// *************************************************************************
- /**
- * Implements the string tokenizer that splits sentences into words as a user-defined
- * ProcessFunction. The process function takes a line (String) and splits it into multiple pairs
- * in the form of "(word,1)" ({@code Tuple2}).
- */
- public static final class Tokenizer
- implements OneInputStreamProcessFunction> {
-
- @Override
- public void processRecord(
- String record,
- Collector> output,
- PartitionedContext> ctx)
- throws Exception {
- // normalize and split the line
- String[] tokens = record.toLowerCase().split("\\W+");
-
- // emit the pairs
- for (String token : tokens) {
- if (!token.isEmpty()) {
- output.collect(new Tuple2<>(token, 1));
- }
- }
- }
- }
-
/**
* Implements a word counter as a user-defined ProcessFunction that counts received words in
* streaming mode. The function uses a ValueState to store the count of each word, it will
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/extension/join/JoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/extension/join/JoinITCase.java
index a5bf9ee94400f..e000ec4c06693 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/extension/join/JoinITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/extension/join/JoinITCase.java
@@ -19,6 +19,8 @@
package org.apache.flink.test.streaming.api.datastream.extension.join;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
import org.apache.flink.api.connector.dsv2.WrappedSink;
import org.apache.flink.api.connector.sink2.Sink;
@@ -33,7 +35,6 @@
import org.apache.flink.datastream.api.ExecutionEnvironment;
import org.apache.flink.datastream.api.builtin.BuiltinFuncs;
import org.apache.flink.datastream.api.common.Collector;
-import org.apache.flink.datastream.api.context.PartitionedContext;
import org.apache.flink.datastream.api.context.RuntimeContext;
import org.apache.flink.datastream.api.extension.join.JoinFunction;
import org.apache.flink.datastream.api.extension.join.JoinType;
@@ -95,12 +96,24 @@ void testInnerJoinWithSameKey() throws Exception {
NonKeyedPartitionStream joinedStream =
BuiltinFuncs.join(
- source1,
- (KeySelector) keyAndValue -> keyAndValue.key,
- source2,
- (KeySelector) keyAndValue -> keyAndValue.key,
- new TestJoinFunction(),
- JoinType.INNER);
+ source1,
+ (KeySelector) keyAndValue -> keyAndValue.key,
+ source2,
+ (KeySelector) keyAndValue -> keyAndValue.key,
+ (JoinFunction)
+ (leftRecord, rightRecord, output, ctx) -> {
+ assertThat(leftRecord.key).isNotNull();
+ assertThat(leftRecord.key).isEqualTo(rightRecord.key);
+ String result =
+ leftRecord.key
+ + ":"
+ + leftRecord.value
+ + ":"
+ + rightRecord.value;
+ output.collect(result);
+ },
+ JoinType.INNER)
+ .returns(TypeInformation.of(String.class));
joinedStream.toSink(new WrappedSink<>(new TestSink()));
env.execute("testInnerJoinWithSameKey");
@@ -131,12 +144,24 @@ void testInnerJoinWithMultipleKeys() throws Exception {
NonKeyedPartitionStream joinedStream =
BuiltinFuncs.join(
- source1,
- (KeySelector) keyAndValue -> keyAndValue.key,
- source2,
- (KeySelector) keyAndValue -> keyAndValue.key,
- new TestJoinFunction(),
- JoinType.INNER);
+ source1,
+ (KeySelector) keyAndValue -> keyAndValue.key,
+ source2,
+ (KeySelector) keyAndValue -> keyAndValue.key,
+ (JoinFunction)
+ (leftRecord, rightRecord, output, ctx) -> {
+ assertThat(leftRecord.key).isNotNull();
+ assertThat(leftRecord.key).isEqualTo(rightRecord.key);
+ String result =
+ leftRecord.key
+ + ":"
+ + leftRecord.value
+ + ":"
+ + rightRecord.value;
+ output.collect(result);
+ },
+ JoinType.INNER)
+ .returns(TypeInformation.of(String.class));
joinedStream.toSink(new WrappedSink<>(new TestSink()));
env.execute("testInnerJoinWithMultipleKeys");
@@ -333,26 +358,23 @@ void testJoinWithTuple() throws Exception {
NonKeyedPartitionStream> joinedStream =
BuiltinFuncs.join(
- source1,
- (KeySelector, String>) elem -> elem.f0,
- source2,
- (KeySelector, String>) elem -> elem.f0,
- new JoinFunction<
- Tuple2,
- Tuple2,
- Tuple3>() {
-
- @Override
- public void processRecord(
- Tuple2 leftRecord,
- Tuple2 rightRecord,
- Collector> output,
- RuntimeContext ctx)
- throws Exception {
- output.collect(
- Tuple3.of(leftRecord.f0, leftRecord.f1, rightRecord.f1));
- }
- });
+ source1,
+ (KeySelector, String>) elem -> elem.f0,
+ source2,
+ (KeySelector, String>) elem -> elem.f0,
+ (JoinFunction<
+ Tuple2,
+ Tuple2,
+ Tuple3>)
+ (leftRecord, rightRecord, output, ctx) ->
+ output.collect(
+ Tuple3.of(
+ leftRecord.f0,
+ leftRecord.f1,
+ rightRecord.f1)))
+ .returns(
+ TypeInformation.of(
+ new TypeHint>() {}));
joinedStream.toSink(
new WrappedSink<>(
@@ -410,14 +432,9 @@ private NonKeyedPartitionStream getEmptySourceStream(
NonKeyedPartitionStream source =
env.fromSource(DataStreamV2SourceUtils.fromData(data), sourceName);
return source.process(
- new OneInputStreamProcessFunction() {
- @Override
- public void processRecord(
- KeyAndValue record,
- Collector output,
- PartitionedContext ctx)
- throws Exception {}
- });
+ (OneInputStreamProcessFunction)
+ (record, output, ctx) -> {})
+ .returns(TypeInformation.of(KeyAndValue.class));
}
private static class TestJoinFunction