Skip to content

Commit

Permalink
Editing files to add resource requests and limits (#3202)
Browse files Browse the repository at this point in the history
Signed-off-by: Author Name [email protected]
Signed-off-by: Paloma Rebuelta <[email protected]>
  • Loading branch information
paloma-rebuelta authored Jan 12, 2024
1 parent 4ecea25 commit 0dae82a
Show file tree
Hide file tree
Showing 16 changed files with 188 additions and 19 deletions.
5 changes: 4 additions & 1 deletion docs/source/getting_started/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

A summary of new feature highlights is located on the [GitHub release page](https://github.com/elyra-ai/elyra/releases).

## Release 3.15.0 - 03/28/2023
## Release 4.0.0

- Add functionality to select resource limits from the GUI [#3202](https://github.com/elyra-ai/elyra/pull/3202)

## Release 3.15.0 - 03/28/2023

## Release 3.15.0rc0 - 03/28/2023

Expand Down
2 changes: 2 additions & 0 deletions elyra/pipeline/airflow/processor_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,8 @@ def _cc_pipeline(self, pipeline: Pipeline, pipeline_name: str, pipeline_instance
"image_pull_policy": image_pull_policy,
"cpu_request": operation.cpu,
"mem_request": operation.memory,
"cpu_limit": operation.cpu_limit,
"memory_limit": operation.memory_limit,
"gpu_limit": operation.gpu,
"operator_source": operation.filename,
}
Expand Down
5 changes: 5 additions & 0 deletions elyra/pipeline/kfp/processor_kfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,11 @@ def _generate_workflow_tasks(
"size": operation.memory,
"units": "G",
}
workflow_task["task_modifiers"]["cpu_limit"] = operation.cpu_limit
workflow_task["task_modifiers"]["memory_limit"] = {
"size": operation.memory_limit,
"units": "G",
}
gpu_vendor = "nvidia.com/gpu"
if operation.gpu_vendor:
gpu_vendor = operation.gpu_vendor
Expand Down
34 changes: 32 additions & 2 deletions elyra/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ def __init__(
outputs: List of files produced by this operation to be included in a child operation(s)
cpu: number of cpus requested to run the operation
memory: amount of memory requested to run the operation (in Gi)
cpu_limit: limit of number of cpus to run the operation
memory_limit: limit of amount of memory to run the operation (in Gi)
gpu: number of gpus requested to run the operation
parameters: a list of names of pipeline parameters that should be passed to this operation
gpu_vendor: gpu resource type, eg. nvidia.com/gpu, amd.com/gpu etc.
Expand All @@ -261,19 +263,39 @@ def __init__(
if not component_props.get("runtime_image"):
raise ValueError("Invalid pipeline operation: Missing field 'operation runtime image'.")
if component_props.get("cpu") and not self._validate_range(component_props.get("cpu"), min_value=1):
raise ValueError("Invalid pipeline operation: CPU must be a positive value or None")
raise ValueError("Invalid pipeline operation: CPU request must be a positive value or None")
if component_props.get("cpu_limit") and not self._validate_range(component_props.get("cpu_limit"), min_value=1):
raise ValueError("Invalid pipeline operation: CPU limit must be a positive value or None")
if component_props.get("gpu") and not self._validate_range(component_props.get("gpu"), min_value=0):
raise ValueError("Invalid pipeline operation: GPU must be a positive value or None")
if component_props.get("memory") and not self._validate_range(component_props.get("memory"), min_value=1):
raise ValueError("Invalid pipeline operation: Memory must be a positive value or None")
raise ValueError("Invalid pipeline operation: Memory request must be a positive value or None")
if component_props.get("memory_limit") and not self._validate_range(
component_props.get("memory_limit"), min_value=1
):
raise ValueError("Invalid pipeline operation: Memory limit must be a positive value or None")
if (
component_props.get("memory_limit")
and component_props.get("memory")
and component_props.get("memory_limit") < component_props.get("memory")
):
raise ValueError("Invalid pipeline operation: Memory limit must be equal or larger than memory request")
if (
component_props.get("cpu_limit")
and component_props.get("cpu")
and component_props.get("cpu_limit") < component_props.get("cpu")
):
raise ValueError("Invalid pipeline operation: CPU limit must be equal or larger than CPU request")

# Re-build object to include default values
self._component_props["filename"] = component_props.get("filename")
self._component_props["runtime_image"] = component_props.get("runtime_image")
self._component_props["dependencies"] = Operation._scrub_list(component_props.get("dependencies", []))
self._component_props["include_subdirectories"] = component_props.get("include_subdirectories", False)
self._component_props["cpu"] = component_props.get("cpu")
self._component_props["cpu_limit"] = component_props.get("cpu_limit")
self._component_props["memory"] = component_props.get("memory")
self._component_props["memory_limit"] = component_props.get("memory_limit")
self._component_props["gpu"] = component_props.get("gpu")
self._component_props["gpu_vendor"] = component_props.get("gpu_vendor")
self._component_props["parameters"] = component_props.get(PIPELINE_PARAMETERS, [])
Expand Down Expand Up @@ -316,10 +338,18 @@ def env_vars(self) -> ElyraPropertyList[EnvironmentVariable]:
def cpu(self) -> Optional[str]:
return self._component_props.get("cpu")

@property
def cpu_limit(self) -> Optional[str]:
return self._component_props.get("cpu_limit")

@property
def memory(self) -> Optional[str]:
return self._component_props.get("memory")

@property
def memory_limit(self) -> Optional[str]:
return self._component_props.get("memory_limit")

@property
def gpu(self) -> Optional[str]:
return self._component_props.get("gpu")
Expand Down
10 changes: 8 additions & 2 deletions elyra/pipeline/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,13 @@ async def _validate_generic_node_properties(
# If not running locally, we check resource and image name
if pipeline_runtime != "local":
self._validate_container_image_name(node.id, node_label, image_name, response=response)
for resource_name in ["cpu", "gpu", "memory"]:
for resource_name in [
"cpu",
"gpu",
"memory",
"cpu_limit",
"memory_limit",
]:
resource_value = node.get_component_parameter(resource_name)
if resource_value:
self._validate_resource_value(
Expand Down Expand Up @@ -638,7 +644,7 @@ def _validate_resource_value(
Validates the value for hardware resources requested
:param node_id: the unique ID of the node
:param node_label: the given node name or user customized name/label of the node
:param resource_name: the name of the resource e.g. cpu, gpu. memory
:param resource_name: the name of the resource e.g. cpu, cpu_limit, gpu, memory, memory_limit
:param resource_value: the value of the resource
:param response: ValidationResponse containing the issue list to be updated
"""
Expand Down
8 changes: 7 additions & 1 deletion elyra/templates/airflow/airflow_template.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,20 @@ op_{{ operation.id|regex_replace }} = KubernetesPodOperator(name='{{ operation.n
arguments=["{{ operation.argument_list }}"],
task_id='{{ operation.notebook|regex_replace }}',
env_vars={{ operation.pipeline_envs }},
{% if operation.cpu_request or operation.mem_request or operation.gpu_limit %}
{% if operation.cpu_request or operation.mem_request or operation.cpu_limit or operation.memory_limit or operation.gpu_limit %}
resources = {
{% if operation.cpu_request %}
'request_cpu': '{{ operation.cpu_request }}',
{% endif %}
{% if operation.mem_request %}
'request_memory': '{{ operation.mem_request }}G',
{% endif %}
{% if operation.cpu_limit %}
'limit_cpu': '{{ operation.cpu_limit }}',
{% endif %}
{% if operation.memory_limit %}
'limit_memory': '{{ operation.memory_limit }}G',
{% endif %}
{% if operation.gpu_limit %}
'limit_gpu': '{{ operation.gpu_limit }}',
{% endif %}
Expand Down
20 changes: 16 additions & 4 deletions elyra/templates/components/generic_properties_template.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,26 @@
},
"cpu": {
"type": "integer",
"title": "CPU",
"description": "For CPU-intensive workloads, you can choose more than 1 CPU (e.g. 1.5).",
"title": "CPU request",
"description": "For CPU-intensive workloads, you can request more than 1 CPU (e.g. 1.5, this is optional).",
"minimum": 0
},
"cpu_limit": {
"type": "integer",
"title": "CPU limit",
"description": "The maximum CPU that can be allocated to this node. This should be equal or higher than the request",
"minimum": 0
},
"memory": {
"type": "integer",
"title": "RAM(GB)",
"description": "The total amount of RAM specified.",
"title": "RAM request (GB)",
"description": "The total amount of RAM requested (optional).",
"minimum": 0
},
"memory_limit": {
"type": "integer",
"title": "RAM limit (GB)",
"description": "The maximum amount of RAM allowed. This should be equal or higher than the request",
"minimum": 0
},
"gpu": {
Expand Down
6 changes: 6 additions & 0 deletions elyra/templates/kubeflow/v1/python_dsl_template.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ def generated_pipeline(
{% if workflow_task.task_modifiers.mem_request and workflow_task.task_modifiers.mem_request.size %}
{{ task_name }}.container.set_memory_request(memory="{{ workflow_task.task_modifiers.mem_request.size }}{{ workflow_task.task_modifiers.mem_request.units }}")
{% endif %}
{% if workflow_task.task_modifiers.cpu_limit %}
{{ task_name }}.container.set_cpu_limit(cpu="{{ workflow_task.task_modifiers.cpu_limit }}")
{% endif %}
{% if workflow_task.task_modifiers.memory_limit and workflow_task.task_modifiers.memory_limit.size %}
{{ task_name }}.container.set_memory_limit(memory="{{ workflow_task.task_modifiers.memory_limit.size }}{{ workflow_task.task_modifiers.memory_limit.units }}")
{% endif %}
{% if workflow_task.task_modifiers.gpu_limit and workflow_task.task_modifiers.gpu_limit.size %}
{{ task_name }}.container.add_resource_limit(resource_name="{{ workflow_task.task_modifiers.gpu_limit.vendor }}", value="{{ workflow_task.task_modifiers.gpu_limit.size }}")
{% endif %}
Expand Down
21 changes: 20 additions & 1 deletion elyra/tests/pipeline/kfp/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,14 @@ def metadata_dependencies(metadata_managers, request):
# Create a Pipeline object from the pipeline file, applying the customization
# options
customization_options = {}
for supported_option in ["with_cos_object_prefix", "resources_cpu", "resources_gpu", "resources_memory"]:
for supported_option in [
"with_cos_object_prefix",
"resources_cpu",
"resources_cpu_limit",
"resources_gpu",
"resources_memory",
"resources_memory_limit",
]:
customization_options[supported_option] = request.param.get(supported_option)
pipeline_object = get_pipeline_object(
pipeline_filename=request.param["pipeline_file"],
Expand Down Expand Up @@ -361,6 +368,8 @@ def get_pipeline_object(
- "resources_cpu" (number, applied to all generic nodes)
- "resources_gpu" (number, applied to all generic nodes)
- "resources_memory" (number, applied to all generic nodes)
- "resources_cpu_limit" (number, applied to all generic nodes)
- "resources_memory_limit" (number, applied to all generic nodes)
"""
assert pipeline_filename is not None, "A pipeline filename is required."

Expand Down Expand Up @@ -409,6 +418,10 @@ def get_pipeline_object(
node["app_data"]["component_parameters"]["cpu"] = customization_options["resources_cpu"]
else:
node["app_data"]["component_parameters"].pop("cpu", None)
if customization_options.get("resources_cpu_limit") is not None:
node["app_data"]["component_parameters"]["cpu_limit"] = customization_options["resources_cpu_limit"]
else:
node["app_data"]["component_parameters"].pop("cpu_limit", None)
if customization_options.get("resources_gpu") is not None:
node["app_data"]["component_parameters"]["gpu"] = customization_options["resources_gpu"]
else:
Expand All @@ -417,6 +430,12 @@ def get_pipeline_object(
node["app_data"]["component_parameters"]["memory"] = customization_options["resources_memory"]
else:
node["app_data"]["component_parameters"].pop("memory", None)
if customization_options.get("resources_memory_limit") is not None:
node["app_data"]["component_parameters"]["memory_limit"] = customization_options[
"resources_memory_limit"
]
else:
node["app_data"]["component_parameters"].pop("memory_limit", None)

# Parse JSON and return Pipeline instance
return PipelineParser().parse(pipeline_json=pipeline_json)
Expand Down
6 changes: 5 additions & 1 deletion elyra/tests/pipeline/kfp/test_processor_kfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -943,14 +943,18 @@ def test_generate_pipeline_dsl_compile_pipeline_dsl_one_generic_node_pipeline_te

op = list(pipeline.operations.values())[0]

if op.gpu or op.cpu or op.memory:
if op.gpu or op.cpu or op.memory or op.cpu_limit or op.memory_limit:
assert node_template["container"].get("resources") is not None
if op.gpu:
assert node_template["container"]["resources"]["limits"]["nvidia.com/gpu"] == str(op.gpu)
if op.cpu:
assert node_template["container"]["resources"]["requests"]["cpu"] == str(op.cpu)
if op.memory:
assert node_template["container"]["resources"]["requests"]["memory"] == f"{op.memory}G"
if op.cpu_limit:
assert node_template["container"]["resources"]["limits"]["cpu"] == str(op.cpu_limit)
if op.memory_limit:
assert node_template["container"]["resources"]["limits"]["memory"] == f"{op.memory_limit}G"


@pytest.fixture(autouse=False)
Expand Down
34 changes: 34 additions & 0 deletions elyra/tests/pipeline/test_pipeline_constructor.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,40 @@ def test_fail_creating_operation_with_negative_cpu_resources():
)


def test_fail_creating_operation_with_more_requests_than_limit_cpu():
component_parameters = {
"filename": "elyra/pipeline/tests/resources/archive/test.ipynb",
"cpu": "4",
"cpu_limit": "2",
"runtime_image": "tensorflow/tensorflow:latest",
}
with pytest.raises(ValueError):
GenericOperation(
id="test-id",
type="execution-node",
classifier="execute-notebook-node",
name="test",
component_props=component_parameters,
)


def test_fail_creating_operation_with_more_requests_than_limit_memory():
component_parameters = {
"filename": "elyra/pipeline/tests/resources/archive/test.ipynb",
"memory": "4",
"memory_limit": "2",
"runtime_image": "tensorflow/tensorflow:latest",
}
with pytest.raises(ValueError):
GenericOperation(
id="test-id",
type="execution-node",
classifier="execute-notebook-node",
name="test",
component_props=component_parameters,
)


def test_fail_creating_operation_with_0_memory_resources():
component_parameters = {
"filename": "elyra/pipeline/tests/resources/archive/test.ipynb",
Expand Down
32 changes: 27 additions & 5 deletions packages/pipeline-editor/src/FileSubmissionDialog.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,24 @@ export const FileSubmissionDialog: React.FC<IProps> = ({
<br />
<div className="elyra-resourcesWrapper">
<div className="elyra-resourceInput">
<label htmlFor="cpu"> CPU:</label>
<label htmlFor="cpu"> CPU request:</label>
<div className="elyra-resourceInputDescription" id="cpu-description">
For CPU-intensive workloads, you can choose more than 1 CPU (e.g.
1.5).
For CPU-intensive workloads, you can request more than 1 CPU (e.g.
1.5, this is optional).
</div>
<input id="cpu" type="number" name="cpu" />
</div>
<div className="elyra-resourceInput">
<label htmlFor="cpu_limit"> CPU limit:</label>
<div
className="elyra-resourceInputDescription"
id="cpu-limit-description"
>
The maximum CPU that can be allocated to this node. This should be
equal or higher than the request
</div>
<input id="cpu_limit" type="number" name="cpu_limit" />
</div>
<div className="elyra-resourceInput">
<label htmlFor="gpu"> GPU:</label>
<div className="elyra-resourceInputDescription" id="gpu-description">
Expand All @@ -102,15 +113,26 @@ export const FileSubmissionDialog: React.FC<IProps> = ({
<input id="gpu" type="number" name="gpu" />
</div>
<div className="elyra-resourceInput">
<label htmlFor="memory"> RAM (GB):</label>
<label htmlFor="memory"> RAM request (GB):</label>
<div
className="elyra-resourceInputDescription"
id="memory-description"
>
The total amount of RAM specified.
The total amount of RAM requested (optional).
</div>
<input id="memory" type="number" name="memory" />
</div>
<div className="elyra-resourceInput">
<label htmlFor="memory_limit"> RAM limit (GB):</label>
<div
className="elyra-resourceInputDescription"
id="memory-limit-description"
>
The maximum amount of RAM allowed. This should be equal or higher
than the request
</div>
<input id="memory_limit" type="number" name="memory_limit" />
</div>
</div>
<br />
<input
Expand Down
6 changes: 6 additions & 0 deletions packages/pipeline-editor/src/PipelineEditorWidget.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -777,9 +777,15 @@ const PipelineWrapper: React.FC<IProps> = ({
if (node.app_data.component_parameters.cpu === null) {
delete node.app_data.component_parameters.cpu;
}
if (node.app_data.component_parameters.cpu_limit === null) {
delete node.app_data.component_parameters.cpu_limit;
}
if (node.app_data.component_parameters.memory === null) {
delete node.app_data.component_parameters.memory;
}
if (node.app_data.component_parameters.memory_limit === null) {
delete node.app_data.component_parameters.memory_limit;
}
if (node.app_data.component_parameters.gpu === null) {
delete node.app_data.component_parameters.gpu;
}
Expand Down
6 changes: 5 additions & 1 deletion packages/pipeline-editor/src/SubmitFileButtonExtension.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,10 @@ export class SubmitFileButtonExtension<
runtime_config,
framework,
cpu,
cpu_limit,
gpu,
memory,
memory_limit,
dependency_include,
dependencies,
...envObject
Expand All @@ -145,8 +147,10 @@ export class SubmitFileButtonExtension<
dependency_include ? dependencies.split(',') : undefined,
envObject,
cpu,
cpu_limit,
gpu,
memory
memory,
memory_limit
);

PipelineService.submitPipeline(
Expand Down
Loading

0 comments on commit 0dae82a

Please sign in to comment.