From f168d967b5a6252805ef733d19f4c162e5d04ff5 Mon Sep 17 00:00:00 2001 From: Prefect <41086007+marvin-robot@users.noreply.github.com> Date: Fri, 15 Nov 2024 10:01:48 -0600 Subject: [PATCH 1/3] Update @prefecthq/prefect-design to version 2.14.11 (#16028) Co-authored-by: marvin-robot --- ui/package-lock.json | 14 +++++++------- ui/package.json | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/ui/package-lock.json b/ui/package-lock.json index dcc75f72bf90..949acd27e804 100644 --- a/ui/package-lock.json +++ b/ui/package-lock.json @@ -8,7 +8,7 @@ "name": "@prefecthq/ui", "version": "2.8.0", "dependencies": { - "@prefecthq/prefect-design": "2.14.10", + "@prefecthq/prefect-design": "2.14.11", "@prefecthq/prefect-ui-library": "3.11.18", "@prefecthq/vue-charts": "2.0.5", "@prefecthq/vue-compositions": "1.11.5", @@ -1037,9 +1037,9 @@ } }, "node_modules/@prefecthq/prefect-design": { - "version": "2.14.10", - "resolved": "https://registry.npmjs.org/@prefecthq/prefect-design/-/prefect-design-2.14.10.tgz", - "integrity": "sha512-sWy6GMWhY+AJze5p+Ve1gFC6cad1WVlp3eA/bLkkDgKBOyUH2Vv5dx7wqQdXLTDAZeCfeTAAdkdJTPRU7VFqDg==", + "version": "2.14.11", + "resolved": "https://registry.npmjs.org/@prefecthq/prefect-design/-/prefect-design-2.14.11.tgz", + "integrity": "sha512-3St5X5smttNQVAGsCrWlBZr8IDBHeayq7XIsMHdqCGrj2975Eciorc7VsXzFyzQbxQKLxecVOUkSiVUrPvHq1Q==", "dependencies": { "@fontsource-variable/inconsolata": "^5.0.18", "@fontsource-variable/inter": "^5.0.18", @@ -7627,9 +7627,9 @@ } }, "@prefecthq/prefect-design": { - "version": "2.14.10", - "resolved": "https://registry.npmjs.org/@prefecthq/prefect-design/-/prefect-design-2.14.10.tgz", - "integrity": "sha512-sWy6GMWhY+AJze5p+Ve1gFC6cad1WVlp3eA/bLkkDgKBOyUH2Vv5dx7wqQdXLTDAZeCfeTAAdkdJTPRU7VFqDg==", + "version": "2.14.11", + "resolved": "https://registry.npmjs.org/@prefecthq/prefect-design/-/prefect-design-2.14.11.tgz", + "integrity": "sha512-3St5X5smttNQVAGsCrWlBZr8IDBHeayq7XIsMHdqCGrj2975Eciorc7VsXzFyzQbxQKLxecVOUkSiVUrPvHq1Q==", "requires": { "@fontsource-variable/inconsolata": "^5.0.18", "@fontsource-variable/inter": "^5.0.18", diff --git a/ui/package.json b/ui/package.json index e51e5f227081..969bd6ff4937 100644 --- a/ui/package.json +++ b/ui/package.json @@ -10,7 +10,7 @@ "validate:types": "vue-tsc --noEmit" }, "dependencies": { - "@prefecthq/prefect-design": "2.14.10", + "@prefecthq/prefect-design": "2.14.11", "@prefecthq/prefect-ui-library": "3.11.18", "@prefecthq/vue-charts": "2.0.5", "@prefecthq/vue-compositions": "1.11.5", From 8dd8f3c4e57394c5081a82c9afd31141826cedab Mon Sep 17 00:00:00 2001 From: Andrew Brookins Date: Fri, 15 Nov 2024 08:54:50 -0800 Subject: [PATCH 2/3] Remove experimental worker logs setting (#16024) Worker logs are no longer experimental, so this PR removes references to the setting. We still want to avoid sending worker logs to open-source API servers until they support worker logs, though, so we use the existence of a backend_id on worker logs as a proxy for whether or not the worker is connected to Cloud. (Only workers connected to Cloud emit logs with worker IDs attached.) --- .../infrastructure-concepts/workers.mdx | 13 +-- docs/3.0/develop/settings-ref.mdx | 12 --- schemas/settings.schema.json | 9 -- src/prefect/logging/handlers.py | 9 +- src/prefect/settings/models/experiments.py | 5 - src/prefect/utilities/urls.py | 4 +- src/prefect/workers/base.py | 11 +-- tests/test_logging.py | 91 +++++++++++++------ tests/test_settings.py | 1 - tests/workers/test_base_worker.py | 16 +--- 10 files changed, 76 insertions(+), 95 deletions(-) diff --git a/docs/3.0/deploy/infrastructure-concepts/workers.mdx b/docs/3.0/deploy/infrastructure-concepts/workers.mdx index 6124d9d8dbea..d46424755d92 100644 --- a/docs/3.0/deploy/infrastructure-concepts/workers.mdx +++ b/docs/3.0/deploy/infrastructure-concepts/workers.mdx @@ -103,17 +103,8 @@ If a worker misses three heartbeats, it is considered offline. By default, a wor after it stopped sending heartbeats, but you can configure the threshold with the `PREFECT_WORKER_HEARTBEAT_SECONDS` setting. ### Worker logs -This feature is experimental as of Prefect version 3.1.1 + Workers send logs to the Prefect Cloud API if you're connected to Prefect Cloud. - Workers can now send logs directly to Prefect Cloud. - -To enable this feature, run the command: -```bash -prefect config set PREFECT_EXPERIMENTS_WORKER_LOGGING_TO_API_ENABLED=true -``` -See our [profile settings](/3.0/manage/settings-and-profiles/) documentation for more information about changing settings. - -Once enabled: - All worker logs are automatically sent to the Prefect Cloud API - Logs are accessible through both the Prefect Cloud UI and API - Each flow run will include a link to its associated worker's logs @@ -126,8 +117,6 @@ Once enabled: - Installed Prefect integrations (e.g., `prefect-aws`, `prefect-gcp`) - Live worker logs (if worker logging is enabled) -Worker logging is an experimental feature as of Prefect version 3.1.1. If your worker is using an earlier version of Prefect or you have not opted in to the experiment, this page will not show worker logs. - Access a worker's details by clicking on the worker's name in the Work Pool list. ### Start a worker diff --git a/docs/3.0/develop/settings-ref.mdx b/docs/3.0/develop/settings-ref.mdx index 89fd5107aaa8..40f29b21b662 100644 --- a/docs/3.0/develop/settings-ref.mdx +++ b/docs/3.0/develop/settings-ref.mdx @@ -455,18 +455,6 @@ If `True`, warn on usage of experimental features. **Supported environment variables**: `PREFECT_EXPERIMENTS_WARN`, `PREFECT_EXPERIMENTAL_WARN` -### `worker_logging_to_api_enabled` -Enables the logging of worker logs to Prefect Cloud. - -**Type**: `boolean` - -**Default**: `False` - -**TOML dotted key path**: `experiments.worker_logging_to_api_enabled` - -**Supported environment variables**: -`PREFECT_EXPERIMENTS_WORKER_LOGGING_TO_API_ENABLED` - ### `telemetry_enabled` Enables sending telemetry to Prefect Cloud. diff --git a/schemas/settings.schema.json b/schemas/settings.schema.json index a1cae0b0b4f4..46ab12535ef7 100644 --- a/schemas/settings.schema.json +++ b/schemas/settings.schema.json @@ -319,15 +319,6 @@ "title": "Warn", "type": "boolean" }, - "worker_logging_to_api_enabled": { - "default": false, - "description": "Enables the logging of worker logs to Prefect Cloud.", - "supported_environment_variables": [ - "PREFECT_EXPERIMENTS_WORKER_LOGGING_TO_API_ENABLED" - ], - "title": "Worker Logging To Api Enabled", - "type": "boolean" - }, "telemetry_enabled": { "default": false, "description": "Enables sending telemetry to Prefect Cloud.", diff --git a/src/prefect/logging/handlers.py b/src/prefect/logging/handlers.py index 744a9d63a421..0e254a9e58a9 100644 --- a/src/prefect/logging/handlers.py +++ b/src/prefect/logging/handlers.py @@ -32,7 +32,6 @@ PREFECT_LOGGING_TO_API_BATCH_SIZE, PREFECT_LOGGING_TO_API_MAX_LOG_SIZE, PREFECT_LOGGING_TO_API_WHEN_MISSING_FLOW, - get_current_settings, ) @@ -241,10 +240,12 @@ def _get_payload_size(self, log: Dict[str, Any]) -> int: class WorkerAPILogHandler(APILogHandler): def emit(self, record: logging.LogRecord): - if get_current_settings().experiments.worker_logging_to_api_enabled: - super().emit(record) - else: + # Open-source API servers do not currently support worker logs, and + # worker logs only have an associated worker ID when connected to Cloud, + # so we won't send worker logs to the API unless they have a worker ID. + if not getattr(record, "worker_id", None): return + super().emit(record) def prepare(self, record: logging.LogRecord) -> Dict[str, Any]: """ diff --git a/src/prefect/settings/models/experiments.py b/src/prefect/settings/models/experiments.py index bb1344e1d985..218128c3dcf1 100644 --- a/src/prefect/settings/models/experiments.py +++ b/src/prefect/settings/models/experiments.py @@ -18,11 +18,6 @@ class ExperimentsSettings(PrefectBaseSettings): ), ) - worker_logging_to_api_enabled: bool = Field( - default=False, - description="Enables the logging of worker logs to Prefect Cloud.", - ) - telemetry_enabled: bool = Field( default=False, description="Enables sending telemetry to Prefect Cloud.", diff --git a/src/prefect/utilities/urls.py b/src/prefect/utilities/urls.py index df73deca36f7..7b99f645b648 100644 --- a/src/prefect/utilities/urls.py +++ b/src/prefect/utilities/urls.py @@ -3,7 +3,7 @@ import socket import urllib.parse from string import Formatter -from typing import TYPE_CHECKING, Any, Dict, Literal, Optional, Union +from typing import TYPE_CHECKING, Any, Literal, Optional, Union from urllib.parse import urlparse from uuid import UUID @@ -135,7 +135,7 @@ def url_for( obj_id: Optional[Union[str, UUID]] = None, url_type: URLType = "ui", default_base_url: Optional[str] = None, - **additional_format_kwargs: Optional[Dict[str, Any]], + **additional_format_kwargs: Any, ) -> Optional[str]: """ Returns the URL for a Prefect object. diff --git a/src/prefect/workers/base.py b/src/prefect/workers/base.py index 01b0f139ee7f..f6b33022ed46 100644 --- a/src/prefect/workers/base.py +++ b/src/prefect/workers/base.py @@ -821,14 +821,14 @@ async def sync_with_backend(self): self._logger = get_worker_logger(self) self._logger.debug( - f"Worker synchronized with the Prefect API server. Remote ID: {self.backend_id}" + "Worker synchronized with the Prefect API server. " + + (f"Remote ID: {self.backend_id}" if self.backend_id else "") ) def _should_get_worker_id(self): """Determines if the worker should request an ID from the API server.""" return ( - get_current_settings().experiments.worker_logging_to_api_enabled - and self._client + self._client and self._client.server_type == ServerType.CLOUD and self.backend_id is None ) @@ -886,10 +886,7 @@ async def _submit_scheduled_flow_runs( run_logger.info( f"Worker '{self.name}' submitting flow run '{flow_run.id}'" ) - if ( - get_current_settings().experiments.worker_logging_to_api_enabled - and self.backend_id - ): + if self.backend_id: try: worker_url = url_for( "worker", diff --git a/tests/test_logging.py b/tests/test_logging.py index aa9d5874389d..d11cf2207d1e 100644 --- a/tests/test_logging.py +++ b/tests/test_logging.py @@ -53,7 +53,6 @@ from prefect.server.schemas.actions import LogCreate from prefect.settings import ( PREFECT_API_KEY, - PREFECT_EXPERIMENTS_WORKER_LOGGING_TO_API_ENABLED, PREFECT_LOGGING_COLORS, PREFECT_LOGGING_LEVEL, PREFECT_LOGGING_MARKUP, @@ -637,23 +636,37 @@ def test_handler_knows_how_large_logs_are(self): assert handler._get_payload_size(dict_log) == log_size +WORKER_ID = uuid.uuid4() + + class TestWorkerLogging: - class WorkerTestImpl(BaseWorker): - type: str = "logging_test" + class CloudWorkerTestImpl(BaseWorker): + type: str = "cloud_logging_test" job_configuration: Type[BaseJobConfiguration] = BaseJobConfiguration async def _send_worker_heartbeat(self, *_, **__): - return "test_backend_id" + """ + Workers only return an ID here if they're connected to Cloud, + so this simulates the worker being connected to Cloud. + """ + return WORKER_ID async def run(self, *_, **__): pass - @pytest.fixture - def experiment_enabled(self): - with temporary_settings( - updates={PREFECT_EXPERIMENTS_WORKER_LOGGING_TO_API_ENABLED: True} - ): - yield + class ServerWorkerTestImpl(BaseWorker): + type: str = "server_logging_test" + job_configuration: Type[BaseJobConfiguration] = BaseJobConfiguration + + async def run(self, *_, **__): + pass + + async def _send_worker_heartbeat(self, *_, **__): + """ + Workers only return an ID here if they're connected to Cloud, + so this simulates the worker not being connected to Cloud. + """ + return None @pytest.fixture def logging_to_api_enabled(self): @@ -672,24 +685,24 @@ def logger(self, worker_handler): yield logger logger.removeHandler(worker_handler) - async def test_get_worker_logger_works_with_no_backend_id(self, experiment_enabled): - async with self.WorkerTestImpl( + async def test_get_worker_logger_works_with_no_backend_id(self): + async with self.CloudWorkerTestImpl( name="test", work_pool_name="test-work-pool" ) as worker: logger = get_worker_logger(worker) - assert logger.name == "prefect.workers.logging_test.test" + assert logger.name == "prefect.workers.cloud_logging_test.test" - async def test_get_worker_logger_works_with_backend_id(self, experiment_enabled): - async with self.WorkerTestImpl( + async def test_get_worker_logger_works_with_backend_id(self): + async with self.CloudWorkerTestImpl( name="test", work_pool_name="test-work-pool" ) as worker: await worker.sync_with_backend() logger = get_worker_logger(worker) - assert logger.name == "prefect.workers.logging_test.test" - assert logger.extra["worker_id"] == "test_backend_id" + assert logger.name == "prefect.workers.cloud_logging_test.test" + assert logger.extra["worker_id"] == str(WORKER_ID) - async def test_worker_emits_logs_with_worker_id(self, caplog, experiment_enabled): - async with self.WorkerTestImpl( + async def test_worker_emits_logs_with_worker_id(self, caplog): + async with self.CloudWorkerTestImpl( name="test", work_pool_name="test-work-pool" ) as worker: await worker.sync_with_backend() @@ -700,21 +713,39 @@ async def test_worker_emits_logs_with_worker_id(self, caplog, experiment_enabled ] assert "testing_with_extras" in caplog.text - assert record_with_extras[0].worker_id == worker.backend_id - assert worker._logger.extra["worker_id"] == worker.backend_id + assert record_with_extras[0].worker_id == str(worker.backend_id) + assert worker._logger.extra["worker_id"] == str(worker.backend_id) - def test_worker_logger_sends_log_to_api_worker( - self, logger, mock_log_worker, experiment_enabled, logging_to_api_enabled + async def test_worker_logger_sends_log_to_api_worker_when_connected_to_cloud( + self, mock_log_worker, worker_handler, logging_to_api_enabled ): - logger.info("test-worker-log") + async with self.CloudWorkerTestImpl( + name="test", work_pool_name="test-work-pool" + ) as worker: + await worker.sync_with_backend() + worker._logger.debug("test-worker-log") + + log_statement = [ + log + for call in mock_log_worker.instance().send.call_args_list + for log in call.args + if log["name"] == worker._logger.name + and log["message"] == "test-worker-log" + ] - mock_log_worker.instance().send.assert_called_once() - assert len(mock_log_worker.instance().send.call_args_list) == 1 + assert len(log_statement) == 1 + assert log_statement[0]["worker_id"] == str(worker.backend_id) - log_statement = mock_log_worker.instance().send.call_args.args[0] - assert log_statement["name"] == logger.name - assert log_statement["level"] == 20 - assert log_statement["message"] == "test-worker-log" + async def test_worker_logger_does_not_send_logs_when_not_connected_to_cloud( + self, mock_log_worker, worker_handler, logging_to_api_enabled + ): + async with self.ServerWorkerTestImpl( + name="test", work_pool_name="test-work-pool" + ) as worker: + assert isinstance(worker._logger, logging.Logger) + worker._logger.debug("test-worker-log") + + mock_log_worker.instance().send.assert_not_called() class TestAPILogWorker: diff --git a/tests/test_settings.py b/tests/test_settings.py index ab998967b0ed..ff7163c0b481 100644 --- a/tests/test_settings.py +++ b/tests/test_settings.py @@ -231,7 +231,6 @@ "PREFECT_EXPERIMENTAL_WARN": {"test_value": True, "legacy": True}, "PREFECT_EXPERIMENTS_TELEMETRY_ENABLED": {"test_value": False}, "PREFECT_EXPERIMENTS_WARN": {"test_value": True}, - "PREFECT_EXPERIMENTS_WORKER_LOGGING_TO_API_ENABLED": {"test_value": False}, "PREFECT_FLOW_DEFAULT_RETRIES": {"test_value": 10, "legacy": True}, "PREFECT_FLOWS_DEFAULT_RETRIES": {"test_value": 10}, "PREFECT_FLOW_DEFAULT_RETRY_DELAY_SECONDS": {"test_value": 10, "legacy": True}, diff --git a/tests/workers/test_base_worker.py b/tests/workers/test_base_worker.py index 9839e0cf3029..a5c763fb0d26 100644 --- a/tests/workers/test_base_worker.py +++ b/tests/workers/test_base_worker.py @@ -28,7 +28,6 @@ from prefect.server.schemas.responses import DeploymentResponse from prefect.settings import ( PREFECT_API_URL, - PREFECT_EXPERIMENTS_WORKER_LOGGING_TO_API_ENABLED, PREFECT_TEST_MODE, PREFECT_WORKER_PREFETCH_SECONDS, get_current_settings, @@ -86,14 +85,6 @@ def no_api_url(): yield -@pytest.fixture -def experimental_logging_enabled(): - with temporary_settings( - updates={PREFECT_EXPERIMENTS_WORKER_LOGGING_TO_API_ENABLED: True} - ): - yield - - async def test_worker_requires_api_url_when_not_in_test_mode(no_api_url): with pytest.raises(ValueError, match="PREFECT_API_URL"): async with WorkerTestImpl( @@ -197,7 +188,7 @@ async def test_worker_sends_heartbeat_gets_id(respx_mock): assert worker.backend_id == test_worker_id -async def test_worker_sends_heartbeat_only_gets_id_once(experimental_logging_enabled): +async def test_worker_sends_heartbeat_only_gets_id_once(): async with WorkerTestImpl(name="test", work_pool_name="test-work-pool") as worker: worker._client.server_type = ServerType.CLOUD mock = AsyncMock(return_value="test") @@ -1567,7 +1558,6 @@ async def test_get_flow_run_logger_with_worker_id_set( prefect_client: PrefectClient, worker_deployment_wq1, work_pool, - experimental_logging_enabled, ): flow_run = await prefect_client.create_flow_run_from_deployment( worker_deployment_wq1.id @@ -1883,7 +1873,7 @@ async def test_env_merge_logic_is_deep( class TestBaseWorkerHeartbeat: async def test_worker_heartbeat_sends_integrations( - self, work_pool, hosted_api_server, experimental_logging_enabled + self, work_pool, hosted_api_server ): async with WorkerTestImpl(work_pool_name=work_pool.name) as worker: await worker.start(run_once=True) @@ -1926,7 +1916,7 @@ async def test_worker_heartbeat_sends_integrations( assert worker._worker_metadata_sent async def test_custom_worker_can_send_arbitrary_metadata( - self, work_pool, hosted_api_server, experimental_logging_enabled + self, work_pool, hosted_api_server ): class CustomWorker(BaseWorker): type: str = "test-custom-metadata" From be8fa7bca595b15d5e04d452f2c2be96c093091c Mon Sep 17 00:00:00 2001 From: Jordan Fisher <43887886+jdjfisher@users.noreply.github.com> Date: Fri, 15 Nov 2024 17:16:52 +0000 Subject: [PATCH 3/3] fix worker not releasing limit slot on failed propose pending state (#16012) --- src/prefect/workers/base.py | 16 ++++++++++++---- tests/workers/test_base_worker.py | 30 +++++++++++++++++++++++++++++- 2 files changed, 41 insertions(+), 5 deletions(-) diff --git a/src/prefect/workers/base.py b/src/prefect/workers/base.py index f6b33022ed46..e593445b3f4b 100644 --- a/src/prefect/workers/base.py +++ b/src/prefect/workers/base.py @@ -973,10 +973,11 @@ async def _submit_run(self, flow_run: "FlowRun") -> None: else: # If the run is not ready to submit, release the concurrency slot - if self._limiter: - self._limiter.release_on_behalf_of(flow_run.id) + self._release_limit_slot(flow_run.id) self._submitting_flow_run_ids.remove(flow_run.id) + else: + self._release_limit_slot(flow_run.id) async def _submit_run_and_capture_errors( self, flow_run: "FlowRun", task_status: Optional[anyio.abc.TaskStatus] = None @@ -1011,8 +1012,7 @@ async def _submit_run_and_capture_errors( ) return exc finally: - if self._limiter: - self._limiter.release_on_behalf_of(flow_run.id) + self._release_limit_slot(flow_run.id) if not task_status._future.done(): run_logger.error( @@ -1037,6 +1037,14 @@ async def _submit_run_and_capture_errors( return result + def _release_limit_slot(self, flow_run_id: str) -> None: + """ + Frees up a slot taken by the given flow run id. + """ + if self._limiter: + self._limiter.release_on_behalf_of(flow_run_id) + self._logger.debug("Limit slot released for flow run '%s'", flow_run_id) + def get_status(self): """ Retrieves the status of the current worker including its name, current worker diff --git a/tests/workers/test_base_worker.py b/tests/workers/test_base_worker.py index a5c763fb0d26..9f303bfefd77 100644 --- a/tests/workers/test_base_worker.py +++ b/tests/workers/test_base_worker.py @@ -1,7 +1,7 @@ import uuid from typing import Any, Dict, Optional, Type from unittest import mock -from unittest.mock import MagicMock +from unittest.mock import MagicMock, Mock import httpx import pendulum @@ -329,6 +329,34 @@ def create_run_with_deployment_2(state): assert {flow_run.id for flow_run in submitted_flow_runs} == set(flow_run_ids[1:2]) +async def test_worker_releases_limit_slot_when_aborting_a_change_to_pending( + prefect_client: PrefectClient, worker_deployment_wq1, work_pool +): + """Regression test for https://github.com/PrefectHQ/prefect/issues/15952""" + + def create_run_with_deployment(state): + return prefect_client.create_flow_run_from_deployment( + worker_deployment_wq1.id, state=state + ) + + flow_run = await create_run_with_deployment( + Scheduled(scheduled_time=pendulum.now("utc").subtract(days=1)) + ) + + run_mock = AsyncMock() + release_mock = Mock() + + async with WorkerTestImpl(work_pool_name=work_pool.name, limit=1) as worker: + worker.run = run_mock + worker._propose_pending_state = AsyncMock(return_value=False) + worker._release_limit_slot = release_mock + + await worker.get_and_submit_flow_runs() + + run_mock.assert_not_called() + release_mock.assert_called_once_with(flow_run.id) + + async def test_worker_with_work_pool_and_limit( prefect_client: PrefectClient, worker_deployment_wq1, work_pool ):