Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37443] Add returns() method to DataStream V2 API for specifying output types with lambda expressions #26284

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
8 changes: 7 additions & 1 deletion flink-datastream-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,11 @@ under the License.
<artifactId>flink-core-api</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API modules should not depend on non-API modules, because this would require users to depend on the flink-core module when developing DataStream V2 programs, but flink-core should only be used at application runnning.

You might consider moving TypeInformation to the API module, such as core-api. However, I understand that this is challenging due to its dependencies on numerous other classes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can copy it, but will it be okay to have these duplicate code in codebase? Or is there any other better way to do that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dependency of flink-core module through usingTypeInformation is only used in the interfaces located in flink-datastream-api, as the implementations are defined in flink-datastream module which are already using flink-core module.

I think it does not make much sense to copy a large number of files and maintain it just for a small feature and that too only for defining interfaces. I am open to other ideas.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dependency of flink-core module through usingTypeInformation is only used in the interfaces located in flink-datastream-api, as the implementations are defined in flink-datastream module which are already using flink-core module.

I think it does not make much sense to copy a large number of files and maintain it just for a small feature and that too only for defining interfaces. I am open to other ideas.

Sorry for the delayed reply. I think we can introduce a class, such as TypeHint, at the API level that allows users to define a class with generics. Subsequently, this TypeHint can be converted into TypeInformation at the implementation level.
During this process, you can leverage TypeDescriptor to describe common types within the flink-datastream-api. Additionally, it would be advantageous to incorporate the tuple type into TypeDescriptor.

<version>${project.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.datastream.api.stream;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.dsv2.Sink;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
Expand All @@ -37,6 +38,20 @@ public interface GlobalStream<T> extends DataStream {
<OUT> ProcessConfigurableAndGlobalStream<OUT> process(
OneInputStreamProcessFunction<T, OUT> processFunction);

/**
* Adds a type information hint about the return type of this operator. This method can be used
* in cases where Flink cannot determine automatically what the produced type of a function is.
* That can be the case if the function uses generic type variables in the return type that
* cannot be inferred from the input type.
*
* <p>In most cases, the methods {@link #returns(Class)} and {@link #returns(TypeHint)} are
* preferable.
*
* @param typeInfo type information as a return type hint
* @return This operator with a given return type hint.
*/
GlobalStream<T> returns(TypeInformation<T> typeInfo);

/**
* Apply a two output operation to this {@link GlobalStream}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.datastream.api.stream;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.dsv2.Sink;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
Expand Down Expand Up @@ -50,6 +51,20 @@ <OUT> ProcessConfigurableAndKeyedPartitionStream<K, OUT> process(
OneInputStreamProcessFunction<T, OUT> processFunction,
KeySelector<OUT, K> newKeySelector);

/**
* Adds a type information hint about the return type of this operator. This method can be used
* in cases where Flink cannot determine automatically what the produced type of a function is.
* That can be the case if the function uses generic type variables in the return type that
* cannot be inferred from the input type.
*
* <p>In most cases, the methods {@link #returns(Class)} and {@link #returns(TypeHint)} are
* preferable.
*
* @param typeInfo type information as a return type hint
* @return This operator with a given return type hint.
*/
KeyedPartitionStream<K, T> returns(TypeInformation<T> typeInfo);

/**
* Apply an operation to this {@link KeyedPartitionStream}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.datastream.api.stream;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.dsv2.Sink;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
Expand All @@ -41,6 +42,20 @@ public interface NonKeyedPartitionStream<T> extends DataStream {
<OUT> ProcessConfigurableAndNonKeyedPartitionStream<OUT> process(
OneInputStreamProcessFunction<T, OUT> processFunction);

/**
* Adds a type information hint about the return type of this operator. This method can be used
* in cases where Flink cannot determine automatically what the produced type of a function is.
* That can be the case if the function uses generic type variables in the return type that
* cannot be inferred from the input type.
*
* <p>In most cases, the methods {@link #returns(Class)} and {@link #returns(TypeHint)} are
* preferable.
*
* @param typeInfo type information as a return type hint
* @return This operator with a given return type hint.
*/
NonKeyedPartitionStream<T> returns(TypeInformation<T> typeInfo);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KeyedPartitionStream and GlobalStream also need returns method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay I will add it,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay I will add it,

BroadcastStream also need returns method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can it be just part of datastream interface? Then it will extend all other streams by default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can it be just part of datastream interface? Then it will extend all other streams by default.

Thank you for the reminder. You can add the withReturnType method to the ProcessConfigurable interface, as it proves to be useful in most scenarios. Regarding situations with two output streams, users can obtain both streams individually and then apply the withReturnType method to each. Here's an example to illustrate:

NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream<Integer, String>
                twoOutputStream = ...;
twoOutputStream.getFirst().withReturnType(TypeDescriptor.INT).process(...);
twoOutputStream.getSecond().withReturnType(TypeDescriptor.String).process(...);


/**
* Apply a two output operation to this {@link NonKeyedPartitionStream}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.Collections;
import java.util.HashSet;

import static java.util.Objects.requireNonNull;
import static org.apache.flink.datastream.impl.utils.StreamUtils.validateStates;

/** The implementation of {@link GlobalStream}. */
Expand Down Expand Up @@ -101,6 +102,13 @@ public <OUT> ProcessConfigurableAndGlobalStream<OUT> process(
}
}

@Override
public GlobalStream<T> returns(TypeInformation<T> typeInfo) {
requireNonNull(typeInfo, "TypeInformation must not be null");
transformation.setOutputType(typeInfo);
return this;
}

@Override
public <OUT1, OUT2> TwoGlobalStreams<OUT1, OUT2> process(
TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.Collections;
import java.util.HashSet;

import static java.util.Objects.requireNonNull;
import static org.apache.flink.datastream.impl.utils.StreamUtils.validateStates;
import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -189,6 +190,13 @@ public <OUT> ProcessConfigurableAndKeyedPartitionStream<K, OUT> process(
TypeExtractor.getKeySelectorTypes(newKeySelector, outputStream.getType())));
}

@Override
public KeyedPartitionStream<K, V> returns(TypeInformation<V> typeInfo) {
requireNonNull(typeInfo, "TypeInformation must not be null");
transformation.setOutputType(typeInfo);
return this;
}

@Override
public <OUT1, OUT2> ProcessConfigurableAndTwoKeyedPartitionStreams<K, OUT1, OUT2> process(
TwoOutputStreamProcessFunction<V, OUT1, OUT2> processFunction,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.Collections;
import java.util.HashSet;

import static java.util.Objects.requireNonNull;
import static org.apache.flink.datastream.impl.utils.StreamUtils.validateStates;
import static org.apache.flink.util.Preconditions.checkState;

Expand Down Expand Up @@ -111,6 +112,13 @@ public <OUT> ProcessConfigurableAndNonKeyedPartitionStream<OUT> process(
new NonKeyedPartitionStreamImpl<>(environment, outputTransform));
}

@Override
public NonKeyedPartitionStream<T> returns(TypeInformation<T> typeInfo) {
requireNonNull(typeInfo, "TypeInformation must not be null");
transformation.setOutputType(typeInfo);
return this;
}

@Override
public <OUT1, OUT2> ProcessConfigurableAndTwoNonKeyedPartitionStream<OUT1, OUT2> process(
TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.datastream.impl.stream;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.dsv2.Sink;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
Expand All @@ -30,6 +31,8 @@
import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
import org.apache.flink.datastream.api.stream.ProcessConfigurable;

import static java.util.Objects.requireNonNull;

/**
* The implementation of {@link ProcessConfigurableAndGlobalStream}. This forwarding all process
* methods to the underlying stream.
Expand All @@ -50,6 +53,13 @@ public <OUT> ProcessConfigurableAndGlobalStream<OUT> process(
return stream.process(processFunction);
}

@Override
public GlobalStream<T> returns(TypeInformation<T> typeInfo) {
requireNonNull(typeInfo, "TypeInformation must not be null");
transformation.setOutputType(typeInfo);
return this;
}

@Override
public <OUT1, OUT2> TwoGlobalStreams<OUT1, OUT2> process(
TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.datastream.impl.stream;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.dsv2.Sink;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
Expand All @@ -33,6 +34,8 @@
import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream;
import org.apache.flink.datastream.api.stream.ProcessConfigurable;

import static java.util.Objects.requireNonNull;

/**
* The implementation of {@link ProcessConfigurableAndKeyedPartitionStream}. This forwarding all
* process methods to the underlying stream.
Expand All @@ -54,6 +57,13 @@ public <OUT> ProcessConfigurableAndKeyedPartitionStream<K, OUT> process(
return stream.process(processFunction, newKeySelector);
}

@Override
public KeyedPartitionStream<K, T> returns(TypeInformation<T> typeInfo) {
requireNonNull(typeInfo, "TypeInformation must not be null");
transformation.setOutputType(typeInfo);
return this;
}

@Override
public <OUT> ProcessConfigurableAndNonKeyedPartitionStream<OUT> process(
OneInputStreamProcessFunction<T, OUT> processFunction) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.datastream.impl.stream;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.dsv2.Sink;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
Expand All @@ -31,6 +32,8 @@
import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream;
import org.apache.flink.datastream.api.stream.ProcessConfigurable;

import static java.util.Objects.requireNonNull;

/**
* The implementation of {@link ProcessConfigurableAndNonKeyedPartitionStream}. This forwarding all
* process methods to the underlying stream.
Expand All @@ -52,6 +55,13 @@ public <OUT> ProcessConfigurableAndNonKeyedPartitionStream<OUT> process(
return stream.process(processFunction);
}

@Override
public NonKeyedPartitionStream<T> returns(TypeInformation<T> typeInfo) {
requireNonNull(typeInfo, "TypeInformation must not be null");
this.transformation.setOutputType(typeInfo);
return this;
}

@Override
public <OUT1, OUT2> ProcessConfigurableAndTwoNonKeyedPartitionStream<OUT1, OUT2> process(
TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.flink.api.common.state.StateDeclarations;
import org.apache.flink.api.common.state.ValueStateDeclaration;
import org.apache.flink.api.common.typeinfo.TypeDescriptors;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
import org.apache.flink.api.connector.dsv2.WrappedSink;
import org.apache.flink.api.connector.dsv2.WrappedSource;
Expand Down Expand Up @@ -153,8 +155,23 @@ public static void main(String[] args) throws Exception {
// The text lines read from the source are split into words
// using a user-defined process function. The tokenizer, implemented below,
// will output each word as a (2-tuple) containing (word, 1)
text.process(new Tokenizer())
// (OneInputStreamProcessFunction<String, Tuple2<<String, Integer>
text.process(
(OneInputStreamProcessFunction<String, Tuple2<String, Integer>>)
(record, output, ctx) -> {
// normalize and split the line
String[] tokens = record.toLowerCase().split("\\W+");

// emit the pairs
for (String token : tokens) {
if (!token.isEmpty()) {
output.collect(new Tuple2<>(token, 1));
}
}
})
.withName("tokenizer")
.returns(TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}))

// keyBy groups tuples based on the first field, the word.
// Using a keyBy allows performing aggregations and other
// stateful transformations over data on a per-key basis.
Expand Down Expand Up @@ -203,32 +220,6 @@ public static void main(String[] args) throws Exception {
// USER PROCESS FUNCTIONS
// *************************************************************************

/**
* Implements the string tokenizer that splits sentences into words as a user-defined
* ProcessFunction. The process function takes a line (String) and splits it into multiple pairs
* in the form of "(word,1)" ({@code Tuple2<String, Integer>}).
*/
public static final class Tokenizer
implements OneInputStreamProcessFunction<String, Tuple2<String, Integer>> {

@Override
public void processRecord(
String record,
Collector<Tuple2<String, Integer>> output,
PartitionedContext<Tuple2<String, Integer>> ctx)
throws Exception {
// normalize and split the line
String[] tokens = record.toLowerCase().split("\\W+");

// emit the pairs
for (String token : tokens) {
if (!token.isEmpty()) {
output.collect(new Tuple2<>(token, 1));
}
}
}
}

/**
* Implements a word counter as a user-defined ProcessFunction that counts received words in
* streaming mode. The function uses a ValueState to store the count of each word, it will
Expand Down
Loading