Skip to content

Commit

Permalink
JobConfiguration type
Browse files Browse the repository at this point in the history
  • Loading branch information
bubriks committed Jan 10, 2025
1 parent 131a2f1 commit c4b76e7
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 6 deletions.
14 changes: 12 additions & 2 deletions python/hsfs/core/deltastreamer_jobconf.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,25 @@
from __future__ import annotations

import json
from typing import Dict, Optional, Union

from hsfs import util
from hsfs.core.job_configuration import JobConfiguration


class DeltaStreamerJobConf:
def __init__(self, options, spark_options, **kwargs):
def __init__(
self,
options,
spark_options: Optional[Union[JobConfiguration, Dict]],
**kwargs
):
self._options = options
self._spark_options = JobConfiguration(**spark_options if spark_options else {}).to_dict()

if isinstance(spark_options, JobConfiguration):
self._spark_options = spark_options
elif isinstance(spark_options, dict):
self._spark_options = JobConfiguration(**spark_options if spark_options else {}).to_dict()

def to_dict(self):
return {
Expand Down
12 changes: 10 additions & 2 deletions python/hsfs/core/ingestion_job_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,28 @@
from __future__ import annotations

import json
from typing import Dict, Optional, Union

from hsfs import util
from hsfs.core.job_configuration import JobConfiguration


class IngestionJobConf:
def __init__(
self, data_format, data_options, write_options, spark_job_configuration
self,
data_format,
data_options,
write_options,
spark_job_configuration: Optional[Union[JobConfiguration, Dict]]
):
self._data_format = data_format
self._data_options = data_options
self._write_options = write_options

self._spark_job_configuration = JobConfiguration(**spark_job_configuration if spark_job_configuration else {}).to_dict(),
if isinstance(spark_job_configuration, JobConfiguration):
self._spark_job_configuration = spark_job_configuration
elif isinstance(spark_job_configuration, dict):
self._spark_job_configuration = JobConfiguration(**spark_job_configuration if spark_job_configuration else {}).to_dict()

@property
def data_format(self):
Expand Down
14 changes: 12 additions & 2 deletions python/hsfs/core/training_dataset_job_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,28 @@
from __future__ import annotations

import json
from typing import Dict, Optional, Union

from hsfs import util
from hsfs.core.job_configuration import JobConfiguration


class TrainingDatasetJobConf:
def __init__(self, query, overwrite, write_options, spark_job_configuration):
def __init__(
self,
query,
overwrite,
write_options,
spark_job_configuration: Optional[Union[JobConfiguration, Dict]]
):
self._query = query
self._overwrite = overwrite
self._write_options = write_options

self._spark_job_configuration = JobConfiguration(**spark_job_configuration if spark_job_configuration else {}).to_dict(),
if isinstance(spark_job_configuration, JobConfiguration):
self._spark_job_configuration = spark_job_configuration
elif isinstance(spark_job_configuration, dict):
self._spark_job_configuration = JobConfiguration(**spark_job_configuration if spark_job_configuration else {}).to_dict()

@property
def query(self):
Expand Down

0 comments on commit c4b76e7

Please sign in to comment.