Skip to content

Commit 4392eeb

Browse files
committed
Merge branch 'master' into FSTORE-1185
2 parents 7972716 + 8a54daa commit 4392eeb

File tree

16 files changed

+413
-194
lines changed

16 files changed

+413
-194
lines changed

java/beam/src/main/java/com/logicalclocks/hsfs/beam/engine/BeamEngine.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.logicalclocks.hsfs.beam.StreamFeatureGroup;
2525
import com.logicalclocks.hsfs.metadata.DatasetApi;
2626
import com.logicalclocks.hsfs.engine.EngineBase;
27+
import com.logicalclocks.hsfs.engine.FeatureGroupUtils;
2728
import com.logicalclocks.hsfs.metadata.HopsworksInternalClient;
2829
import org.apache.avro.Schema;
2930

@@ -34,6 +35,7 @@
3435

3536
public class BeamEngine extends EngineBase {
3637
private static BeamEngine INSTANCE = null;
38+
private FeatureGroupUtils featureGroupUtils = new FeatureGroupUtils();
3739

3840
public static synchronized BeamEngine getInstance() throws FeatureStoreException {
3941
if (INSTANCE == null) {
@@ -71,7 +73,7 @@ public String addFile(String filePath) throws IOException, FeatureStoreException
7173
}
7274
String targetPath = System.getProperty("java.io.tmpdir") + filePath.substring(filePath.lastIndexOf("/"));
7375
try (FileOutputStream outputStream = new FileOutputStream(targetPath)) {
74-
outputStream.write(DatasetApi.readContent(filePath, "HIVEDB"));
76+
outputStream.write(DatasetApi.readContent(filePath, featureGroupUtils.getDatasetType(filePath)));
7577
}
7678
return targetPath;
7779
}

java/hsfs/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupUtils.java

+7
Original file line numberDiff line numberDiff line change
@@ -240,4 +240,11 @@ private void checkListdiff(List<String> primaryPartitionKeyNames, List<String> f
240240
public Subject getSubject(FeatureGroupBase featureGroup) throws FeatureStoreException, IOException {
241241
return kafkaApi.getSubject(featureGroup.getFeatureStore(), getFgName(featureGroup));
242242
}
243+
244+
public String getDatasetType(String path) {
245+
if (Pattern.compile("^(?:hdfs://|)/apps/hive/warehouse/*").matcher(path).find()) {
246+
return "HIVEDB";
247+
}
248+
return "DATASET";
249+
}
243250
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright (c) 2024. Hopsworks AB
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
*
14+
* See the License for the specific language governing permissions and limitations under the License.
15+
*
16+
*/
17+
18+
package com.logicalclocks.hsfs.engine;
19+
20+
import org.junit.Assert;
21+
import org.junit.jupiter.api.Test;
22+
23+
import com.fasterxml.jackson.core.JsonProcessingException;
24+
25+
public class TestFeatureGroupUtils {
26+
27+
private FeatureGroupUtils featureGroupUtils = new FeatureGroupUtils();
28+
29+
@Test
30+
void testGetDatasetTypeHIVEDB() throws JsonProcessingException {
31+
// Arrange
32+
String path = "/apps/hive/warehouse/temp_featurestore.db/storage_connector_resources/kafka__tstore.jks";
33+
34+
// Act
35+
String databaseType = featureGroupUtils.getDatasetType(path);
36+
37+
// Assert
38+
Assert.assertEquals("HIVEDB", databaseType);
39+
}
40+
41+
@Test
42+
void testGetDatasetTypeHIVEDBWithDfs() throws JsonProcessingException {
43+
// Arrange
44+
String path = "hdfs:///apps/hive/warehouse/temp_featurestore.db/storage_connector_resources/kafka__tstore.jks";
45+
46+
// Act
47+
String databaseType = featureGroupUtils.getDatasetType(path);
48+
49+
// Assert
50+
Assert.assertEquals("HIVEDB", databaseType);
51+
}
52+
53+
@Test
54+
void testGetDatasetTypeDATASET() throws JsonProcessingException {
55+
// Arrange
56+
String path = "/Projects/temp/Resources/kafka__tstore.jks";
57+
58+
// Act
59+
String databaseType = featureGroupUtils.getDatasetType(path);
60+
61+
// Assert
62+
Assert.assertEquals("DATASET", databaseType);
63+
}
64+
65+
@Test
66+
void testGetDatasetTypeDATASETWithDfs() throws JsonProcessingException {
67+
// Arrange
68+
String path = "hdfs:///Projects/temp/Resources/kafka__tstore.jks";
69+
70+
// Act
71+
String databaseType = featureGroupUtils.getDatasetType(path);
72+
73+
// Assert
74+
Assert.assertEquals("DATASET", databaseType);
75+
}
76+
}

java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java

+4-6
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,6 @@
123123
public class SparkEngine extends EngineBase {
124124

125125
private final StorageConnectorUtils storageConnectorUtils = new StorageConnectorUtils();
126-
127126
private FeatureGroupUtils featureGroupUtils = new FeatureGroupUtils();
128127

129128
private static SparkEngine INSTANCE = null;
@@ -143,7 +142,6 @@ public static void setInstance(SparkEngine sparkEngine) {
143142
@Getter
144143
private SparkSession sparkSession;
145144

146-
private FeatureGroupUtils utils = new FeatureGroupUtils();
147145
private HudiEngine hudiEngine = new HudiEngine();
148146

149147
private SparkEngine() {
@@ -657,7 +655,7 @@ private Dataset<Row> onlineFeatureGroupToAvro(FeatureGroupBase featureGroupBase,
657655

658656
public void writeEmptyDataframe(FeatureGroupBase featureGroup)
659657
throws IOException, FeatureStoreException, ParseException {
660-
String fgTableName = utils.getTableName(featureGroup);
658+
String fgTableName = featureGroupUtils.getTableName(featureGroup);
661659
Dataset emptyDf = sparkSession.table(fgTableName).limit(0);
662660
writeOfflineDataframe(featureGroup, emptyDf, HudiOperationType.UPSERT, new HashMap<>(), null);
663661
}
@@ -681,8 +679,8 @@ private void writeSparkDataset(FeatureGroupBase featureGroup, Dataset<Row> datas
681679
.mode(SaveMode.Append)
682680
// write options cannot be null
683681
.options(writeOptions == null ? new HashMap<>() : writeOptions)
684-
.partitionBy(utils.getPartitionColumns(featureGroup))
685-
.saveAsTable(utils.getTableName(featureGroup));
682+
.partitionBy(featureGroupUtils.getPartitionColumns(featureGroup))
683+
.saveAsTable(featureGroupUtils.getTableName(featureGroup));
686684
}
687685

688686
public String profile(Dataset<Row> df, List<String> restrictToColumns, Boolean correlation,
@@ -935,7 +933,7 @@ public String addFile(String filePath) throws FeatureStoreException {
935933
java.nio.file.Path targetPath = Paths.get(SparkFiles.getRootDirectory(), fileName);
936934

937935
try (FileOutputStream outputStream = new FileOutputStream(targetPath.toString())) {
938-
outputStream.write(DatasetApi.readContent(filePath, "HIVEDB"));
936+
outputStream.write(DatasetApi.readContent(filePath, featureGroupUtils.getDatasetType(filePath)));
939937
} catch (IOException e) {
940938
throw new FeatureStoreException("Error setting up file: " + filePath, e);
941939
}

python/hsfs/core/job.py

+31-4
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
#
1616

1717
import humps
18-
from hsfs import engine
18+
import time
19+
20+
from hsfs import util
1921
from hsfs.client.exceptions import FeatureStoreException
2022
from hsfs.core import job_api
2123
from hsfs.core import job_schedule as js
@@ -58,7 +60,7 @@ def __init__(
5860
@classmethod
5961
def from_response_json(cls, json_dict):
6062
# Job config should not be decamelized when updated
61-
config = json_dict.pop("config")
63+
config = json_dict.pop("config", None)
6264
json_decamelized = humps.decamelize(json_dict)
6365
json_decamelized["config"] = config
6466
return cls(**json_decamelized)
@@ -116,10 +118,10 @@ def run(self, args: str = None, await_termination: bool = True):
116118
self._job_api.launch(self.name, args=args)
117119
print(
118120
"Job started successfully, you can follow the progress at \n{}".format(
119-
engine.get_instance().get_job_url(self.href)
121+
util.get_job_url(self.href)
120122
)
121123
)
122-
engine.get_instance().wait_for_job(self, await_termination=await_termination)
124+
self._wait_for_job(await_termination=await_termination)
123125

124126
def get_state(self):
125127
"""Get the state of the job.
@@ -223,3 +225,28 @@ def _update_schedule(self, job_schedule):
223225
self._name, job_schedule.to_dict()
224226
)
225227
return self._job_schedule
228+
229+
def _wait_for_job(self, await_termination=True):
230+
# If the user passed the wait_for_job option consider it,
231+
# otherwise use the default True
232+
while await_termination:
233+
executions = self._job_api.last_execution(self)
234+
if len(executions) > 0:
235+
execution = executions[0]
236+
else:
237+
return
238+
239+
if execution.final_status.lower() == "succeeded":
240+
return
241+
elif execution.final_status.lower() == "failed":
242+
raise FeatureStoreException(
243+
"The Hopsworks Job failed, use the Hopsworks UI to access the job logs"
244+
)
245+
elif execution.final_status.lower() == "killed":
246+
raise FeatureStoreException("The Hopsworks Job was stopped")
247+
elif execution.state.lower() == "framework_failure":
248+
raise FeatureStoreException(
249+
"The Hopsworks Job monitoring failed, could not determine the final status"
250+
)
251+
252+
time.sleep(3)

python/hsfs/engine/python.py

+9-52
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import pandas as pd
1818
import numpy as np
1919
import boto3
20-
import time
2120
import re
2221
import ast
2322
import warnings
@@ -39,7 +38,6 @@
3938

4039
from io import BytesIO
4140
from pyhive import hive
42-
from urllib.parse import urlparse
4341
from typing import TypeVar, Optional, Dict, Any
4442
from confluent_kafka import Consumer, Producer, TopicPartition, KafkaError
4543
from tqdm.auto import tqdm
@@ -65,7 +63,7 @@
6563
)
6664
from hsfs.constructor import query
6765
from hsfs.training_dataset_split import TrainingDatasetSplit
68-
from hsfs.client import exceptions, hopsworks
66+
from hsfs.client import hopsworks
6967
from hsfs.feature_group import FeatureGroup
7068
from thrift.transport.TTransport import TTransportException
7169
from pyhive.exc import OperationalError
@@ -384,11 +382,11 @@ def profile_by_spark(self, metadata_instance):
384382
job = stat_api.compute(metadata_instance)
385383
print(
386384
"Statistics Job started successfully, you can follow the progress at \n{}".format(
387-
self.get_job_url(job.href)
385+
util.get_job_url(job.href)
388386
)
389387
)
390388

391-
self.wait_for_job(job)
389+
job._wait_for_job()
392390
return job
393391

394392
def profile(
@@ -807,15 +805,13 @@ def write_training_dataset(
807805
td_job = td_api.compute(training_dataset, td_app_conf)
808806
print(
809807
"Training dataset job started successfully, you can follow the progress at \n{}".format(
810-
self.get_job_url(td_job.href)
808+
util.get_job_url(td_job.href)
811809
)
812810
)
813811

814-
self.wait_for_job(
815-
td_job,
816-
await_termination=user_write_options.get("wait_for_job", True),
812+
td_job._wait_for_job(
813+
await_termination=user_write_options.get("wait_for_job", True)
817814
)
818-
819815
return td_job
820816

821817
def _create_hive_connection(self, feature_store, hive_config=None):
@@ -882,22 +878,6 @@ def save_empty_dataframe(self, feature_group):
882878
"""Wrapper around save_dataframe in order to provide no-op."""
883879
pass
884880

885-
def get_job_url(self, href: str):
886-
"""Use the endpoint returned by the API to construct the UI url for jobs
887-
888-
Args:
889-
href (str): the endpoint returned by the API
890-
"""
891-
url = urlparse(href)
892-
url_splits = url.path.split("/")
893-
project_id = url_splits[4]
894-
job_name = url_splits[6]
895-
ui_url = url._replace(
896-
path="p/{}/jobs/named/{}/executions".format(project_id, job_name)
897-
)
898-
ui_url = client.get_instance().replace_public_host(ui_url)
899-
return ui_url.geturl()
900-
901881
def _get_app_options(self, user_write_options={}):
902882
"""
903883
Generate the options that should be passed to the application doing the ingestion.
@@ -916,31 +896,6 @@ def _get_app_options(self, user_write_options={}):
916896
spark_job_configuration=spark_job_configuration,
917897
)
918898

919-
def wait_for_job(self, job, await_termination=True):
920-
# If the user passed the wait_for_job option consider it,
921-
# otherwise use the default True
922-
while await_termination:
923-
executions = self._job_api.last_execution(job)
924-
if len(executions) > 0:
925-
execution = executions[0]
926-
else:
927-
return
928-
929-
if execution.final_status.lower() == "succeeded":
930-
return
931-
elif execution.final_status.lower() == "failed":
932-
raise exceptions.FeatureStoreException(
933-
"The Hopsworks Job failed, use the Hopsworks UI to access the job logs"
934-
)
935-
elif execution.final_status.lower() == "killed":
936-
raise exceptions.FeatureStoreException("The Hopsworks Job was stopped")
937-
elif execution.state.lower() == "framework_failure":
938-
raise exceptions.FeatureStoreException(
939-
"The Hopsworks Job monitoring failed, could not determine the final status"
940-
)
941-
942-
time.sleep(3)
943-
944899
def add_file(self, file):
945900
if not file:
946901
return file
@@ -951,7 +906,9 @@ def add_file(self, file):
951906

952907
local_file = os.path.join("/tmp", os.path.basename(file))
953908
if not os.path.exists(local_file):
954-
content_stream = self._dataset_api.read_content(file, "HIVEDB")
909+
content_stream = self._dataset_api.read_content(
910+
file, util.get_dataset_type(file)
911+
)
955912
bytesio_object = BytesIO(content_stream.content)
956913
# Write the stuff
957914
with open(local_file, "wb") as f:

python/hsfs/engine/spark.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -792,7 +792,7 @@ def add_file(self, file):
792792
if isinstance(client.get_instance(), client.external.Client):
793793
tmp_file = os.path.join(SparkFiles.getRootDirectory(), file_name)
794794
print("Reading key file from storage connector.")
795-
response = self._dataset_api.read_content(tmp_file, "HIVEDB")
795+
response = self._dataset_api.read_content(file, util.get_dataset_type(file))
796796

797797
with open(tmp_file, "wb") as f:
798798
f.write(response.content)

python/hsfs/serving_key.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,13 @@ def to_dict(self):
6464
"feature_name": self._feature_name,
6565
"join_index": self._join_index,
6666
"feature_group_id": (
67-
self._feature_group["id"] if self._feature_group is not None else None
67+
self._feature_group.id if self._feature_group is not None else None
68+
),
69+
"feature_group_name": (
70+
self._feature_group.name if self._feature_group is not None else None
71+
),
72+
"feature_group_version": (
73+
self._feature_group.version if self._feature_group is not None else None
6874
),
6975
"required": self._required,
7076
"prefix": self._prefix,

0 commit comments

Comments
 (0)