Skip to content

Commit 26aa327

Browse files
committed
[FSTORE-1265] feature_group.materialization_job.run() fails with Spar… (#1234)
1 parent 9ee3569 commit 26aa327

File tree

7 files changed

+176
-180
lines changed

7 files changed

+176
-180
lines changed

python/hsfs/core/job.py

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
#
1616

1717
import humps
18-
from hsfs import engine
18+
import time
19+
20+
from hsfs import util
1921
from hsfs.client.exceptions import FeatureStoreException
2022
from hsfs.core import job_api
2123
from hsfs.core import job_schedule as js
@@ -58,7 +60,7 @@ def __init__(
5860
@classmethod
5961
def from_response_json(cls, json_dict):
6062
# Job config should not be decamelized when updated
61-
config = json_dict.pop("config")
63+
config = json_dict.pop("config", None)
6264
json_decamelized = humps.decamelize(json_dict)
6365
json_decamelized["config"] = config
6466
return cls(**json_decamelized)
@@ -116,10 +118,10 @@ def run(self, args: str = None, await_termination: bool = True):
116118
self._job_api.launch(self.name, args=args)
117119
print(
118120
"Job started successfully, you can follow the progress at \n{}".format(
119-
engine.get_instance().get_job_url(self.href)
121+
util.get_job_url(self.href)
120122
)
121123
)
122-
engine.get_instance().wait_for_job(self, await_termination=await_termination)
124+
self._wait_for_job(await_termination=await_termination)
123125

124126
def get_state(self):
125127
"""Get the state of the job.
@@ -223,3 +225,28 @@ def _update_schedule(self, job_schedule):
223225
self._name, job_schedule.to_dict()
224226
)
225227
return self._job_schedule
228+
229+
def _wait_for_job(self, await_termination=True):
230+
# If the user passed the wait_for_job option consider it,
231+
# otherwise use the default True
232+
while await_termination:
233+
executions = self._job_api.last_execution(self)
234+
if len(executions) > 0:
235+
execution = executions[0]
236+
else:
237+
return
238+
239+
if execution.final_status.lower() == "succeeded":
240+
return
241+
elif execution.final_status.lower() == "failed":
242+
raise FeatureStoreException(
243+
"The Hopsworks Job failed, use the Hopsworks UI to access the job logs"
244+
)
245+
elif execution.final_status.lower() == "killed":
246+
raise FeatureStoreException("The Hopsworks Job was stopped")
247+
elif execution.state.lower() == "framework_failure":
248+
raise FeatureStoreException(
249+
"The Hopsworks Job monitoring failed, could not determine the final status"
250+
)
251+
252+
time.sleep(3)

python/hsfs/engine/python.py

Lines changed: 6 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import pandas as pd
1818
import numpy as np
1919
import boto3
20-
import time
2120
import re
2221
import ast
2322
import warnings
@@ -39,7 +38,6 @@
3938

4039
from io import BytesIO
4140
from pyhive import hive
42-
from urllib.parse import urlparse
4341
from typing import TypeVar, Optional, Dict, Any
4442
from confluent_kafka import Consumer, Producer, TopicPartition, KafkaError
4543
from tqdm.auto import tqdm
@@ -65,7 +63,7 @@
6563
)
6664
from hsfs.constructor import query
6765
from hsfs.training_dataset_split import TrainingDatasetSplit
68-
from hsfs.client import exceptions, hopsworks
66+
from hsfs.client import hopsworks
6967
from hsfs.feature_group import FeatureGroup
7068
from thrift.transport.TTransport import TTransportException
7169
from pyhive.exc import OperationalError
@@ -384,11 +382,11 @@ def profile_by_spark(self, metadata_instance):
384382
job = stat_api.compute(metadata_instance)
385383
print(
386384
"Statistics Job started successfully, you can follow the progress at \n{}".format(
387-
self.get_job_url(job.href)
385+
util.get_job_url(job.href)
388386
)
389387
)
390388

391-
self.wait_for_job(job)
389+
job._wait_for_job()
392390
return job
393391

394392
def profile(
@@ -807,15 +805,13 @@ def write_training_dataset(
807805
td_job = td_api.compute(training_dataset, td_app_conf)
808806
print(
809807
"Training dataset job started successfully, you can follow the progress at \n{}".format(
810-
self.get_job_url(td_job.href)
808+
util.get_job_url(td_job.href)
811809
)
812810
)
813811

814-
self.wait_for_job(
815-
td_job,
816-
await_termination=user_write_options.get("wait_for_job", True),
812+
td_job._wait_for_job(
813+
await_termination=user_write_options.get("wait_for_job", True)
817814
)
818-
819815
return td_job
820816

821817
def _create_hive_connection(self, feature_store, hive_config=None):
@@ -882,22 +878,6 @@ def save_empty_dataframe(self, feature_group):
882878
"""Wrapper around save_dataframe in order to provide no-op."""
883879
pass
884880

885-
def get_job_url(self, href: str):
886-
"""Use the endpoint returned by the API to construct the UI url for jobs
887-
888-
Args:
889-
href (str): the endpoint returned by the API
890-
"""
891-
url = urlparse(href)
892-
url_splits = url.path.split("/")
893-
project_id = url_splits[4]
894-
job_name = url_splits[6]
895-
ui_url = url._replace(
896-
path="p/{}/jobs/named/{}/executions".format(project_id, job_name)
897-
)
898-
ui_url = client.get_instance().replace_public_host(ui_url)
899-
return ui_url.geturl()
900-
901881
def _get_app_options(self, user_write_options={}):
902882
"""
903883
Generate the options that should be passed to the application doing the ingestion.
@@ -916,27 +896,6 @@ def _get_app_options(self, user_write_options={}):
916896
spark_job_configuration=spark_job_configuration,
917897
)
918898

919-
def wait_for_job(self, job, await_termination=True):
920-
# If the user passed the wait_for_job option consider it,
921-
# otherwise use the default True
922-
while await_termination:
923-
executions = self._job_api.last_execution(job)
924-
if len(executions) > 0:
925-
execution = executions[0]
926-
else:
927-
return
928-
929-
if execution.final_status.lower() == "succeeded":
930-
return
931-
elif execution.final_status.lower() == "failed":
932-
raise exceptions.FeatureStoreException(
933-
"The Hopsworks Job failed, use the Hopsworks UI to access the job logs"
934-
)
935-
elif execution.final_status.lower() == "killed":
936-
raise exceptions.FeatureStoreException("The Hopsworks Job was stopped")
937-
938-
time.sleep(3)
939-
940899
def add_file(self, file):
941900
if not file:
942901
return file

python/hsfs/util.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,23 @@ def verify_attribute_key_names(feature_group_obj, external_feature_group=False):
335335
)
336336

337337

338+
def get_job_url(href: str):
339+
"""Use the endpoint returned by the API to construct the UI url for jobs
340+
341+
Args:
342+
href (str): the endpoint returned by the API
343+
"""
344+
url = urlparse(href)
345+
url_splits = url.path.split("/")
346+
project_id = url_splits[4]
347+
job_name = url_splits[6]
348+
ui_url = url._replace(
349+
path="p/{}/jobs/named/{}/executions".format(project_id, job_name)
350+
)
351+
ui_url = client.get_instance().replace_public_host(ui_url)
352+
return ui_url.geturl()
353+
354+
338355
def translate_legacy_spark_type(output_type):
339356
if output_type == "StringType()":
340357
return "STRING"

python/tests/core/test_job.py

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515
#
1616

1717

18-
from hsfs.core import job
18+
import pytest
19+
20+
from hsfs.core import job, execution
21+
from hsfs.client import exceptions
1922

2023

2124
class TestJob:
@@ -44,3 +47,87 @@ def test_from_response_json_empty(self, backend_fixtures):
4447
assert j.name == "test_name"
4548
assert j.executions is None
4649
assert j.href is None
50+
51+
def test_wait_for_job(self, mocker, backend_fixtures):
52+
# Arrange
53+
mock_job_api = mocker.patch("hsfs.core.job_api.JobApi")
54+
55+
json = backend_fixtures["job"]["get"]["response"]
56+
j = job.Job.from_response_json(json)
57+
58+
# Act
59+
j._wait_for_job()
60+
61+
# Assert
62+
assert mock_job_api.return_value.last_execution.call_count == 1
63+
64+
def test_wait_for_job_wait_for_job_false(self, mocker, backend_fixtures):
65+
# Arrange
66+
mock_job_api = mocker.patch("hsfs.core.job_api.JobApi")
67+
68+
json = backend_fixtures["job"]["get"]["response"]
69+
j = job.Job.from_response_json(json)
70+
71+
# Act
72+
j._wait_for_job(False)
73+
74+
# Assert
75+
assert mock_job_api.return_value.last_execution.call_count == 0
76+
77+
def test_wait_for_job_final_status_succeeded(self, mocker, backend_fixtures):
78+
# Arrange
79+
mock_job_api = mocker.patch("hsfs.core.job_api.JobApi")
80+
81+
json = backend_fixtures["job"]["get"]["response"]
82+
j = job.Job.from_response_json(json)
83+
84+
mock_job_api.return_value.last_execution.return_value = [
85+
execution.Execution(id=1, state=None, final_status="succeeded")
86+
]
87+
88+
# Act
89+
j._wait_for_job()
90+
91+
# Assert
92+
assert mock_job_api.return_value.last_execution.call_count == 1
93+
94+
def test_wait_for_job_final_status_failed(self, mocker, backend_fixtures):
95+
# Arrange
96+
mock_job_api = mocker.patch("hsfs.core.job_api.JobApi")
97+
98+
json = backend_fixtures["job"]["get"]["response"]
99+
j = job.Job.from_response_json(json)
100+
101+
mock_job_api.return_value.last_execution.return_value = [
102+
execution.Execution(id=1, state=None, final_status="failed")
103+
]
104+
105+
# Act
106+
with pytest.raises(exceptions.FeatureStoreException) as e_info:
107+
j._wait_for_job()
108+
109+
# Assert
110+
assert mock_job_api.return_value.last_execution.call_count == 1
111+
assert (
112+
str(e_info.value)
113+
== "The Hopsworks Job failed, use the Hopsworks UI to access the job logs"
114+
)
115+
116+
def test_wait_for_job_final_status_killed(self, mocker, backend_fixtures):
117+
# Arrange
118+
mock_job_api = mocker.patch("hsfs.core.job_api.JobApi")
119+
120+
json = backend_fixtures["job"]["get"]["response"]
121+
j = job.Job.from_response_json(json)
122+
123+
mock_job_api.return_value.last_execution.return_value = [
124+
execution.Execution(id=1, state=None, final_status="killed")
125+
]
126+
127+
# Act
128+
with pytest.raises(exceptions.FeatureStoreException) as e_info:
129+
j._wait_for_job()
130+
131+
# Assert
132+
assert mock_job_api.return_value.last_execution.call_count == 1
133+
assert str(e_info.value) == "The Hopsworks Job was stopped"

0 commit comments

Comments
 (0)