Skip to content

Commit

Permalink
Merge branch 'main' into CLOUD-569
Browse files Browse the repository at this point in the history
  • Loading branch information
bunchesofdonald authored Nov 12, 2024
2 parents 5c05990 + 2b018ea commit 2bb6dda
Show file tree
Hide file tree
Showing 75 changed files with 13,292 additions and 8,219 deletions.
2 changes: 1 addition & 1 deletion .codespellrc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ skip = .git,*.pdf,*.svg,versioneer.py,package-lock.json,_vendor,*.css,.codespell
# from https://github.com/PrefectHQ/prefect/pull/10813#issuecomment-1732676130
ignore-regex = .*lazy=\"selectin\"|.*e import Bloc$|America/Nome

ignore-words-list = selectin,aci,wqs,aks,ines,dependant,fsspec,automations,nmme
ignore-words-list = selectin,aci,wqs,aks,ines,dependant,fsspec,automations,nmme,afterall

check-hidden = true
4 changes: 4 additions & 0 deletions .github/workflows/ui-v2-checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,7 @@ jobs:
- name: Build UI
working-directory: ./ui-v2
run: npm run build

- name: Run tests
working-directory: ./ui-v2
run: npm run test
2 changes: 1 addition & 1 deletion docs/3.0/deploy/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ description: Learn how to use deployments to trigger flow runs remotely.

Deployments allow you to run flows on a [schedule](/3.0/automate/add-schedules/) and trigger runs based on [events](/3.0/automate/events/automations-triggers/).

[Deployments](/3.0/deploy/infrastructure-examples/docker/) are server-side representations of flows.
Deployments are server-side representations of flows.
They store the crucial metadata for remote orchestration including when, where, and how a workflow should run.

In addition to manually triggering and managing flow runs, deploying a flow exposes an API and UI that allow you to:
Expand Down
17 changes: 13 additions & 4 deletions docs/3.0/develop/manage-states.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,13 @@ import FinalFlowState from '/snippets/final-flow-state.mdx'
State change hooks execute code in response to **_client side_** changes in flow or task run states, enabling you to define actions for
specific state transitions in a workflow.

A state hook must have the following signature:
State hooks have the following signature:

```python
def my_hook(obj: Task | Flow, run: TaskRun | FlowRun, state: State) -> None:
def my_task_state_hook(task: Task, run: TaskRun, state: State) -> None:
...

def my_flow_state_hook(flow: Flow, run: FlowRun, state: State) -> None:
...
```

Expand All @@ -137,7 +140,7 @@ from prefect import task, flow

# for type hints only
from prefect import Task
from prefect.context import TaskRun
from prefect.client.schemas.objects import TaskRun
from prefect.states import State


Expand All @@ -154,11 +157,17 @@ def nice_task(name: str):


# alternatively hooks can be specified via decorator
@my_nice_task.on_completion
@nice_task.on_completion
def second_hook(tsk: Task, run: TaskRun, state: State) -> None:
print('another hook')

nice_task(name='Marvin')
```

<Note>
To import a `TaskRun` or `FlowRun` for type hinting, you can import from `prefect.client.schemas.objects`.
</Note>

State change hooks are versatile, allowing you to specify multiple state change hooks for the same state transition,
or to use the same state change hook for different transitions:

Expand Down
2 changes: 1 addition & 1 deletion docs/3.0/get-started/quickstart.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ You can also choose from other [work pool types](https://docs.prefect.io/concept
## Deploy and schedule your flow
A [deployment](/3.0/deploy/infrastructure-examples/docker/) is used to determine when, where, and how a flow should run.
A [deployment](/3.0/deploy/) is used to determine when, where, and how a flow should run.
Deployments elevate flows to remotely configurable entities that have their own API.
1. Create a deployment in code:
Expand Down
46 changes: 34 additions & 12 deletions src/integrations/prefect-slack/prefect_slack/credentials.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
"""Credential classes use to store Slack credentials."""

from typing import Optional
from typing import Any, Optional, Union

from pydantic import Field, SecretStr
from slack_sdk.web.async_client import AsyncWebClient
from slack_sdk.webhook.async_client import AsyncWebhookClient
from slack_sdk.webhook.client import WebhookClient

from prefect._internal.compatibility.async_dispatch import async_dispatch
from prefect.blocks.core import Block
from prefect.blocks.notifications import NotificationBlock
from prefect.utilities.asyncutils import sync_compatible


class SlackCredentials(Block):
Expand Down Expand Up @@ -49,6 +50,14 @@ def get_client(self) -> AsyncWebClient:
return AsyncWebClient(token=self.token.get_secret_value())


async def _notify_async(obj: Any, body: str, subject: Optional[str] = None):
client = obj.get_client()

response = await client.send(text=body)

obj._raise_on_failure(response)


class SlackWebhook(NotificationBlock):
"""
Block holding a Slack webhook for use in tasks and flows.
Expand Down Expand Up @@ -90,22 +99,18 @@ class SlackWebhook(NotificationBlock):
examples=["https://hooks.slack.com/XXX"],
)

def get_client(self) -> AsyncWebhookClient:
def get_client(
self, sync_client: bool = False
) -> Union[AsyncWebhookClient, WebhookClient]:
"""
Returns an authenticated `AsyncWebhookClient` to interact with the configured
Slack webhook.
"""
if sync_client:
return WebhookClient(url=self.url.get_secret_value())
return AsyncWebhookClient(url=self.url.get_secret_value())

@sync_compatible
async def notify(self, body: str, subject: Optional[str] = None):
"""
Sends a message to the Slack channel.
"""
client = self.get_client()

response = await client.send(text=body)

def _raise_on_failure(self, response: Any):
# prefect>=2.17.2 added a means for notification blocks to raise errors on
# failures. This is not available in older versions, so we need to check if the
# private base class attribute exists before using it.
Expand All @@ -117,3 +122,20 @@ async def notify(self, body: str, subject: Optional[str] = None):

if response.status_code >= 400:
raise NotificationError(f"Failed to send message: {response.body}")

async def notify_async(self, body: str, subject: Optional[str] = None):
"""
Sends a message to the Slack channel.
"""
await _notify_async(self, body, subject)

@async_dispatch(_notify_async) # type: ignore
def notify(self, body: str, subject: Optional[str] = None):
"""
Sends a message to the Slack channel.
"""
client = self.get_client(sync_client=True)

response = client.send(text=body)

self._raise_on_failure(response)
11 changes: 3 additions & 8 deletions src/integrations/prefect-slack/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,7 @@ classifiers = [
"Programming Language :: Python :: 3.12",
"Topic :: Software Development :: Libraries",
]
dependencies = [
"aiohttp",
"slack_sdk>=3.15.1",
"prefect>=3.0.0rc1",
]
dependencies = ["aiohttp", "slack_sdk>=3.15.1", "prefect>=3.0.0rc1"]
dynamic = ["version"]

[project.optional-dependencies]
Expand Down Expand Up @@ -74,7 +70,6 @@ fail_under = 80
show_missing = true

[tool.pytest.ini_options]
asyncio_default_fixture_loop_scope = "session"
asyncio_mode = "auto"
env = [
"PREFECT_TEST_MODE=1",
]
env = ["PREFECT_TEST_MODE=1"]
54 changes: 52 additions & 2 deletions src/integrations/prefect-slack/tests/test_credentials.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from unittest.mock import AsyncMock
from unittest.mock import AsyncMock, MagicMock

import pytest
from prefect_slack import SlackCredentials, SlackWebhook
from slack_sdk.web.async_client import AsyncWebClient
from slack_sdk.webhook.async_client import AsyncWebhookClient, WebhookResponse
from slack_sdk.webhook.async_client import AsyncWebhookClient
from slack_sdk.webhook.webhook_response import WebhookResponse


def test_slack_credentials():
Expand Down Expand Up @@ -63,3 +64,52 @@ async def test_slack_webhook_block_handles_raise_on_failure(
with pytest.raises(NotificationError, match="Failed to send message: woops"):
with block.raise_on_failure():
await block.notify("hello", "world")


def test_slack_webhook_sync_notify(monkeypatch):
"""Test the sync notify path"""
mock_client = MagicMock()
mock_client.send.return_value = WebhookResponse(
url="http://test", status_code=200, body="ok", headers={}
)

webhook = SlackWebhook(url="http://test")
monkeypatch.setattr(webhook, "get_client", MagicMock(return_value=mock_client))

webhook.notify("test message")
mock_client.send.assert_called_once_with(text="test message")


async def test_slack_webhook_async_notify(monkeypatch):
"""Test the async notify path"""
mock_client = MagicMock()
mock_client.send = AsyncMock(
return_value=WebhookResponse(
url="http://test", status_code=200, body="ok", headers={}
)
)

webhook = SlackWebhook(url="http://test")
monkeypatch.setattr(webhook, "get_client", MagicMock(return_value=mock_client))

await webhook.notify_async("test message")
mock_client.send.assert_called_once_with(text="test message")


@pytest.mark.parametrize("message", ["test message 1", "test message 2"])
async def test_slack_webhook_notify_async_dispatch(monkeypatch, message):
"""Test that async_dispatch properly handles both sync and async contexts"""

mock_response = WebhookResponse(
url="http://test", status_code=200, body="ok", headers={}
)

mock_client = MagicMock()
mock_client.send = AsyncMock(return_value=mock_response)

webhook = SlackWebhook(url="http://test")
monkeypatch.setattr(webhook, "get_client", lambda sync_client=False: mock_client)

# Test notification
await webhook.notify(message)
mock_client.send.assert_called_once_with(text=message)
2 changes: 1 addition & 1 deletion src/prefect/client/orchestration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2613,7 +2613,7 @@ async def send_worker_heartbeat(
"heartbeat_interval_seconds": heartbeat_interval_seconds,
}
if worker_metadata:
params["worker_metadata"] = worker_metadata.model_dump(mode="json")
params["metadata"] = worker_metadata.model_dump(mode="json")
if get_worker_id:
params["return_id"] = get_worker_id

Expand Down
13 changes: 10 additions & 3 deletions src/prefect/server/api/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import List, Optional
from uuid import UUID

import sqlalchemy as sa
from fastapi import Body, Depends, HTTPException, Path, status
from sqlalchemy.ext.asyncio import AsyncSession

Expand Down Expand Up @@ -50,9 +51,15 @@ async def create_variable(
db: PrefectDBInterface = Depends(provide_database_interface),
) -> core.Variable:
async with db.session_context(begin_transaction=True) as session:
model = await models.variables.create_variable(
session=session, variable=variable
)
try:
model = await models.variables.create_variable(
session=session, variable=variable
)
except sa.exc.IntegrityError:
raise HTTPException(
status_code=409,
detail=f"A variable with the name {variable.name!r} already exists.",
)

return core.Variable.model_validate(model, from_attributes=True)

Expand Down
21 changes: 18 additions & 3 deletions src/prefect/utilities/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import ipaddress
import socket
import urllib.parse
from typing import TYPE_CHECKING, Any, Literal, Optional, Union
from string import Formatter
from typing import TYPE_CHECKING, Any, Dict, Literal, Optional, Union
from urllib.parse import urlparse
from uuid import UUID

Expand All @@ -22,7 +23,6 @@

# The following objects are excluded from UI URL generation because we lack a
# directly-addressable URL:
# worker
# artifact
# variable
# saved-search
Expand All @@ -38,6 +38,7 @@
"deployment": "deployments/deployment/{obj_id}",
"automation": "automations/automation/{obj_id}",
"received-event": "events/event/{occurred}/{obj_id}",
"worker": "work-pools/work-pool/{work_pool_name}/worker/{obj_id}",
}

# The following objects are excluded from API URL generation because we lack a
Expand Down Expand Up @@ -134,6 +135,7 @@ def url_for(
obj_id: Optional[Union[str, UUID]] = None,
url_type: URLType = "ui",
default_base_url: Optional[str] = None,
**additional_format_kwargs: Optional[Dict[str, Any]],
) -> Optional[str]:
"""
Returns the URL for a Prefect object.
Expand All @@ -149,6 +151,8 @@ def url_for(
Whether to return the URL for the UI (default) or API.
default_base_url (str, optional):
The default base URL to use if no URL is configured.
additional_format_kwargs (Dict[str, Any], optional):
Additional keyword arguments to pass to the URL format.
Returns:
Optional[str]: The URL for the given object or None if the object is not supported.
Expand Down Expand Up @@ -246,7 +250,18 @@ def url_for(
occurred=obj.occurred.strftime("%Y-%m-%d"), obj_id=obj_id
)
else:
url = url_format.format(obj_id=obj_id)
obj_keys = [
fname
for _, fname, _, _ in Formatter().parse(url_format)
if fname is not None and fname != "obj_id"
]

if not all(key in additional_format_kwargs for key in obj_keys):
raise ValueError(
f"Unable to generate URL for {name} because the following keys are missing: {', '.join(obj_keys)}"
)

url = url_format.format(obj_id=obj_id, **additional_format_kwargs)

if not base_url.endswith("/"):
base_url += "/"
Expand Down
16 changes: 11 additions & 5 deletions src/prefect/workers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -885,12 +885,18 @@ async def _submit_scheduled_flow_runs(
get_current_settings().experiments.worker_logging_to_api_enabled
and self.backend_id
):
worker_path = f"worker/{self.backend_id}"
base_url = url_for("work-pool", self._work_pool.name)
try:
worker_url = url_for(
"worker",
obj_id=self.backend_id,
work_pool_name=self._work_pool_name,
)

run_logger.info(
f"Running on worker id: {self.backend_id}. See worker logs here: {base_url}/{worker_path}"
)
run_logger.info(
f"Running on worker id: {self.backend_id}. See worker logs here: {worker_url}"
)
except ValueError as ve:
run_logger.warning(f"Failed to generate worker URL: {ve}")

self._submitting_flow_run_ids.add(flow_run.id)
self._runs_task_group.start_soon(
Expand Down
2 changes: 1 addition & 1 deletion tests/client/test_prefect_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2737,7 +2737,7 @@ async def test_worker_heartbeat_sends_metadata_if_passed(
assert mock_post.call_args[1]["json"] == {
"name": "test-worker",
"heartbeat_interval_seconds": 10,
"worker_metadata": {
"metadata": {
"integrations": [{"name": "prefect-aws", "version": "1.0.0"}]
},
}
Expand Down
Loading

0 comments on commit 2bb6dda

Please sign in to comment.