Skip to content

Commit

Permalink
Merge branch 'main' into jean/cloud-565-task-run-instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
jeanluciano authored Nov 15, 2024
2 parents 5553d21 + be8fa7b commit 3dc228e
Show file tree
Hide file tree
Showing 12 changed files with 125 additions and 108 deletions.
13 changes: 1 addition & 12 deletions docs/3.0/deploy/infrastructure-concepts/workers.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -103,17 +103,8 @@ If a worker misses three heartbeats, it is considered offline. By default, a wor
after it stopped sending heartbeats, but you can configure the threshold with the `PREFECT_WORKER_HEARTBEAT_SECONDS` setting.

### Worker logs
<Warning>This feature is experimental as of Prefect version 3.1.1</Warning>
<span class="badge cloud"></span> Workers send logs to the Prefect Cloud API if you're connected to Prefect Cloud.

<span class="badge cloud"></span> Workers can now send logs directly to Prefect Cloud.

To enable this feature, run the command:
```bash
prefect config set PREFECT_EXPERIMENTS_WORKER_LOGGING_TO_API_ENABLED=true
```
See our [profile settings](/3.0/manage/settings-and-profiles/) documentation for more information about changing settings.

Once enabled:
- All worker logs are automatically sent to the Prefect Cloud API
- Logs are accessible through both the Prefect Cloud UI and API
- Each flow run will include a link to its associated worker's logs
Expand All @@ -126,8 +117,6 @@ Once enabled:
- Installed Prefect integrations (e.g., `prefect-aws`, `prefect-gcp`)
- Live worker logs (if worker logging is enabled)

<Warning>Worker logging is an experimental feature as of Prefect version 3.1.1.</Warning> If your worker is using an earlier version of Prefect or you have not opted in to the experiment, this page will not show worker logs.

Access a worker's details by clicking on the worker's name in the Work Pool list.

### Start a worker
Expand Down
12 changes: 0 additions & 12 deletions docs/3.0/develop/settings-ref.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -455,18 +455,6 @@ If `True`, warn on usage of experimental features.
**Supported environment variables**:
`PREFECT_EXPERIMENTS_WARN`, `PREFECT_EXPERIMENTAL_WARN`

### `worker_logging_to_api_enabled`
Enables the logging of worker logs to Prefect Cloud.

**Type**: `boolean`

**Default**: `False`

**TOML dotted key path**: `experiments.worker_logging_to_api_enabled`

**Supported environment variables**:
`PREFECT_EXPERIMENTS_WORKER_LOGGING_TO_API_ENABLED`

### `telemetry_enabled`
Enables sending telemetry to Prefect Cloud.

Expand Down
9 changes: 0 additions & 9 deletions schemas/settings.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -319,15 +319,6 @@
"title": "Warn",
"type": "boolean"
},
"worker_logging_to_api_enabled": {
"default": false,
"description": "Enables the logging of worker logs to Prefect Cloud.",
"supported_environment_variables": [
"PREFECT_EXPERIMENTS_WORKER_LOGGING_TO_API_ENABLED"
],
"title": "Worker Logging To Api Enabled",
"type": "boolean"
},
"telemetry_enabled": {
"default": false,
"description": "Enables sending telemetry to Prefect Cloud.",
Expand Down
9 changes: 5 additions & 4 deletions src/prefect/logging/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
PREFECT_LOGGING_TO_API_BATCH_SIZE,
PREFECT_LOGGING_TO_API_MAX_LOG_SIZE,
PREFECT_LOGGING_TO_API_WHEN_MISSING_FLOW,
get_current_settings,
)


Expand Down Expand Up @@ -241,10 +240,12 @@ def _get_payload_size(self, log: Dict[str, Any]) -> int:

class WorkerAPILogHandler(APILogHandler):
def emit(self, record: logging.LogRecord):
if get_current_settings().experiments.worker_logging_to_api_enabled:
super().emit(record)
else:
# Open-source API servers do not currently support worker logs, and
# worker logs only have an associated worker ID when connected to Cloud,
# so we won't send worker logs to the API unless they have a worker ID.
if not getattr(record, "worker_id", None):
return
super().emit(record)

def prepare(self, record: logging.LogRecord) -> Dict[str, Any]:
"""
Expand Down
5 changes: 0 additions & 5 deletions src/prefect/settings/models/experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@ class ExperimentsSettings(PrefectBaseSettings):
),
)

worker_logging_to_api_enabled: bool = Field(
default=False,
description="Enables the logging of worker logs to Prefect Cloud.",
)

telemetry_enabled: bool = Field(
default=False,
description="Enables sending telemetry to Prefect Cloud.",
Expand Down
4 changes: 2 additions & 2 deletions src/prefect/utilities/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import socket
import urllib.parse
from string import Formatter
from typing import TYPE_CHECKING, Any, Dict, Literal, Optional, Union
from typing import TYPE_CHECKING, Any, Literal, Optional, Union
from urllib.parse import urlparse
from uuid import UUID

Expand Down Expand Up @@ -135,7 +135,7 @@ def url_for(
obj_id: Optional[Union[str, UUID]] = None,
url_type: URLType = "ui",
default_base_url: Optional[str] = None,
**additional_format_kwargs: Optional[Dict[str, Any]],
**additional_format_kwargs: Any,
) -> Optional[str]:
"""
Returns the URL for a Prefect object.
Expand Down
27 changes: 16 additions & 11 deletions src/prefect/workers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -821,14 +821,14 @@ async def sync_with_backend(self):
self._logger = get_worker_logger(self)

self._logger.debug(
f"Worker synchronized with the Prefect API server. Remote ID: {self.backend_id}"
"Worker synchronized with the Prefect API server. "
+ (f"Remote ID: {self.backend_id}" if self.backend_id else "")
)

def _should_get_worker_id(self):
"""Determines if the worker should request an ID from the API server."""
return (
get_current_settings().experiments.worker_logging_to_api_enabled
and self._client
self._client
and self._client.server_type == ServerType.CLOUD
and self.backend_id is None
)
Expand Down Expand Up @@ -886,10 +886,7 @@ async def _submit_scheduled_flow_runs(
run_logger.info(
f"Worker '{self.name}' submitting flow run '{flow_run.id}'"
)
if (
get_current_settings().experiments.worker_logging_to_api_enabled
and self.backend_id
):
if self.backend_id:
try:
worker_url = url_for(
"worker",
Expand Down Expand Up @@ -976,10 +973,11 @@ async def _submit_run(self, flow_run: "FlowRun") -> None:

else:
# If the run is not ready to submit, release the concurrency slot
if self._limiter:
self._limiter.release_on_behalf_of(flow_run.id)
self._release_limit_slot(flow_run.id)

self._submitting_flow_run_ids.remove(flow_run.id)
else:
self._release_limit_slot(flow_run.id)

async def _submit_run_and_capture_errors(
self, flow_run: "FlowRun", task_status: Optional[anyio.abc.TaskStatus] = None
Expand Down Expand Up @@ -1014,8 +1012,7 @@ async def _submit_run_and_capture_errors(
)
return exc
finally:
if self._limiter:
self._limiter.release_on_behalf_of(flow_run.id)
self._release_limit_slot(flow_run.id)

if not task_status._future.done():
run_logger.error(
Expand All @@ -1040,6 +1037,14 @@ async def _submit_run_and_capture_errors(

return result

def _release_limit_slot(self, flow_run_id: str) -> None:
"""
Frees up a slot taken by the given flow run id.
"""
if self._limiter:
self._limiter.release_on_behalf_of(flow_run_id)
self._logger.debug("Limit slot released for flow run '%s'", flow_run_id)

def get_status(self):
"""
Retrieves the status of the current worker including its name, current worker
Expand Down
91 changes: 61 additions & 30 deletions tests/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
from prefect.server.schemas.actions import LogCreate
from prefect.settings import (
PREFECT_API_KEY,
PREFECT_EXPERIMENTS_WORKER_LOGGING_TO_API_ENABLED,
PREFECT_LOGGING_COLORS,
PREFECT_LOGGING_LEVEL,
PREFECT_LOGGING_MARKUP,
Expand Down Expand Up @@ -637,23 +636,37 @@ def test_handler_knows_how_large_logs_are(self):
assert handler._get_payload_size(dict_log) == log_size


WORKER_ID = uuid.uuid4()


class TestWorkerLogging:
class WorkerTestImpl(BaseWorker):
type: str = "logging_test"
class CloudWorkerTestImpl(BaseWorker):
type: str = "cloud_logging_test"
job_configuration: Type[BaseJobConfiguration] = BaseJobConfiguration

async def _send_worker_heartbeat(self, *_, **__):
return "test_backend_id"
"""
Workers only return an ID here if they're connected to Cloud,
so this simulates the worker being connected to Cloud.
"""
return WORKER_ID

async def run(self, *_, **__):
pass

@pytest.fixture
def experiment_enabled(self):
with temporary_settings(
updates={PREFECT_EXPERIMENTS_WORKER_LOGGING_TO_API_ENABLED: True}
):
yield
class ServerWorkerTestImpl(BaseWorker):
type: str = "server_logging_test"
job_configuration: Type[BaseJobConfiguration] = BaseJobConfiguration

async def run(self, *_, **__):
pass

async def _send_worker_heartbeat(self, *_, **__):
"""
Workers only return an ID here if they're connected to Cloud,
so this simulates the worker not being connected to Cloud.
"""
return None

@pytest.fixture
def logging_to_api_enabled(self):
Expand All @@ -672,24 +685,24 @@ def logger(self, worker_handler):
yield logger
logger.removeHandler(worker_handler)

async def test_get_worker_logger_works_with_no_backend_id(self, experiment_enabled):
async with self.WorkerTestImpl(
async def test_get_worker_logger_works_with_no_backend_id(self):
async with self.CloudWorkerTestImpl(
name="test", work_pool_name="test-work-pool"
) as worker:
logger = get_worker_logger(worker)
assert logger.name == "prefect.workers.logging_test.test"
assert logger.name == "prefect.workers.cloud_logging_test.test"

async def test_get_worker_logger_works_with_backend_id(self, experiment_enabled):
async with self.WorkerTestImpl(
async def test_get_worker_logger_works_with_backend_id(self):
async with self.CloudWorkerTestImpl(
name="test", work_pool_name="test-work-pool"
) as worker:
await worker.sync_with_backend()
logger = get_worker_logger(worker)
assert logger.name == "prefect.workers.logging_test.test"
assert logger.extra["worker_id"] == "test_backend_id"
assert logger.name == "prefect.workers.cloud_logging_test.test"
assert logger.extra["worker_id"] == str(WORKER_ID)

async def test_worker_emits_logs_with_worker_id(self, caplog, experiment_enabled):
async with self.WorkerTestImpl(
async def test_worker_emits_logs_with_worker_id(self, caplog):
async with self.CloudWorkerTestImpl(
name="test", work_pool_name="test-work-pool"
) as worker:
await worker.sync_with_backend()
Expand All @@ -700,21 +713,39 @@ async def test_worker_emits_logs_with_worker_id(self, caplog, experiment_enabled
]

assert "testing_with_extras" in caplog.text
assert record_with_extras[0].worker_id == worker.backend_id
assert worker._logger.extra["worker_id"] == worker.backend_id
assert record_with_extras[0].worker_id == str(worker.backend_id)
assert worker._logger.extra["worker_id"] == str(worker.backend_id)

def test_worker_logger_sends_log_to_api_worker(
self, logger, mock_log_worker, experiment_enabled, logging_to_api_enabled
async def test_worker_logger_sends_log_to_api_worker_when_connected_to_cloud(
self, mock_log_worker, worker_handler, logging_to_api_enabled
):
logger.info("test-worker-log")
async with self.CloudWorkerTestImpl(
name="test", work_pool_name="test-work-pool"
) as worker:
await worker.sync_with_backend()
worker._logger.debug("test-worker-log")

log_statement = [
log
for call in mock_log_worker.instance().send.call_args_list
for log in call.args
if log["name"] == worker._logger.name
and log["message"] == "test-worker-log"
]

mock_log_worker.instance().send.assert_called_once()
assert len(mock_log_worker.instance().send.call_args_list) == 1
assert len(log_statement) == 1
assert log_statement[0]["worker_id"] == str(worker.backend_id)

log_statement = mock_log_worker.instance().send.call_args.args[0]
assert log_statement["name"] == logger.name
assert log_statement["level"] == 20
assert log_statement["message"] == "test-worker-log"
async def test_worker_logger_does_not_send_logs_when_not_connected_to_cloud(
self, mock_log_worker, worker_handler, logging_to_api_enabled
):
async with self.ServerWorkerTestImpl(
name="test", work_pool_name="test-work-pool"
) as worker:
assert isinstance(worker._logger, logging.Logger)
worker._logger.debug("test-worker-log")

mock_log_worker.instance().send.assert_not_called()


class TestAPILogWorker:
Expand Down
1 change: 0 additions & 1 deletion tests/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@
"PREFECT_EXPERIMENTAL_WARN": {"test_value": True, "legacy": True},
"PREFECT_EXPERIMENTS_TELEMETRY_ENABLED": {"test_value": False},
"PREFECT_EXPERIMENTS_WARN": {"test_value": True},
"PREFECT_EXPERIMENTS_WORKER_LOGGING_TO_API_ENABLED": {"test_value": False},
"PREFECT_FLOW_DEFAULT_RETRIES": {"test_value": 10, "legacy": True},
"PREFECT_FLOWS_DEFAULT_RETRIES": {"test_value": 10},
"PREFECT_FLOW_DEFAULT_RETRY_DELAY_SECONDS": {"test_value": 10, "legacy": True},
Expand Down
Loading

0 comments on commit 3dc228e

Please sign in to comment.