Skip to content

Commit

Permalink
Allow blocks in deployment params (#14741)
Browse files Browse the repository at this point in the history
Co-authored-by: Ladislav Gál <[email protected]>
  • Loading branch information
GalLadislav and Ladislav Gál authored Jul 29, 2024
1 parent 8751f56 commit 544bd82
Show file tree
Hide file tree
Showing 4 changed files with 366 additions and 4 deletions.
132 changes: 129 additions & 3 deletions src/prefect/blocks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import html
import inspect
import sys
import uuid
import warnings
from abc import ABC
from functools import partial
Expand Down Expand Up @@ -790,6 +791,33 @@ async def _get_block_document(

return block_document, block_document_name

@classmethod
@sync_compatible
@inject_client
async def _get_block_document_by_id(
cls,
block_document_id: Union[str, uuid.UUID],
client: Optional["PrefectClient"] = None,
):
if isinstance(block_document_id, str):
try:
block_document_id = UUID(block_document_id)
except ValueError:
raise ValueError(
f"Block document ID {block_document_id!r} is not a valid UUID"
)

try:
block_document = await client.read_block_document(
block_document_id=block_document_id
)
except prefect.exceptions.ObjectNotFound:
raise ValueError(
f"Unable to find block document with ID {block_document_id!r}"
)

return block_document, block_document.name

@classmethod
@sync_compatible
@inject_client
Expand Down Expand Up @@ -876,25 +904,123 @@ class Custom(Block):
"""
block_document, block_document_name = await cls._get_block_document(name)

return cls._load_from_block_document(block_document, validate=validate)

@classmethod
@sync_compatible
@inject_client
async def load_from_ref(
cls,
ref: Union[str, UUID, Dict[str, Any]],
validate: bool = True,
client: Optional["PrefectClient"] = None,
) -> "Self":
"""
Retrieves data from the block document by given reference for the block type
that corresponds with the current class and returns an instantiated version of
the current class with the data stored in the block document.
Provided reference can be a block document ID, or a reference data in dictionary format.
Supported dictionary reference formats are:
- {"block_document_id": <block_document_id>}
- {"block_document_slug": <block_document_slug>}
If a block document for a given block type is saved with a different schema
than the current class calling `load`, a warning will be raised.
If the current class schema is a subset of the block document schema, the block
can be loaded as normal using the default `validate = True`.
If the current class schema is a superset of the block document schema, `load`
must be called with `validate` set to False to prevent a validation error. In
this case, the block attributes will default to `None` and must be set manually
and saved to a new block document before the block can be used as expected.
Args:
ref: The reference to the block document. This can be a block document ID,
or one of supported dictionary reference formats.
validate: If False, the block document will be loaded without Pydantic
validating the block schema. This is useful if the block schema has
changed client-side since the block document referred to by `name` was saved.
client: The client to use to load the block document. If not provided, the
default client will be injected.
Raises:
ValueError: If invalid reference format is provided.
ValueError: If the requested block document is not found.
Returns:
An instance of the current class hydrated with the data stored in the
block document with the specified name.
"""
block_document = None
if isinstance(ref, (str, UUID)):
block_document, _ = await cls._get_block_document_by_id(ref)
elif isinstance(ref, dict):
if block_document_id := ref.get("block_document_id"):
block_document, _ = await cls._get_block_document_by_id(
block_document_id
)
elif block_document_slug := ref.get("block_document_slug"):
block_document, _ = await cls._get_block_document(block_document_slug)

if not block_document:
raise ValueError(f"Invalid reference format {ref!r}.")

return cls._load_from_block_document(block_document, validate=validate)

@classmethod
def _load_from_block_document(
cls, block_document: BlockDocument, validate: bool = True
) -> "Self":
"""
Loads a block from a given block document.
If a block document for a given block type is saved with a different schema
than the current class calling `load`, a warning will be raised.
If the current class schema is a subset of the block document schema, the block
can be loaded as normal using the default `validate = True`.
If the current class schema is a superset of the block document schema, `load`
must be called with `validate` set to False to prevent a validation error. In
this case, the block attributes will default to `None` and must be set manually
and saved to a new block document before the block can be used as expected.
Args:
block_document: The block document used to instantiate a block.
validate: If False, the block document will be loaded without Pydantic
validating the block schema. This is useful if the block schema has
changed client-side since the block document referred to by `name` was saved.
Raises:
ValueError: If the requested block document is not found.
Returns:
An instance of the current class hydrated with the data stored in the
block document with the specified name.
"""
try:
return cls._from_block_document(block_document)
except ValidationError as e:
if not validate:
missing_fields = tuple(err["loc"][0] for err in e.errors())
missing_block_data = {field: None for field in missing_fields}
warnings.warn(
f"Could not fully load {block_document_name!r} of block type"
f"Could not fully load {block_document.name!r} of block type"
f" {cls.get_block_type_slug()!r} - this is likely because one or more"
" required fields were added to the schema for"
f" {cls.__name__!r} that did not exist on the class when this block"
" was last saved. Please specify values for new field(s):"
f" {listrepr(missing_fields)}, then run"
f' `{cls.__name__}.save("{block_document_name}", overwrite=True)`,'
f' `{cls.__name__}.save("{block_document.name}", overwrite=True)`,'
" and load this block again before attempting to use it."
)
return cls.model_construct(**block_document.data, **missing_block_data)
raise RuntimeError(
f"Unable to load {block_document_name!r} of block type"
f"Unable to load {block_document.name!r} of block type"
f" {cls.get_block_type_slug()!r} due to failed validation. To load without"
" validation, try loading again with `validate=False`."
) from e
Expand Down
17 changes: 16 additions & 1 deletion src/prefect/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
parameters_to_args_kwargs,
raise_for_reserved_arguments,
)
from prefect.utilities.collections import listrepr
from prefect.utilities.collections import listrepr, visit_collection
from prefect.utilities.filesystem import relative_path_to_current_platform
from prefect.utilities.hashing import file_hash
from prefect.utilities.importtools import import_object, safe_load_namespace
Expand Down Expand Up @@ -535,6 +535,21 @@ def validate_parameters(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
Raises:
ParameterTypeError: if the provided parameters are not valid
"""

def resolve_block_reference(data: Any) -> Any:
if isinstance(data, dict) and "$ref" in data:
return Block.load_from_ref(data["$ref"])
return data

try:
parameters = visit_collection(
parameters, resolve_block_reference, return_data=True
)
except (ValueError, RuntimeError) as exc:
raise ParameterTypeError(
"Failed to resolve block references in parameters."
) from exc

args, kwargs = parameters_to_args_kwargs(self.fn, parameters)

with warnings.catch_warnings():
Expand Down
30 changes: 30 additions & 0 deletions src/prefect/utilities/schema_tools/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,5 +253,35 @@ def preprocess_schema(
process_properties(
definition["properties"], required_fields, allow_none_with_default
)
# Allow block types to be referenced by their id
if "block_type_slug" in definition:
schema["definitions"][definition["title"]] = {
"oneOf": [
definition,
{
"type": "object",
"properties": {
"$ref": {
"oneOf": [
{
"type": "string",
"format": "uuid",
},
{
"type": "object",
"additionalProperties": {
"type": "string",
},
"minProperties": 1,
},
]
}
},
"required": [
"$ref",
],
},
]
}

return schema
Loading

0 comments on commit 544bd82

Please sign in to comment.