From 06a1696f91e0bbd866c2c68db0c1bc734a0b5222 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ladislav=20G=C3=A1l?= Date: Wed, 24 Jul 2024 17:49:48 +0200 Subject: [PATCH 01/16] Implement server schema validation for blocks defined by reference ID --- .../utilities/schema_tools/validation.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/prefect/utilities/schema_tools/validation.py b/src/prefect/utilities/schema_tools/validation.py index 6e67d7e1065f..f66e023dfe34 100644 --- a/src/prefect/utilities/schema_tools/validation.py +++ b/src/prefect/utilities/schema_tools/validation.py @@ -253,5 +253,24 @@ def preprocess_schema( process_properties( definition["properties"], required_fields, allow_none_with_default ) + # Allow block types to be referenced by their id + if "block_type_slug" in definition: + schema["definitions"][definition["title"]] = { + "oneOf": [ + definition, + { + "type": "object", + "properties": { + "$ref": { + "type": "string", + "format": "uuid", + }, + }, + "required": [ + "$ref", + ], + }, + ] + } return schema From 316fcb2eb7d4989952884c8da975029c88945aa7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ladislav=20G=C3=A1l?= Date: Wed, 24 Jul 2024 17:50:11 +0200 Subject: [PATCH 02/16] Allow Blocks to be initialized using their reference ID --- src/prefect/blocks/core.py | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/src/prefect/blocks/core.py b/src/prefect/blocks/core.py index 620d87e80b70..467293de7f3d 100644 --- a/src/prefect/blocks/core.py +++ b/src/prefect/blocks/core.py @@ -2,6 +2,7 @@ import html import inspect import sys +import uuid import warnings from abc import ABC from functools import partial @@ -272,6 +273,12 @@ class Block(BaseModel, ABC): ) def __init__(self, *args, **kwargs): + block_document_id = kwargs.pop("$ref", None) + if block_document_id: + block_document, block_document_name = self._get_block_document_by_id( + block_document_id + ) + kwargs.update(block_document.data) super().__init__(*args, **kwargs) self.block_initialization() @@ -790,6 +797,33 @@ async def _get_block_document( return block_document, block_document_name + @classmethod + @sync_compatible + @inject_client + async def _get_block_document_by_id( + cls, + block_document_id: str | uuid.UUID, + client: Optional["PrefectClient"] = None, + ): + if isinstance(block_document_id, str): + try: + block_document_id = UUID(block_document_id) + except ValueError as e: + raise ValueError( + f"Block document ID {block_document_id!r} is not a valid UUID" + ) from e + + try: + block_document = await client.read_block_document( + block_document_id=block_document_id + ) + except prefect.exceptions.ObjectNotFound as e: + raise ValueError( + f"Unable to find block document with ID {block_document_id!r}" + ) from e + + return block_document, block_document.name + @classmethod @sync_compatible @inject_client From 318ccd7055f8931bb7cef648ce34d2a893f36aca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ladislav=20G=C3=A1l?= Date: Wed, 24 Jul 2024 18:26:48 +0200 Subject: [PATCH 03/16] Allow kwargs to overload referenced Block data in initialization --- src/prefect/blocks/core.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/prefect/blocks/core.py b/src/prefect/blocks/core.py index 467293de7f3d..54ea90f3944d 100644 --- a/src/prefect/blocks/core.py +++ b/src/prefect/blocks/core.py @@ -278,7 +278,10 @@ def __init__(self, *args, **kwargs): block_document, block_document_name = self._get_block_document_by_id( block_document_id ) - kwargs.update(block_document.data) + kwargs = { + **block_document.data, + **kwargs, + } super().__init__(*args, **kwargs) self.block_initialization() From d01b2ab13933a79e9b95060dc4a60f65685b461b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ladislav=20G=C3=A1l?= Date: Wed, 24 Jul 2024 20:52:35 +0200 Subject: [PATCH 04/16] Fix typing incompatibility with python3.9 --- src/prefect/blocks/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/prefect/blocks/core.py b/src/prefect/blocks/core.py index 54ea90f3944d..35def3fc9a1d 100644 --- a/src/prefect/blocks/core.py +++ b/src/prefect/blocks/core.py @@ -805,7 +805,7 @@ async def _get_block_document( @inject_client async def _get_block_document_by_id( cls, - block_document_id: str | uuid.UUID, + block_document_id: Union[str, uuid.UUID], client: Optional["PrefectClient"] = None, ): if isinstance(block_document_id, str): From 202c178fd647a2d2dcc5109265aad4df5f4421d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ladislav=20G=C3=A1l?= Date: Wed, 24 Jul 2024 23:58:30 +0200 Subject: [PATCH 05/16] Remove unnecessary exception chaining --- src/prefect/blocks/core.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/prefect/blocks/core.py b/src/prefect/blocks/core.py index 35def3fc9a1d..da65bada44cc 100644 --- a/src/prefect/blocks/core.py +++ b/src/prefect/blocks/core.py @@ -811,19 +811,19 @@ async def _get_block_document_by_id( if isinstance(block_document_id, str): try: block_document_id = UUID(block_document_id) - except ValueError as e: + except ValueError: raise ValueError( f"Block document ID {block_document_id!r} is not a valid UUID" - ) from e + ) try: block_document = await client.read_block_document( block_document_id=block_document_id ) - except prefect.exceptions.ObjectNotFound as e: + except prefect.exceptions.ObjectNotFound: raise ValueError( f"Unable to find block document with ID {block_document_id!r}" - ) from e + ) return block_document, block_document.name From 200cd31d864fb19dfe91c9616b62c2a82fd54492 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ladislav=20G=C3=A1l?= Date: Wed, 24 Jul 2024 23:59:52 +0200 Subject: [PATCH 06/16] Add _sync=True to _get_block_document_by_id() call --- src/prefect/blocks/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/prefect/blocks/core.py b/src/prefect/blocks/core.py index da65bada44cc..75581f495ab5 100644 --- a/src/prefect/blocks/core.py +++ b/src/prefect/blocks/core.py @@ -276,7 +276,7 @@ def __init__(self, *args, **kwargs): block_document_id = kwargs.pop("$ref", None) if block_document_id: block_document, block_document_name = self._get_block_document_by_id( - block_document_id + block_document_id, _sync=True ) kwargs = { **block_document.data, From a3a1f67a3356a030c21a6c2a94bd3d0eacc2783d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ladislav=20G=C3=A1l?= Date: Thu, 25 Jul 2024 01:45:00 +0200 Subject: [PATCH 07/16] Add block document validation for block initialization --- src/prefect/blocks/core.py | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/src/prefect/blocks/core.py b/src/prefect/blocks/core.py index 75581f495ab5..db4221e2134e 100644 --- a/src/prefect/blocks/core.py +++ b/src/prefect/blocks/core.py @@ -278,10 +278,14 @@ def __init__(self, *args, **kwargs): block_document, block_document_name = self._get_block_document_by_id( block_document_id, _sync=True ) + + self._validate_block_reference(block_document) + kwargs = { **block_document.data, **kwargs, } + super().__init__(*args, **kwargs) self.block_initialization() @@ -295,6 +299,35 @@ def __repr_args__(self): (key, value) for key, value in repr_args if key is None or key in data_keys ] + def _validate_block_reference(self, block_document: BlockDocument) -> None: + """ + Validates that the provided block document matches the block schema of the current block. + + Args: + block_document: The referenced block document to validate. + + Raises: + TypeError: If the block instantiation is attempted from the base Block class. + ValueError: If the block reference type or slug is invalid. + """ + if self.__class__ == Block: + raise TypeError( + "Block class cannot be instantiated directly from block reference." + ) + block_type_name = self.get_block_type_name() + block_type_slug = self.get_block_type_slug() + + if block_document.block_type_name != block_type_name: + raise ValueError( + f"Invalid Block reference type {block_document.block_type_name!r} " + f"for block type {block_type_name!r} initialization" + ) + if block_document.block_type.slug != block_type_slug: + raise ValueError( + f"Invalid Block reference slug {block_document.block_type.slug!r} " + f"for block slug {block_type_slug!r} initialization" + ) + def block_initialization(self) -> None: pass From a089d9c91217d038b5bbbc13886a6804e7fd1024 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ladislav=20G=C3=A1l?= Date: Thu, 25 Jul 2024 01:45:14 +0200 Subject: [PATCH 08/16] Add unit testing for block reference --- tests/blocks/test_block_reference.py | 131 +++++++++++++++++++++++++++ 1 file changed, 131 insertions(+) create mode 100644 tests/blocks/test_block_reference.py diff --git a/tests/blocks/test_block_reference.py b/tests/blocks/test_block_reference.py new file mode 100644 index 000000000000..390ce090c90f --- /dev/null +++ b/tests/blocks/test_block_reference.py @@ -0,0 +1,131 @@ +import warnings +from typing import Dict, Type +from uuid import uuid4 + +import pytest +from pydantic import ValidationError + +from prefect.blocks.core import Block +from prefect.exceptions import ParameterTypeError +from prefect.flows import flow + + +class TestBlockReference: + class ReferencedBlock(Block): + a: int + b: str + + class OtherReferencedBlock(Block): + a: int + b: str + + @pytest.fixture + def block_reference(self, prefect_client) -> Dict[str, str]: + block = self.ReferencedBlock(a=1, b="foo") + block.save("block-reference", client=prefect_client) + return {"$ref": str(block._block_document_id)} + + def test_block_initialization_from_reference( + self, + block_reference: Dict[str, str], + ): + block = self.ReferencedBlock(**block_reference) + assert block.a == 1 + assert block.b == "foo" + + def test_block_initialization_from_reference_with_kwargs( + self, + block_reference: Dict[str, str], + ): + block = self.ReferencedBlock(**block_reference, a=2) + assert block.a == 2 + assert block.b == "foo" + + def test_block_initialization_from_bad_reference(self): + with pytest.raises(ValueError, match="is not a valid UUID"): + self.ReferencedBlock(**{"$ref": "non-valid-uuid"}) + + with pytest.raises(ValueError, match="Unable to find block document with ID"): + self.ReferencedBlock(**{"$ref": str(uuid4())}) + + def test_block_initialization_from_invalid_block_reference_type(self): + block = self.OtherReferencedBlock(a=1, b="foo") + block.save("other-block") + + with pytest.raises(ValueError, match="Invalid Block reference type"): + self.ReferencedBlock(**{"$ref": str(block._block_document_id)}) + + def test_block_validation_from_reference( + self, + block_reference: Dict[str, str], + ): + block = self.ReferencedBlock.model_validate(block_reference) + assert block.a == 1 + assert block.b == "foo" + + def test_block_validation_from_bad_reference( + self, + block_reference: Dict[str, str], + ): + with pytest.raises(ValidationError): + self.ReferencedBlock.model_validate({"$ref": "non-valid-uuid"}) + + with pytest.raises(ValidationError): + self.ReferencedBlock.model_validate({"$ref": str(uuid4())}) + + def test_block_validation_from_invalid_block_reference_type(self): + block = self.OtherReferencedBlock(a=1, b="foo") + block.save("other-block") + + with pytest.raises(ValidationError): + self.ReferencedBlock.model_validate({"$ref": str(block._block_document_id)}) + + +class TestFlowWithBlockParam: + @pytest.fixture + def ParamBlock(self) -> Type[Block]: + # Ignore warning caused by matching key in registry due to block fixture + warnings.filterwarnings("ignore", category=UserWarning) + + class ParamBlock(Block): + a: int + b: str + + return ParamBlock + + @pytest.fixture + def OtherParamBlock(self) -> Type[Block]: + # Ignore warning caused by matching key in registry due to block fixture + warnings.filterwarnings("ignore", category=UserWarning) + + class OtherParamBlock(Block): + a: int + b: str + + return OtherParamBlock + + def test_flow_with_block_params(self, ParamBlock): + ref_block = ParamBlock(a=10, b="foo") + ref_block.save("param-block") + + @flow + def flow_with_block_param(block: ParamBlock) -> int: + return block.a + + assert ( + flow_with_block_param({"$ref": str(ref_block._block_document_id)}) + == ref_block.a + ) + + def test_flow_with_invalid_block_param_type(self, ParamBlock, OtherParamBlock): + ref_block = OtherParamBlock(a=10, b="foo") + ref_block.save("other-param-block") + + @flow + def flow_with_block_param(block: ParamBlock) -> int: + return block.a + + with pytest.raises( + ParameterTypeError, match="Flow run received invalid parameters" + ): + flow_with_block_param({"$ref": str(ref_block._block_document_id)}) From 49ad754ef6a39c65e7bf11096d7be884a3c31622 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ladislav=20G=C3=A1l?= Date: Sat, 27 Jul 2024 00:12:24 +0200 Subject: [PATCH 09/16] Refactor block reference resolution to be executed in Flow.validate_parameters --- src/prefect/blocks/core.py | 111 +++++++++++++++++++++++-------------- src/prefect/flows.py | 28 +++++++++- 2 files changed, 96 insertions(+), 43 deletions(-) diff --git a/src/prefect/blocks/core.py b/src/prefect/blocks/core.py index db4221e2134e..bcd948f2dd5b 100644 --- a/src/prefect/blocks/core.py +++ b/src/prefect/blocks/core.py @@ -273,19 +273,6 @@ class Block(BaseModel, ABC): ) def __init__(self, *args, **kwargs): - block_document_id = kwargs.pop("$ref", None) - if block_document_id: - block_document, block_document_name = self._get_block_document_by_id( - block_document_id, _sync=True - ) - - self._validate_block_reference(block_document) - - kwargs = { - **block_document.data, - **kwargs, - } - super().__init__(*args, **kwargs) self.block_initialization() @@ -299,35 +286,6 @@ def __repr_args__(self): (key, value) for key, value in repr_args if key is None or key in data_keys ] - def _validate_block_reference(self, block_document: BlockDocument) -> None: - """ - Validates that the provided block document matches the block schema of the current block. - - Args: - block_document: The referenced block document to validate. - - Raises: - TypeError: If the block instantiation is attempted from the base Block class. - ValueError: If the block reference type or slug is invalid. - """ - if self.__class__ == Block: - raise TypeError( - "Block class cannot be instantiated directly from block reference." - ) - block_type_name = self.get_block_type_name() - block_type_slug = self.get_block_type_slug() - - if block_document.block_type_name != block_type_name: - raise ValueError( - f"Invalid Block reference type {block_document.block_type_name!r} " - f"for block type {block_type_name!r} initialization" - ) - if block_document.block_type.slug != block_type_slug: - raise ValueError( - f"Invalid Block reference slug {block_document.block_type.slug!r} " - f"for block slug {block_type_slug!r} initialization" - ) - def block_initialization(self) -> None: pass @@ -969,6 +927,75 @@ class Custom(Block): " validation, try loading again with `validate=False`." ) from e + @classmethod + @sync_compatible + @inject_client + async def load_from_ref( + cls, + id: Union[str, UUID], + validate: bool = True, + client: Optional["PrefectClient"] = None, + ) -> "Self": + """ + Retrieves data from the block document with the given ID for the block type + that corresponds with the current class and returns an instantiated version of + the current class with the data stored in the block document. + + If a block document for a given block type is saved with a different schema + than the current class calling `load`, a warning will be raised. + + If the current class schema is a subset of the block document schema, the block + can be loaded as normal using the default `validate = True`. + + If the current class schema is a superset of the block document schema, `load` + must be called with `validate` set to False to prevent a validation error. In + this case, the block attributes will default to `None` and must be set manually + and saved to a new block document before the block can be used as expected. + + Args: + id: The ID of the block document. + validate: If False, the block document will be loaded without Pydantic + validating the block schema. This is useful if the block schema has + changed client-side since the block document referred to by `name` was saved. + client: The client to use to load the block document. If not provided, the + default client will be injected. + + Raises: + ValueError: If the requested block document is not found. + + Returns: + An instance of the current class hydrated with the data stored in the + block document with the specified name. + + Examples: + ... TBD + + """ + block_document, block_document_name = await cls._get_block_document_by_id(id) + + try: + return cls._from_block_document(block_document) + except ValidationError as e: + if not validate: + missing_fields = tuple(err["loc"][0] for err in e.errors()) + missing_block_data = {field: None for field in missing_fields} + warnings.warn( + f"Could not fully load {block_document_name!r} of block type" + f" {cls.get_block_type_slug()!r} - this is likely because one or more" + " required fields were added to the schema for" + f" {cls.__name__!r} that did not exist on the class when this block" + " was last saved. Please specify values for new field(s):" + f" {listrepr(missing_fields)}, then run" + f' `{cls.__name__}.save("{block_document_name}", overwrite=True)`,' + " and load this block again before attempting to use it." + ) + return cls.model_construct(**block_document.data, **missing_block_data) + raise RuntimeError( + f"Unable to load {block_document_name!r} of block type" + f" {cls.get_block_type_slug()!r} due to failed validation. To load without" + " validation, try loading again with `validate=False`." + ) from e + @staticmethod def is_block_class(block) -> bool: return _is_subclass(block, Block) diff --git a/src/prefect/flows.py b/src/prefect/flows.py index 4d76f3584885..f5a2a5fd38e1 100644 --- a/src/prefect/flows.py +++ b/src/prefect/flows.py @@ -95,7 +95,7 @@ parameters_to_args_kwargs, raise_for_reserved_arguments, ) -from prefect.utilities.collections import listrepr +from prefect.utilities.collections import listrepr, visit_collection from prefect.utilities.filesystem import relative_path_to_current_platform from prefect.utilities.hashing import file_hash from prefect.utilities.importtools import import_object, safe_load_namespace @@ -535,6 +535,32 @@ def validate_parameters(self, parameters: Dict[str, Any]) -> Dict[str, Any]: Raises: ParameterTypeError: if the provided parameters are not valid """ + + def resolve_block_reference(data: Any) -> Any: + if isinstance(data, dict) and "$ref" in data: + if isinstance(data["$ref"], (str, UUID)): + # This is format used by Deployment Create Run form + block_document_id = data["$ref"] + elif ( + isinstance(data["$ref"], dict) + and "block_document_id" in data["$ref"] + ): + block_document_id = data["$ref"]["block_document_id"] + else: + raise ValueError(f"Invalid reference format: {data}") + + return Block.load_from_ref(block_document_id) + return data + + try: + parameters = visit_collection( + parameters, resolve_block_reference, return_data=True + ) + except (ValueError, RuntimeError) as exc: + raise ParameterTypeError( + "Failed to resolve block references in parameters." + ) from exc + args, kwargs = parameters_to_args_kwargs(self.fn, parameters) with warnings.catch_warnings(): From 52c9a5d1591a78f4781994c4eafe7ed8644439d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ladislav=20G=C3=A1l?= Date: Sat, 27 Jul 2024 00:26:52 +0200 Subject: [PATCH 10/16] Update unittests --- tests/blocks/test_block_reference.py | 71 +++++++++++++--------------- 1 file changed, 32 insertions(+), 39 deletions(-) diff --git a/tests/blocks/test_block_reference.py b/tests/blocks/test_block_reference.py index 390ce090c90f..6366a3979a2f 100644 --- a/tests/blocks/test_block_reference.py +++ b/tests/blocks/test_block_reference.py @@ -1,6 +1,6 @@ import warnings from typing import Dict, Type -from uuid import uuid4 +from uuid import uuid4, UUID import pytest from pydantic import ValidationError @@ -15,70 +15,57 @@ class ReferencedBlock(Block): a: int b: str - class OtherReferencedBlock(Block): + class SimilarReferencedBlock(Block): a: int b: str + class OtherReferencedBlock(Block): + c: int + d: str + @pytest.fixture - def block_reference(self, prefect_client) -> Dict[str, str]: + def block_document_id(self, prefect_client) -> UUID: block = self.ReferencedBlock(a=1, b="foo") block.save("block-reference", client=prefect_client) - return {"$ref": str(block._block_document_id)} + return block._block_document_id - def test_block_initialization_from_reference( + def test_block_load_from_reference( self, - block_reference: Dict[str, str], + block_document_id: UUID, ): - block = self.ReferencedBlock(**block_reference) + block = self.ReferencedBlock.load_from_ref(block_document_id) assert block.a == 1 assert block.b == "foo" - def test_block_initialization_from_reference_with_kwargs( + def test_block_load_from_reference_string( self, - block_reference: Dict[str, str], + block_document_id: UUID, ): - block = self.ReferencedBlock(**block_reference, a=2) - assert block.a == 2 + block = self.ReferencedBlock.load_from_ref(str(block_document_id)) + assert block.a == 1 assert block.b == "foo" - def test_block_initialization_from_bad_reference(self): + def test_block_load_from_bad_reference(self): with pytest.raises(ValueError, match="is not a valid UUID"): - self.ReferencedBlock(**{"$ref": "non-valid-uuid"}) + self.ReferencedBlock.load_from_ref("non-valid-uuid") with pytest.raises(ValueError, match="Unable to find block document with ID"): - self.ReferencedBlock(**{"$ref": str(uuid4())}) + self.ReferencedBlock.load_from_ref(uuid4()) - def test_block_initialization_from_invalid_block_reference_type(self): - block = self.OtherReferencedBlock(a=1, b="foo") + def test_block_load_from_similar_block_reference_type(self): + block = self.SimilarReferencedBlock(a=1, b="foo") block.save("other-block") - with pytest.raises(ValueError, match="Invalid Block reference type"): - self.ReferencedBlock(**{"$ref": str(block._block_document_id)}) - - def test_block_validation_from_reference( - self, - block_reference: Dict[str, str], - ): - block = self.ReferencedBlock.model_validate(block_reference) + block = self.ReferencedBlock.load_from_ref(block._block_document_id) assert block.a == 1 assert block.b == "foo" - def test_block_validation_from_bad_reference( - self, - block_reference: Dict[str, str], - ): - with pytest.raises(ValidationError): - self.ReferencedBlock.model_validate({"$ref": "non-valid-uuid"}) - - with pytest.raises(ValidationError): - self.ReferencedBlock.model_validate({"$ref": str(uuid4())}) - - def test_block_validation_from_invalid_block_reference_type(self): - block = self.OtherReferencedBlock(a=1, b="foo") + def test_block_load_from_invalid_block_reference_type(self): + block = self.OtherReferencedBlock(c=1, d="foo") block.save("other-block") - with pytest.raises(ValidationError): - self.ReferencedBlock.model_validate({"$ref": str(block._block_document_id)}) + with pytest.raises(RuntimeError): + self.ReferencedBlock.load_from_ref(block._block_document_id) class TestFlowWithBlockParam: @@ -116,9 +103,15 @@ def flow_with_block_param(block: ParamBlock) -> int: flow_with_block_param({"$ref": str(ref_block._block_document_id)}) == ref_block.a ) + assert ( + flow_with_block_param( + {"$ref": {"block_document_id": str(ref_block._block_document_id)}} + ) + == ref_block.a + ) def test_flow_with_invalid_block_param_type(self, ParamBlock, OtherParamBlock): - ref_block = OtherParamBlock(a=10, b="foo") + ref_block = OtherParamBlock(c=10, d="foo") ref_block.save("other-param-block") @flow From b91a9d935917898d7b32649352404c27fac4b99b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ladislav=20G=C3=A1l?= Date: Sat, 27 Jul 2024 00:55:34 +0200 Subject: [PATCH 11/16] Remove duplicate code by creating _load_from_block_document class method --- src/prefect/blocks/core.py | 65 +++++++++++++++++++++++--------------- 1 file changed, 39 insertions(+), 26 deletions(-) diff --git a/src/prefect/blocks/core.py b/src/prefect/blocks/core.py index bcd948f2dd5b..0b79d7994f8c 100644 --- a/src/prefect/blocks/core.py +++ b/src/prefect/blocks/core.py @@ -904,28 +904,7 @@ class Custom(Block): """ block_document, block_document_name = await cls._get_block_document(name) - try: - return cls._from_block_document(block_document) - except ValidationError as e: - if not validate: - missing_fields = tuple(err["loc"][0] for err in e.errors()) - missing_block_data = {field: None for field in missing_fields} - warnings.warn( - f"Could not fully load {block_document_name!r} of block type" - f" {cls.get_block_type_slug()!r} - this is likely because one or more" - " required fields were added to the schema for" - f" {cls.__name__!r} that did not exist on the class when this block" - " was last saved. Please specify values for new field(s):" - f" {listrepr(missing_fields)}, then run" - f' `{cls.__name__}.save("{block_document_name}", overwrite=True)`,' - " and load this block again before attempting to use it." - ) - return cls.model_construct(**block_document.data, **missing_block_data) - raise RuntimeError( - f"Unable to load {block_document_name!r} of block type" - f" {cls.get_block_type_slug()!r} due to failed validation. To load without" - " validation, try loading again with `validate=False`." - ) from e + return cls._load_from_block_document(block_document, validate=validate) @classmethod @sync_compatible @@ -971,8 +950,42 @@ async def load_from_ref( ... TBD """ - block_document, block_document_name = await cls._get_block_document_by_id(id) + block_document, _ = await cls._get_block_document_by_id(id) + return cls._load_from_block_document(block_document, validate=validate) + + @classmethod + def _load_from_block_document( + cls, block_document: BlockDocument, validate: bool = True + ) -> "Self": + """ + Loads a block from a given block document. + + If a block document for a given block type is saved with a different schema + than the current class calling `load`, a warning will be raised. + + If the current class schema is a subset of the block document schema, the block + can be loaded as normal using the default `validate = True`. + + If the current class schema is a superset of the block document schema, `load` + must be called with `validate` set to False to prevent a validation error. In + this case, the block attributes will default to `None` and must be set manually + and saved to a new block document before the block can be used as expected. + + Args: + block_document: The block document used to instantiate a block. + validate: If False, the block document will be loaded without Pydantic + validating the block schema. This is useful if the block schema has + changed client-side since the block document referred to by `name` was saved. + + Raises: + ValueError: If the requested block document is not found. + + Returns: + An instance of the current class hydrated with the data stored in the + block document with the specified name. + + """ try: return cls._from_block_document(block_document) except ValidationError as e: @@ -980,18 +993,18 @@ async def load_from_ref( missing_fields = tuple(err["loc"][0] for err in e.errors()) missing_block_data = {field: None for field in missing_fields} warnings.warn( - f"Could not fully load {block_document_name!r} of block type" + f"Could not fully load {block_document.name!r} of block type" f" {cls.get_block_type_slug()!r} - this is likely because one or more" " required fields were added to the schema for" f" {cls.__name__!r} that did not exist on the class when this block" " was last saved. Please specify values for new field(s):" f" {listrepr(missing_fields)}, then run" - f' `{cls.__name__}.save("{block_document_name}", overwrite=True)`,' + f' `{cls.__name__}.save("{block_document.name}", overwrite=True)`,' " and load this block again before attempting to use it." ) return cls.model_construct(**block_document.data, **missing_block_data) raise RuntimeError( - f"Unable to load {block_document_name!r} of block type" + f"Unable to load {block_document.name!r} of block type" f" {cls.get_block_type_slug()!r} due to failed validation. To load without" " validation, try loading again with `validate=False`." ) from e From 76157ca5df11ddb73a6efdc92885917d9bcae057 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ladislav=20G=C3=A1l?= Date: Sat, 27 Jul 2024 01:05:25 +0200 Subject: [PATCH 12/16] Add additional schemas for block reference to validation API --- src/prefect/utilities/schema_tools/validation.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/prefect/utilities/schema_tools/validation.py b/src/prefect/utilities/schema_tools/validation.py index f66e023dfe34..5955691e66c5 100644 --- a/src/prefect/utilities/schema_tools/validation.py +++ b/src/prefect/utilities/schema_tools/validation.py @@ -262,9 +262,19 @@ def preprocess_schema( "type": "object", "properties": { "$ref": { - "type": "string", - "format": "uuid", - }, + "oneOf": [ + { + "type": "string", + "format": "uuid", + }, + { + "type": "object", + "additionalProperties": { + "type": "string", + }, + }, + ] + } }, "required": [ "$ref", From 30b6a516c5b3615935f4aaf555ef5f1d3ec182d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ladislav=20G=C3=A1l?= Date: Sat, 27 Jul 2024 01:07:27 +0200 Subject: [PATCH 13/16] Add minProperties restriction to added reference validation schema --- src/prefect/utilities/schema_tools/validation.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/prefect/utilities/schema_tools/validation.py b/src/prefect/utilities/schema_tools/validation.py index 5955691e66c5..e94204331125 100644 --- a/src/prefect/utilities/schema_tools/validation.py +++ b/src/prefect/utilities/schema_tools/validation.py @@ -272,6 +272,7 @@ def preprocess_schema( "additionalProperties": { "type": "string", }, + "minProperties": 1, }, ] } From 05040703e4e54681525dd25d5ead169c0d08eb45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ladislav=20G=C3=A1l?= Date: Sat, 27 Jul 2024 07:12:52 +0200 Subject: [PATCH 14/16] Improve load_from_ref to handle reference formats --- src/prefect/blocks/core.py | 30 +++++++++++++++++++++++------- src/prefect/flows.py | 13 +------------ 2 files changed, 24 insertions(+), 19 deletions(-) diff --git a/src/prefect/blocks/core.py b/src/prefect/blocks/core.py index 0b79d7994f8c..59097417e46a 100644 --- a/src/prefect/blocks/core.py +++ b/src/prefect/blocks/core.py @@ -911,15 +911,20 @@ class Custom(Block): @inject_client async def load_from_ref( cls, - id: Union[str, UUID], + ref: Union[str, UUID, Dict[str, Any]], validate: bool = True, client: Optional["PrefectClient"] = None, ) -> "Self": """ - Retrieves data from the block document with the given ID for the block type + Retrieves data from the block document by given reference for the block type that corresponds with the current class and returns an instantiated version of the current class with the data stored in the block document. + Provided reference can be a block document ID, or a reference data in dictionary format. + Supported dictionary reference formats are: + - {"block_document_id": } + - {"block_document_slug": } + If a block document for a given block type is saved with a different schema than the current class calling `load`, a warning will be raised. @@ -932,7 +937,8 @@ async def load_from_ref( and saved to a new block document before the block can be used as expected. Args: - id: The ID of the block document. + ref: The reference to the block document. This can be a block document ID, + or one of supported dictionary reference formats. validate: If False, the block document will be loaded without Pydantic validating the block schema. This is useful if the block schema has changed client-side since the block document referred to by `name` was saved. @@ -940,17 +946,27 @@ async def load_from_ref( default client will be injected. Raises: + ValueError: If invalid reference format is provided. ValueError: If the requested block document is not found. Returns: An instance of the current class hydrated with the data stored in the block document with the specified name. - Examples: - ... TBD - """ - block_document, _ = await cls._get_block_document_by_id(id) + block_document = None + if isinstance(ref, (str, UUID)): + block_document, _ = await cls._get_block_document_by_id(ref) + elif isinstance(ref, dict): + if block_document_id := ref.get("block_document_id"): + block_document, _ = await cls._get_block_document_by_id( + block_document_id + ) + elif block_document_slug := ref.get("block_document_slug"): + block_document, _ = await cls._get_block_document(block_document_slug) + + if not block_document: + raise ValueError(f"Invalid reference format {ref!r}.") return cls._load_from_block_document(block_document, validate=validate) diff --git a/src/prefect/flows.py b/src/prefect/flows.py index f5a2a5fd38e1..a865b29a1f90 100644 --- a/src/prefect/flows.py +++ b/src/prefect/flows.py @@ -538,18 +538,7 @@ def validate_parameters(self, parameters: Dict[str, Any]) -> Dict[str, Any]: def resolve_block_reference(data: Any) -> Any: if isinstance(data, dict) and "$ref" in data: - if isinstance(data["$ref"], (str, UUID)): - # This is format used by Deployment Create Run form - block_document_id = data["$ref"] - elif ( - isinstance(data["$ref"], dict) - and "block_document_id" in data["$ref"] - ): - block_document_id = data["$ref"]["block_document_id"] - else: - raise ValueError(f"Invalid reference format: {data}") - - return Block.load_from_ref(block_document_id) + return Block.load_from_ref(data["$ref"]) return data try: From 1108a40cb8386d2db12a11c5a06fba340c0a57a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ladislav=20G=C3=A1l?= Date: Sat, 27 Jul 2024 07:15:52 +0200 Subject: [PATCH 15/16] Unittest fixup --- tests/blocks/test_block_reference.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/blocks/test_block_reference.py b/tests/blocks/test_block_reference.py index 6366a3979a2f..28da7e7b79ac 100644 --- a/tests/blocks/test_block_reference.py +++ b/tests/blocks/test_block_reference.py @@ -111,7 +111,7 @@ def flow_with_block_param(block: ParamBlock) -> int: ) def test_flow_with_invalid_block_param_type(self, ParamBlock, OtherParamBlock): - ref_block = OtherParamBlock(c=10, d="foo") + ref_block = OtherParamBlock(a=10, b="foo") ref_block.save("other-param-block") @flow From 0a12e51a17dcb8d72109b63d95452e8cef6044fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ladislav=20G=C3=A1l?= Date: Sat, 27 Jul 2024 22:27:39 +0200 Subject: [PATCH 16/16] Add more tests for nested block references --- tests/blocks/test_block_reference.py | 73 ++++++++++++++++++++++++++-- 1 file changed, 70 insertions(+), 3 deletions(-) diff --git a/tests/blocks/test_block_reference.py b/tests/blocks/test_block_reference.py index 28da7e7b79ac..06de8143685c 100644 --- a/tests/blocks/test_block_reference.py +++ b/tests/blocks/test_block_reference.py @@ -1,9 +1,9 @@ import warnings -from typing import Dict, Type -from uuid import uuid4, UUID +from typing import Type +from uuid import UUID, uuid4 +import pydantic import pytest -from pydantic import ValidationError from prefect.blocks.core import Block from prefect.exceptions import ParameterTypeError @@ -37,6 +37,15 @@ def test_block_load_from_reference( assert block.a == 1 assert block.b == "foo" + def test_base_block_load_from_reference( + self, + block_document_id: UUID, + ): + block = Block.load_from_ref(block_document_id) + assert isinstance(block, self.ReferencedBlock) + assert block.a == 1 + assert block.b == "foo" + def test_block_load_from_reference_string( self, block_document_id: UUID, @@ -67,6 +76,22 @@ def test_block_load_from_invalid_block_reference_type(self): with pytest.raises(RuntimeError): self.ReferencedBlock.load_from_ref(block._block_document_id) + def test_block_load_from_nested_block_reference(self): + ReferencedBlock = self.ReferencedBlock + + class NestedReferencedBlock(Block): + inner_block: ReferencedBlock + + nested_block = NestedReferencedBlock(inner_block=ReferencedBlock(a=1, b="foo")) + nested_block.save("nested-block") + + loaded_block = NestedReferencedBlock.load_from_ref( + nested_block._block_document_id + ) + assert getattr(loaded_block, "inner_block", None) is not None + assert loaded_block.inner_block.a == 1 + assert loaded_block.inner_block.b == "foo" + class TestFlowWithBlockParam: @pytest.fixture @@ -122,3 +147,45 @@ def flow_with_block_param(block: ParamBlock) -> int: ParameterTypeError, match="Flow run received invalid parameters" ): flow_with_block_param({"$ref": str(ref_block._block_document_id)}) + + def test_flow_with_nested_block_params(self, ParamBlock): + class NestedParamBlock(Block): + inner_block: ParamBlock + + nested_block = NestedParamBlock(inner_block=ParamBlock(a=12, b="foo")) + nested_block.save("nested-block") + + @flow + def flow_with_nested_block_param(block: NestedParamBlock): + return block.inner_block.a + + assert ( + flow_with_nested_block_param( + {"$ref": {"block_document_id": str(nested_block._block_document_id)}} + ) + == nested_block.inner_block.a + ) + + def test_flow_with_block_param_in_basemodel(self, ParamBlock): + class ParamModel(pydantic.BaseModel): + block: ParamBlock + + param_block = ParamBlock(a=12, b="foo") + param_block.save("param-block") + + @flow + def flow_with_block_param_in_basemodel(param: ParamModel): + return param.block.a + + assert ( + flow_with_block_param_in_basemodel( + { + "block": { + "$ref": { + "block_document_id": str(param_block._block_document_id) + } + } + } + ) + == param_block.a + )