Skip to content

Commit

Permalink
[FSTORE-414] feature_view.create_train_test_split returns empty df (#854
Browse files Browse the repository at this point in the history
)

bug fix feature_view.create_train_test_split returns empty df
  • Loading branch information
davitbzh authored Nov 15, 2022
1 parent 23e1fb4 commit 6c70614
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 83 deletions.
30 changes: 24 additions & 6 deletions java/src/main/java/com/logicalclocks/hsfs/engine/SparkEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.DataStreamReader;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.StreamingQuery;
Expand All @@ -57,6 +58,7 @@
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.BooleanType;
import org.apache.spark.sql.types.ByteType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.DoubleType;
Expand Down Expand Up @@ -303,27 +305,43 @@ private Dataset<Row>[] timeSeriesSplit(TrainingDataset trainingDataset, Query qu
int i = 0;
for (Split split : splits) {
if (dataset.count() > 0) {
String eventTime = query.getLeftFeatureGroup().getEventTime();
String eventTimeType =
query.getLeftFeatureGroup().getFeature(query.getLeftFeatureGroup().getEventTime()).getType();
query.getLeftFeatureGroup().getFeature(eventTime).getType();

if (BIGINT.getType().equals(eventTimeType)) {
String tmpEventTime = eventTime + "_hopsworks_tmp";
sparkSession.sqlContext()
.udf()
.register("checkEpochUDF", (Long input) -> {
if (Long.toString(input).length() > 10) {
input = input / 1000;
return input.longValue();
} else {
return input;
}
}, DataTypes.LongType);
dataset = dataset.withColumn(tmpEventTime,functions.callUDF(
"checkEpochUDF", dataset.col(eventTime)));

// event time in second. `getTime()` return in millisecond.
datasetSplits[i] = dataset.filter(
String.format(
"%d/1000 <= `%s` and `%s` < %d/1000",
split.getStartTime().getTime(),
query.getLeftFeatureGroup().getEventTime(),
query.getLeftFeatureGroup().getEventTime(),
tmpEventTime,
tmpEventTime,
split.getEndTime().getTime()
)
);
).drop(tmpEventTime);
} else if (DATE.getType().equals(eventTimeType) || TIMESTAMP.getType().equals(eventTimeType)) {
// unix_timestamp return in second. `getTime()` return in millisecond.
datasetSplits[i] = dataset.filter(
String.format(
"%d/1000 <= unix_timestamp(`%s`) and unix_timestamp(`%s`) < %d/1000",
split.getStartTime().getTime(),
query.getLeftFeatureGroup().getEventTime(),
query.getLeftFeatureGroup().getEventTime(),
eventTime,
eventTime,
split.getEndTime().getTime()
)
);
Expand Down
15 changes: 7 additions & 8 deletions python/hsfs/engine/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
concat,
col,
from_json,
unix_timestamp,
udf,
)
from pyspark.sql.avro.functions import from_avro, to_avro
from pyspark.sql.types import (
Expand Down Expand Up @@ -492,14 +492,13 @@ def _random_split(self, dataset, training_dataset):
def _time_series_split(
self, training_dataset, dataset, event_time, drop_event_time=False
):
result_dfs = {}
ts_type = dataset.select(event_time).dtypes[0][1]
ts_col = (
unix_timestamp(col(event_time)) * 1000
if ts_type in ["date", "timestamp"]
# jdbc supports timestamp precision up to second only.
else col(event_time) * 1000
# registering the UDF
_convert_event_time_to_timestamp = udf(
util.convert_event_time_to_timestamp, LongType()
)

result_dfs = {}
ts_col = _convert_event_time_to_timestamp(col(event_time))
for split in training_dataset.splits:
result_df = dataset.filter(ts_col >= split.start_time).filter(
ts_col < split.end_time
Expand Down
54 changes: 27 additions & 27 deletions python/hsfs/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,9 @@ def get_batch_query(
# Arguments
start_time: Start event time for the batch query. Optional. Strings should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`,
`%Y-%m-%d %H:%M:%S`, or `%Y-%m-%d %H:%M:%S.%f`.
`%Y-%m-%d %H:%M:%S`, or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
end_time: End event time for the batch query. Optional. Strings should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`,
`%Y-%m-%d %H:%M:%S`, or `%Y-%m-%d %H:%M:%S.%f`.
`%Y-%m-%d %H:%M:%S`, or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
# Returns
`str`: batch query
Expand Down Expand Up @@ -272,10 +272,10 @@ def get_batch_data(
# Arguments
start_time: Start event time for the batch query. Optional. Strings should be
formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
end_time: End event time for the batch query. Optional. Strings should be
formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
read_options: User provided read options. Defaults to `{}`.
"""

Expand Down Expand Up @@ -336,10 +336,10 @@ def create_training_data(
# Arguments
start_time: Start event time for the training dataset query. Optional. Strings should
be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
end_time: End event time for the training dataset query. Optional. Strings should
be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
storage_connector: Storage connector defining the sink location for the
training dataset, defaults to `None`, and materializes training dataset
on HopsFS.
Expand Down Expand Up @@ -444,16 +444,16 @@ def create_train_test_split(
test_size: size of test set.
train_start: Start event time for the train split query. Strings should
be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
train_end: End event time for the train split query. Strings should
be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
test_start: Start event time for the test split query. Strings should
be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
test_end: End event time for the test split query. Strings should
be formatted in one of the following ormats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
storage_connector: Storage connector defining the sink location for the
training dataset, defaults to `None`, and materializes training dataset
on HopsFS.
Expand Down Expand Up @@ -570,22 +570,22 @@ def create_train_validation_test_split(
test_size: size of test set.
train_start: Start event time for the train split query. Strings should
be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
train_end: End event time for the train split query. Strings should
be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
validation_start: Start event time for the validation split query. Strings
should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
validation_end: End event time for the validation split query. Strings
should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
test_start: Start event time for the test split query. Strings should
be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
test_end: End event time for the test split query. Strings should
be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
storage_connector: Storage connector defining the sink location for the
training dataset, defaults to `None`, and materializes training dataset
on HopsFS.
Expand Down Expand Up @@ -718,11 +718,11 @@ def training_data(
start_time: Start event time for the training dataset query. Strings should
be formatted in one of the following
formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
end_time: End event time for the training dataset query. Strings should be
formatted in one of the following
formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
description: A string describing the contents of the training dataset to
improve discoverability for Data Scientists, defaults to empty string
`""`.
Expand Down Expand Up @@ -794,13 +794,13 @@ def train_test_split(
or `%Y-%m-%d %H:%M:%S.%f`.
train_end: End event time for the train split query. Strings should
be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
test_start: Start event time for the test split query. Strings should
be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
test_end: End event time for the test split query. Strings should
be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
description: A string describing the contents of the training dataset to
improve discoverability for Data Scientists, defaults to empty string
`""`.
Expand Down Expand Up @@ -893,22 +893,22 @@ def train_validation_test_split(
test_size: size of test set. Should be between 0 and 1.
train_start: Start event time for the train split query. Strings should
be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
train_end: End event time for the train split query. Strings should
be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
validation_start: Start event time for the validation split query. Strings
should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
validation_end: End event time for the validation split query. Strings
should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
test_start: Start event time for the test split query. Strings should
be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
test_end: End event time for the test split query. Strings should
be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
description: A string describing the contents of the training dataset to
improve discoverability for Data Scientists, defaults to empty string
`""`.
Expand Down
4 changes: 2 additions & 2 deletions python/hsfs/training_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,8 @@ def _append_time_split(
TrainingDatasetSplit(
name=split_name,
split_type=TrainingDatasetSplit.TIME_SERIES_SPLIT,
start_time=util.convert_event_time_to_timestamp(start_time),
end_time=util.convert_event_time_to_timestamp(end_time),
start_time=start_time,
end_time=end_time,
)
)

Expand Down
2 changes: 1 addition & 1 deletion python/hsfs/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def convert_event_time_to_timestamp(event_time):
if event_time == 0:
raise ValueError("Event time should be greater than 0.")
# jdbc supports timestamp precision up to second only.
if len(str(event_time)) < 13:
if len(str(event_time)) <= 10:
event_time = event_time * 1000
return event_time
else:
Expand Down
13 changes: 9 additions & 4 deletions python/tests/core/test_feature_view_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@ def test_save_time_travel_query(self, mocker):
)

fv = feature_view.FeatureView(
name="fv_name", query=query.as_of(1000), featurestore_id=feature_store_id
name="fv_name",
query=query.as_of(1000000000),
featurestore_id=feature_store_id,
)

# Act
Expand Down Expand Up @@ -449,7 +451,10 @@ def test_get_batch_query(self, mocker):

# Act
fv_engine.get_batch_query(
feature_view_obj=fv, start_time=1, end_time=2, with_label=False
feature_view_obj=fv,
start_time=1000000000,
end_time=2000000000,
with_label=False,
)

# Assert
Expand Down Expand Up @@ -486,7 +491,7 @@ def test_get_batch_query_string(self, mocker):

# Act
result = fv_engine.get_batch_query_string(
feature_view_obj=fv, start_time=1, end_time=2
feature_view_obj=fv, start_time=1000000000, end_time=2000000000
)

# Assert
Expand Down Expand Up @@ -526,7 +531,7 @@ def test_get_batch_query_string_pit_query(self, mocker):

# Act
result = fv_engine.get_batch_query_string(
feature_view_obj=fv, start_time=1, end_time=2
feature_view_obj=fv, start_time=1000000000, end_time=2000000000
)

# Assert
Expand Down
Loading

0 comments on commit 6c70614

Please sign in to comment.