Skip to content

Commit 1c84c54

Browse files
authored
Merge branch 'master' into refactor-type-convert
2 parents eeb9c25 + 8c0d994 commit 1c84c54

File tree

8 files changed

+30
-6
lines changed

8 files changed

+30
-6
lines changed

java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1097,5 +1097,10 @@ protected String makeQueryName(String queryName, FeatureGroupBase featureGroup)
10971097
}
10981098
return queryName;
10991099
}
1100-
1100+
1101+
public void closeSparkSession() {
1102+
if (getSparkSession() != null) {
1103+
getSparkSession().stop();
1104+
}
1105+
}
11011106
}

python/hsfs/constructor/join.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def __init__(
4747
self._on = util.parse_features(on)
4848
self._left_on = util.parse_features(left_on)
4949
self._right_on = util.parse_features(right_on)
50-
self._join_type = join_type or self.INNER
50+
self._join_type = join_type or self.LEFT
5151
self._prefix = prefix
5252

5353
def to_dict(self) -> Dict[str, Any]:

python/hsfs/constructor/query.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ def join(
239239
on: Optional[List[str]] = None,
240240
left_on: Optional[List[str]] = None,
241241
right_on: Optional[List[str]] = None,
242-
join_type: Optional[str] = "inner",
242+
join_type: Optional[str] = "left",
243243
prefix: Optional[str] = None,
244244
) -> "Query":
245245
"""Join Query with another Query.
@@ -769,7 +769,7 @@ def featuregroups(
769769
"""List of feature groups used in the query"""
770770
featuregroups = {self._left_feature_group}
771771
for join_obj in self.joins:
772-
featuregroups.add(join_obj.query._left_feature_group)
772+
self._fg_rec_add(join_obj, featuregroups)
773773
return list(featuregroups)
774774

775775
@property
@@ -809,6 +809,18 @@ def get_feature(self, feature_name: str) -> "Feature":
809809
"""
810810
return self._get_feature_by_name(feature_name)[0]
811811

812+
def _fg_rec_add(self, join_object, featuregroups):
813+
"""
814+
Recursively get a feature groups from nested join and add to featuregroups list.
815+
816+
# Arguments
817+
join_object: `Join object`.
818+
"""
819+
if len(join_object.query.joins) > 0:
820+
for nested_join in join_object.query.joins:
821+
self._fg_rec_add(nested_join, featuregroups)
822+
featuregroups.add(join_object.query._left_feature_group)
823+
812824
def __getattr__(self, name: str) -> Any:
813825
try:
814826
return self.__getitem__(name)

python/hsfs/engine/spark.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1190,7 +1190,7 @@ def add_cols_to_delta_table(self, feature_group, new_features):
11901190
"spark.databricks.delta.schema.autoMerge.enabled", "true"
11911191
).save(feature_group.location)
11921192

1193-
def _apply_transformation_function(self, transformation_functions, dataset):
1193+
def _apply_transformation_function(self, transformation_functions, dataset, **kwargs):
11941194
# generate transformation function expressions
11951195
transformed_feature_names = []
11961196
transformation_fn_expressions = []

python/tests/constructor/test_join.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def test_from_response_json_basic_info(self, mocker, backend_fixtures):
5555
assert len(j._on) == 0
5656
assert len(j._left_on) == 0
5757
assert len(j._right_on) == 0
58-
assert j._join_type == "INNER"
58+
assert j._join_type == "LEFT"
5959
assert j._prefix is None
6060

6161
def test_from_response_json_left_join(self, mocker, backend_fixtures):

utils/java/src/main/java/com/logicalclocks/utils/MainClass.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,5 +118,9 @@ public static void main(String[] args) throws Exception {
118118
if (op.equals("offline_fg_materialization") || op.equals("offline_fg_backfill")) {
119119
SparkEngine.getInstance().streamToHudiTable(streamFeatureGroup, writeOptions);
120120
}
121+
122+
LOGGER.info("Closing spark session...");
123+
SparkEngine.getInstance().closeSparkSession();
124+
System.exit(0);
121125
}
122126
}

utils/python/hdfs-file-operations.py

Whitespace-only changes.

utils/python/hsfs_utils.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,3 +292,6 @@ def parse_isoformat_date(da: str) -> datetime:
292292
import_fg(job_conf)
293293
elif args.op == "run_feature_monitoring":
294294
run_feature_monitoring(job_conf)
295+
296+
if spark is not None:
297+
spark.stop()

0 commit comments

Comments
 (0)