Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions python/hsfs/core/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
#

import humps
from hsfs import engine
import time

from hsfs import util
from hsfs.client.exceptions import FeatureStoreException
from hsfs.core import job_api
from hsfs.core import job_schedule as js
Expand Down Expand Up @@ -58,7 +60,7 @@ def __init__(
@classmethod
def from_response_json(cls, json_dict):
# Job config should not be decamelized when updated
config = json_dict.pop("config")
config = json_dict.pop("config", None)
json_decamelized = humps.decamelize(json_dict)
json_decamelized["config"] = config
return cls(**json_decamelized)
Expand Down Expand Up @@ -116,10 +118,10 @@ def run(self, args: str = None, await_termination: bool = True):
self._job_api.launch(self.name, args=args)
print(
"Job started successfully, you can follow the progress at \n{}".format(
engine.get_instance().get_job_url(self.href)
util.get_job_url(self.href)
)
)
engine.get_instance().wait_for_job(self, await_termination=await_termination)
self._wait_for_job(await_termination=await_termination)

def get_state(self):
"""Get the state of the job.
Expand Down Expand Up @@ -223,3 +225,28 @@ def _update_schedule(self, job_schedule):
self._name, job_schedule.to_dict()
)
return self._job_schedule

def _wait_for_job(self, await_termination=True):
# If the user passed the wait_for_job option consider it,
# otherwise use the default True
while await_termination:
executions = self._job_api.last_execution(self)
if len(executions) > 0:
execution = executions[0]
else:
return

if execution.final_status.lower() == "succeeded":
return
elif execution.final_status.lower() == "failed":
raise FeatureStoreException(
"The Hopsworks Job failed, use the Hopsworks UI to access the job logs"
)
elif execution.final_status.lower() == "killed":
raise FeatureStoreException("The Hopsworks Job was stopped")
elif execution.state.lower() == "framework_failure":
raise FeatureStoreException(
"The Hopsworks Job monitoring failed, could not determine the final status"
)

time.sleep(3)
57 changes: 6 additions & 51 deletions python/hsfs/engine/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import pandas as pd
import numpy as np
import boto3
import time
import re
import ast
import warnings
Expand All @@ -39,7 +38,6 @@

from io import BytesIO
from pyhive import hive
from urllib.parse import urlparse
from typing import TypeVar, Optional, Dict, Any
from confluent_kafka import Consumer, Producer, TopicPartition, KafkaError
from tqdm.auto import tqdm
Expand All @@ -65,7 +63,7 @@
)
from hsfs.constructor import query
from hsfs.training_dataset_split import TrainingDatasetSplit
from hsfs.client import exceptions, hopsworks
from hsfs.client import hopsworks
from hsfs.feature_group import FeatureGroup
from thrift.transport.TTransport import TTransportException
from pyhive.exc import OperationalError
Expand Down Expand Up @@ -384,11 +382,11 @@ def profile_by_spark(self, metadata_instance):
job = stat_api.compute(metadata_instance)
print(
"Statistics Job started successfully, you can follow the progress at \n{}".format(
self.get_job_url(job.href)
util.get_job_url(job.href)
)
)

self.wait_for_job(job)
job._wait_for_job()
return job

def profile(
Expand Down Expand Up @@ -807,15 +805,13 @@ def write_training_dataset(
td_job = td_api.compute(training_dataset, td_app_conf)
print(
"Training dataset job started successfully, you can follow the progress at \n{}".format(
self.get_job_url(td_job.href)
util.get_job_url(td_job.href)
)
)

self.wait_for_job(
td_job,
await_termination=user_write_options.get("wait_for_job", True),
td_job._wait_for_job(
await_termination=user_write_options.get("wait_for_job", True)
)

return td_job

def _create_hive_connection(self, feature_store, hive_config=None):
Expand Down Expand Up @@ -882,22 +878,6 @@ def save_empty_dataframe(self, feature_group):
"""Wrapper around save_dataframe in order to provide no-op."""
pass

def get_job_url(self, href: str):
"""Use the endpoint returned by the API to construct the UI url for jobs

Args:
href (str): the endpoint returned by the API
"""
url = urlparse(href)
url_splits = url.path.split("/")
project_id = url_splits[4]
job_name = url_splits[6]
ui_url = url._replace(
path="p/{}/jobs/named/{}/executions".format(project_id, job_name)
)
ui_url = client.get_instance().replace_public_host(ui_url)
return ui_url.geturl()

def _get_app_options(self, user_write_options={}):
"""
Generate the options that should be passed to the application doing the ingestion.
Expand All @@ -916,31 +896,6 @@ def _get_app_options(self, user_write_options={}):
spark_job_configuration=spark_job_configuration,
)

def wait_for_job(self, job, await_termination=True):
# If the user passed the wait_for_job option consider it,
# otherwise use the default True
while await_termination:
executions = self._job_api.last_execution(job)
if len(executions) > 0:
execution = executions[0]
else:
return

if execution.final_status.lower() == "succeeded":
return
elif execution.final_status.lower() == "failed":
raise exceptions.FeatureStoreException(
"The Hopsworks Job failed, use the Hopsworks UI to access the job logs"
)
elif execution.final_status.lower() == "killed":
raise exceptions.FeatureStoreException("The Hopsworks Job was stopped")
elif execution.state.lower() == "framework_failure":
raise exceptions.FeatureStoreException(
"The Hopsworks Job monitoring failed, could not determine the final status"
)

time.sleep(3)

def add_file(self, file):
if not file:
return file
Expand Down
17 changes: 17 additions & 0 deletions python/hsfs/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,23 @@ def verify_attribute_key_names(feature_group_obj, external_feature_group=False):
)


def get_job_url(href: str):
"""Use the endpoint returned by the API to construct the UI url for jobs

Args:
href (str): the endpoint returned by the API
"""
url = urlparse(href)
url_splits = url.path.split("/")
project_id = url_splits[4]
job_name = url_splits[6]
ui_url = url._replace(
path="p/{}/jobs/named/{}/executions".format(project_id, job_name)
)
ui_url = client.get_instance().replace_public_host(ui_url)
return ui_url.geturl()


def translate_legacy_spark_type(output_type):
if output_type == "StringType()":
return "STRING"
Expand Down
89 changes: 88 additions & 1 deletion python/tests/core/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
#


from hsfs.core import job
import pytest

from hsfs.core import job, execution
from hsfs.client import exceptions


class TestJob:
Expand Down Expand Up @@ -44,3 +47,87 @@ def test_from_response_json_empty(self, backend_fixtures):
assert j.name == "test_name"
assert j.executions is None
assert j.href is None

def test_wait_for_job(self, mocker, backend_fixtures):
# Arrange
mock_job_api = mocker.patch("hsfs.core.job_api.JobApi")

json = backend_fixtures["job"]["get"]["response"]
j = job.Job.from_response_json(json)

# Act
j._wait_for_job()

# Assert
assert mock_job_api.return_value.last_execution.call_count == 1

def test_wait_for_job_wait_for_job_false(self, mocker, backend_fixtures):
# Arrange
mock_job_api = mocker.patch("hsfs.core.job_api.JobApi")

json = backend_fixtures["job"]["get"]["response"]
j = job.Job.from_response_json(json)

# Act
j._wait_for_job(False)

# Assert
assert mock_job_api.return_value.last_execution.call_count == 0

def test_wait_for_job_final_status_succeeded(self, mocker, backend_fixtures):
# Arrange
mock_job_api = mocker.patch("hsfs.core.job_api.JobApi")

json = backend_fixtures["job"]["get"]["response"]
j = job.Job.from_response_json(json)

mock_job_api.return_value.last_execution.return_value = [
execution.Execution(id=1, state=None, final_status="succeeded")
]

# Act
j._wait_for_job()

# Assert
assert mock_job_api.return_value.last_execution.call_count == 1

def test_wait_for_job_final_status_failed(self, mocker, backend_fixtures):
# Arrange
mock_job_api = mocker.patch("hsfs.core.job_api.JobApi")

json = backend_fixtures["job"]["get"]["response"]
j = job.Job.from_response_json(json)

mock_job_api.return_value.last_execution.return_value = [
execution.Execution(id=1, state=None, final_status="failed")
]

# Act
with pytest.raises(exceptions.FeatureStoreException) as e_info:
j._wait_for_job()

# Assert
assert mock_job_api.return_value.last_execution.call_count == 1
assert (
str(e_info.value)
== "The Hopsworks Job failed, use the Hopsworks UI to access the job logs"
)

def test_wait_for_job_final_status_killed(self, mocker, backend_fixtures):
# Arrange
mock_job_api = mocker.patch("hsfs.core.job_api.JobApi")

json = backend_fixtures["job"]["get"]["response"]
j = job.Job.from_response_json(json)

mock_job_api.return_value.last_execution.return_value = [
execution.Execution(id=1, state=None, final_status="killed")
]

# Act
with pytest.raises(exceptions.FeatureStoreException) as e_info:
j._wait_for_job()

# Assert
assert mock_job_api.return_value.last_execution.call_count == 1
assert str(e_info.value) == "The Hopsworks Job was stopped"
Loading