diff --git a/java/src/main/java/com/logicalclocks/hsfs/engine/SparkEngine.java b/java/src/main/java/com/logicalclocks/hsfs/engine/SparkEngine.java index 3abbae5989..c118ad4b0a 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/engine/SparkEngine.java +++ b/java/src/main/java/com/logicalclocks/hsfs/engine/SparkEngine.java @@ -49,6 +49,7 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.functions; import org.apache.spark.sql.streaming.DataStreamReader; import org.apache.spark.sql.streaming.DataStreamWriter; import org.apache.spark.sql.streaming.StreamingQuery; @@ -57,6 +58,7 @@ import org.apache.spark.sql.types.BinaryType; import org.apache.spark.sql.types.BooleanType; import org.apache.spark.sql.types.ByteType; +import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.DateType; import org.apache.spark.sql.types.DecimalType; import org.apache.spark.sql.types.DoubleType; @@ -303,27 +305,43 @@ private Dataset[] timeSeriesSplit(TrainingDataset trainingDataset, Query qu int i = 0; for (Split split : splits) { if (dataset.count() > 0) { + String eventTime = query.getLeftFeatureGroup().getEventTime(); String eventTimeType = - query.getLeftFeatureGroup().getFeature(query.getLeftFeatureGroup().getEventTime()).getType(); + query.getLeftFeatureGroup().getFeature(eventTime).getType(); + if (BIGINT.getType().equals(eventTimeType)) { + String tmpEventTime = eventTime + "_hopsworks_tmp"; + sparkSession.sqlContext() + .udf() + .register("checkEpochUDF", (Long input) -> { + if (Long.toString(input).length() > 10) { + input = input / 1000; + return input.longValue(); + } else { + return input; + } + }, DataTypes.LongType); + dataset = dataset.withColumn(tmpEventTime,functions.callUDF( + "checkEpochUDF", dataset.col(eventTime))); + // event time in second. `getTime()` return in millisecond. datasetSplits[i] = dataset.filter( String.format( "%d/1000 <= `%s` and `%s` < %d/1000", split.getStartTime().getTime(), - query.getLeftFeatureGroup().getEventTime(), - query.getLeftFeatureGroup().getEventTime(), + tmpEventTime, + tmpEventTime, split.getEndTime().getTime() ) - ); + ).drop(tmpEventTime); } else if (DATE.getType().equals(eventTimeType) || TIMESTAMP.getType().equals(eventTimeType)) { // unix_timestamp return in second. `getTime()` return in millisecond. datasetSplits[i] = dataset.filter( String.format( "%d/1000 <= unix_timestamp(`%s`) and unix_timestamp(`%s`) < %d/1000", split.getStartTime().getTime(), - query.getLeftFeatureGroup().getEventTime(), - query.getLeftFeatureGroup().getEventTime(), + eventTime, + eventTime, split.getEndTime().getTime() ) ); diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 47db017e2b..7eb54466ec 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -39,7 +39,7 @@ concat, col, from_json, - unix_timestamp, + udf, ) from pyspark.sql.avro.functions import from_avro, to_avro from pyspark.sql.types import ( @@ -492,14 +492,13 @@ def _random_split(self, dataset, training_dataset): def _time_series_split( self, training_dataset, dataset, event_time, drop_event_time=False ): - result_dfs = {} - ts_type = dataset.select(event_time).dtypes[0][1] - ts_col = ( - unix_timestamp(col(event_time)) * 1000 - if ts_type in ["date", "timestamp"] - # jdbc supports timestamp precision up to second only. - else col(event_time) * 1000 + # registering the UDF + _convert_event_time_to_timestamp = udf( + util.convert_event_time_to_timestamp, LongType() ) + + result_dfs = {} + ts_col = _convert_event_time_to_timestamp(col(event_time)) for split in training_dataset.splits: result_df = dataset.filter(ts_col >= split.start_time).filter( ts_col < split.end_time diff --git a/python/hsfs/feature_view.py b/python/hsfs/feature_view.py index 9ebafa3e16..b74c060e87 100644 --- a/python/hsfs/feature_view.py +++ b/python/hsfs/feature_view.py @@ -190,9 +190,9 @@ def get_batch_query( # Arguments start_time: Start event time for the batch query. Optional. Strings should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, - `%Y-%m-%d %H:%M:%S`, or `%Y-%m-%d %H:%M:%S.%f`. + `%Y-%m-%d %H:%M:%S`, or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds. end_time: End event time for the batch query. Optional. Strings should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, - `%Y-%m-%d %H:%M:%S`, or `%Y-%m-%d %H:%M:%S.%f`. + `%Y-%m-%d %H:%M:%S`, or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds. # Returns `str`: batch query @@ -272,10 +272,10 @@ def get_batch_data( # Arguments start_time: Start event time for the batch query. Optional. Strings should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`, - or `%Y-%m-%d %H:%M:%S.%f`. + or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds. end_time: End event time for the batch query. Optional. Strings should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`, - or `%Y-%m-%d %H:%M:%S.%f`. + or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds. read_options: User provided read options. Defaults to `{}`. """ @@ -336,10 +336,10 @@ def create_training_data( # Arguments start_time: Start event time for the training dataset query. Optional. Strings should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`, - or `%Y-%m-%d %H:%M:%S.%f`. + or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds. end_time: End event time for the training dataset query. Optional. Strings should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`, - or `%Y-%m-%d %H:%M:%S.%f`. + or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds. storage_connector: Storage connector defining the sink location for the training dataset, defaults to `None`, and materializes training dataset on HopsFS. @@ -444,16 +444,16 @@ def create_train_test_split( test_size: size of test set. train_start: Start event time for the train split query. Strings should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`, - or `%Y-%m-%d %H:%M:%S.%f`. + or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds. train_end: End event time for the train split query. Strings should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`, - or `%Y-%m-%d %H:%M:%S.%f`. + or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds. test_start: Start event time for the test split query. Strings should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`, - or `%Y-%m-%d %H:%M:%S.%f`. + or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds. test_end: End event time for the test split query. Strings should be formatted in one of the following ormats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`, - or `%Y-%m-%d %H:%M:%S.%f`. + or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds. storage_connector: Storage connector defining the sink location for the training dataset, defaults to `None`, and materializes training dataset on HopsFS. @@ -570,22 +570,22 @@ def create_train_validation_test_split( test_size: size of test set. train_start: Start event time for the train split query. Strings should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`, - or `%Y-%m-%d %H:%M:%S.%f`. + or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds. train_end: End event time for the train split query. Strings should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`, - or `%Y-%m-%d %H:%M:%S.%f`. + or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds. validation_start: Start event time for the validation split query. Strings should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`, - or `%Y-%m-%d %H:%M:%S.%f`. + or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds. validation_end: End event time for the validation split query. Strings should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`, - or `%Y-%m-%d %H:%M:%S.%f`. + or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds. test_start: Start event time for the test split query. Strings should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`, - or `%Y-%m-%d %H:%M:%S.%f`. + or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds. test_end: End event time for the test split query. Strings should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`, - or `%Y-%m-%d %H:%M:%S.%f`. + or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds. storage_connector: Storage connector defining the sink location for the training dataset, defaults to `None`, and materializes training dataset on HopsFS. @@ -718,11 +718,11 @@ def training_data( start_time: Start event time for the training dataset query. Strings should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`, - or `%Y-%m-%d %H:%M:%S.%f`. + or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds. end_time: End event time for the training dataset query. Strings should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`, - or `%Y-%m-%d %H:%M:%S.%f`. + or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds. description: A string describing the contents of the training dataset to improve discoverability for Data Scientists, defaults to empty string `""`. @@ -794,13 +794,13 @@ def train_test_split( or `%Y-%m-%d %H:%M:%S.%f`. train_end: End event time for the train split query. Strings should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`, - or `%Y-%m-%d %H:%M:%S.%f`. + or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds. test_start: Start event time for the test split query. Strings should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`, - or `%Y-%m-%d %H:%M:%S.%f`. + or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds. test_end: End event time for the test split query. Strings should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`, - or `%Y-%m-%d %H:%M:%S.%f`. + or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds. description: A string describing the contents of the training dataset to improve discoverability for Data Scientists, defaults to empty string `""`. @@ -893,22 +893,22 @@ def train_validation_test_split( test_size: size of test set. Should be between 0 and 1. train_start: Start event time for the train split query. Strings should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`, - or `%Y-%m-%d %H:%M:%S.%f`. + or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds. train_end: End event time for the train split query. Strings should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`, - or `%Y-%m-%d %H:%M:%S.%f`. + or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds. validation_start: Start event time for the validation split query. Strings should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`, - or `%Y-%m-%d %H:%M:%S.%f`. + or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds. validation_end: End event time for the validation split query. Strings should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`, - or `%Y-%m-%d %H:%M:%S.%f`. + or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds. test_start: Start event time for the test split query. Strings should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`, - or `%Y-%m-%d %H:%M:%S.%f`. + or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds. test_end: End event time for the test split query. Strings should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`, - or `%Y-%m-%d %H:%M:%S.%f`. + or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds. description: A string describing the contents of the training dataset to improve discoverability for Data Scientists, defaults to empty string `""`. diff --git a/python/hsfs/training_dataset.py b/python/hsfs/training_dataset.py index 9e321fd38f..cba4400d55 100644 --- a/python/hsfs/training_dataset.py +++ b/python/hsfs/training_dataset.py @@ -238,8 +238,8 @@ def _append_time_split( TrainingDatasetSplit( name=split_name, split_type=TrainingDatasetSplit.TIME_SERIES_SPLIT, - start_time=util.convert_event_time_to_timestamp(start_time), - end_time=util.convert_event_time_to_timestamp(end_time), + start_time=start_time, + end_time=end_time, ) ) diff --git a/python/hsfs/util.py b/python/hsfs/util.py index 69dd16a0d7..c565ef09a4 100644 --- a/python/hsfs/util.py +++ b/python/hsfs/util.py @@ -180,7 +180,7 @@ def convert_event_time_to_timestamp(event_time): if event_time == 0: raise ValueError("Event time should be greater than 0.") # jdbc supports timestamp precision up to second only. - if len(str(event_time)) < 13: + if len(str(event_time)) <= 10: event_time = event_time * 1000 return event_time else: diff --git a/python/tests/core/test_feature_view_engine.py b/python/tests/core/test_feature_view_engine.py index 53da6a1568..09c7c7948e 100644 --- a/python/tests/core/test_feature_view_engine.py +++ b/python/tests/core/test_feature_view_engine.py @@ -134,7 +134,9 @@ def test_save_time_travel_query(self, mocker): ) fv = feature_view.FeatureView( - name="fv_name", query=query.as_of(1000), featurestore_id=feature_store_id + name="fv_name", + query=query.as_of(1000000000), + featurestore_id=feature_store_id, ) # Act @@ -449,7 +451,10 @@ def test_get_batch_query(self, mocker): # Act fv_engine.get_batch_query( - feature_view_obj=fv, start_time=1, end_time=2, with_label=False + feature_view_obj=fv, + start_time=1000000000, + end_time=2000000000, + with_label=False, ) # Assert @@ -486,7 +491,7 @@ def test_get_batch_query_string(self, mocker): # Act result = fv_engine.get_batch_query_string( - feature_view_obj=fv, start_time=1, end_time=2 + feature_view_obj=fv, start_time=1000000000, end_time=2000000000 ) # Assert @@ -526,7 +531,7 @@ def test_get_batch_query_string_pit_query(self, mocker): # Act result = fv_engine.get_batch_query_string( - feature_view_obj=fv, start_time=1, end_time=2 + feature_view_obj=fv, start_time=1000000000, end_time=2000000000 ) # Assert diff --git a/python/tests/engine/test_python.py b/python/tests/engine/test_python.py index 216fb6905f..2694d93cee 100644 --- a/python/tests/engine/test_python.py +++ b/python/tests/engine/test_python.py @@ -1505,7 +1505,7 @@ def test_prepare_transform_split_df_time_split_td_features(self, mocker): python_engine = python.Engine() - d = {"col1": [1, 2], "col2": [3, 4], "event_time": [1, 2]} + d = {"col1": [1, 2], "col2": [3, 4], "event_time": [1000000000, 2000000000]} df = pd.DataFrame(data=d) td = training_dataset.TrainingDataset( @@ -1516,9 +1516,9 @@ def test_prepare_transform_split_df_time_split_td_features(self, mocker): splits={"col1": None, "col2": None}, label=["f", "f_wrong"], id=10, - train_start=1, - train_end=2, - test_end=3, + train_start=1000000000, + train_end=2000000000, + test_end=3000000000, ) f = feature.Feature(name="col1", type="str") @@ -1570,7 +1570,7 @@ def test_prepare_transform_split_df_time_split_query_features(self, mocker): python_engine = python.Engine() - d = {"col1": [1, 2], "col2": [3, 4], "event_time": [1, 2]} + d = {"col1": [1, 2], "col2": [3, 4], "event_time": [1000000000, 2000000000]} df = pd.DataFrame(data=d) mock_python_engine_time_series_split.return_value = { @@ -1586,9 +1586,9 @@ def test_prepare_transform_split_df_time_split_query_features(self, mocker): splits={"col1": None, "col2": None}, label=["f", "f_wrong"], id=10, - train_start=1, - train_end=2, - test_end=3, + train_start=1000000000, + train_end=2000000000, + test_end=3000000000, ) fg = feature_group.FeatureGroup( @@ -1693,9 +1693,9 @@ def test_time_series_split(self, mocker): splits={"col1": None, "col2": None}, label=["f", "f_wrong"], id=10, - train_start=1, - train_end=2, - test_end=3, + train_start=1000000000, + train_end=2000000000, + test_end=3000000000, ) expected = {"train": df.loc[df["col1"] == 1], "test": df.loc[df["col1"] == 2]} @@ -1730,9 +1730,9 @@ def test_time_series_split_drop_event_time(self, mocker): splits={"col1": None, "col2": None}, label=["f", "f_wrong"], id=10, - train_start=1, - train_end=2, - test_end=3, + train_start=1000000000, + train_end=2000000000, + test_end=3000000000, ) expected = {"train": df.loc[df["col1"] == 1], "test": df.loc[df["col1"] == 2]} @@ -1758,7 +1758,7 @@ def test_time_series_split_event_time(self, mocker): python_engine = python.Engine() - d = {"col1": [1, 2], "col2": [3, 4], "event_time": [1000, 2000]} + d = {"col1": [1, 2], "col2": [3, 4], "event_time": [1000000000, 2000000000]} df = pd.DataFrame(data=d) td = training_dataset.TrainingDataset( @@ -1769,9 +1769,9 @@ def test_time_series_split_event_time(self, mocker): splits={"col1": None, "col2": None}, label=["f", "f_wrong"], id=10, - train_start=1, - train_end=2, - test_end=3, + train_start=1000000000, + train_end=2000000000, + test_end=3000000000, ) expected = {"train": df.loc[df["col1"] == 1], "test": df.loc[df["col1"] == 2]} diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index 65d1d821e2..8a430690c3 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -1971,9 +1971,9 @@ def test_split_df_time_split_td_features(self, mocker): data_format="CSV", featurestore_id=99, splits={"col1": 1}, - train_start=1, - train_end=2, - test_end=3, + train_start=1000000000, + train_end=2000000000, + test_end=3000000000, ) f = feature.Feature(name="col1", type="str") @@ -2024,9 +2024,9 @@ def test_split_df_time_split_query_features(self, mocker): data_format="CSV", featurestore_id=99, splits={"col1": 1}, - train_start=1, - train_end=2, - test_end=3, + train_start=1000000000, + train_end=2000000000, + test_end=3000000000, ) f = feature.Feature(name="col1", type="str") @@ -2106,12 +2106,16 @@ def test_time_series_split(self, mocker): featurestore_id=99, splits={"col1": None, "col2": None}, id=10, - train_start=1, - train_end=2, - test_end=3, + train_start=1000000000, + train_end=2000000000, + test_end=3000000000, ) - d = {"col_0": [1, 2], "col_1": ["test_1", "test_2"], "event_time": [1000, 2000]} + d = { + "col_0": [1, 2], + "col_1": ["test_1", "test_2"], + "event_time": [1000000000, 2000000000], + } df = pd.DataFrame(data=d) spark_df = spark_engine._spark_session.createDataFrame(df) @@ -2153,7 +2157,7 @@ def test_time_series_split_date(self, mocker): featurestore_id=99, splits={"col1": None, "col2": None}, id=10, - train_start=1, + train_start=1000000000, train_end=1488600000, test_end=1488718800, ) @@ -2213,7 +2217,7 @@ def test_time_series_split_timestamp(self, mocker): featurestore_id=99, splits={"col1": None, "col2": None}, id=10, - train_start=1, + train_start=1000000000, train_end=1488600000, test_end=1488718800, ) @@ -2260,6 +2264,57 @@ def test_time_series_split_timestamp(self, mocker): assert result[column].schema == expected[column].schema assert result[column].collect() == expected[column].collect() + def test_time_series_split_epoch_sec(self, mocker): + # Arrange + mocker.patch("hsfs.client.get_instance") + + spark_engine = spark.Engine() + + td = training_dataset.TrainingDataset( + name="test", + version=1, + data_format="CSV", + featurestore_id=99, + splits={"col1": None, "col2": None}, + id=10, + train_start=1000000000, + train_end=1488600001, + test_end=1488718801, + ) + + d = { + "col_0": [1, 2], + "col_1": ["test_1", "test_2"], + "event_time": [1488600000, 1488718800], + } + df = pd.DataFrame(data=d) + + spark_df = spark_engine._spark_session.createDataFrame(df) + + train_spark_df = spark_engine._spark_session.createDataFrame( + df.loc[df["col_0"] == 1] + ) + + test_spark_df = spark_engine._spark_session.createDataFrame( + df.loc[df["col_0"] == 2] + ) + + expected = {"train": train_spark_df, "test": test_spark_df} + + # Act + result = spark_engine._time_series_split( + training_dataset=td, + dataset=spark_df, + event_time="event_time", + drop_event_time=False, + ) + + # Assert + assert list(result) == list(expected) + for column in list(result): + assert result[column].schema == expected[column].schema + assert result[column].collect() == expected[column].collect() + def test_time_series_split_drop_event_time(self, mocker): # Arrange mocker.patch("hsfs.client.get_instance") @@ -2273,12 +2328,16 @@ def test_time_series_split_drop_event_time(self, mocker): featurestore_id=99, splits={"col1": None, "col2": None}, id=10, - train_start=1, - train_end=2, - test_end=3, + train_start=1000000000, + train_end=2000000000, + test_end=3000000000, ) - d = {"col_0": [1, 2], "col_1": ["test_1", "test_2"], "event_time": [1000, 2000]} + d = { + "col_0": [1, 2], + "col_1": ["test_1", "test_2"], + "event_time": [1000000000, 2000000000], + } df = pd.DataFrame(data=d) spark_df = spark_engine._spark_session.createDataFrame(df) diff --git a/python/tests/test_util.py b/python/tests/test_util.py index de67f7ab9b..6ef2d1072b 100644 --- a/python/tests/test_util.py +++ b/python/tests/test_util.py @@ -26,7 +26,7 @@ def test_get_hudi_datestr_from_timestamp(self): assert dt == "20220101000000000" def test_convert_event_time_to_timestamp_timestamp(self): - dt = util.convert_event_time_to_timestamp(1640995200000) + dt = util.convert_event_time_to_timestamp(1640995200) assert dt == 1640995200000 def test_convert_event_time_to_timestamp_datetime(self):