Skip to content

Commit

Permalink
[FLINK-36269][python] Remove usage about TableEnvironmentInternal#fro…
Browse files Browse the repository at this point in the history
…mTableSource in python module (#25322)
  • Loading branch information
xuyangzhong authored Jan 2, 2025
1 parent b700f1c commit c3daf84
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 48 deletions.
11 changes: 0 additions & 11 deletions docs/content.zh/docs/dev/python/table/table_environment.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,17 +206,6 @@ TableEnvironment API
</tr>
</thead>
<tbody>
<tr>
<td>
<strong>from_table_source(table_source)</strong>
</td>
<td>
通过 table source 创建一张表。
</td>
<td class="text-center">
{{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.from_table_source" name="链接">}}
</td>
</tr>
<tr>
<td>
<strong>scan(*table_path)</strong>
Expand Down
11 changes: 0 additions & 11 deletions docs/content/docs/dev/python/table/table_environment.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,17 +206,6 @@ These APIs are used to create/remove Table API/SQL Tables and write queries:
</tr>
</thead>
<tbody>
<tr>
<td>
<strong>from_table_source(table_source)</strong>
</td>
<td>
Creates a table from a table source.
</td>
<td class="text-center">
{{< pythondoc file="pyflink.table.html#pyflink.table.TableEnvironment.from_table_source" name="link">}}
</td>
</tr>
<tr>
<td>
<strong>scan(*table_path)</strong>
Expand Down
6 changes: 3 additions & 3 deletions flink-python/pyflink/table/table_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -1368,10 +1368,10 @@ def from_pandas(self, pdf,
data_type = data_type.bridgedTo(
load_java_class('org.apache.flink.table.data.RowData'))

j_arrow_table_source = \
jvm.org.apache.flink.table.runtime.arrow.ArrowUtils.createArrowTableSource(
j_arrow_table_source_descriptor = \
jvm.org.apache.flink.table.runtime.arrow.ArrowUtils.createArrowTableSourceDesc(
data_type, temp_file.name)
return Table(self._j_tenv.fromTableSource(j_arrow_table_source), self)
return Table(getattr(self._j_tenv, "from")(j_arrow_table_source_descriptor), self)
finally:
os.unlink(temp_file.name)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,18 @@
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.api.internal.TableImpl;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.columnar.vector.ColumnVector;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.operations.OutputConversionModifyOperation;
import org.apache.flink.table.runtime.arrow.sources.ArrowTableSource;
import org.apache.flink.table.runtime.arrow.sources.ArrowTableSourceFactory;
import org.apache.flink.table.runtime.arrow.sources.ArrowTableSourceOptions;
import org.apache.flink.table.runtime.arrow.vectors.ArrowArrayColumnVector;
import org.apache.flink.table.runtime.arrow.vectors.ArrowBigIntColumnVector;
import org.apache.flink.table.runtime.arrow.vectors.ArrowBinaryColumnVector;
Expand Down Expand Up @@ -159,6 +162,8 @@
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.flink.table.types.DataType.getFieldNames;

/** Utilities for Arrow. */
@Internal
public final class ArrowUtils {
Expand Down Expand Up @@ -475,10 +480,31 @@ public static ColumnVector createColumnVector(ValueVector vector, LogicalType fi
}
}

public static ArrowTableSource createArrowTableSource(DataType dataType, String fileName)
throws IOException {
public static TableDescriptor createArrowTableSourceDesc(DataType dataType, String fileName) {
List<String> fieldNames = getFieldNames(dataType);
List<DataType> fieldTypes = dataType.getChildren();
org.apache.flink.table.api.Schema.Builder schemaBuilder =
org.apache.flink.table.api.Schema.newBuilder();
for (int i = 0; i < fieldNames.size(); i++) {
schemaBuilder.column(fieldNames.get(i), fieldTypes.get(i));
}

try {
byte[][] data = readArrowBatches(fileName);
return TableDescriptor.forConnector(ArrowTableSourceFactory.IDENTIFIER)
.option(
ArrowTableSourceOptions.DATA,
ByteArrayUtils.twoDimByteArrayToString(data))
.schema(schemaBuilder.build())
.build();
} catch (Throwable e) {
throw new TableException("Failed to read the arrow data from " + fileName, e);
}
}

public static byte[][] readArrowBatches(String fileName) throws IOException {
try (FileInputStream fis = new FileInputStream(fileName)) {
return new ArrowTableSource(dataType, readArrowBatches(fis.getChannel()));
return readArrowBatches(fis.getChannel());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.runtime.arrow;

import org.apache.flink.annotation.Internal;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Base64;

/** A utility class for converting byte[][] to String and String to byte[][]. */
@Internal
public class ByteArrayUtils {

/** Convert byte[][] to String. */
public static String twoDimByteArrayToString(byte[][] byteArray) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(byteArray);
oos.flush();
byte[] serializedArray = bos.toByteArray();

return Base64.getEncoder().encodeToString(serializedArray);
}

/** Convert String to byte[][]. */
public static byte[][] stringToTwoDimByteArray(String str)
throws IOException, ClassNotFoundException {
byte[] bytes = Base64.getDecoder().decode(str);

ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(bis);
return (byte[][]) ois.readObject();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,43 +19,54 @@
package org.apache.flink.table.runtime.arrow.sources;

import org.apache.flink.annotation.Internal;
import org.apache.flink.legacy.table.sources.StreamTableSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.legacy.api.TableSchema;
import org.apache.flink.legacy.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.runtime.arrow.ByteArrayUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;

/** A {@link StreamTableSource} for serialized arrow record batch data. */
/** A {@link ScanTableSource} for serialized arrow record batch data. */
@Internal
public class ArrowTableSource implements StreamTableSource<RowData> {
public class ArrowTableSource implements ScanTableSource {

final DataType dataType;
final byte[][] arrowData;
private final DataType dataType;

public ArrowTableSource(DataType dataType, byte[][] arrowData) {
private final byte[][] arrowData;

public ArrowTableSource(DataType dataType, String data) {
this.dataType = dataType;
try {
this.arrowData = ByteArrayUtils.stringToTwoDimByteArray(data);
} catch (Throwable e) {
throw new TableException(
"Failed to convert the data from String to byte[][].\nThe data is: " + data, e);
}
}

private ArrowTableSource(DataType dataType, byte[][] arrowData) {
this.dataType = dataType;
this.arrowData = arrowData;
}

@Override
public boolean isBounded() {
return true;
public DynamicTableSource copy() {
return new ArrowTableSource(dataType, arrowData);
}

@Override
public DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) {
return execEnv.addSource(new ArrowSourceFunction(dataType, arrowData));
public String asSummaryString() {
return "ArrowTableSource";
}

@Override
public TableSchema getTableSchema() {
return TableSchema.fromResolvedSchema(DataTypeUtils.expandCompositeTypeToSchema(dataType));
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
}

@Override
public DataType getProducedDataType() {
return dataType;
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
return SourceFunctionProvider.of(new ArrowSourceFunction(dataType, arrowData), true);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.runtime.arrow.sources;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;

import java.util.HashSet;
import java.util.Set;

/** Factory for creating configured instances of {@link ArrowTableSource}.. */
public class ArrowTableSourceFactory implements DynamicTableSourceFactory {

public static final String IDENTIFIER = "python-arrow-source";

@Override
public String factoryIdentifier() {
return IDENTIFIER;
}

@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(ArrowTableSourceOptions.DATA);
return options;
}

@Override
public Set<ConfigOption<?>> optionalOptions() {
return new HashSet<>();
}

@Override
public DynamicTableSource createDynamicTableSource(Context context) {
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
ReadableConfig tableOptions = helper.getOptions();

String data = tableOptions.get(ArrowTableSourceOptions.DATA);
DataType dataType = context.getPhysicalRowDataType();
return new ArrowTableSource(dataType, data);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.runtime.arrow.sources;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

/** Table options for the {@link ArrowTableSource}. */
public class ArrowTableSourceOptions {

public static final ConfigOption<String> DATA =
ConfigOptions.key("data")
.stringType()
.noDefaultValue()
.withDescription(
"This is the data serialized by Arrow with a byte two-dimensional array. "
+ "Note: The byte two-dimensional array is converted into a string using base64.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@
# limitations under the License.

org.apache.flink.table.utils.python.PythonDynamicTableFactory
org.apache.flink.table.runtime.arrow.sources.ArrowTableSourceFactory

0 comments on commit c3daf84

Please sign in to comment.