Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Webhook tasks using FlyteAgents #3058

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
4 changes: 4 additions & 0 deletions flytekit/extras/webhook/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .agent import WebhookAgent
from .task import WebhookTask

Check warning on line 2 in flytekit/extras/webhook/__init__.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/__init__.py#L1-L2

Added lines #L1 - L2 were not covered by tests

__all__ = ["WebhookTask", "WebhookAgent"]

Check warning on line 4 in flytekit/extras/webhook/__init__.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/__init__.py#L4

Added line #L4 was not covered by tests
81 changes: 81 additions & 0 deletions flytekit/extras/webhook/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import asyncio
import http
from typing import Optional

Check warning on line 3 in flytekit/extras/webhook/agent.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/agent.py#L1-L3

Added lines #L1 - L3 were not covered by tests

import aiohttp
from flyteidl.core.execution_pb2 import TaskExecution

Check warning on line 6 in flytekit/extras/webhook/agent.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/agent.py#L5-L6

Added lines #L5 - L6 were not covered by tests

from flytekit.extend.backend.base_agent import AgentRegistry, Resource, SyncAgentBase
from flytekit.interaction.string_literals import literal_map_string_repr
from flytekit.models.literals import LiteralMap
from flytekit.models.task import TaskTemplate
from flytekit.utils.dict_formatter import format_dict

Check warning on line 12 in flytekit/extras/webhook/agent.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/agent.py#L8-L12

Added lines #L8 - L12 were not covered by tests

from .constants import BODY_KEY, HEADERS_KEY, METHOD_KEY, SHOW_BODY_KEY, SHOW_URL_KEY, TASK_TYPE, URL_KEY

Check warning on line 14 in flytekit/extras/webhook/agent.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/agent.py#L14

Added line #L14 was not covered by tests


class WebhookAgent(SyncAgentBase):
name = "Webhook Agent"

Check warning on line 18 in flytekit/extras/webhook/agent.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/agent.py#L17-L18

Added lines #L17 - L18 were not covered by tests

def __init__(self):
super().__init__(task_type_name=TASK_TYPE)
self._session = None
self._lock = asyncio.Lock()

Check warning on line 23 in flytekit/extras/webhook/agent.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/agent.py#L20-L23

Added lines #L20 - L23 were not covered by tests

async def _get_session(self) -> aiohttp.ClientSession:
async with self._lock:

Check warning on line 26 in flytekit/extras/webhook/agent.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/agent.py#L25-L26

Added lines #L25 - L26 were not covered by tests
if self._session is None:
self._session = aiohttp.ClientSession()
return self._session

Check warning on line 29 in flytekit/extras/webhook/agent.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/agent.py#L28-L29

Added lines #L28 - L29 were not covered by tests

async def do(

Check warning on line 31 in flytekit/extras/webhook/agent.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/agent.py#L31

Added line #L31 was not covered by tests
self, task_template: TaskTemplate, output_prefix: str, inputs: Optional[LiteralMap] = None, **kwargs
) -> Resource:
try:
custom_dict = task_template.custom
input_dict = {

Check warning on line 36 in flytekit/extras/webhook/agent.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/agent.py#L34-L36

Added lines #L34 - L36 were not covered by tests
"inputs": literal_map_string_repr(inputs),
}

final_dict = format_dict("test", custom_dict, input_dict)
url = final_dict.get(URL_KEY)
body = final_dict.get(BODY_KEY)
headers = final_dict.get(HEADERS_KEY)
method = final_dict.get(METHOD_KEY)
method = http.HTTPMethod(method)
show_body = final_dict.get(SHOW_BODY_KEY, False)
show_url = final_dict.get(SHOW_URL_KEY, False)

Check warning on line 47 in flytekit/extras/webhook/agent.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/agent.py#L40-L47

Added lines #L40 - L47 were not covered by tests

session = await self._get_session()

Check warning on line 49 in flytekit/extras/webhook/agent.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/agent.py#L49

Added line #L49 was not covered by tests

text = None

Check warning on line 51 in flytekit/extras/webhook/agent.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/agent.py#L51

Added line #L51 was not covered by tests
if method == http.HTTPMethod.GET:
response = await session.get(url, headers=headers)
text = await response.text()

Check warning on line 54 in flytekit/extras/webhook/agent.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/agent.py#L53-L54

Added lines #L53 - L54 were not covered by tests
else:
response = await session.post(url, json=body, headers=headers)
text = await response.text()

Check warning on line 57 in flytekit/extras/webhook/agent.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/agent.py#L56-L57

Added lines #L56 - L57 were not covered by tests
if response.status != 200:
return Resource(

Check warning on line 59 in flytekit/extras/webhook/agent.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/agent.py#L59

Added line #L59 was not covered by tests
phase=TaskExecution.FAILED,
message=f"Webhook failed with status code {response.status}, response: {text}",
)
final_response = {

Check warning on line 63 in flytekit/extras/webhook/agent.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/agent.py#L63

Added line #L63 was not covered by tests
"status_code": response.status,
"body": text,
}
if show_body:
final_response["input_body"] = body

Check warning on line 68 in flytekit/extras/webhook/agent.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/agent.py#L68

Added line #L68 was not covered by tests
if show_url:
final_response["url"] = url

Check warning on line 70 in flytekit/extras/webhook/agent.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/agent.py#L70

Added line #L70 was not covered by tests

return Resource(

Check warning on line 72 in flytekit/extras/webhook/agent.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/agent.py#L72

Added line #L72 was not covered by tests
phase=TaskExecution.SUCCEEDED,
outputs={"info": final_response},
message="Webhook was successfully invoked!",
)
except Exception as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too broad exception handling

Catching generic 'Exception' may hide bugs. Consider catching specific exceptions like 'aiohttp.ClientError'.

Code suggestion
Check the AI-generated fix before applying
Suggested change
except Exception as e:
except (aiohttp.ClientError, ValueError) as e:

Code Review Run #8c3d4c


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

return Resource(phase=TaskExecution.FAILED, message=str(e))

Check warning on line 78 in flytekit/extras/webhook/agent.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/agent.py#L77-L78

Added lines #L77 - L78 were not covered by tests
Copy link
Contributor

@flyte-bot flyte-bot Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider breaking down long method

The do method is quite long and handles multiple responsibilities including HTTP request handling, response processing, and error handling. Consider breaking it down into smaller, focused methods for better maintainability.

Code suggestion
Check the AI-generated fix before applying
Suggested change
url = final_dict.get(URL_KEY)
body = final_dict.get(BODY_KEY)
headers = final_dict.get(HEADERS_KEY)
method = final_dict.get(METHOD_KEY)
method = http.HTTPMethod(method)
show_body = final_dict.get(SHOW_BODY_KEY, False)
show_url = final_dict.get(SHOW_URL_KEY, False)
session = await self._get_session()
text = None
if method == http.HTTPMethod.GET:
response = await session.get(url, headers=headers)
text = await response.text()
else:
response = await session.post(url, json=body, headers=headers)
text = await response.text()
if response.status != 200:
return Resource(
phase=TaskExecution.FAILED,
message=f"Webhook failed with status code {response.status}, response: {text}",
)
final_response = {
"status_code": response.status,
"body": text,
}
if show_body:
final_response["input_body"] = body
if show_url:
final_response["url"] = url
return Resource(
phase=TaskExecution.SUCCEEDED,
outputs={"info": final_response},
message="Webhook was successfully invoked!",
)
except Exception as e:
return Resource(phase=TaskExecution.FAILED, message=str(e))
return await self._process_webhook(final_dict)
except Exception as e:
return Resource(phase=TaskExecution.FAILED, message=str(e))
async def _make_http_request(self, method: http.HTTPMethod, url: str, headers: dict, body: dict = None) -> tuple:
session = await self._get_session()
if method == http.HTTPMethod.GET:
response = await session.get(url, headers=headers)
else:
response = await session.post(url, json=body, headers=headers)
text = await response.text()
return response, text
def _build_response(self, response: aiohttp.ClientResponse, text: str, body: dict = None, url: str = None,
show_body: bool = False, show_url: bool = False) -> dict:
final_response = {
"status_code": response.status,
"body": text,
}
if show_body:
final_response["input_body"] = body
if show_url:
final_response["url"] = url
return final_response
async def _process_webhook(self, final_dict: dict) -> Resource:
url = final_dict.get(URL_KEY)
body = final_dict.get(BODY_KEY)
headers = final_dict.get(HEADERS_KEY)
method = http.HTTPMethod(final_dict.get(METHOD_KEY))
show_body = final_dict.get(SHOW_BODY_KEY, False)
show_url = final_dict.get(SHOW_URL_KEY, False)
response, text = await self._make_http_request(method, url, headers, body)
if response.status != 200:
return Resource(
phase=TaskExecution.FAILED,
message=f"Webhook failed with status code {response.status}, response: {text}",
)
final_response = self._build_response(response, text, body, url, show_body, show_url)
return Resource(
phase=TaskExecution.SUCCEEDED,
outputs={"info": final_response},
message="Webhook was successfully invoked!",
)

Code Review Run #8c3d4c

Consider validating timeout parameter value

Consider adding a timeout type check to ensure timeout is a positive integer before making the HTTP request. The current implementation assumes timeout is always valid.

Code suggestion
Check the AI-generated fix before applying
 @@ -53,8 +53,11 @@ class WebhookAgent(SyncAgentBase):
      async def _make_http_request(
          self, method: http.HTTPMethod, url: str, headers: dict, data: dict, timeout: int
      ) -> tuple:
 +        if not isinstance(timeout, int) or timeout <= 0:
 +            raise ValueError(f'timeout must be a positive integer, got {timeout}')
 +
          if method == http.HTTPMethod.GET:
              response = await self._client.get(url, headers=headers, params=data, timeout=timeout)

Code Review Run #49f39f


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged



AgentRegistry.register(WebhookAgent())

Check warning on line 81 in flytekit/extras/webhook/agent.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/agent.py#L81

Added line #L81 was not covered by tests
8 changes: 8 additions & 0 deletions flytekit/extras/webhook/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
TASK_TYPE = "webhook"

Check warning on line 1 in flytekit/extras/webhook/constants.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/constants.py#L1

Added line #L1 was not covered by tests

URL_KEY = "url"
METHOD_KEY = "method"
HEADERS_KEY = "headers"
BODY_KEY = "body"
SHOW_BODY_KEY = "show_body"
SHOW_URL_KEY = "show_url"

Check warning on line 8 in flytekit/extras/webhook/constants.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/constants.py#L3-L8

Added lines #L3 - L8 were not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding type hints to constants

Consider adding type hints to the constant declarations to improve code maintainability and IDE support. For example: TASK_TYPE: str = "webhook"

Code suggestion
Check the AI-generated fix before applying
Suggested change
TASK_TYPE = "webhook"
URL_KEY = "url"
METHOD_KEY = "method"
HEADERS_KEY = "headers"
BODY_KEY = "body"
SHOW_BODY_KEY = "show_body"
SHOW_URL_KEY = "show_url"
TASK_TYPE: str = "webhook"
URL_KEY: str = "url"
METHOD_KEY: str = "method"
HEADERS_KEY: str = "headers"
BODY_KEY: str = "body"
SHOW_BODY_KEY: str = "show_body"
SHOW_URL_KEY: str = "show_url"

Code Review Run #8c3d4c


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

110 changes: 110 additions & 0 deletions flytekit/extras/webhook/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import http
from typing import Any, Dict, Optional, Type

Check warning on line 2 in flytekit/extras/webhook/task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/task.py#L1-L2

Added lines #L1 - L2 were not covered by tests

from flytekit import Documentation
from flytekit.configuration import SerializationSettings
from flytekit.core.base_task import PythonTask
from flytekit.extend.backend.base_agent import SyncAgentExecutorMixin

Check warning on line 7 in flytekit/extras/webhook/task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/task.py#L4-L7

Added lines #L4 - L7 were not covered by tests

from ...core.interface import Interface
from .constants import BODY_KEY, HEADERS_KEY, METHOD_KEY, SHOW_BODY_KEY, SHOW_URL_KEY, TASK_TYPE, URL_KEY

Check warning on line 10 in flytekit/extras/webhook/task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/task.py#L9-L10

Added lines #L9 - L10 were not covered by tests


class WebhookTask(SyncAgentExecutorMixin, PythonTask):

Check warning on line 13 in flytekit/extras/webhook/task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/task.py#L13

Added line #L13 was not covered by tests
"""
This is the simplest form of a BigQuery Task, that can be used even for tasks that do not produce any output.
"""

def __init__(

Check warning on line 18 in flytekit/extras/webhook/task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/task.py#L18

Added line #L18 was not covered by tests
self,
name: str,
url: str,
method: http.HTTPMethod = http.HTTPMethod.POST,
headers: Optional[Dict[str, str]] = None,
body: Optional[Dict[str, Any]] = None,
dynamic_inputs: Optional[Dict[str, Type]] = None,
show_body: bool = False,
show_url: bool = False,
description: Optional[str] = None,
# secret_requests: Optional[List[Secret]] = None, TODO Secret support is coming soon
):
"""
This task is used to invoke a webhook. The webhook can be invoked with a POST or GET method.

All the parameters can be formatted using python format strings. The following parameters are available for
formatting:
- dynamic_inputs: These are the dynamic inputs to the task. The keys are the names of the inputs and the values
are the values of the inputs. All inputs are available under the prefix `inputs.`.
For example, if the inputs are {"input1": 10, "input2": "hello"}, then you can
use {inputs.input1} and {inputs.input2} in the URL and the body. Define the dynamic_inputs argument in the
constructor to use these inputs. The dynamic inputs should not be actual values, but the types of the inputs.

TODO Coming soon secrets support
- secrets: These are the secrets that are requested by the task. The keys are the names of the secrets and the
values are the values of the secrets. All secrets are available under the prefix `secrets.`.
For example, if the secret requested are Secret(name="secret1") and Secret(name="secret), then you can use
{secrets.secret1} and {secrets.secret2} in the URL and the body. Define the secret_requests argument in the
constructor to use these secrets. The secrets should not be actual values, but the types of the secrets.

:param name: Name of this task, should be unique in the project
:param url: The endpoint or URL to invoke for this webhook. This can be a static string or a python format string,
where the format arguments are the dynamic_inputs to the task, secrets etc. Refer to the description for more
details of available formatting parameters.
:param method: The HTTP method to use for the request. Default is POST.
:param headers: The headers to send with the request. This can be a static dictionary or a python format string,
where the format arguments are the dynamic_inputs to the task, secrets etc. Refer to the description for more
details of available formatting parameters.
:param body: The body to send with the request. This can be a static dictionary or a python format string,
where the format arguments are the dynamic_inputs to the task, secrets etc. Refer to the description for more
details of available formatting parameters.
:param dynamic_inputs: The dynamic inputs to the task. The keys are the names of the inputs and the values
are the types of the inputs. These inputs are available under the prefix `inputs.` to be used in the URL,
headers and body and other formatted fields.
:param secret_requests: The secrets that are requested by the task. (TODO not yet supported)
:param show_body: If True, the body of the request will be logged in the UI as the output of the task.
:param show_url: If True, the URL of the request will be logged in the UI as the output of the task.
:param description: Description of the task
"""
if method not in {http.HTTPMethod.GET, http.HTTPMethod.POST}:
raise ValueError(f"Method should be either GET or POST. Got {method}")

Check warning on line 69 in flytekit/extras/webhook/task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/task.py#L69

Added line #L69 was not covered by tests
if method == http.HTTPMethod.GET:
if body:
raise ValueError("GET method cannot have a body")

Check warning on line 72 in flytekit/extras/webhook/task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/task.py#L72

Added line #L72 was not covered by tests
if show_body:
raise ValueError("GET method cannot show body")
outputs = {

Check warning on line 75 in flytekit/extras/webhook/task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/task.py#L74-L75

Added lines #L74 - L75 were not covered by tests
"status_code": int,
}
if show_body:
outputs["body"] = dict

Check warning on line 79 in flytekit/extras/webhook/task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/task.py#L79

Added line #L79 was not covered by tests
if show_url:
outputs["url"] = bool

Check warning on line 81 in flytekit/extras/webhook/task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/task.py#L81

Added line #L81 was not covered by tests

interface = Interface(

Check warning on line 83 in flytekit/extras/webhook/task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/task.py#L83

Added line #L83 was not covered by tests
inputs=dynamic_inputs or {},
outputs={"info": dict},
)
super().__init__(

Check warning on line 87 in flytekit/extras/webhook/task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/task.py#L87

Added line #L87 was not covered by tests
name=name,
interface=interface,
task_type=TASK_TYPE,
# secret_requests=secret_requests,
docs=Documentation(short_description=description) if description else None,
)
self._url = url
self._method = method
self._headers = headers
self._body = body
self._show_body = show_body
self._show_url = show_url

Check warning on line 99 in flytekit/extras/webhook/task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/task.py#L94-L99

Added lines #L94 - L99 were not covered by tests

def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]:
config = {

Check warning on line 102 in flytekit/extras/webhook/task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/task.py#L101-L102

Added lines #L101 - L102 were not covered by tests
URL_KEY: self._url,
METHOD_KEY: self._method.value,
HEADERS_KEY: self._headers or {},
BODY_KEY: self._body or {},
SHOW_BODY_KEY: self._show_body,
SHOW_URL_KEY: self._show_url,
}
return config

Check warning on line 110 in flytekit/extras/webhook/task.py

View check run for this annotation

Codecov / codecov/patch

flytekit/extras/webhook/task.py#L110

Added line #L110 was not covered by tests
86 changes: 86 additions & 0 deletions flytekit/utils/dict_formatter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import re
from typing import Any, Dict, Optional

Check warning on line 2 in flytekit/utils/dict_formatter.py

View check run for this annotation

Codecov / codecov/patch

flytekit/utils/dict_formatter.py#L1-L2

Added lines #L1 - L2 were not covered by tests


def get_nested_value(d: Dict[str, Any], keys: list[str]) -> Any:

Check warning on line 5 in flytekit/utils/dict_formatter.py

View check run for this annotation

Codecov / codecov/patch

flytekit/utils/dict_formatter.py#L5

Added line #L5 was not covered by tests
"""
Retrieve the nested value from a dictionary based on a list of keys.
"""
for key in keys:
if key not in d:
raise ValueError(f"Could not find the key {key} in {d}.")
d = d[key]
return d

Check warning on line 13 in flytekit/utils/dict_formatter.py

View check run for this annotation

Codecov / codecov/patch

flytekit/utils/dict_formatter.py#L11-L13

Added lines #L11 - L13 were not covered by tests


def replace_placeholder(

Check warning on line 16 in flytekit/utils/dict_formatter.py

View check run for this annotation

Codecov / codecov/patch

flytekit/utils/dict_formatter.py#L16

Added line #L16 was not covered by tests
service: str,
original_dict: str,
placeholder: str,
replacement: str,
) -> str:
"""
Replace a placeholder in the original string and handle the specific logic for the sagemaker service and idempotence token.
"""
temp_dict = original_dict.replace(f"{{{placeholder}}}", replacement)

Check warning on line 25 in flytekit/utils/dict_formatter.py

View check run for this annotation

Codecov / codecov/patch

flytekit/utils/dict_formatter.py#L25

Added line #L25 was not covered by tests
if service == "sagemaker" and placeholder in [
"inputs.idempotence_token",
"idempotence_token",
]:
if len(temp_dict) > 63:
truncated_token = replacement[: 63 - len(original_dict.replace(f"{{{placeholder}}}", ""))]
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
return original_dict.replace(f"{{{placeholder}}}", truncated_token)

Check warning on line 32 in flytekit/utils/dict_formatter.py

View check run for this annotation

Codecov / codecov/patch

flytekit/utils/dict_formatter.py#L31-L32

Added lines #L31 - L32 were not covered by tests
else:
return temp_dict
return temp_dict

Check warning on line 35 in flytekit/utils/dict_formatter.py

View check run for this annotation

Codecov / codecov/patch

flytekit/utils/dict_formatter.py#L34-L35

Added lines #L34 - L35 were not covered by tests


def format_dict(

Check warning on line 38 in flytekit/utils/dict_formatter.py

View check run for this annotation

Codecov / codecov/patch

flytekit/utils/dict_formatter.py#L38

Added line #L38 was not covered by tests
service: str,
original_dict: Any,
update_dict: Dict[str, Any],
idempotence_token: Optional[str] = None,
) -> Any:
"""
Recursively update a dictionary with format strings with values from another dictionary where the keys match
the format string. This goes a little beyond regular python string formatting and uses `.` to denote nested keys.

For example, if original_dict is {"EndpointConfigName": "{endpoint_config_name}"},
and update_dict is {"endpoint_config_name": "my-endpoint-config"},
then the result will be {"EndpointConfigName": "my-endpoint-config"}.

For nested keys if the original_dict is {"EndpointConfigName": "{inputs.endpoint_config_name}"},
and update_dict is {"inputs": {"endpoint_config_name": "my-endpoint-config"}},
then the result will be {"EndpointConfigName": "my-endpoint-config"}.

:param service: The AWS service to use
:param original_dict: The dictionary to update (in place)
:param update_dict: The dictionary to use for updating
:param idempotence_token: Hash of config -- this is to ensure the execution ID is deterministic
:return: The updated dictionary
"""
if original_dict is None:
return None

Check warning on line 63 in flytekit/utils/dict_formatter.py

View check run for this annotation

Codecov / codecov/patch

flytekit/utils/dict_formatter.py#L63

Added line #L63 was not covered by tests

if isinstance(original_dict, str) and "{" in original_dict and "}" in original_dict:
matches = re.findall(r"\{([^}]+)\}", original_dict)

Check warning on line 66 in flytekit/utils/dict_formatter.py

View check run for this annotation

Codecov / codecov/patch

flytekit/utils/dict_formatter.py#L66

Added line #L66 was not covered by tests
for match in matches:
if "." in match:
keys = match.split(".")
nested_value = get_nested_value(update_dict, keys)

Check warning on line 70 in flytekit/utils/dict_formatter.py

View check run for this annotation

Codecov / codecov/patch

flytekit/utils/dict_formatter.py#L69-L70

Added lines #L69 - L70 were not covered by tests
if f"{{{match}}}" == original_dict:
return nested_value

Check warning on line 72 in flytekit/utils/dict_formatter.py

View check run for this annotation

Codecov / codecov/patch

flytekit/utils/dict_formatter.py#L72

Added line #L72 was not covered by tests
else:
original_dict = replace_placeholder(service, original_dict, match, nested_value)

Check warning on line 74 in flytekit/utils/dict_formatter.py

View check run for this annotation

Codecov / codecov/patch

flytekit/utils/dict_formatter.py#L74

Added line #L74 was not covered by tests
elif match == "idempotence_token" and idempotence_token:
original_dict = replace_placeholder(service, original_dict, match, idempotence_token)
return original_dict

Check warning on line 77 in flytekit/utils/dict_formatter.py

View check run for this annotation

Codecov / codecov/patch

flytekit/utils/dict_formatter.py#L76-L77

Added lines #L76 - L77 were not covered by tests

if isinstance(original_dict, list):
return [format_dict(service, item, update_dict, idempotence_token) for item in original_dict]

Check warning on line 80 in flytekit/utils/dict_formatter.py

View check run for this annotation

Codecov / codecov/patch

flytekit/utils/dict_formatter.py#L80

Added line #L80 was not covered by tests

if isinstance(original_dict, dict):
for key, value in original_dict.items():
original_dict[key] = format_dict(service, value, update_dict, idempotence_token)

Check warning on line 84 in flytekit/utils/dict_formatter.py

View check run for this annotation

Codecov / codecov/patch

flytekit/utils/dict_formatter.py#L84

Added line #L84 was not covered by tests

return original_dict

Check warning on line 86 in flytekit/utils/dict_formatter.py

View check run for this annotation

Codecov / codecov/patch

flytekit/utils/dict_formatter.py#L86

Added line #L86 was not covered by tests
Loading
Loading