diff --git a/python/hsfs/core/constants.py b/python/hsfs/core/constants.py index d6af380185..74f14aca78 100644 --- a/python/hsfs/core/constants.py +++ b/python/hsfs/core/constants.py @@ -27,8 +27,10 @@ ) initialise_expectation_suite_for_single_expectation_api_message = "Initialize Expectation Suite by attaching to a Feature Group to enable single expectation API" -# Numpy +HAS_ARROW: bool = importlib.util.find_spec("pyarrow") is not None +HAS_PANDAS: bool = importlib.util.find_spec("pandas") is not None HAS_NUMPY: bool = importlib.util.find_spec("numpy") is not None +HAS_POLARS: bool = importlib.util.find_spec("polars") is not None # SQL packages HAS_SQLALCHEMY: bool = importlib.util.find_spec("sqlalchemy") is not None diff --git a/python/hsfs/core/type_systems.py b/python/hsfs/core/type_systems.py new file mode 100644 index 0000000000..99fcc16f6c --- /dev/null +++ b/python/hsfs/core/type_systems.py @@ -0,0 +1,385 @@ +# +# Copyright 2024 Hopsworks AB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import annotations + +import ast +import datetime +import decimal +from typing import TYPE_CHECKING, Literal, Union + +import pytz +from hsfs.core.constants import HAS_ARROW, HAS_PANDAS, HAS_POLARS + + +if TYPE_CHECKING: + import numpy as np + import pandas as pd + import polars as pl + +if HAS_ARROW: + import pyarrow as pa + + # Decimal types are currently not supported + _INT_TYPES = [pa.uint8(), pa.uint16(), pa.int8(), pa.int16(), pa.int32()] + _BIG_INT_TYPES = [pa.uint32(), pa.int64()] + _FLOAT_TYPES = [pa.float16(), pa.float32()] + _DOUBLE_TYPES = [pa.float64()] + _TIMESTAMP_UNIT = ["ns", "us", "ms", "s"] + _BOOLEAN_TYPES = [pa.bool_()] + _STRING_TYPES = [pa.string(), pa.large_string()] + _DATE_TYPES = [pa.date32(), pa.date64()] + _BINARY_TYPES = [pa.binary(), pa.large_binary()] + + PYARROW_HOPSWORKS_DTYPE_MAPPING = { + **dict.fromkeys(_INT_TYPES, "int"), + **dict.fromkeys(_BIG_INT_TYPES, "bigint"), + **dict.fromkeys(_FLOAT_TYPES, "float"), + **dict.fromkeys(_DOUBLE_TYPES, "double"), + **dict.fromkeys( + [ + *[pa.timestamp(unit) for unit in _TIMESTAMP_UNIT], + *[ + pa.timestamp(unit, tz=tz) + for unit in _TIMESTAMP_UNIT + for tz in pytz.all_timezones + ], + ], + "timestamp", + ), + **dict.fromkeys(_BOOLEAN_TYPES, "boolean"), + **dict.fromkeys( + [ + *_STRING_TYPES, + # Category type in pandas stored as dictinoary in pyarrow + *[ + pa.dictionary( + value_type=value_type, index_type=index_type, ordered=ordered + ) + for value_type in _STRING_TYPES + for index_type in _INT_TYPES + _BIG_INT_TYPES + for ordered in [True, False] + ], + ], + "string", + ), + **dict.fromkeys(_DATE_TYPES, "date"), + **dict.fromkeys(_BINARY_TYPES, "binary"), + } +else: + PYARROW_HOPSWORKS_DTYPE_MAPPING = {} + +# python cast column to offline type +if HAS_POLARS: + import polars as pl + + polars_offline_dtype_mapping = { + "bigint": pl.Int64, + "int": pl.Int32, + "smallint": pl.Int16, + "tinyint": pl.Int8, + "float": pl.Float32, + "double": pl.Float64, + } + + _polars_online_dtype_mapping = { + "bigint": pl.Int64, + "int": pl.Int32, + "smallint": pl.Int16, + "tinyint": pl.Int8, + "float": pl.Float32, + "double": pl.Float64, + } + +if HAS_PANDAS: + import numpy as np + import pandas as pd + + pandas_offline_dtype_mapping = { + "bigint": pd.Int64Dtype(), + "int": pd.Int32Dtype(), + "smallint": pd.Int16Dtype(), + "tinyint": pd.Int8Dtype(), + "float": np.dtype("float32"), + "double": np.dtype("float64"), + } + + pandas_online_dtype_mapping = { + "bigint": pd.Int64Dtype(), + "int": pd.Int32Dtype(), + "smallint": pd.Int16Dtype(), + "tinyint": pd.Int8Dtype(), + "float": np.dtype("float32"), + "double": np.dtype("float64"), + } + + +def convert_pandas_dtype_to_offline_type(arrow_type: str) -> str: + # This is a simple type conversion between pandas dtypes and pyspark (hive) types, + # using pyarrow types obatined from pandas dataframe to convert pandas typed fields, + # A recurisive function "convert_pandas_object_type_to_offline_type" is used to convert complex types like lists and structures + # "_onvert_simple_pandas_dtype_to_offline_type" is used to convert simple types + # In the backend, the types specified here will also be used for mapping to Avro types. + if ( + pa.types.is_list(arrow_type) + or pa.types.is_large_list(arrow_type) + or pa.types.is_struct(arrow_type) + ): + return convert_pandas_object_type_to_offline_type(arrow_type) + + return convert_simple_pandas_dtype_to_offline_type(arrow_type) + + +def convert_pandas_object_type_to_offline_type(arrow_type: str) -> str: + if pa.types.is_list(arrow_type) or pa.types.is_large_list(arrow_type): + # figure out sub type + sub_arrow_type = arrow_type.value_type + subtype = convert_pandas_dtype_to_offline_type(sub_arrow_type) + return "array<{}>".format(subtype) + if pa.types.is_struct(arrow_type): + struct_schema = {} + for index in range(arrow_type.num_fields): + struct_schema[arrow_type.field(index).name] = ( + convert_pandas_dtype_to_offline_type(arrow_type.field(index).type) + ) + return ( + "struct<" + + ",".join([f"{key}:{value}" for key, value in struct_schema.items()]) + + ">" + ) + + raise ValueError(f"dtype 'O' (arrow_type '{str(arrow_type)}') not supported") + + +def cast_pandas_column_to_offline_type( + feature_column: pd.Series, offline_type: str +) -> pd.Series: + offline_type = offline_type.lower() + if offline_type == "timestamp": + return pd.to_datetime(feature_column, utc=True).dt.tz_localize(None) + elif offline_type == "date": + return pd.to_datetime(feature_column, utc=True).dt.date + elif ( + offline_type.startswith("array<") + or offline_type.startswith("struct<") + or offline_type == "boolean" + ): + return feature_column.apply( + lambda x: (ast.literal_eval(x) if isinstance(x, str) else x) + if (x is not None and x != "") + else None + ) + elif offline_type == "string": + return feature_column.apply(lambda x: str(x) if x is not None else None) + elif offline_type.startswith("decimal"): + return feature_column.apply( + lambda x: decimal.Decimal(x) if (x is not None) else None + ) + else: + if offline_type in pandas_offline_dtype_mapping: + return feature_column.astype(pandas_offline_dtype_mapping[offline_type]) + else: + return feature_column # handle gracefully, just return the column as-is + + +def cast_polars_column_to_offline_type( + feature_column: pl.Series, offline_type: str +) -> pl.Series: + offline_type = offline_type.lower() + if offline_type == "timestamp": + # convert (if tz!=UTC) to utc, then make timezone unaware + return feature_column.cast(pl.Datetime(time_zone=None)) + elif offline_type == "date": + return feature_column.cast(pl.Date) + elif ( + offline_type.startswith("array<") + or offline_type.startswith("struct<") + or offline_type == "boolean" + ): + return feature_column.map_elements( + lambda x: (ast.literal_eval(x) if isinstance(x, str) else x) + if (x is not None and x != "") + else None + ) + elif offline_type == "string": + return feature_column.map_elements(lambda x: str(x) if x is not None else None) + elif offline_type.startswith("decimal"): + return feature_column.map_elements( + lambda x: decimal.Decimal(x) if (x is not None) else None + ) + else: + if offline_type in polars_offline_dtype_mapping: + return feature_column.cast(polars_offline_dtype_mapping[offline_type]) + else: + return feature_column # handle gracefully, just return the column as-is + + +def cast_column_to_offline_type( + feature_column: Union[pd.Series, pl.Series], offline_type: str +) -> pd.Series: + if isinstance(feature_column, pd.Series): + return cast_pandas_column_to_offline_type(feature_column, offline_type.lower()) + elif isinstance(feature_column, pl.Series): + return cast_polars_column_to_offline_type(feature_column, offline_type.lower()) + + +def cast_column_to_online_type( + feature_column: pd.Series, online_type: str +) -> pd.Series: + online_type = online_type.lower() + if online_type == "timestamp": + # convert (if tz!=UTC) to utc, then make timezone unaware + return pd.to_datetime(feature_column, utc=True).dt.tz_localize(None) + elif online_type == "date": + return pd.to_datetime(feature_column, utc=True).dt.date + elif online_type.startswith("varchar") or online_type == "text": + return feature_column.apply(lambda x: str(x) if x is not None else None) + elif online_type == "boolean": + return feature_column.apply( + lambda x: (ast.literal_eval(x) if isinstance(x, str) else x) + if (x is not None and x != "") + else None + ) + elif online_type.startswith("decimal"): + return feature_column.apply( + lambda x: decimal.Decimal(x) if (x is not None) else None + ) + else: + if online_type in pandas_online_dtype_mapping: + casted_feature = feature_column.astype( + pandas_online_dtype_mapping[online_type] + ) + return casted_feature + else: + return feature_column # handle gracefully, just return the column as-is + + +def convert_simple_pandas_dtype_to_offline_type(arrow_type: str) -> str: + try: + return PYARROW_HOPSWORKS_DTYPE_MAPPING[arrow_type] + except KeyError as err: + raise ValueError(f"dtype '{arrow_type}' not supported") from err + + +def translate_legacy_spark_type( + output_type: str, +) -> Literal[ + "STRING", + "BINARY", + "BYTE", + "SHORT", + "INT", + "LONG", + "FLOAT", + "DOUBLE", + "TIMESTAMP", + "DATE", + "BOOLEAN", +]: + if output_type == "StringType()": + return "STRING" + elif output_type == "BinaryType()": + return "BINARY" + elif output_type == "ByteType()": + return "BYTE" + elif output_type == "ShortType()": + return "SHORT" + elif output_type == "IntegerType()": + return "INT" + elif output_type == "LongType()": + return "LONG" + elif output_type == "FloatType()": + return "FLOAT" + elif output_type == "DoubleType()": + return "DOUBLE" + elif output_type == "TimestampType()": + return "TIMESTAMP" + elif output_type == "DateType()": + return "DATE" + elif output_type == "BooleanType()": + return "BOOLEAN" + else: + return "STRING" # handle gracefully, and return STRING type, the default for spark udfs + + +def convert_spark_type_to_offline_type(spark_type_string: str) -> str: + if spark_type_string.endswith("Type()"): + spark_type_string = translate_legacy_spark_type(spark_type_string) + if spark_type_string == "STRING": + return "STRING" + elif spark_type_string == "BINARY": + return "BINARY" + elif spark_type_string == "BYTE": + return "INT" + elif spark_type_string == "SHORT": + return "INT" + elif spark_type_string == "INT": + return "INT" + elif spark_type_string == "LONG": + return "BIGINT" + elif spark_type_string == "FLOAT": + return "FLOAT" + elif spark_type_string == "DOUBLE": + return "DOUBLE" + elif spark_type_string == "TIMESTAMP": + return "TIMESTAMP" + elif spark_type_string == "DATE": + return "DATE" + elif spark_type_string == "BOOLEAN": + return "BOOLEAN" + else: + raise ValueError( + f"Return type {spark_type_string} not supported for transformation functions." + ) + + +def infer_spark_type(output_type): + if not output_type: + return "STRING" # STRING is default type for spark udfs + + if isinstance(output_type, str): + if output_type.endswith("Type()"): + return translate_legacy_spark_type(output_type) + output_type = output_type.lower() + + if output_type in (str, "str", "string"): + return "STRING" + elif output_type in (bytes, "binary"): + return "BINARY" + elif output_type in (np.int8, "int8", "byte", "tinyint"): + return "BYTE" + elif output_type in (np.int16, "int16", "short", "smallint"): + return "SHORT" + elif output_type in (int, "int", "integer", np.int32): + return "INT" + elif output_type in (np.int64, "int64", "long", "bigint"): + return "LONG" + elif output_type in (float, "float"): + return "FLOAT" + elif output_type in (np.float64, "float64", "double"): + return "DOUBLE" + elif output_type in ( + datetime.datetime, + np.datetime64, + "datetime", + "timestamp", + ): + return "TIMESTAMP" + elif output_type in (datetime.date, "date"): + return "DATE" + elif output_type in (bool, "boolean", "bool"): + return "BOOLEAN" + else: + raise TypeError("Not supported type %s." % output_type) diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 034a724a72..6e06af0588 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -15,8 +15,6 @@ # from __future__ import annotations -import ast -import decimal import json import math import numbers @@ -40,6 +38,11 @@ Union, ) +from hsfs.core.type_systems import ( + cast_column_to_offline_type, + cast_column_to_online_type, +) + if TYPE_CHECKING: import great_expectations @@ -50,7 +53,6 @@ import pandas as pd import polars as pl import pyarrow as pa -import pytz from botocore.response import StreamingBody from hsfs import ( client, @@ -78,7 +80,13 @@ training_dataset_job_conf, transformation_function_engine, ) -from hsfs.core.constants import HAS_AIOMYSQL, HAS_GREAT_EXPECTATIONS, HAS_SQLALCHEMY +from hsfs.core.constants import ( + HAS_AIOMYSQL, + HAS_ARROW, + HAS_GREAT_EXPECTATIONS, + HAS_PANDAS, + HAS_SQLALCHEMY, +) from hsfs.core.feature_view_engine import FeatureViewEngine from hsfs.core.vector_db_client import VectorDbClient from hsfs.decorators import uses_great_expectations @@ -91,71 +99,16 @@ if HAS_GREAT_EXPECTATIONS: import great_expectations +if HAS_ARROW: + from hsfs.core.type_systems import PYARROW_HOPSWORKS_DTYPE_MAPPING if HAS_AIOMYSQL and HAS_SQLALCHEMY: from hsfs.core import util_sql if HAS_SQLALCHEMY: from sqlalchemy import sql - -PYARROW_EXTENSION_ENABLE = False -try: - import pandas as pd - from packaging.version import Version - - if Version(pd.__version__) > Version("2.0"): - PYARROW_EXTENSION_ENABLE = True - else: - PYARROW_EXTENSION_ENABLE = False -except Exception: - PYARROW_EXTENSION_ENABLE = False # Set PYARROW_EXTENSION_ENABLE to false if pyarrow or pandas cannot be imported - -# Decimal types are currently not supported -_INT_TYPES = [pa.uint8(), pa.uint16(), pa.int8(), pa.int16(), pa.int32()] -_BIG_INT_TYPES = [pa.uint32(), pa.int64()] -_FLOAT_TYPES = [pa.float16(), pa.float32()] -_DOUBLE_TYPES = [pa.float64()] -_TIMESTAMP_UNIT = ["ns", "us", "ms", "s"] -_BOOLEAN_TYPES = [pa.bool_()] -_STRING_TYPES = [pa.string(), pa.large_string()] -_DATE_TYPES = [pa.date32(), pa.date64()] -_BINARY_TYPES = [pa.binary(), pa.large_binary()] - -PYARROW_HOPSWORKS_DTYPE_MAPPING = { - **dict.fromkeys(_INT_TYPES, "int"), - **dict.fromkeys(_BIG_INT_TYPES, "bigint"), - **dict.fromkeys(_FLOAT_TYPES, "float"), - **dict.fromkeys(_DOUBLE_TYPES, "double"), - **dict.fromkeys( - [ - *[pa.timestamp(unit) for unit in _TIMESTAMP_UNIT], - *[ - pa.timestamp(unit, tz=tz) - for unit in _TIMESTAMP_UNIT - for tz in pytz.all_timezones - ], - ], - "timestamp", - ), - **dict.fromkeys(_BOOLEAN_TYPES, "boolean"), - **dict.fromkeys( - [ - *_STRING_TYPES, - # Category type in pandas stored as dictinoary in pyarrow - *[ - pa.dictionary( - value_type=value_type, index_type=index_type, ordered=ordered - ) - for value_type in _STRING_TYPES - for index_type in _INT_TYPES + _BIG_INT_TYPES - for ordered in [True, False] - ], - ], - "string", - ), - **dict.fromkeys(_DATE_TYPES, "date"), - **dict.fromkeys(_BINARY_TYPES, "binary"), -} +if HAS_PANDAS: + from hsfs.core.type_systems import convert_pandas_dtype_to_offline_type class Engine: @@ -792,7 +745,7 @@ def parse_schema_feature_group( for feat_name in arrow_schema.names: name = util.autofix_feature_name(feat_name) try: - converted_type = self._convert_pandas_dtype_to_offline_type( + converted_type = convert_pandas_dtype_to_offline_type( arrow_schema.field(feat_name).type ) except ValueError as e: @@ -826,7 +779,7 @@ def save_dataframe( ) if ( - isinstance(feature_group, ExternalFeatureGroup) + hasattr(feature_group, "EXTERNAL_FEATURE_GROUP") and feature_group.online_enabled ) or feature_group.stream: return self._write_dataframe_kafka( @@ -1273,7 +1226,7 @@ def _apply_transformation_function( dataset, pl.dataframe.frame.DataFrame ): # Converting polars dataframe to pandas because currently we support only pandas UDF's as transformation functions. - if PYARROW_EXTENSION_ENABLE: + if HAS_ARROW: dataset = dataset.to_pandas( use_pyarrow_extension_array=True ) # Zero copy if pyarrow extension can be used. @@ -1433,215 +1386,15 @@ def _write_dataframe_kafka( ) return feature_group.materialization_job - @staticmethod - def _convert_pandas_dtype_to_offline_type(arrow_type: str) -> str: - # This is a simple type conversion between pandas dtypes and pyspark (hive) types, - # using pyarrow types obatined from pandas dataframe to convert pandas typed fields, - # A recurisive function "_convert_pandas_object_type_to_offline_type" is used to convert complex types like lists and structures - # "_convert_simple_pandas_dtype_to_offline_type" is used to convert simple types - # In the backend, the types specified here will also be used for mapping to Avro types. - - if ( - pa.types.is_list(arrow_type) - or pa.types.is_large_list(arrow_type) - or pa.types.is_struct(arrow_type) - ): - return Engine._convert_pandas_object_type_to_offline_type(arrow_type) - - return Engine._convert_simple_pandas_dtype_to_offline_type(arrow_type) - - @staticmethod - def convert_spark_type_to_offline_type(spark_type_string: str) -> str: - if spark_type_string.endswith("Type()"): - spark_type_string = util.translate_legacy_spark_type(spark_type_string) - if spark_type_string == "STRING": - return "STRING" - elif spark_type_string == "BINARY": - return "BINARY" - elif spark_type_string == "BYTE": - return "INT" - elif spark_type_string == "SHORT": - return "INT" - elif spark_type_string == "INT": - return "INT" - elif spark_type_string == "LONG": - return "BIGINT" - elif spark_type_string == "FLOAT": - return "FLOAT" - elif spark_type_string == "DOUBLE": - return "DOUBLE" - elif spark_type_string == "TIMESTAMP": - return "TIMESTAMP" - elif spark_type_string == "DATE": - return "DATE" - elif spark_type_string == "BOOLEAN": - return "BOOLEAN" - else: - raise ValueError( - f"Return type {spark_type_string} not supported for transformation functions." - ) - - @staticmethod - def _convert_simple_pandas_dtype_to_offline_type(arrow_type: str) -> str: - try: - return PYARROW_HOPSWORKS_DTYPE_MAPPING[arrow_type] - except KeyError as err: - raise ValueError(f"dtype '{arrow_type}' not supported") from err - - @staticmethod - def _convert_pandas_object_type_to_offline_type(arrow_type: str) -> str: - if pa.types.is_list(arrow_type) or pa.types.is_large_list(arrow_type): - # figure out sub type - sub_arrow_type = arrow_type.value_type - subtype = Engine._convert_pandas_dtype_to_offline_type(sub_arrow_type) - return "array<{}>".format(subtype) - if pa.types.is_struct(arrow_type): - struct_schema = {} - for index in range(arrow_type.num_fields): - struct_schema[arrow_type.field(index).name] = ( - Engine._convert_pandas_dtype_to_offline_type( - arrow_type.field(index).type - ) - ) - return ( - "struct<" - + ",".join([f"{key}:{value}" for key, value in struct_schema.items()]) - + ">" - ) - - raise ValueError(f"dtype 'O' (arrow_type '{str(arrow_type)}') not supported") - - @staticmethod - def _cast_column_to_offline_type( - feature_column: pd.Series, offline_type: str - ) -> pd.Series: - offline_type = offline_type.lower() - if offline_type == "timestamp": - # convert (if tz!=UTC) to utc, then make timezone unaware - if isinstance(feature_column, pl.Series): - return feature_column.cast(pl.Datetime(time_zone=None)) - else: - return pd.to_datetime(feature_column, utc=True).dt.tz_localize(None) - elif offline_type == "date": - if isinstance(feature_column, pl.Series): - return feature_column.cast(pl.Date) - else: - return pd.to_datetime(feature_column, utc=True).dt.date - elif ( - offline_type.startswith("array<") - or offline_type.startswith("struct<") - or offline_type == "boolean" - ): - if isinstance(feature_column, pl.Series): - return feature_column.map_elements( - lambda x: (ast.literal_eval(x) if isinstance(x, str) else x) - if (x is not None and x != "") - else None - ) - else: - return feature_column.apply( - lambda x: (ast.literal_eval(x) if isinstance(x, str) else x) - if (x is not None and x != "") - else None - ) - elif offline_type == "string": - if isinstance(feature_column, pl.Series): - return feature_column.map_elements( - lambda x: str(x) if x is not None else None - ) - else: - return feature_column.apply(lambda x: str(x) if x is not None else None) - elif offline_type.startswith("decimal"): - if isinstance(feature_column, pl.Series): - return feature_column.map_elements( - lambda x: decimal.Decimal(x) if (x is not None) else None - ) - else: - return feature_column.apply( - lambda x: decimal.Decimal(x) if (x is not None) else None - ) - else: - if isinstance(feature_column, pl.Series): - offline_dtype_mapping = { - "bigint": pl.Int64, - "int": pl.Int32, - "smallint": pl.Int16, - "tinyint": pl.Int8, - "float": pl.Float32, - "double": pl.Float64, - } - else: - offline_dtype_mapping = { - "bigint": pd.Int64Dtype(), - "int": pd.Int32Dtype(), - "smallint": pd.Int16Dtype(), - "tinyint": pd.Int8Dtype(), - "float": np.dtype("float32"), - "double": np.dtype("float64"), - } - if offline_type in offline_dtype_mapping: - if isinstance(feature_column, pl.Series): - casted_feature = feature_column.cast( - offline_dtype_mapping[offline_type] - ) - else: - casted_feature = feature_column.astype( - offline_dtype_mapping[offline_type] - ) - return casted_feature - else: - return feature_column # handle gracefully, just return the column as-is - - @staticmethod - def _cast_column_to_online_type( - feature_column: pd.Series, online_type: str - ) -> pd.Series: - online_type = online_type.lower() - if online_type == "timestamp": - # convert (if tz!=UTC) to utc, then make timezone unaware - return pd.to_datetime(feature_column, utc=True).dt.tz_localize(None) - elif online_type == "date": - return pd.to_datetime(feature_column, utc=True).dt.date - elif online_type.startswith("varchar") or online_type == "text": - return feature_column.apply(lambda x: str(x) if x is not None else None) - elif online_type == "boolean": - return feature_column.apply( - lambda x: (ast.literal_eval(x) if isinstance(x, str) else x) - if (x is not None and x != "") - else None - ) - elif online_type.startswith("decimal"): - return feature_column.apply( - lambda x: decimal.Decimal(x) if (x is not None) else None - ) - else: - online_dtype_mapping = { - "bigint": pd.Int64Dtype(), - "int": pd.Int32Dtype(), - "smallint": pd.Int16Dtype(), - "tinyint": pd.Int8Dtype(), - "float": np.dtype("float32"), - "double": np.dtype("float64"), - } - if online_type in online_dtype_mapping: - casted_feature = feature_column.astype( - online_dtype_mapping[online_type] - ) - return casted_feature - else: - return feature_column # handle gracefully, just return the column as-is - @staticmethod def cast_columns( - df: pd.DataFrame, schema: List["feature.Feature"], online: bool = False + df: pd.DataFrame, schema: List[feature.Feature], online: bool = False ) -> pd.DataFrame: for _feat in schema: if not online: - df[_feat.name] = Engine._cast_column_to_offline_type( - df[_feat.name], _feat.type - ) + df[_feat.name] = cast_column_to_offline_type(df[_feat.name], _feat.type) else: - df[_feat.name] = Engine._cast_column_to_online_type( + df[_feat.name] = cast_column_to_online_type( df[_feat.name], _feat.online_type ) return df diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index ae41382eac..4e1f67c7d0 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -1157,7 +1157,7 @@ def parse_schema_feature_group( for feat in dataframe.schema: name = util.autofix_feature_name(feat.name) try: - converted_type = Engine.convert_spark_type_to_offline_type( + converted_type = Engine._convert_spark_type_to_offline_type( feat.dataType, using_hudi ) except ValueError as e: @@ -1396,7 +1396,7 @@ def get_unique_values(feature_dataframe, feature_name): return [field[feature_name] for field in unique_values] @staticmethod - def convert_spark_type_to_offline_type(spark_type, using_hudi): + def _convert_spark_type_to_offline_type(spark_type, using_hudi): # The HiveSyncTool is strict and does not support schema evolution from tinyint/short to # int. Avro, on the other hand, does not support tinyint/short and delivers them as int # to Hive. Therefore, we need to force Hive to create int-typed columns in the first place. diff --git a/python/hsfs/util.py b/python/hsfs/util.py index 10aa3d733a..da65392f43 100644 --- a/python/hsfs/util.py +++ b/python/hsfs/util.py @@ -354,47 +354,6 @@ def get_job_url(href: str) -> str: return ui_url.geturl() -def translate_legacy_spark_type( - output_type: str, -) -> Literal[ - "STRING", - "BINARY", - "BYTE", - "SHORT", - "INT", - "LONG", - "FLOAT", - "DOUBLE", - "TIMESTAMP", - "DATE", - "BOOLEAN", -]: - if output_type == "StringType()": - return "STRING" - elif output_type == "BinaryType()": - return "BINARY" - elif output_type == "ByteType()": - return "BYTE" - elif output_type == "ShortType()": - return "SHORT" - elif output_type == "IntegerType()": - return "INT" - elif output_type == "LongType()": - return "LONG" - elif output_type == "FloatType()": - return "FLOAT" - elif output_type == "DoubleType()": - return "DOUBLE" - elif output_type == "TimestampType()": - return "TIMESTAMP" - elif output_type == "DateType()": - return "DATE" - elif output_type == "BooleanType()": - return "BOOLEAN" - else: - return "STRING" # handle gracefully, and return STRING type, the default for spark udfs - - def _loading_animation(message: str, stop_event: threading.Event) -> None: for char in itertools.cycle([".", "..", "...", ""]): if stop_event.is_set(): diff --git a/python/tests/core/test_type_systems.py b/python/tests/core/test_type_systems.py new file mode 100644 index 0000000000..1c6c26b081 --- /dev/null +++ b/python/tests/core/test_type_systems.py @@ -0,0 +1,749 @@ +# +# Copyright 2024 Hopsworks AB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import datetime + +import pytest +from hsfs.core import type_systems +from hsfs.core.constants import HAS_ARROW, HAS_PANDAS + + +if HAS_ARROW: + import pyarrow as pa + +if HAS_PANDAS: + import numpy as np + import pandas as pd + + rng_engine = np.random.default_rng(42) + + +class TestTypeSystems: + @pytest.mark.skipif( + not HAS_ARROW or not HAS_PANDAS, reason="Arrow or Pandas are not installed" + ) + def test_infer_type_pyarrow_list(self): + # Act + result = type_systems.convert_pandas_object_type_to_offline_type( + arrow_type=pa.list_(pa.int8()) + ) + + # Assert + assert result == "array" + + def test_infer_type_pyarrow_large_list(self): + # Act + result = type_systems.convert_pandas_object_type_to_offline_type( + arrow_type=pa.large_list(pa.int8()) + ) + + # Assert + assert result == "array" + + def test_infer_type_pyarrow_struct(self): + # Act + result = type_systems.convert_pandas_object_type_to_offline_type( + arrow_type=pa.struct([pa.field("f1", pa.int32())]) + ) + + # Assert + assert result == "struct" + + def test_infer_type_pyarrow_date32(self): + # Act + result = type_systems.convert_simple_pandas_dtype_to_offline_type( + arrow_type=pa.date32() + ) + + # Assert + assert result == "date" + + def test_infer_type_pyarrow_date64(self): + # Act + result = type_systems.convert_simple_pandas_dtype_to_offline_type( + arrow_type=pa.date64() + ) + + # Assert + assert result == "date" + + def test_infer_type_pyarrow_binary(self): + # Act + result = type_systems.convert_simple_pandas_dtype_to_offline_type( + arrow_type=pa.binary() + ) + + # Assert + assert result == "binary" + + def test_infer_type_pyarrow_large_binary(self): + # Act + result = type_systems.convert_simple_pandas_dtype_to_offline_type( + arrow_type=pa.large_binary() + ) + + # Assert + assert result == "binary" + + def test_infer_type_pyarrow_string(self): + # Act + result = type_systems.convert_simple_pandas_dtype_to_offline_type( + arrow_type=pa.string() + ) + + # Assert + assert result == "string" + + def test_infer_type_pyarrow_large_string(self): + # Act + result = type_systems.convert_simple_pandas_dtype_to_offline_type( + arrow_type=pa.large_string() + ) + + # Assert + assert result == "string" + + def test_infer_type_pyarrow_utf8(self): + # Act + result = type_systems.convert_simple_pandas_dtype_to_offline_type( + arrow_type=pa.utf8() + ) + + # Assert + assert result == "string" + + def test_infer_type_pyarrow_other(self): + # Act + with pytest.raises(ValueError) as e_info: + type_systems.convert_simple_pandas_dtype_to_offline_type( + arrow_type=pa.time32("s") + ) + + # Assert + assert str(e_info.value) == "dtype 'time32[s]' not supported" + + def test_infer_type_pyarrow_struct_with_decimal_fields(self): + # Arrange + mapping = {f"user{i}": 2.0 for i in range(2)} + pdf = pd.DataFrame( + data=zip(list(range(1, 2)), [mapping] * 2), + columns=["id", "mapping"], + ) + arrow_schema = pa.Schema.from_pandas(pdf) + + # Act + arrow_type = type_systems.convert_pandas_object_type_to_offline_type( + arrow_schema.field("mapping").type + ) + + # Assert + assert arrow_type == "struct" + + def test_infer_type_pyarrow_struct_with_decimal_and_string_fields(self): + # Arrange + mapping = {"user0": 2.0, "user1": "test"} + pdf = pd.DataFrame( + data=zip(list(range(1, 2)), [mapping] * 2), + columns=["id", "mapping"], + ) + arrow_schema = pa.Schema.from_pandas(pdf) + + # Act + arrow_type = type_systems.convert_pandas_object_type_to_offline_type( + arrow_schema.field("mapping").type + ) + + # Assert + assert arrow_type == "struct" + + def test_infer_type_pyarrow_struct_with_list_fields(self): + # Arrange + mapping = {"user0": list(rng_engine.normal(size=5)), "user1": ["test", "test"]} + pdf = pd.DataFrame( + data=zip(list(range(1, 2)), [mapping] * 2), + columns=["id", "mapping"], + ) + arrow_schema = pa.Schema.from_pandas(pdf) + + # Act + arrow_type = type_systems.convert_pandas_object_type_to_offline_type( + arrow_schema.field("mapping").type + ) + + # Assert + assert arrow_type == "struct,user1:array>" + + def test_infer_type_pyarrow_struct_with_string_fields(self): + # Arrange + mapping = {f"user{i}": "test" for i in range(2)} + pdf = pd.DataFrame( + data=zip(list(range(1, 2)), [mapping] * 2), + columns=["id", "mapping"], + ) + arrow_schema = pa.Schema.from_pandas(pdf) + + # Act + arrow_type = type_systems.convert_pandas_object_type_to_offline_type( + arrow_schema.field("mapping").type + ) + + # Assert + assert arrow_type == "struct" + + def test_infer_type_pyarrow_struct_with_struct_fields(self): + # Arrange + mapping = {f"user{i}": {"value": "test"} for i in range(2)} + pdf = pd.DataFrame( + data=zip(list(range(1, 2)), [mapping] * 2), + columns=["id", "mapping"], + ) + arrow_schema = pa.Schema.from_pandas(pdf) + + # Act + arrow_type = type_systems.convert_pandas_object_type_to_offline_type( + arrow_schema.field("mapping").type + ) + + # Assert + assert ( + arrow_type + == "struct,user1:struct>" + ) + + def test_infer_type_pyarrow_struct_with_struct_fields_with_list_values(self): + # Arrange + mapping = { + f"user{i}": {"value": list(rng_engine.normal(size=5))} for i in range(2) + } + pdf = pd.DataFrame( + data=zip(list(range(1, 2)), [mapping] * 2), + columns=["id", "mapping"], + ) + arrow_schema = pa.Schema.from_pandas(pdf) + + # Act + arrow_type = type_systems.convert_pandas_object_type_to_offline_type( + arrow_schema.field("mapping").type + ) + + # Assert + assert ( + arrow_type + == "struct>,user1:struct>>" + ) + + def test_infer_type_pyarrow_struct_with_nested_struct_fields(self): + # Arrange + mapping = {f"user{i}": {"value": {"value": "test"}} for i in range(2)} + pdf = pd.DataFrame( + data=zip(list(range(1, 2)), [mapping] * 2), + columns=["id", "mapping"], + ) + arrow_schema = pa.Schema.from_pandas(pdf) + + # Act + arrow_type = type_systems.convert_pandas_object_type_to_offline_type( + arrow_schema.field("mapping").type + ) + + # Assert + assert ( + arrow_type + == "struct>,user1:struct>>" + ) + + def test_infer_type_pyarrow_list_of_struct_fields(self): + # Arrange + mapping = [{"value": rng_engine.normal(size=5)}] + pdf = pd.DataFrame( + data=zip(list(range(1, 2)), [mapping] * 2), + columns=["id", "mapping"], + ) + arrow_schema = pa.Schema.from_pandas(pdf) + + # Act + arrow_type = type_systems.convert_pandas_object_type_to_offline_type( + arrow_schema.field("mapping").type + ) + + # Assert + assert arrow_type == "array>>" + + def test_infer_type_pyarrow_struct_with_list_of_struct_fields(self): + # Arrange + mapping = {f"user{i}": [{"value": rng_engine.normal(size=5)}] for i in range(2)} + pdf = pd.DataFrame( + data=zip(list(range(1, 2)), [mapping] * 2), + columns=["id", "mapping"], + ) + arrow_schema = pa.Schema.from_pandas(pdf) + + # Act + arrow_type = type_systems.convert_pandas_object_type_to_offline_type( + arrow_schema.field("mapping").type + ) + + # Assert + assert ( + arrow_type + == "struct>>,user1:array>>>" + ) + + def test_convert_simple_pandas_type_uint8(self): + # Act + result = type_systems.convert_simple_pandas_dtype_to_offline_type( + arrow_type=pa.uint8() + ) + + # Assert + assert result == "int" + + def test_convert_simple_pandas_type_uint16(self): + # Act + result = type_systems.convert_simple_pandas_dtype_to_offline_type( + arrow_type=pa.uint16() + ) + + # Assert + assert result == "int" + + def test_convert_simple_pandas_type_int8(self): + # Act + result = type_systems.convert_simple_pandas_dtype_to_offline_type( + arrow_type=pa.int8() + ) + + # Assert + assert result == "int" + + def test_convert_simple_pandas_type_int16(self): + # Act + result = type_systems.convert_simple_pandas_dtype_to_offline_type( + arrow_type=pa.int16() + ) + + # Assert + assert result == "int" + + def test_convert_simple_pandas_type_int32(self): + # Act + result = type_systems.convert_simple_pandas_dtype_to_offline_type( + arrow_type=pa.int32() + ) + + # Assert + assert result == "int" + + def test_convert_simple_pandas_type_uint32(self): + # Act + result = type_systems.convert_simple_pandas_dtype_to_offline_type( + arrow_type=pa.uint32() + ) + + # Assert + assert result == "bigint" + + def test_convert_simple_pandas_type_int64(self): + # Act + result = type_systems.convert_simple_pandas_dtype_to_offline_type( + arrow_type=pa.int64() + ) + + # Assert + assert result == "bigint" + + def test_convert_simple_pandas_type_float16(self): + # Act + result = type_systems.convert_simple_pandas_dtype_to_offline_type( + arrow_type=pa.float16() + ) + + # Assert + assert result == "float" + + def test_convert_simple_pandas_type_float32(self): + # Act + result = type_systems.convert_simple_pandas_dtype_to_offline_type( + arrow_type=pa.float32() + ) + + # Assert + assert result == "float" + + def test_convert_simple_pandas_type_float64(self): + # Act + result = type_systems.convert_simple_pandas_dtype_to_offline_type( + arrow_type=pa.float64() + ) + + # Assert + assert result == "double" + + def test_convert_simple_pandas_type_datetime64ns(self): + # Act + result = type_systems.convert_simple_pandas_dtype_to_offline_type( + arrow_type=pa.timestamp(unit="ns") + ) + + # Assert + assert result == "timestamp" + + def test_convert_simple_pandas_type_datetime64nstz(self): + # Act + result = type_systems.convert_simple_pandas_dtype_to_offline_type( + arrow_type=pa.timestamp(unit="ns", tz="UTC") + ) + + # Assert + assert result == "timestamp" + + def test_convert_simple_pandas_type_datetime64us(self): + # Act + result = type_systems.convert_simple_pandas_dtype_to_offline_type( + arrow_type=pa.timestamp(unit="us") + ) + + # Assert + assert result == "timestamp" + + def test_convert_simple_pandas_type_datetime64ustz(self): + # Act + result = type_systems.convert_simple_pandas_dtype_to_offline_type( + arrow_type=pa.timestamp(unit="us", tz="UTC") + ) + + # Assert + assert result == "timestamp" + + def test_convert_simple_pandas_type_datetime64ms(self): + # Act + result = type_systems.convert_simple_pandas_dtype_to_offline_type( + arrow_type=pa.timestamp(unit="ms") + ) + + # Assert + assert result == "timestamp" + + def test_convert_simple_pandas_type_datetime64mstz(self): + # Act + result = type_systems.convert_simple_pandas_dtype_to_offline_type( + arrow_type=pa.timestamp(unit="ms", tz="UTC") + ) + + # Assert + assert result == "timestamp" + + def test_convert_simple_pandas_type_datetime64s(self): + # Act + result = type_systems.convert_simple_pandas_dtype_to_offline_type( + arrow_type=pa.timestamp(unit="s") + ) + + # Assert + assert result == "timestamp" + + def test_convert_simple_pandas_type_datetime64stz(self): + # Act + result = type_systems.convert_simple_pandas_dtype_to_offline_type( + arrow_type=pa.timestamp(unit="s", tz="UTC") + ) + + # Assert + assert result == "timestamp" + + def test_convert_simple_pandas_type_bool(self): + # Act + result = type_systems.convert_simple_pandas_dtype_to_offline_type( + arrow_type=pa.bool_() + ) + + # Assert + assert result == "boolean" + + def test_convert_simple_pandas_type_category_unordered(self): + # Act + result = type_systems.convert_simple_pandas_dtype_to_offline_type( + arrow_type=pa.dictionary( + value_type=pa.string(), index_type=pa.int8(), ordered=False + ) + ) + + # Assert + assert result == "string" + + def test_convert_simple_pandas_type_large_string_category_unordered(self): + # Act + result = type_systems.convert_simple_pandas_dtype_to_offline_type( + arrow_type=pa.dictionary( + value_type=pa.large_string(), index_type=pa.int64(), ordered=False + ) + ) + + # Assert + assert result == "string" + + def test_convert_simple_pandas_type_large_string_category_ordered(self): + # Act + result = type_systems.convert_simple_pandas_dtype_to_offline_type( + arrow_type=pa.dictionary( + value_type=pa.large_string(), index_type=pa.int64(), ordered=True + ) + ) + + # Assert + assert result == "string" + + def test_convert_simple_pandas_type_category_ordered(self): + # Act + result = type_systems.convert_simple_pandas_dtype_to_offline_type( + arrow_type=pa.dictionary( + value_type=pa.string(), index_type=pa.int8(), ordered=True + ) + ) + + # Assert + assert result == "string" + + def test_convert_simple_pandas_type_other(self): + # Act + with pytest.raises(ValueError) as e_info: + type_systems.convert_simple_pandas_dtype_to_offline_type(arrow_type="other") + + # Assert + assert str(e_info.value) == "dtype 'other' not supported" + + def test_infer_spark_type_string_type_1(self): + # Act + result = type_systems.infer_spark_type(str) + + # Assert + assert result == "STRING" + + def test_infer_spark_type_string_type_2(self): + # Act + result = type_systems.infer_spark_type("str") + + # Assert + assert result == "STRING" + + def test_infer_spark_type_string_type_3(self): + # Act + result = type_systems.infer_spark_type("string") + + # Assert + assert result == "STRING" + + def test_infer_spark_type_byte_type_1(self): + # Act + result = type_systems.infer_spark_type(bytes) + result1 = type_systems.infer_spark_type("BinaryType()") + + # Assert + assert result == "BINARY" + assert result1 == "BINARY" + + def test_infer_spark_type_int8_type_1(self): + # Act + result = type_systems.infer_spark_type(np.int8) + + # Assert + assert result == "BYTE" + + def test_infer_spark_type_int8_type_2(self): + # Act + result = type_systems.infer_spark_type("int8") + + # Assert + assert result == "BYTE" + + def test_infer_spark_type_int8_type_3(self): + # Act + result = type_systems.infer_spark_type("byte") + result1 = type_systems.infer_spark_type("ByteType()") + + # Assert + assert result == "BYTE" + assert result1 == "BYTE" + + def test_infer_spark_type_int16_type_1(self): + # Act + result = type_systems.infer_spark_type(np.int16) + + # Assert + assert result == "SHORT" + + def test_infer_spark_type_int16_type_2(self): + # Act + result = type_systems.infer_spark_type("int16") + + # Assert + assert result == "SHORT" + + def test_infer_spark_type_int16_type_3(self): + # Act + result = type_systems.infer_spark_type("short") + result1 = type_systems.infer_spark_type("ShortType()") + + # Assert + assert result == "SHORT" + assert result1 == "SHORT" + + def test_infer_spark_type_int_type_1(self): + # Act + result = type_systems.infer_spark_type(int) + + # Assert + assert result == "INT" + + def test_infer_spark_type_int_type_2(self): + # Act + result = type_systems.infer_spark_type("int") + + # Assert + assert result == "INT" + + def test_infer_spark_type_int_type_3(self): + # Act + result = type_systems.infer_spark_type(np.int32) + result1 = type_systems.infer_spark_type("IntegerType()") + + # Assert + assert result == "INT" + assert result1 == "INT" + + def test_infer_spark_type_int64_type_1(self): + # Act + result = type_systems.infer_spark_type(np.int64) + + # Assert + assert result == "LONG" + + def test_infer_spark_type_int64_type_2(self): + # Act + result = type_systems.infer_spark_type("int64") + + # Assert + assert result == "LONG" + + def test_infer_spark_type_int64_type_3(self): + # Act + result = type_systems.infer_spark_type("long") + + # Assert + assert result == "LONG" + + def test_infer_spark_type_int64_type_4(self): + # Act + result = type_systems.infer_spark_type("bigint") + result1 = type_systems.infer_spark_type("LongType()") + + # Assert + assert result == "LONG" + assert result1 == "LONG" + + def test_infer_spark_type_float_type_1(self): + # Act + result = type_systems.infer_spark_type(float) + + # Assert + assert result == "FLOAT" + + def test_infer_spark_type_float_type_2(self): + # Act + result = type_systems.infer_spark_type("float") + result1 = type_systems.infer_spark_type("FloatType()") + + # Assert + assert result == "FLOAT" + assert result1 == "FLOAT" + + def test_infer_spark_type_double_type_1(self): + # Act + result = type_systems.infer_spark_type(np.float64) + + # Assert + assert result == "DOUBLE" + + def test_infer_spark_type_double_type_2(self): + # Act + result = type_systems.infer_spark_type("float64") + + # Assert + assert result == "DOUBLE" + + def test_infer_spark_type_double_type_3(self): + # Act + result = type_systems.infer_spark_type("double") + result1 = type_systems.infer_spark_type("DoubleType()") + + # Assert + assert result == "DOUBLE" + assert result1 == "DOUBLE" + + def test_infer_spark_type_timestamp_type_1(self): + # Act + result = type_systems.infer_spark_type(datetime.datetime) + + # Assert + assert result == "TIMESTAMP" + + def test_infer_spark_type_timestamp_type_2(self): + # Act + result = type_systems.infer_spark_type(np.datetime64) + result1 = type_systems.infer_spark_type("TimestampType()") + + # Assert + assert result == "TIMESTAMP" + assert result1 == "TIMESTAMP" + + def test_infer_spark_type_date_type_1(self): + # Act + result = type_systems.infer_spark_type(datetime.date) + result1 = type_systems.infer_spark_type("DateType()") + + # Assert + assert result == "DATE" + assert result1 == "DATE" + + def test_infer_spark_type_bool_type_1(self): + # Act + result = type_systems.infer_spark_type(bool) + + # Assert + assert result == "BOOLEAN" + + def test_infer_spark_type_bool_type_2(self): + # Act + result = type_systems.infer_spark_type("boolean") + + # Assert + assert result == "BOOLEAN" + + def test_infer_spark_type_bool_type_3(self): + # Act + result = type_systems.infer_spark_type("bool") + result1 = type_systems.infer_spark_type("BooleanType()") + + # Assert + assert result == "BOOLEAN" + assert result1 == "BOOLEAN" + + def test_infer_spark_type_wrong_type(self): + # Act + with pytest.raises(TypeError) as e_info: + type_systems.infer_spark_type("wrong") + + # Assert + assert str(e_info.value) == "Not supported type wrong." diff --git a/python/tests/engine/test_python.py b/python/tests/engine/test_python.py index a08c6e8ec4..1ee9c8a0d1 100644 --- a/python/tests/engine/test_python.py +++ b/python/tests/engine/test_python.py @@ -1406,7 +1406,7 @@ def test_convert_to_default_dataframe_polars(self, mocker): def test_parse_schema_feature_group_pandas(self, mocker): # Arrange - mocker.patch("hsfs.engine.python.Engine._convert_pandas_dtype_to_offline_type") + mocker.patch("hsfs.core.type_systems.convert_pandas_dtype_to_offline_type") python_engine = python.Engine() @@ -1425,7 +1425,7 @@ def test_parse_schema_feature_group_pandas(self, mocker): def test_parse_schema_feature_group_polars(self, mocker): # Arrange - mocker.patch("hsfs.engine.python.Engine._convert_pandas_dtype_to_offline_type") + mocker.patch("hsfs.core.type_systems.convert_pandas_dtype_to_offline_type") python_engine = python.Engine() @@ -1469,623 +1469,6 @@ def test_parse_schema_training_dataset(self): == "Training dataset creation from Dataframes is not supported in Python environment. Use HSFS Query object instead." ) - def test_convert_simple_pandas_type_uint8(self): - # Arrange - python_engine = python.Engine() - - # Act - result = python_engine._convert_simple_pandas_dtype_to_offline_type( - arrow_type=pa.uint8() - ) - - # Assert - assert result == "int" - - def test_convert_simple_pandas_type_uint16(self): - # Arrange - python_engine = python.Engine() - - # Act - result = python_engine._convert_simple_pandas_dtype_to_offline_type( - arrow_type=pa.uint16() - ) - - # Assert - assert result == "int" - - def test_convert_simple_pandas_type_int8(self): - # Arrange - python_engine = python.Engine() - - # Act - result = python_engine._convert_simple_pandas_dtype_to_offline_type( - arrow_type=pa.int8() - ) - - # Assert - assert result == "int" - - def test_convert_simple_pandas_type_int16(self): - # Arrange - python_engine = python.Engine() - - # Act - result = python_engine._convert_simple_pandas_dtype_to_offline_type( - arrow_type=pa.int16() - ) - - # Assert - assert result == "int" - - def test_convert_simple_pandas_type_int32(self): - # Arrange - python_engine = python.Engine() - - # Act - result = python_engine._convert_simple_pandas_dtype_to_offline_type( - arrow_type=pa.int32() - ) - - # Assert - assert result == "int" - - def test_convert_simple_pandas_type_uint32(self): - # Arrange - python_engine = python.Engine() - - # Act - result = python_engine._convert_simple_pandas_dtype_to_offline_type( - arrow_type=pa.uint32() - ) - - # Assert - assert result == "bigint" - - def test_convert_simple_pandas_type_int64(self): - # Arrange - python_engine = python.Engine() - - # Act - result = python_engine._convert_simple_pandas_dtype_to_offline_type( - arrow_type=pa.int64() - ) - - # Assert - assert result == "bigint" - - def test_convert_simple_pandas_type_float16(self): - # Arrange - python_engine = python.Engine() - - # Act - result = python_engine._convert_simple_pandas_dtype_to_offline_type( - arrow_type=pa.float16() - ) - - # Assert - assert result == "float" - - def test_convert_simple_pandas_type_float32(self): - # Arrange - python_engine = python.Engine() - - # Act - result = python_engine._convert_simple_pandas_dtype_to_offline_type( - arrow_type=pa.float32() - ) - - # Assert - assert result == "float" - - def test_convert_simple_pandas_type_float64(self): - # Arrange - python_engine = python.Engine() - - # Act - result = python_engine._convert_simple_pandas_dtype_to_offline_type( - arrow_type=pa.float64() - ) - - # Assert - assert result == "double" - - def test_convert_simple_pandas_type_datetime64ns(self): - # Arrange - python_engine = python.Engine() - - # Act - result = python_engine._convert_simple_pandas_dtype_to_offline_type( - arrow_type=pa.timestamp(unit="ns") - ) - - # Assert - assert result == "timestamp" - - def test_convert_simple_pandas_type_datetime64nstz(self): - # Arrange - python_engine = python.Engine() - - # Act - result = python_engine._convert_simple_pandas_dtype_to_offline_type( - arrow_type=pa.timestamp(unit="ns", tz="UTC") - ) - - # Assert - assert result == "timestamp" - - def test_convert_simple_pandas_type_datetime64us(self): - # Arrange - python_engine = python.Engine() - - # Act - result = python_engine._convert_simple_pandas_dtype_to_offline_type( - arrow_type=pa.timestamp(unit="us") - ) - - # Assert - assert result == "timestamp" - - def test_convert_simple_pandas_type_datetime64ustz(self): - # Arrange - python_engine = python.Engine() - - # Act - result = python_engine._convert_simple_pandas_dtype_to_offline_type( - arrow_type=pa.timestamp(unit="us", tz="UTC") - ) - - # Assert - assert result == "timestamp" - - def test_convert_simple_pandas_type_datetime64ms(self): - # Arrange - python_engine = python.Engine() - - # Act - result = python_engine._convert_simple_pandas_dtype_to_offline_type( - arrow_type=pa.timestamp(unit="ms") - ) - - # Assert - assert result == "timestamp" - - def test_convert_simple_pandas_type_datetime64mstz(self): - # Arrange - python_engine = python.Engine() - - # Act - result = python_engine._convert_simple_pandas_dtype_to_offline_type( - arrow_type=pa.timestamp(unit="ms", tz="UTC") - ) - - # Assert - assert result == "timestamp" - - def test_convert_simple_pandas_type_datetime64s(self): - # Arrange - python_engine = python.Engine() - - # Act - result = python_engine._convert_simple_pandas_dtype_to_offline_type( - arrow_type=pa.timestamp(unit="s") - ) - - # Assert - assert result == "timestamp" - - def test_convert_simple_pandas_type_datetime64stz(self): - # Arrange - python_engine = python.Engine() - - # Act - result = python_engine._convert_simple_pandas_dtype_to_offline_type( - arrow_type=pa.timestamp(unit="s", tz="UTC") - ) - - # Assert - assert result == "timestamp" - - def test_convert_simple_pandas_type_bool(self): - # Arrange - python_engine = python.Engine() - - # Act - result = python_engine._convert_simple_pandas_dtype_to_offline_type( - arrow_type=pa.bool_() - ) - - # Assert - assert result == "boolean" - - def test_convert_simple_pandas_type_category_unordered(self): - # Arrange - python_engine = python.Engine() - - # Act - result = python_engine._convert_simple_pandas_dtype_to_offline_type( - arrow_type=pa.dictionary( - value_type=pa.string(), index_type=pa.int8(), ordered=False - ) - ) - - # Assert - assert result == "string" - - def test_convert_simple_pandas_type_large_string_category_unordered(self): - # Arrange - python_engine = python.Engine() - - # Act - result = python_engine._convert_simple_pandas_dtype_to_offline_type( - arrow_type=pa.dictionary( - value_type=pa.large_string(), index_type=pa.int64(), ordered=False - ) - ) - - # Assert - assert result == "string" - - def test_convert_simple_pandas_type_large_string_category_ordered(self): - # Arrange - python_engine = python.Engine() - - # Act - result = python_engine._convert_simple_pandas_dtype_to_offline_type( - arrow_type=pa.dictionary( - value_type=pa.large_string(), index_type=pa.int64(), ordered=True - ) - ) - - # Assert - assert result == "string" - - def test_convert_simple_pandas_type_category_ordered(self): - # Arrange - python_engine = python.Engine() - - # Act - result = python_engine._convert_simple_pandas_dtype_to_offline_type( - arrow_type=pa.dictionary( - value_type=pa.string(), index_type=pa.int8(), ordered=True - ) - ) - - # Assert - assert result == "string" - - def test_convert_simple_pandas_type_other(self): - # Arrange - python_engine = python.Engine() - - # Act - with pytest.raises(ValueError) as e_info: - python_engine._convert_simple_pandas_dtype_to_offline_type( - arrow_type="other" - ) - - # Assert - assert str(e_info.value) == "dtype 'other' not supported" - - def test_infer_type_pyarrow_list(self): - # Arrange - - python_engine = python.Engine() - - # Act - result = python_engine._convert_pandas_object_type_to_offline_type( - arrow_type=pa.list_(pa.int8()) - ) - - # Assert - assert result == "array" - - def test_infer_type_pyarrow_large_list(self): - # Arrange - - python_engine = python.Engine() - - # Act - result = python_engine._convert_pandas_object_type_to_offline_type( - arrow_type=pa.large_list(pa.int8()) - ) - - # Assert - assert result == "array" - - def test_infer_type_pyarrow_struct(self): - # Arrange - python_engine = python.Engine() - - # Act - result = python_engine._convert_pandas_object_type_to_offline_type( - arrow_type=pa.struct([pa.field("f1", pa.int32())]) - ) - - # Assert - assert result == "struct" - - def test_infer_type_pyarrow_date32(self): - # Arrange - python_engine = python.Engine() - - # Act - result = python_engine._convert_simple_pandas_dtype_to_offline_type( - arrow_type=pa.date32() - ) - - # Assert - assert result == "date" - - def test_infer_type_pyarrow_date64(self): - # Arrange - python_engine = python.Engine() - - # Act - result = python_engine._convert_simple_pandas_dtype_to_offline_type( - arrow_type=pa.date64() - ) - - # Assert - assert result == "date" - - def test_infer_type_pyarrow_binary(self): - # Arrange - python_engine = python.Engine() - - # Act - result = python_engine._convert_simple_pandas_dtype_to_offline_type( - arrow_type=pa.binary() - ) - - # Assert - assert result == "binary" - - def test_infer_type_pyarrow_large_binary(self): - # Arrange - python_engine = python.Engine() - - # Act - result = python_engine._convert_simple_pandas_dtype_to_offline_type( - arrow_type=pa.large_binary() - ) - - # Assert - assert result == "binary" - - def test_infer_type_pyarrow_string(self): - # Arrange - python_engine = python.Engine() - - # Act - result = python_engine._convert_simple_pandas_dtype_to_offline_type( - arrow_type=pa.string() - ) - - # Assert - assert result == "string" - - def test_infer_type_pyarrow_large_string(self): - # Arrange - python_engine = python.Engine() - - # Act - result = python_engine._convert_simple_pandas_dtype_to_offline_type( - arrow_type=pa.large_string() - ) - - # Assert - assert result == "string" - - def test_infer_type_pyarrow_utf8(self): - # Arrange - python_engine = python.Engine() - - # Act - result = python_engine._convert_simple_pandas_dtype_to_offline_type( - arrow_type=pa.utf8() - ) - - # Assert - assert result == "string" - - def test_infer_type_pyarrow_other(self): - # Arrange - python_engine = python.Engine() - - # Act - with pytest.raises(ValueError) as e_info: - python_engine._convert_simple_pandas_dtype_to_offline_type( - arrow_type=pa.time32("s") - ) - - # Assert - assert str(e_info.value) == "dtype 'time32[s]' not supported" - - def test_infer_type_pyarrow_struct_with_decimal_fields(self): - # Arrange - mapping = {f"user{i}": 2.0 for i in range(2)} - pdf = pd.DataFrame( - data=zip(list(range(1, 2)), [mapping] * 2), - columns=["id", "mapping"], - ) - arrow_schema = pa.Schema.from_pandas(pdf) - - python_engine = python.Engine() - - # Act - arrow_type = python_engine._convert_pandas_object_type_to_offline_type( - arrow_schema.field("mapping").type - ) - - # Assert - assert arrow_type == "struct" - - def test_infer_type_pyarrow_struct_with_decimal_and_string_fields(self): - # Arrange - mapping = {"user0": 2.0, "user1": "test"} - pdf = pd.DataFrame( - data=zip(list(range(1, 2)), [mapping] * 2), - columns=["id", "mapping"], - ) - arrow_schema = pa.Schema.from_pandas(pdf) - - python_engine = python.Engine() - - # Act - arrow_type = python_engine._convert_pandas_object_type_to_offline_type( - arrow_schema.field("mapping").type - ) - - # Assert - assert arrow_type == "struct" - - def test_infer_type_pyarrow_struct_with_list_fields(self): - # Arrange - mapping = {"user0": list(np.random.normal(size=5)), "user1": ["test", "test"]} - pdf = pd.DataFrame( - data=zip(list(range(1, 2)), [mapping] * 2), - columns=["id", "mapping"], - ) - arrow_schema = pa.Schema.from_pandas(pdf) - - python_engine = python.Engine() - - # Act - arrow_type = python_engine._convert_pandas_object_type_to_offline_type( - arrow_schema.field("mapping").type - ) - - # Assert - assert arrow_type == "struct,user1:array>" - - def test_infer_type_pyarrow_struct_with_string_fields(self): - # Arrange - mapping = {f"user{i}": "test" for i in range(2)} - pdf = pd.DataFrame( - data=zip(list(range(1, 2)), [mapping] * 2), - columns=["id", "mapping"], - ) - arrow_schema = pa.Schema.from_pandas(pdf) - - python_engine = python.Engine() - - # Act - arrow_type = python_engine._convert_pandas_object_type_to_offline_type( - arrow_schema.field("mapping").type - ) - - # Assert - assert arrow_type == "struct" - - def test_infer_type_pyarrow_struct_with_struct_fields(self): - # Arrange - mapping = {f"user{i}": {"value": "test"} for i in range(2)} - pdf = pd.DataFrame( - data=zip(list(range(1, 2)), [mapping] * 2), - columns=["id", "mapping"], - ) - arrow_schema = pa.Schema.from_pandas(pdf) - - python_engine = python.Engine() - - # Act - arrow_type = python_engine._convert_pandas_object_type_to_offline_type( - arrow_schema.field("mapping").type - ) - - # Assert - assert ( - arrow_type - == "struct,user1:struct>" - ) - - def test_infer_type_pyarrow_struct_with_struct_fields_with_list_values(self): - # Arrange - mapping = { - f"user{i}": {"value": list(np.random.normal(size=5))} for i in range(2) - } - pdf = pd.DataFrame( - data=zip(list(range(1, 2)), [mapping] * 2), - columns=["id", "mapping"], - ) - arrow_schema = pa.Schema.from_pandas(pdf) - - python_engine = python.Engine() - - # Act - arrow_type = python_engine._convert_pandas_object_type_to_offline_type( - arrow_schema.field("mapping").type - ) - - # Assert - assert ( - arrow_type - == "struct>,user1:struct>>" - ) - - def test_infer_type_pyarrow_struct_with_nested_struct_fields(self): - # Arrange - mapping = {f"user{i}": {"value": {"value": "test"}} for i in range(2)} - pdf = pd.DataFrame( - data=zip(list(range(1, 2)), [mapping] * 2), - columns=["id", "mapping"], - ) - arrow_schema = pa.Schema.from_pandas(pdf) - - python_engine = python.Engine() - - # Act - arrow_type = python_engine._convert_pandas_object_type_to_offline_type( - arrow_schema.field("mapping").type - ) - - # Assert - assert ( - arrow_type - == "struct>,user1:struct>>" - ) - - def test_infer_type_pyarrow_list_of_struct_fields(self): - # Arrange - mapping = [{"value": np.random.normal(size=5)}] - pdf = pd.DataFrame( - data=zip(list(range(1, 2)), [mapping] * 2), - columns=["id", "mapping"], - ) - arrow_schema = pa.Schema.from_pandas(pdf) - - python_engine = python.Engine() - - # Act - arrow_type = python_engine._convert_pandas_object_type_to_offline_type( - arrow_schema.field("mapping").type - ) - - # Assert - assert arrow_type == "array>>" - - def test_infer_type_pyarrow_struct_with_list_of_struct_fields(self): - # Arrange - mapping = {f"user{i}": [{"value": np.random.normal(size=5)}] for i in range(2)} - pdf = pd.DataFrame( - data=zip(list(range(1, 2)), [mapping] * 2), - columns=["id", "mapping"], - ) - arrow_schema = pa.Schema.from_pandas(pdf) - - python_engine = python.Engine() - - # Act - arrow_type = python_engine._convert_pandas_object_type_to_offline_type( - arrow_schema.field("mapping").type - ) - - # Assert - assert ( - arrow_type - == "struct>>,user1:array>>>" - ) - def test_save_dataframe(self, mocker): # Arrange mock_python_engine_write_dataframe_kafka = mocker.patch( diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index 9b4ff6b2de..9dade8ec68 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -3867,7 +3867,7 @@ def test_read_options_provided_options_tsv(self): def test_parse_schema_feature_group(self, mocker): # Arrange mock_spark_engine_convert_spark_type = mocker.patch( - "hsfs.engine.spark.Engine.convert_spark_type_to_offline_type" + "hsfs.engine.spark.Engine._convert_spark_type_to_offline_type" ) spark_engine = spark.Engine() @@ -3892,7 +3892,7 @@ def test_parse_schema_feature_group(self, mocker): def test_parse_schema_feature_group_hudi(self, mocker): # Arrange mock_spark_engine_convert_spark_type = mocker.patch( - "hsfs.engine.spark.Engine.convert_spark_type_to_offline_type" + "hsfs.engine.spark.Engine._convert_spark_type_to_offline_type" ) spark_engine = spark.Engine() @@ -3917,7 +3917,7 @@ def test_parse_schema_feature_group_hudi(self, mocker): def test_parse_schema_feature_group_value_error(self, mocker): # Arrange mock_spark_engine_convert_spark_type = mocker.patch( - "hsfs.engine.spark.Engine.convert_spark_type_to_offline_type" + "hsfs.engine.spark.Engine._convert_spark_type_to_offline_type" ) spark_engine = spark.Engine() @@ -3964,7 +3964,7 @@ def test_convert_spark_type(self): spark_engine = spark.Engine() # Act - result = spark_engine.convert_spark_type_to_offline_type( + result = spark_engine._convert_spark_type_to_offline_type( spark_type=IntegerType(), using_hudi=False, ) @@ -4043,7 +4043,7 @@ def test_convert_spark_type_using_hudi_byte_type(self): spark_engine = spark.Engine() # Act - result = spark_engine.convert_spark_type_to_offline_type( + result = spark_engine._convert_spark_type_to_offline_type( spark_type=ByteType(), using_hudi=True, ) @@ -4056,7 +4056,7 @@ def test_convert_spark_type_using_hudi_short_type(self): spark_engine = spark.Engine() # Act - result = spark_engine.convert_spark_type_to_offline_type( + result = spark_engine._convert_spark_type_to_offline_type( spark_type=ShortType(), using_hudi=True, ) @@ -4069,7 +4069,7 @@ def test_convert_spark_type_using_hudi_bool_type(self): spark_engine = spark.Engine() # Act - result = spark_engine.convert_spark_type_to_offline_type( + result = spark_engine._convert_spark_type_to_offline_type( spark_type=BooleanType(), using_hudi=True, ) @@ -4082,7 +4082,7 @@ def test_convert_spark_type_using_hudi_int_type(self): spark_engine = spark.Engine() # Act - result = spark_engine.convert_spark_type_to_offline_type( + result = spark_engine._convert_spark_type_to_offline_type( spark_type=IntegerType(), using_hudi=True, ) @@ -4095,7 +4095,7 @@ def test_convert_spark_type_using_hudi_long_type(self): spark_engine = spark.Engine() # Act - result = spark_engine.convert_spark_type_to_offline_type( + result = spark_engine._convert_spark_type_to_offline_type( spark_type=LongType(), using_hudi=True, ) @@ -4108,7 +4108,7 @@ def test_convert_spark_type_using_hudi_float_type(self): spark_engine = spark.Engine() # Act - result = spark_engine.convert_spark_type_to_offline_type( + result = spark_engine._convert_spark_type_to_offline_type( spark_type=FloatType(), using_hudi=True, ) @@ -4121,7 +4121,7 @@ def test_convert_spark_type_using_hudi_double_type(self): spark_engine = spark.Engine() # Act - result = spark_engine.convert_spark_type_to_offline_type( + result = spark_engine._convert_spark_type_to_offline_type( spark_type=DoubleType(), using_hudi=True, ) @@ -4134,7 +4134,7 @@ def test_convert_spark_type_using_hudi_decimal_type(self): spark_engine = spark.Engine() # Act - result = spark_engine.convert_spark_type_to_offline_type( + result = spark_engine._convert_spark_type_to_offline_type( spark_type=DecimalType(), using_hudi=True, ) @@ -4147,7 +4147,7 @@ def test_convert_spark_type_using_hudi_timestamp_type(self): spark_engine = spark.Engine() # Act - result = spark_engine.convert_spark_type_to_offline_type( + result = spark_engine._convert_spark_type_to_offline_type( spark_type=TimestampType(), using_hudi=True, ) @@ -4160,7 +4160,7 @@ def test_convert_spark_type_using_hudi_date_type(self): spark_engine = spark.Engine() # Act - result = spark_engine.convert_spark_type_to_offline_type( + result = spark_engine._convert_spark_type_to_offline_type( spark_type=DateType(), using_hudi=True, ) @@ -4173,7 +4173,7 @@ def test_convert_spark_type_using_hudi_string_type(self): spark_engine = spark.Engine() # Act - result = spark_engine.convert_spark_type_to_offline_type( + result = spark_engine._convert_spark_type_to_offline_type( spark_type=StringType(), using_hudi=True, ) @@ -4186,7 +4186,7 @@ def test_convert_spark_type_using_hudi_struct_type(self): spark_engine = spark.Engine() # Act - result = spark_engine.convert_spark_type_to_offline_type( + result = spark_engine._convert_spark_type_to_offline_type( spark_type=StructType(), using_hudi=True, ) @@ -4199,7 +4199,7 @@ def test_convert_spark_type_using_hudi_binary_type(self): spark_engine = spark.Engine() # Act - result = spark_engine.convert_spark_type_to_offline_type( + result = spark_engine._convert_spark_type_to_offline_type( spark_type=BinaryType(), using_hudi=True, ) @@ -4213,7 +4213,7 @@ def test_convert_spark_type_using_hudi_map_type(self): # Act with pytest.raises(ValueError) as e_info: - spark_engine.convert_spark_type_to_offline_type( + spark_engine._convert_spark_type_to_offline_type( spark_type=MapType(StringType(), StringType()), using_hudi=True, )