From a5217544deb5e9fcd7e7b00cd0284cfac924f0eb Mon Sep 17 00:00:00 2001 From: Alexander Sidorenko Date: Mon, 11 Nov 2024 16:53:16 +0300 Subject: [PATCH 1/3] feat: Add the ability to run serv() asynchronously Before executing serve() - when running configured deployments in bulk via a script - it is often necessary to perform an initialization function (create a block, create/reset a concurrency tag, and others). It is normal that it can be asynchronous (in addition to the fact that you want to have a completely asynchronous code, get_client c sync_client=True is not functionally equal to asynchronous get_client). The current implementation does not allow you to run the described case in main as asyncio.run(main()). --- src/prefect/flows.py | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/src/prefect/flows.py b/src/prefect/flows.py index e378e0d00161..6f6abdeeae42 100644 --- a/src/prefect/flows.py +++ b/src/prefect/flows.py @@ -1762,8 +1762,8 @@ def load_flow_from_entrypoint( return flow - -def serve( +@sync_compatible +async def serve( *args: "RunnerDeployment", pause_on_shutdown: bool = True, print_starting_message: bool = True, @@ -1817,9 +1817,20 @@ def my_other_flow(name): from prefect.runner import Runner + try: + loop = asyncio.get_running_loop() + except RuntimeError as exc: + if "no running event loop" in str(exc): + loop = None + else: + raise + runner = Runner(pause_on_shutdown=pause_on_shutdown, limit=limit, **kwargs) for deployment in args: - runner.add_deployment(deployment) + if loop.is_running(): + await runner.add_deployment(deployment) + else: + runner.add_deployment(deployment) if print_starting_message: help_message_top = ( @@ -1851,14 +1862,8 @@ def my_other_flow(name): ) try: - loop = asyncio.get_running_loop() - except RuntimeError as exc: - if "no running event loop" in str(exc): - loop = None - else: - raise - - try: + if loop.is_running(): + await runner.start() if loop is not None: loop.run_until_complete(runner.start()) else: From 15b27e671eebcd94dd28a51f13fd3b76f2a0e7ea Mon Sep 17 00:00:00 2001 From: Alexander Sidorenko Date: Mon, 11 Nov 2024 17:13:56 +0300 Subject: [PATCH 2/3] refactor: Run pre-commit --- src/prefect/flows.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/prefect/flows.py b/src/prefect/flows.py index 6f6abdeeae42..7e6e1090523b 100644 --- a/src/prefect/flows.py +++ b/src/prefect/flows.py @@ -1762,6 +1762,7 @@ def load_flow_from_entrypoint( return flow + @sync_compatible async def serve( *args: "RunnerDeployment", From c5fda6cc57fb374a56aff941a7ffbf6a269b79d7 Mon Sep 17 00:00:00 2001 From: Alexander Sidorenko Date: Tue, 12 Nov 2024 11:46:31 +0300 Subject: [PATCH 3/3] feat: Add aserve method --- src/prefect/__init__.py | 3 + src/prefect/flows.py | 148 +++++++++++++++++++++++++++++----------- src/prefect/main.py | 3 +- 3 files changed, 113 insertions(+), 41 deletions(-) diff --git a/src/prefect/__init__.py b/src/prefect/__init__.py index 705b9fcef35f..928c736f006c 100644 --- a/src/prefect/__init__.py +++ b/src/prefect/__init__.py @@ -38,6 +38,7 @@ Transaction, unmapped, serve, + aserve, deploy, pause_flow_run, resume_flow_run, @@ -66,6 +67,7 @@ "Transaction": (__spec__.parent, ".main"), "unmapped": (__spec__.parent, ".main"), "serve": (__spec__.parent, ".main"), + "aserve": (__spec__.parent, ".main"), "deploy": (__spec__.parent, ".main"), "pause_flow_run": (__spec__.parent, ".main"), "resume_flow_run": (__spec__.parent, ".main"), @@ -86,6 +88,7 @@ "Transaction", "unmapped", "serve", + "aserve", "deploy", "pause_flow_run", "resume_flow_run", diff --git a/src/prefect/flows.py b/src/prefect/flows.py index 7e6e1090523b..845a3b867dac 100644 --- a/src/prefect/flows.py +++ b/src/prefect/flows.py @@ -1763,8 +1763,7 @@ def load_flow_from_entrypoint( return flow -@sync_compatible -async def serve( +def serve( *args: "RunnerDeployment", pause_on_shutdown: bool = True, print_starting_message: bool = True, @@ -1813,11 +1812,16 @@ def my_other_flow(name): serve(hello_deploy, bye_deploy) ``` """ - from rich.console import Console, Group - from rich.table import Table from prefect.runner import Runner + runner = Runner(pause_on_shutdown=pause_on_shutdown, limit=limit, **kwargs) + for deployment in args: + runner.add_deployment(deployment) + + if print_starting_message: + display_start_message(*args) + try: loop = asyncio.get_running_loop() except RuntimeError as exc: @@ -1826,53 +1830,117 @@ def my_other_flow(name): else: raise + try: + if loop is not None: + loop.run_until_complete(runner.start()) + else: + asyncio.run(runner.start()) + except (KeyboardInterrupt, TerminationSignal) as exc: + logger.info(f"Received {type(exc).__name__}, shutting down...") + if loop is not None: + loop.stop() + + +async def aserve( + *args: "RunnerDeployment", + pause_on_shutdown: bool = True, + print_starting_message: bool = True, + limit: Optional[int] = None, + **kwargs, +): + """ + Serve the provided list of deployments with the possibility of asynchronous context initialization. + + Args: + *args: A list of deployments to serve. + pause_on_shutdown: A boolean for whether or not to automatically pause + deployment schedules on shutdown. + print_starting_message: Whether or not to print message to the console + on startup. + limit: The maximum number of runs that can be executed concurrently. + **kwargs: Additional keyword arguments to pass to the runner. + + Examples: + Prepare deployment and asynchronous initialization function and serve them: + + ```python + import asyncio + import datetime + + from prefect import flow, aserve, get_client + + + async def init(): + await set_concurrency_limit() + + + async def set_concurrency_limit(): + async with get_client() as client: + await client.create_concurrency_limit(tag='dev', concurrency_limit=3) + + + @flow + async def my_flow(name): + print(f"hello {name}") + + + async def main(): + # Initialization function + await init() + + # Run once a day + hello_deploy = await my_flow.to_deployment( + "hello", tags=["dev"], interval=datetime.timedelta(days=1) + ) + + await aserve(hello_deploy) + + + if __name__ == "__main__": + asyncio.run(main()) + """ + + from prefect.runner import Runner + runner = Runner(pause_on_shutdown=pause_on_shutdown, limit=limit, **kwargs) for deployment in args: - if loop.is_running(): - await runner.add_deployment(deployment) - else: - runner.add_deployment(deployment) + await runner.add_deployment(deployment) if print_starting_message: - help_message_top = ( - "[green]Your deployments are being served and polling for" - " scheduled runs!\n[/]" - ) + display_start_message(*args) - table = Table(title="Deployments", show_header=False) + await runner.start() - table.add_column(style="blue", no_wrap=True) - for deployment in args: - table.add_row(f"{deployment.flow_name}/{deployment.name}") +def display_start_message(*args: "RunnerDeployment"): + from rich.console import Console, Group + from rich.table import Table - help_message_bottom = ( - "\nTo trigger any of these deployments, use the" - " following command:\n[blue]\n\t$ prefect deployment run" - " [DEPLOYMENT_NAME]\n[/]" - ) - if PREFECT_UI_URL: - help_message_bottom += ( - "\nYou can also trigger your deployments via the Prefect UI:" - f" [blue]{PREFECT_UI_URL.value()}/deployments[/]\n" - ) + help_message_top = ( + "[green]Your deployments are being served and polling for" + " scheduled runs!\n[/]" + ) + + table = Table(title="Deployments", show_header=False) + + table.add_column(style="blue", no_wrap=True) + + for deployment in args: + table.add_row(f"{deployment.flow_name}/{deployment.name}") - console = Console() - console.print( - Group(help_message_top, table, help_message_bottom), soft_wrap=True + help_message_bottom = ( + "\nTo trigger any of these deployments, use the" + " following command:\n[blue]\n\t$ prefect deployment run" + " [DEPLOYMENT_NAME]\n[/]" + ) + if PREFECT_UI_URL: + help_message_bottom += ( + "\nYou can also trigger your deployments via the Prefect UI:" + f" [blue]{PREFECT_UI_URL.value()}/deployments[/]\n" ) - try: - if loop.is_running(): - await runner.start() - if loop is not None: - loop.run_until_complete(runner.start()) - else: - asyncio.run(runner.start()) - except (KeyboardInterrupt, TerminationSignal) as exc: - logger.info(f"Received {type(exc).__name__}, shutting down...") - if loop is not None: - loop.stop() + console = Console() + console.print(Group(help_message_top, table, help_message_bottom), soft_wrap=True) @client_injector diff --git a/src/prefect/main.py b/src/prefect/main.py index 46ffa2d3d180..4fea3999e2ad 100644 --- a/src/prefect/main.py +++ b/src/prefect/main.py @@ -2,7 +2,7 @@ from prefect.deployments import deploy from prefect.states import State from prefect.logging import get_run_logger -from prefect.flows import flow, Flow, serve +from prefect.flows import flow, Flow, serve, aserve from prefect.transactions import Transaction from prefect.tasks import task, Task from prefect.context import tags @@ -84,6 +84,7 @@ "Transaction", "unmapped", "serve", + "aserve", "deploy", "pause_flow_run", "resume_flow_run",