diff --git a/flink-python/pyflink/table/catalog.py b/flink-python/pyflink/table/catalog.py index 4ccf5d6945832..bb9a5b45c7393 100644 --- a/flink-python/pyflink/table/catalog.py +++ b/flink-python/pyflink/table/catalog.py @@ -28,6 +28,7 @@ from pyflink.util.java_utils import to_jarray from typing import Dict, List, Optional, Union from abc import ABCMeta, abstractmethod +from pyflink.util.api_stability_decorators import PublicEvolving __all__ = ['Catalog', 'CatalogDatabase', 'CatalogBaseTable', 'CatalogPartition', 'CatalogFunction', 'Procedure', 'ObjectPath', 'CatalogPartitionSpec', 'CatalogTableStatistics', @@ -36,6 +37,7 @@ 'Constraint', 'UniqueConstraint', 'ResolvedSchema'] +@PublicEvolving() class Catalog(object): """ Catalog is responsible for reading and writing metadata such as database/table/views/UDFs @@ -733,6 +735,7 @@ def alter_partition_column_statistics(self, ignore_if_not_exists) +@PublicEvolving() class CatalogDatabase(object): """ Represents a database object in a catalog. @@ -809,6 +812,7 @@ def get_detailed_description(self) -> Optional[str]: return None +@PublicEvolving() class CatalogBaseTable(object): """ CatalogBaseTable is the common parent of table and view. It has a map of @@ -943,6 +947,7 @@ def get_detailed_description(self) -> Optional[str]: return None +@PublicEvolving() class CatalogPartition(object): """ Represents a partition object in catalog. @@ -1022,6 +1027,7 @@ def get_comment(self) -> str: return self._j_catalog_partition.getComment() +@PublicEvolving() class CatalogFunction(object): """ Interface for a function in a catalog. @@ -1113,6 +1119,7 @@ def get_function_language(self): return self._j_catalog_function.getFunctionLanguage() +@PublicEvolving() class CatalogModel(object): """ Interface for a model in a catalog. @@ -1176,6 +1183,7 @@ def get_options(self): return dict(self._j_catalog_model.getOptions()) +@PublicEvolving() class Procedure(object): """ Interface for a procedure in a catalog. @@ -1189,6 +1197,7 @@ def _get(j_procedure): return Procedure(j_procedure) +@PublicEvolving() class ObjectPath(object): """ A database name and object (table/view/function) name combo in a catalog. @@ -1226,6 +1235,7 @@ def from_string(full_name: str) -> 'ObjectPath': return ObjectPath(j_object_path=gateway.jvm.ObjectPath.fromString(full_name)) +@PublicEvolving() class CatalogPartitionSpec(object): """ Represents a partition spec object in catalog. @@ -1259,6 +1269,7 @@ def get_partition_spec(self) -> Dict[str, str]: return dict(self._j_catalog_partition_spec.getPartitionSpec()) +@PublicEvolving() class CatalogTableStatistics(object): """ Statistics for a non-partitioned table or a partition of a partitioned table. @@ -1313,6 +1324,7 @@ def copy(self) -> 'CatalogTableStatistics': j_catalog_table_statistics=self._j_catalog_table_statistics.copy()) +@PublicEvolving() class CatalogColumnStatistics(object): """ Column statistics of a table or partition. @@ -1379,6 +1391,7 @@ def __init__(self, catalog_name: str, default_database: str, username: str, pwd: super(JdbcCatalog, self).__init__(j_jdbc_catalog) +@PublicEvolving() class CatalogDescriptor: """ Describes a catalog with the catalog name and configuration. @@ -1483,6 +1496,7 @@ def as_summary_string(self) -> str: return self._j_object_identifier.asSummaryString() +@PublicEvolving() class Column(metaclass=ABCMeta): """ Representation of a column in a :class:`ResolvedSchema`. @@ -1642,6 +1656,7 @@ def rename(self, new_name: str) -> "Column": pass +@PublicEvolving() class PhysicalColumn(Column): """ Representation of a physical column. @@ -1670,6 +1685,7 @@ def rename(self, new_name: str) -> Column: return self._j_physical_column.rename(new_name) +@PublicEvolving() class ComputedColumn(Column): """ Representation of a computed column. @@ -1710,6 +1726,7 @@ def rename(self, new_name: str) -> Column: return self._j_computed_column.rename(new_name) +@PublicEvolving() class MetadataColumn(Column): """ Representation of a metadata column. @@ -1754,6 +1771,7 @@ def rename(self, new_name: str) -> Column: return self._j_metadata_column.rename(new_name) +@PublicEvolving() class WatermarkSpec: """ Representation of a watermark specification in :class:`ResolvedSchema`. @@ -1811,6 +1829,7 @@ def as_summary_string(self) -> str: return self._j_watermark_spec.asSummaryString() +@PublicEvolving() class Constraint(metaclass=ABCMeta): """ Integrity constraints, generally referred to simply as constraints, define the valid states of @@ -1849,6 +1868,7 @@ def as_summary_string(self) -> str: """ return self._j_constraint.asSummaryString() + @PublicEvolving() class ConstraintType(Enum): """ Type of the constraint. @@ -1866,6 +1886,7 @@ class ConstraintType(Enum): UNIQUE_KEY = 1 +@PublicEvolving() class UniqueConstraint(Constraint): """ A unique key constraint. It can be declared also as a PRIMARY KEY. @@ -1899,6 +1920,7 @@ def get_type_string(self) -> str: return self._j_unique_constraint.getTypeString() +@PublicEvolving() class ResolvedSchema(object): """ Schema of a table or view consisting of columns, constraints, and watermark specifications. diff --git a/flink-python/pyflink/table/changelog_mode.py b/flink-python/pyflink/table/changelog_mode.py index bd63e1d851fce..a6e31f1c11a25 100644 --- a/flink-python/pyflink/table/changelog_mode.py +++ b/flink-python/pyflink/table/changelog_mode.py @@ -16,10 +16,12 @@ # limitations under the License. ################################################################################ from pyflink.java_gateway import get_gateway +from pyflink.util.api_stability_decorators import PublicEvolving __all__ = ['ChangelogMode'] +@PublicEvolving() class ChangelogMode(object): """ The set of changes contained in a changelog. @@ -30,18 +32,28 @@ def __init__(self, j_changelog_mode): @staticmethod def insert_only(): + """ + Shortcut for a simple :attr:`~pyflink.common.RowKind.INSERT`-only changelog. + """ gateway = get_gateway() return ChangelogMode( gateway.jvm.org.apache.flink.table.connector.ChangelogMode.insertOnly()) @staticmethod def upsert(): + """ + Shortcut for an upsert changelog that describes idempotent updates on a key and thus does + does not contain :attr:`~pyflink.common.RowKind.UPDATE_BEFORE` rows. + """ gateway = get_gateway() return ChangelogMode( gateway.jvm.org.apache.flink.table.connector.ChangelogMode.upsert()) @staticmethod def all(): + """ + Shortcut for a changelog that can contain all :class:`~pyflink.common.RowKind`. + """ gateway = get_gateway() return ChangelogMode( gateway.jvm.org.apache.flink.table.connector.ChangelogMode.all()) diff --git a/flink-python/pyflink/table/data_view.py b/flink-python/pyflink/table/data_view.py index 424cae8b2eeb6..05d11476411d5 100644 --- a/flink-python/pyflink/table/data_view.py +++ b/flink-python/pyflink/table/data_view.py @@ -18,6 +18,8 @@ from abc import ABC, abstractmethod from typing import TypeVar, Generic, Iterable, List, Any, Iterator, Dict, Tuple +from pyflink.util.api_stability_decorators import PublicEvolving + T = TypeVar('T') K = TypeVar('K') V = TypeVar('V') @@ -25,6 +27,7 @@ __all__ = ['DataView', 'ListView', 'MapView'] +@PublicEvolving() class DataView(ABC): """ A DataView is a collection type that can be used in the accumulator of an user defined @@ -40,6 +43,7 @@ def clear(self) -> None: pass +@PublicEvolving() class ListView(DataView, Generic[T]): """ A :class:`DataView` that provides list-like functionality in the accumulator of an @@ -102,6 +106,7 @@ def __iter__(self) -> Iterator[T]: return iter(self.get()) +@PublicEvolving() class MapView(Generic[K, V]): """ A :class:`DataView` that provides dict-like functionality in the accumulator of an diff --git a/flink-python/pyflink/table/environment_settings.py b/flink-python/pyflink/table/environment_settings.py index 9ed2e4a973adb..051a7a8766133 100644 --- a/flink-python/pyflink/table/environment_settings.py +++ b/flink-python/pyflink/table/environment_settings.py @@ -17,6 +17,7 @@ ################################################################################ from pyflink.java_gateway import get_gateway +from pyflink.util.api_stability_decorators import PublicEvolving from pyflink.util.java_utils import create_url_class_loader from pyflink.common import Configuration @@ -24,6 +25,7 @@ __all__ = ['EnvironmentSettings'] +@PublicEvolving() class EnvironmentSettings(object): """ Defines all parameters that initialize a table environment. Those parameters are used only diff --git a/flink-python/pyflink/table/explain_detail.py b/flink-python/pyflink/table/explain_detail.py index 3b1e417a16d1f..0cdfa0c1e8d9a 100644 --- a/flink-python/pyflink/table/explain_detail.py +++ b/flink-python/pyflink/table/explain_detail.py @@ -18,7 +18,10 @@ __all__ = ['ExplainDetail'] +from pyflink.util.api_stability_decorators import PublicEvolving + +@PublicEvolving() class ExplainDetail(object): """ ExplainDetail defines the types of details for explain result. diff --git a/flink-python/pyflink/table/module.py b/flink-python/pyflink/table/module.py index f4cdc2e922dfe..c343b4d470d1b 100644 --- a/flink-python/pyflink/table/module.py +++ b/flink-python/pyflink/table/module.py @@ -19,7 +19,10 @@ __all__ = ['HiveModule', 'Module', 'ModuleEntry'] +from pyflink.util.api_stability_decorators import PublicEvolving + +@PublicEvolving() class Module(object): """ Modules define a set of metadata, including functions, user defined types, operators, rules, @@ -34,6 +37,7 @@ def __init__(self, j_module): self._j_module = j_module +@PublicEvolving() class HiveModule(Module): """ Module to provide Hive built-in metadata. @@ -51,6 +55,7 @@ def __init__(self, hive_version: str = None): super(HiveModule, self).__init__(j_hive_module) +@PublicEvolving() class ModuleEntry(object): """ A POJO to represent a module's name and use status. diff --git a/flink-python/pyflink/table/resolved_expression.py b/flink-python/pyflink/table/resolved_expression.py index b5953e4e4d519..122cca989a46c 100644 --- a/flink-python/pyflink/table/resolved_expression.py +++ b/flink-python/pyflink/table/resolved_expression.py @@ -19,10 +19,12 @@ from pyflink.table import Expression from pyflink.table.types import DataType, _from_java_data_type +from pyflink.util.api_stability_decorators import PublicEvolving __all__ = ["ResolvedExpression"] +@PublicEvolving() class ResolvedExpression(Expression): """ Expression that has been fully resolved and validated. diff --git a/flink-python/pyflink/table/result_kind.py b/flink-python/pyflink/table/result_kind.py index 16c425fb76fda..ef06600a092b4 100644 --- a/flink-python/pyflink/table/result_kind.py +++ b/flink-python/pyflink/table/result_kind.py @@ -19,7 +19,10 @@ __all__ = ['ResultKind'] +from pyflink.util.api_stability_decorators import PublicEvolving + +@PublicEvolving() class ResultKind(object): """ ResultKind defines the types of the result. diff --git a/flink-python/pyflink/table/schema.py b/flink-python/pyflink/table/schema.py index c70a8c01fee69..c5fdaa96fc2a6 100644 --- a/flink-python/pyflink/table/schema.py +++ b/flink-python/pyflink/table/schema.py @@ -21,6 +21,7 @@ from pyflink.table import Expression from pyflink.table.expression import _get_java_expression from pyflink.table.types import DataType, _to_java_data_type +from pyflink.util.api_stability_decorators import PublicEvolving from pyflink.util.java_utils import to_jarray if TYPE_CHECKING: @@ -31,6 +32,7 @@ __all__ = ['Schema'] +@PublicEvolving() class Schema(object): """ Schema of a table or view. @@ -67,6 +69,7 @@ def __eq__(self, other): def __hash__(self): return self._j_schema.hashCode() + @PublicEvolving() class Builder(object): """ A builder for constructing an immutable but still unresolved Schema. diff --git a/flink-python/pyflink/table/sql_dialect.py b/flink-python/pyflink/table/sql_dialect.py index b78e1e536f64d..b82166d7ba14e 100644 --- a/flink-python/pyflink/table/sql_dialect.py +++ b/flink-python/pyflink/table/sql_dialect.py @@ -16,10 +16,12 @@ # limitations under the License. ################################################################################ from pyflink.java_gateway import get_gateway +from pyflink.util.api_stability_decorators import PublicEvolving __all__ = ['SqlDialect'] +@PublicEvolving() class SqlDialect(object): """ Enumeration of valid SQL compatibility modes. diff --git a/flink-python/pyflink/table/statement_set.py b/flink-python/pyflink/table/statement_set.py index 07ece0a56b116..cd40c82ff9ac7 100644 --- a/flink-python/pyflink/table/statement_set.py +++ b/flink-python/pyflink/table/statement_set.py @@ -22,11 +22,13 @@ from pyflink.table.table_descriptor import TableDescriptor from pyflink.table.table_pipeline import TablePipeline from pyflink.table.table_result import TableResult +from pyflink.util.api_stability_decorators import PublicEvolving from pyflink.util.java_utils import to_j_explain_detail_arr __all__ = ['StatementSet'] +@PublicEvolving() class StatementSet(object): """ A :class:`~StatementSet` accepts pipelines defined by DML statements or :class:`~Table` objects. diff --git a/flink-python/pyflink/table/table.py b/flink-python/pyflink/table/table.py index 8726d3d52b034..6cc0cdb9e4cf4 100644 --- a/flink-python/pyflink/table/table.py +++ b/flink-python/pyflink/table/table.py @@ -35,12 +35,15 @@ from pyflink.table.utils import tz_convert_from_internal, to_expression_jarray from pyflink.table.window import OverWindow, GroupWindow +from pyflink.util.api_stability_decorators import Deprecated, PublicEvolving + from pyflink.util.java_utils import to_jarray from pyflink.util.java_utils import to_j_explain_detail_arr __all__ = ['Table', 'GroupedTable', 'GroupWindowedTable', 'OverWindowedTable', 'WindowGroupedTable'] +@PublicEvolving() class Table(object): """ A :class:`~pyflink.table.Table` object is the core abstraction of the Table API. @@ -959,14 +962,12 @@ def to_pandas(self): import pandas as pd return pd.DataFrame.from_records([], columns=self.get_schema().get_field_names()) + @Deprecated(since="2.1.0", detail=":func:`Table.get_resolved_schema` instead.") def get_schema(self) -> TableSchema: """ Returns the :class:`~pyflink.table.TableSchema` of this table. :return: The schema of this table. - - .. deprecated:: 2.1.0 - Use :func:`Table.get_resolved_schema` instead. """ return TableSchema(j_table_schema=self._j_table.getSchema()) @@ -1175,6 +1176,7 @@ def insert_into( ) +@PublicEvolving() class GroupedTable(object): """ A table that has been grouped on a set of grouping keys. @@ -1299,6 +1301,7 @@ def flat_aggregate(self, func: Union[Expression, UserDefinedAggregateFunctionWra return FlatAggregateTable(self._j_table.flatAggregate(func._j_expr), self._t_env) +@PublicEvolving() class GroupWindowedTable(object): """ A table that has been windowed for :class:`~pyflink.table.GroupWindow`. @@ -1338,6 +1341,7 @@ def group_by(self, *fields: Expression) -> 'WindowGroupedTable': self._j_table.groupBy(to_expression_jarray(fields)), self._t_env) +@PublicEvolving() class WindowGroupedTable(object): """ A table that has been windowed and grouped for :class:`~pyflink.table.window.GroupWindow`. @@ -1423,6 +1427,7 @@ def _to_expr(self, func: UserDefinedAggregateFunctionWrapper) -> Expression: return func_expression +@PublicEvolving() class OverWindowedTable(object): """ A table that has been windowed for :class:`~pyflink.table.window.OverWindow`. @@ -1455,6 +1460,7 @@ def select(self, *fields: Expression) -> 'Table': return Table(self._j_table.select(to_expression_jarray(fields)), self._t_env) +@PublicEvolving() class AggregatedTable(object): """ A table that has been performed on the aggregate function. @@ -1493,6 +1499,7 @@ def select(self, *fields: Expression) -> 'Table': return Table(self._j_table.select(to_expression_jarray(fields)), self._t_env) +@PublicEvolving() class FlatAggregateTable(object): """ A table that performs flatAggregate on a :class:`~pyflink.table.Table`, a diff --git a/flink-python/pyflink/table/table_config.py b/flink-python/pyflink/table/table_config.py index 9c80e59a49a75..e8b32ef9c2a2e 100644 --- a/flink-python/pyflink/table/table_config.py +++ b/flink-python/pyflink/table/table_config.py @@ -22,12 +22,14 @@ from pyflink.common.configuration import Configuration from pyflink.java_gateway import get_gateway from pyflink.table.sql_dialect import SqlDialect +from pyflink.util.api_stability_decorators import PublicEvolving, Internal __all__ = ['TableConfig'] from pyflink.util.java_utils import add_jars_to_context_class_loader +@PublicEvolving() class TableConfig(object): """ Configuration for the current :class:`TableEnvironment` session to adjust Table & SQL API @@ -207,6 +209,7 @@ def add_configuration(self, configuration: Configuration): """ self._j_table_config.addConfiguration(configuration._j_configuration) + @Internal() def to_map(self) -> dict: """ Calls the toMap method of the underlying Java TableConfig to get the configuration map. diff --git a/flink-python/pyflink/table/table_descriptor.py b/flink-python/pyflink/table/table_descriptor.py index 1ac96bdaadb89..01ab4572a32ee 100644 --- a/flink-python/pyflink/table/table_descriptor.py +++ b/flink-python/pyflink/table/table_descriptor.py @@ -20,11 +20,13 @@ from pyflink.common.config_options import ConfigOption from pyflink.java_gateway import get_gateway from pyflink.table.schema import Schema +from pyflink.util.api_stability_decorators import PublicEvolving from pyflink.util.java_utils import to_jarray __all__ = ['TableDescriptor', 'FormatDescriptor'] +@PublicEvolving() class TableDescriptor(object): """ Describes a CatalogTable representing a source or sink. @@ -83,6 +85,7 @@ def __eq__(self, other): def __hash__(self): return self._j_table_descriptor.hashCode() + @PublicEvolving() class Builder(object): """ Builder for TableDescriptor. diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py index 245cf1ea14815..b927b87b01370 100644 --- a/flink-python/pyflink/table/table_environment.py +++ b/flink-python/pyflink/table/table_environment.py @@ -45,6 +45,7 @@ from pyflink.table.udf import UserDefinedFunctionWrapper, AggregateFunction, udaf, \ udtaf, TableAggregateFunction from pyflink.table.utils import to_expression_jarray +from pyflink.util.api_stability_decorators import PublicEvolving, Deprecated from pyflink.util.java_utils import get_j_env_configuration, is_local_deployment, load_java_class, \ to_j_explain_detail_arr, to_jarray, get_field @@ -54,6 +55,7 @@ ] +@PublicEvolving() class TableEnvironment(object): """ A table environment is the base class, entry point, and central context for creating Table @@ -128,6 +130,11 @@ def create_catalog(self, catalog_name: str, catalog_descriptor: CatalogDescripto """ self._j_tenv.createCatalog(catalog_name, catalog_descriptor._j_catalog_descriptor) + @Deprecated(since="2.1.0", detail=""" + Use :func:`create_catalog` instead. The new method uses a + :class:`~pyflink.table.catalog.CatalogDescriptor` to initialize the catalog instance and store + the `~pyflink.table.catalog.CatalogDescriptor` in the catalog store. + """) def register_catalog(self, catalog_name: str, catalog: Catalog): """ Registers a :class:`~pyflink.table.catalog.Catalog` under a unique name. diff --git a/flink-python/pyflink/table/table_pipeline.py b/flink-python/pyflink/table/table_pipeline.py index e2c82ac2ab461..912dd7a8d0bd9 100644 --- a/flink-python/pyflink/table/table_pipeline.py +++ b/flink-python/pyflink/table/table_pipeline.py @@ -21,11 +21,13 @@ from pyflink.table import ExplainDetail from pyflink.table.catalog import ObjectIdentifier from pyflink.table.table_result import TableResult +from pyflink.util.api_stability_decorators import PublicEvolving from pyflink.util.java_utils import to_j_explain_detail_arr __all__ = ["TablePipeline"] +@PublicEvolving() class TablePipeline(object): """ Describes a complete pipeline from one or more source tables to a sink table. diff --git a/flink-python/pyflink/table/table_result.py b/flink-python/pyflink/table/table_result.py index 3c842c3d19e96..2073701417a35 100644 --- a/flink-python/pyflink/table/table_result.py +++ b/flink-python/pyflink/table/table_result.py @@ -28,10 +28,12 @@ from pyflink.table.table_schema import TableSchema from pyflink.table.types import _from_java_data_type from pyflink.table.utils import pickled_bytes_to_python_converter +from pyflink.util.api_stability_decorators import PublicEvolving, Deprecated __all__ = ['TableResult', 'CloseableIterator'] +@PublicEvolving() class TableResult(object): """ A :class:`~pyflink.table.TableResult` is the representation of the statement execution result. @@ -76,6 +78,14 @@ def wait(self, timeout_ms: int = None): else: get_method(self._j_table_result, "await")() + @Deprecated(since="2.1.0", detail=""" + This function has been deprecated as part of FLIP-164. + :class:`~pyflink.table.table_schema.TableSchema` has been replaced by two more + dedicated classes :class:`~pyflink.table.Schema` and + :class:`~pyflink.table.catalog.ResolvedSchema`. Use :class:`~pyflink.table.Schema` for + declaration in APIs. :class:`~pyflink.table.catalog.ResolvedSchema` is offered by the + framework after resolution and validation. + """) def get_table_schema(self) -> TableSchema: """ Returns the schema of the result. @@ -84,13 +94,6 @@ def get_table_schema(self) -> TableSchema: :rtype: pyflink.table.TableSchema .. versionadded:: 1.11.0 - .. deprecated:: 2.1.0 - This function has been deprecated as part of FLIP-164. - :class:`~pyflink.table.table_schema.TableSchema` has been replaced by two more - dedicated classes :class:`~pyflink.table.Schema` and - :class:`~pyflink.table.catalog.ResolvedSchema`. Use :class:`~pyflink.table.Schema` for - declaration in APIs. :class:`~pyflink.table.catalog.ResolvedSchema` is offered by the - framework after resolution and validation. """ return TableSchema(j_table_schema=self._get_java_table_schema()) diff --git a/flink-python/pyflink/table/table_schema.py b/flink-python/pyflink/table/table_schema.py index fd2dbd941fd64..878d688531ea8 100644 --- a/flink-python/pyflink/table/table_schema.py +++ b/flink-python/pyflink/table/table_schema.py @@ -19,16 +19,23 @@ from pyflink.java_gateway import get_gateway from pyflink.table.types import DataType, RowType, _to_java_data_type, _from_java_data_type +from pyflink.util.api_stability_decorators import Deprecated from pyflink.util.java_utils import to_jarray __all__ = ['TableSchema'] +@Deprecated(since="2.1.0", detail=""" +This class has been deprecated as part of FLIP-164. It has been replaced by two more dedicated +classes :class:`~pyflink.table.Schema` and :class:`~pyflink.table.catalog.ResolvedSchema`. +Use :class:`~pyflink.table.Schema` for declaration in APIs. +:class:`~pyflink.table.catalog.ResolvedSchema` is offered by the framework after resolution and +validation. +""") class TableSchema(object): """ A table schema that represents a table's structure with field names and data types. """ - def __init__(self, field_names: List[str] = None, data_types: List[DataType] = None, j_table_schema=None): if j_table_schema is None: diff --git a/flink-python/pyflink/table/types.py b/flink-python/pyflink/table/types.py index 99e6676d6875b..d2f7547152615 100644 --- a/flink-python/pyflink/table/types.py +++ b/flink-python/pyflink/table/types.py @@ -32,6 +32,7 @@ from typing import List, Union from pyflink.common.types import _create_row +from pyflink.util.api_stability_decorators import PublicEvolving from pyflink.util.java_utils import to_jarray, is_instance_of from pyflink.java_gateway import get_gateway from pyflink.common import Row, RowKind @@ -39,6 +40,7 @@ __all__ = ['DataTypes', 'UserDefinedType', 'Row', 'RowKind'] +@PublicEvolving() class DataType(object): """ Describes the data type of a value in the table ecosystem. Instances of this class can be used diff --git a/flink-python/pyflink/table/udf.py b/flink-python/pyflink/table/udf.py index 90e49653b60a1..5b63cda8e94af 100644 --- a/flink-python/pyflink/table/udf.py +++ b/flink-python/pyflink/table/udf.py @@ -25,11 +25,13 @@ from pyflink.table import Expression from pyflink.table.types import DataType, _to_java_data_type from pyflink.util import java_utils +from pyflink.util.api_stability_decorators import PublicEvolving, Internal __all__ = ['FunctionContext', 'AggregateFunction', 'ScalarFunction', 'TableFunction', 'TableAggregateFunction', 'udf', 'udtf', 'udaf', 'udtaf'] +@PublicEvolving() class FunctionContext(object): """ Used to obtain global runtime information about the context in which the @@ -65,6 +67,7 @@ def get_job_parameter(self, key: str, default_value: str) -> str: return self._job_parameters[key] if key in self._job_parameters else default_value +@PublicEvolving() class UserDefinedFunction(abc.ABC): """ Base interface for user-defined function. @@ -102,6 +105,7 @@ def is_deterministic(self) -> bool: return True +@PublicEvolving() class ScalarFunction(UserDefinedFunction): """ Base interface for user-defined scalar function. A user-defined scalar functions maps zero, one, @@ -118,6 +122,7 @@ def eval(self, *args): pass +@PublicEvolving() class TableFunction(UserDefinedFunction): """ Base interface for user-defined table function. A user-defined table function creates zero, one, @@ -138,6 +143,7 @@ def eval(self, *args): ACC = TypeVar('ACC') +@PublicEvolving() class ImperativeAggregateFunction(UserDefinedFunction, Generic[T, ACC]): """ Base interface for user-defined aggregate function and table aggregate function. @@ -211,6 +217,7 @@ def get_accumulator_type(self) -> Union[DataType, str]: raise RuntimeError("Method get_accumulator_type is not implemented") +@PublicEvolving() class AggregateFunction(ImperativeAggregateFunction): """ Base interface for user-defined aggregate function. A user-defined aggregate function maps @@ -232,6 +239,7 @@ def get_value(self, accumulator: ACC) -> T: # type: ignore[type-var] pass +@PublicEvolving() class TableAggregateFunction(ImperativeAggregateFunction): """ Base class for a user-defined table aggregate function. A user-defined table aggregate function @@ -255,6 +263,7 @@ def emit_value(self, accumulator: ACC) -> Iterable[T]: pass +@Internal() class DelegatingScalarFunction(ScalarFunction): """ Helper scalar function implementation for lambda expression and python function. It's for @@ -268,6 +277,7 @@ def eval(self, *args): return self.func(*args) +@Internal() class DelegationTableFunction(TableFunction): """ Helper table function implementation for lambda expression and python function. It's for @@ -281,6 +291,7 @@ def eval(self, *args): return self.func(*args) +@Internal() class DelegatingPandasAggregateFunction(AggregateFunction): """ Helper pandas aggregate function implementation for lambda expression and python function. @@ -319,6 +330,7 @@ def close(self): self.func.close() +@Internal() class UserDefinedFunctionWrapper(object): """ Base Wrapper for Python user-defined function. It handles things like converting lambda diff --git a/flink-python/pyflink/table/window.py b/flink-python/pyflink/table/window.py index 4b35bd4fd88e8..1ceb1399c5677 100644 --- a/flink-python/pyflink/table/window.py +++ b/flink-python/pyflink/table/window.py @@ -20,6 +20,7 @@ from pyflink.java_gateway import get_gateway from pyflink.table import Expression from pyflink.table.expression import _get_java_expression +from pyflink.util.api_stability_decorators import PublicEvolving __all__ = [ 'Tumble', @@ -33,6 +34,7 @@ from pyflink.table.utils import to_expression_jarray +@PublicEvolving() class GroupWindow(object): """ A group window specification. @@ -51,6 +53,7 @@ def __init__(self, java_window): self._java_window = java_window +@PublicEvolving() class Tumble(object): """ Helper class for creating a tumbling window. Tumbling windows are consecutive, non-overlapping @@ -79,6 +82,7 @@ def over(cls, size: Expression) -> 'TumbleWithSize': return TumbleWithSize(get_gateway().jvm.Tumble.over(_get_java_expression(size))) +@PublicEvolving() class TumbleWithSize(object): """ Tumbling window. @@ -106,6 +110,7 @@ def on(self, time_field: Expression) -> 'TumbleWithSizeOnTime': return TumbleWithSizeOnTime(self._java_window.on(_get_java_expression(time_field))) +@PublicEvolving() class TumbleWithSizeOnTime(object): """ Tumbling window on time. You need to assign an alias for the window. @@ -128,6 +133,7 @@ def alias(self, alias: str) -> 'GroupWindow': return GroupWindow(get_method(self._java_window, "as")(alias)) +@PublicEvolving() class Session(object): """ Helper class for creating a session window. The boundary of session windows are defined by @@ -157,6 +163,7 @@ def with_gap(cls, gap: Expression) -> 'SessionWithGap': return SessionWithGap(get_gateway().jvm.Session.withGap(_get_java_expression(gap))) +@PublicEvolving() class SessionWithGap(object): """ Session window. @@ -184,6 +191,7 @@ def on(self, time_field: Expression) -> 'SessionWithGapOnTime': return SessionWithGapOnTime(self._java_window.on(_get_java_expression(time_field))) +@PublicEvolving() class SessionWithGapOnTime(object): """ Session window on time. You need to assign an alias for the window. @@ -206,6 +214,7 @@ def alias(self, alias: str) -> 'GroupWindow': return GroupWindow(get_method(self._java_window, "as")(alias)) +@PublicEvolving() class Slide(object): """ Helper class for creating a sliding window. Sliding windows have a fixed size and slide by @@ -243,6 +252,7 @@ def over(cls, size: Expression) -> 'SlideWithSize': return SlideWithSize(get_gateway().jvm.Slide.over(_get_java_expression(size))) +@PublicEvolving() class SlideWithSize(object): """ Partially specified sliding window. The size of the window either as time or row-count @@ -269,6 +279,7 @@ def every(self, slide: Expression) -> 'SlideWithSizeAndSlide': return SlideWithSizeAndSlide(self._java_window.every(_get_java_expression(slide))) +@PublicEvolving() class SlideWithSizeAndSlide(object): """ Sliding window. The size of the window either as time or row-count interval. @@ -293,6 +304,7 @@ def on(self, time_field: Expression) -> 'SlideWithSizeAndSlideOnTime': return SlideWithSizeAndSlideOnTime(self._java_window.on(_get_java_expression(time_field))) +@PublicEvolving() class SlideWithSizeAndSlideOnTime(object): """ Sliding window on time. You need to assign an alias for the window. @@ -315,6 +327,7 @@ def alias(self, alias: str) -> 'GroupWindow': return GroupWindow(get_method(self._java_window, "as")(alias)) +@PublicEvolving() class Over(object): """ Helper class for creating an over window. Similar to SQL, over window aggregates compute an @@ -361,6 +374,7 @@ def partition_by(cls, *partition_by: Expression) -> 'OverWindowPartitioned': to_expression_jarray(partition_by))) +@PublicEvolving() class OverWindowPartitionedOrdered(object): """ Partially defined over window with (optional) partitioning and order. @@ -389,6 +403,7 @@ def preceding(self, preceding: Expression) -> 'OverWindowPartitionedOrderedPrece self._java_over_window.preceding(_get_java_expression(preceding))) +@PublicEvolving() class OverWindowPartitionedOrderedPreceding(object): """ Partially defined over window with (optional) partitioning, order, and preceding. @@ -418,6 +433,7 @@ def following(self, following: Expression) -> 'OverWindowPartitionedOrderedPrece self._java_over_window.following(_get_java_expression(following))) +@PublicEvolving() class OverWindowPartitioned(object): """ Partially defined over window with partitioning. @@ -442,6 +458,7 @@ def order_by(self, order_by: Expression) -> 'OverWindowPartitionedOrdered': _get_java_expression(order_by))) +@PublicEvolving() class OverWindow(object): """ An over window specification. diff --git a/flink-python/pyflink/util/api_stability_decorators.py b/flink-python/pyflink/util/api_stability_decorators.py new file mode 100644 index 0000000000000..3b891e97fa1df --- /dev/null +++ b/flink-python/pyflink/util/api_stability_decorators.py @@ -0,0 +1,215 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from inspect import getmembers, isfunction, isclass +from typing import TypeVar, Callable, Any, Union, Type, Optional +from abc import ABCMeta, abstractmethod +import warnings +from typing_extensions import override +from textwrap import dedent, indent + +__all__ = ["Deprecated", "Experimental", "Internal", "PublicEvolving"] + +# TypeVar for anything callable (function or class) +T = TypeVar("T", bound=Union[Callable[..., Any], Type[Any]]) + + +class BaseAPIStabilityDecorator(metaclass=ABCMeta): + """ + Base class for implementing API stability decorators. + + This abstract base class provides the foundation for creating decorators that + mark API elements (functions or classes) with stability indicators. It handles + the mechanics of applying documentation directives to both standalone functions + and entire classes, including their public methods. + """ + + @abstractmethod + def get_directive(self, func_or_cls: T) -> str: + """ + Returns the Sphinx directive that should be appended to the docs of the function/class + for the given decorator. + """ + pass + + @staticmethod + def _get_element_type_name(func_or_cls: T) -> str: + """ + Returns a string representation of the API element's type. + """ + if isfunction(func_or_cls): + return "function" + elif isclass(func_or_cls): + return "class" + else: + return "API" + + def __call__(self, func_or_cls: T) -> T: + """ + Appends a directive to the docstring of the given function or class. + If a class, it also appends the directive to the docstrings of the public functions + of that class. + """ + directive = dedent(self.get_directive(func_or_cls)) + + docstring = func_or_cls.__doc__ or "" + + # Class/Function docstrings can be at an arbitrary level of indentation depending on the + # depth. We should dedent the docstring here so that we can insert the directive at the + # correct indentation. + docstring = dedent(docstring) + + # Avoid duplicating directives if already present in the docstring. + if directive not in docstring: + func_or_cls.__doc__ = f"{docstring}\n{directive}" + + if isclass(func_or_cls): + for name, method in getmembers(func_or_cls, isfunction): + if not name.startswith("_"): + method_docstring = method.__doc__ or "" + method_docstring = dedent(method_docstring) + + if directive not in method_docstring: + method.__doc__ = f"{method_docstring}\n{directive}" + + return func_or_cls + + +class Deprecated(BaseAPIStabilityDecorator): + """ + Decorator to mark classes and functions as deprecated since a certain version, with an + optional extra-details parameter. + + Example: + + .. code-block:: python + + @Deprecated(since="1.2.3", detail="Use :class:`MyNewClass` instead) + class MyClass: + + @Deprecated(since="1.0.0") + def func(self): + pass + + :param str since: The version that this class/function was deprecated in. + :param str detail: Optional explanatory detail for the deprecation. + """ + + def __init__(self, since: str, detail: Optional[str] = None): + self.since = since + self.detail = detail + + def get_directive(self, func_or_cls: T) -> str: + return f".. deprecated:: {self.since}\n{indent(dedent(self.detail), ' ')}" + + @override + def __call__(self, func_or_cls: T) -> T: + """ + Emit a warning on the deprecation of the given function/class. Then call the base class + for docstring modification. + """ + msg = f"{func_or_cls.__qualname__} has been deprecated since version {self.since}." + if self.detail is not None: + msg = f"{msg} {self.detail}" + + warnings.warn(msg, category=DeprecationWarning, stacklevel=2) + return super().__call__(func_or_cls) + + +class Experimental(BaseAPIStabilityDecorator): + """ + Decorator to mark classes for experimental use. + + Classes with this annotation are neither battle-tested nor stable, and may be changed or + removed in future versions. + + Example: + + .. code-block:: python + + @Experimental() + class MyClass: + + @Experimental() + def func(self): + pass + + """ + + def get_directive(self, func_or_cls: T) -> str: + return f""" +.. warning:: This *{self._get_element_type_name(func_or_cls)}* is marked as **experimental**. It + is neither battle-tested nor stable, and may be changed or removed in future + versions. + """ + + +class Internal(BaseAPIStabilityDecorator): + """ + Decorator to mark functions within stable, public APIs as an internal developer API. + + Developer APIs are stable but internal to Flink and might change across releases. + + Example: + + .. code-block:: python + + @Internal() + class MyClass: + + @Internal() + def func(self): + pass + + """ + + def get_directive(self, func_or_cls: T) -> str: + return f""" +.. caution:: This *{self._get_element_type_name(func_or_cls)}* is marked as **internal**. + It as an internal developer API, which are stable but internal to Flink and + might change across versions. + """ + + +class PublicEvolving(BaseAPIStabilityDecorator): + """ + Decorator to mark classes and functions for public use, but with evolving interfaces. + + Classes and functions with this decorator are intended for public use and have stable behaviour. + However, their interfaces and signatures are not considered to be stable and might be changed + across versions. + + Example: + + .. code-block:: python + + @PublicEvolving() + class MyClass: + + @PublicEvolving() + def func(self): + pass + + """ + + def get_directive(self, func_or_cls: T) -> str: + return f""" +.. note:: This *{self._get_element_type_name(func_or_cls)}* is marked as **evolving**. It is + intended for public use and has stable behaviour. However, its interface/signature is + not considered to be stable and might be changed across versions. + """