Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions python/hsfs/core/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
#

import humps
from hsfs import engine
import time

from hsfs import util
from hsfs.client.exceptions import FeatureStoreException
from hsfs.core import job_api
from hsfs.core import job_schedule as js
Expand Down Expand Up @@ -116,10 +118,10 @@ def run(self, args: str = None, await_termination: bool = True):
self._job_api.launch(self.name, args=args)
print(
"Job started successfully, you can follow the progress at \n{}".format(
engine.get_instance().get_job_url(self.href)
util.get_job_url(self.href)
)
)
engine.get_instance().wait_for_job(self, await_termination=await_termination)
self._wait_for_job(await_termination=await_termination)

def get_state(self):
"""Get the state of the job.
Expand Down Expand Up @@ -223,3 +225,28 @@ def _update_schedule(self, job_schedule):
self._name, job_schedule.to_dict()
)
return self._job_schedule

def _wait_for_job(self, await_termination=True):
# If the user passed the wait_for_job option consider it,
# otherwise use the default True
while await_termination:
executions = self._job_api.last_execution(self)
if len(executions) > 0:
execution = executions[0]
else:
return

if execution.final_status.lower() == "succeeded":
return
elif execution.final_status.lower() == "failed":
raise FeatureStoreException(
"The Hopsworks Job failed, use the Hopsworks UI to access the job logs"
)
elif execution.final_status.lower() == "killed":
raise FeatureStoreException("The Hopsworks Job was stopped")
elif execution.state.lower() == "framework_failure":
raise FeatureStoreException(
"The Hopsworks Job monitoring failed, could not determine the final status"
)

time.sleep(3)
49 changes: 3 additions & 46 deletions python/hsfs/engine/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import pandas as pd
import numpy as np
import boto3
import time
import re
import ast
import warnings
Expand All @@ -39,7 +38,6 @@

from io import BytesIO
from pyhive import hive
from urllib.parse import urlparse
from typing import TypeVar, Optional, Dict, Any
from confluent_kafka import Consumer, Producer, TopicPartition, KafkaError
from tqdm.auto import tqdm
Expand All @@ -65,7 +63,7 @@
)
from hsfs.constructor import query
from hsfs.training_dataset_split import TrainingDatasetSplit
from hsfs.client import exceptions, hopsworks
from hsfs.client import hopsworks
from hsfs.feature_group import FeatureGroup
from thrift.transport.TTransport import TTransportException
from pyhive.exc import OperationalError
Expand Down Expand Up @@ -384,7 +382,7 @@ def profile_by_spark(self, metadata_instance):
job = stat_api.compute(metadata_instance)
print(
"Statistics Job started successfully, you can follow the progress at \n{}".format(
self.get_job_url(job.href)
util.get_job_url(job.href)
)
)

Expand Down Expand Up @@ -807,7 +805,7 @@ def write_training_dataset(
td_job = td_api.compute(training_dataset, td_app_conf)
print(
"Training dataset job started successfully, you can follow the progress at \n{}".format(
self.get_job_url(td_job.href)
util.get_job_url(td_job.href)
)
)

Expand Down Expand Up @@ -882,22 +880,6 @@ def save_empty_dataframe(self, feature_group):
"""Wrapper around save_dataframe in order to provide no-op."""
pass

def get_job_url(self, href: str):
"""Use the endpoint returned by the API to construct the UI url for jobs

Args:
href (str): the endpoint returned by the API
"""
url = urlparse(href)
url_splits = url.path.split("/")
project_id = url_splits[4]
job_name = url_splits[6]
ui_url = url._replace(
path="p/{}/jobs/named/{}/executions".format(project_id, job_name)
)
ui_url = client.get_instance().replace_public_host(ui_url)
return ui_url.geturl()

def _get_app_options(self, user_write_options={}):
"""
Generate the options that should be passed to the application doing the ingestion.
Expand All @@ -916,31 +898,6 @@ def _get_app_options(self, user_write_options={}):
spark_job_configuration=spark_job_configuration,
)

def wait_for_job(self, job, await_termination=True):
# If the user passed the wait_for_job option consider it,
# otherwise use the default True
while await_termination:
executions = self._job_api.last_execution(job)
if len(executions) > 0:
execution = executions[0]
else:
return

if execution.final_status.lower() == "succeeded":
return
elif execution.final_status.lower() == "failed":
raise exceptions.FeatureStoreException(
"The Hopsworks Job failed, use the Hopsworks UI to access the job logs"
)
elif execution.final_status.lower() == "killed":
raise exceptions.FeatureStoreException("The Hopsworks Job was stopped")
elif execution.state.lower() == "framework_failure":
raise exceptions.FeatureStoreException(
"The Hopsworks Job monitoring failed, could not determine the final status"
)

time.sleep(3)

def add_file(self, file):
if not file:
return file
Expand Down
17 changes: 17 additions & 0 deletions python/hsfs/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,23 @@ def verify_attribute_key_names(feature_group_obj, external_feature_group=False):
)


def get_job_url(href: str):
"""Use the endpoint returned by the API to construct the UI url for jobs
Args:
href (str): the endpoint returned by the API
"""
url = urlparse(href)
url_splits = url.path.split("/")
project_id = url_splits[4]
job_name = url_splits[6]
ui_url = url._replace(
path="p/{}/jobs/named/{}/executions".format(project_id, job_name)
)
ui_url = client.get_instance().replace_public_host(ui_url)
return ui_url.geturl()


def translate_legacy_spark_type(output_type):
if output_type == "StringType()":
return "STRING"
Expand Down
17 changes: 0 additions & 17 deletions python/tests/engine/test_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -2552,23 +2552,6 @@ def test_save_empty_dataframe(self):
# Assert
assert result is None

def test_get_job_url(self, mocker):
# Arrange
mock_client_get_instance = mocker.patch("hsfs.client.get_instance")

python_engine = python.Engine()

# Act
python_engine.get_job_url(href="1/2/3/4/5/6/7/8")

# Assert
assert (
mock_client_get_instance.return_value.replace_public_host.call_args[0][
0
].path
== "p/5/jobs/named/7/executions"
)

def test_get_app_options(self, mocker):
# Arrange
mock_ingestion_job_conf = mocker.patch(
Expand Down
15 changes: 15 additions & 0 deletions python/tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,18 @@ def test_convert_event_time_to_timestamp_yyyy_mm_dd_hh_mm_ss_error3(self):
def test_convert_hudi_commit_time_to_timestamp(self):
timestamp = util.get_timestamp_from_date_string("20221118095233099")
assert timestamp == 1668765153099

def test_get_job_url(self, mocker):
# Arrange
mock_client_get_instance = mocker.patch("hsfs.client.get_instance")

# Act
util.get_job_url(href="1/2/3/4/5/6/7/8")

# Assert
assert (
mock_client_get_instance.return_value.replace_public_host.call_args[0][
0
].path
== "p/5/jobs/named/7/executions"
)