Skip to content

Commit

Permalink
Flow run logs csv download (#14703)
Browse files Browse the repository at this point in the history
  • Loading branch information
pleek91 authored Jul 29, 2024
1 parent 85bea69 commit 8751f56
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
---
openapi: get /api/flow_runs/{id}/logs
---
53 changes: 53 additions & 0 deletions docs/3.0rc/api-ref/rest-api/server/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1508,6 +1508,59 @@
}
}
},
"/api/flow_runs/{id}/download-logs-csv": {
"get": {
"tags": [
"Flow Runs"
],
"summary": "Download Logs",
"description": "Download all flow run logs as a CSV file, collecting all logs until there are no more logs to retrieve.",
"operationId": "download_logs_flow_runs__id__download_logs_csv_get",
"parameters": [
{
"name": "id",
"in": "path",
"required": true,
"schema": {
"type": "string",
"format": "uuid",
"description": "The flow run id",
"title": "Id"
},
"description": "The flow run id"
},
{
"name": "x-prefect-api-version",
"in": "header",
"required": false,
"schema": {
"type": "string",
"title": "X-Prefect-Api-Version"
}
}
],
"responses": {
"200": {
"description": "Successful Response",
"content": {
"application/json": {
"schema": {}
}
}
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/HTTPValidationError"
}
}
}
}
}
}
},
"/api/task_runs/": {
"post": {
"tags": [
Expand Down
1 change: 1 addition & 0 deletions docs/mint.json
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@
"3.0rc/api-ref/rest-api/server/flow-runs/read-flow-run-input",
"3.0rc/api-ref/rest-api/server/flow-runs/delete-flow-run-input",
"3.0rc/api-ref/rest-api/server/flow-runs/paginate-flow-runs",
"3.0rc/api-ref/rest-api/server/flow-runs/download-logs",
"3.0rc/api-ref/rest-api/server/flow-runs/read-flow-run-history",
"3.0rc/api-ref/rest-api/server/flow-runs/count-task-runs-by-flow-run"
]
Expand Down
73 changes: 72 additions & 1 deletion src/prefect/server/api/flow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
Routes for interacting with flow run objects.
"""

import csv
import datetime
import io
from typing import Any, Dict, List, Optional
from uuid import UUID

Expand All @@ -18,7 +20,7 @@
Response,
status,
)
from fastapi.responses import ORJSONResponse, PlainTextResponse
from fastapi.responses import ORJSONResponse, PlainTextResponse, StreamingResponse
from pydantic_extra_types.pendulum_dt import DateTime
from sqlalchemy.exc import IntegrityError

Expand Down Expand Up @@ -767,3 +769,72 @@ async def paginate_flow_runs(
).model_dump(mode="json")

return ORJSONResponse(content=response)


FLOW_RUN_LOGS_CSV_PAGE_LIMIT = 1000


@router.get("/{id}/download-logs-csv")
async def download_logs(
flow_run_id: UUID = Path(..., description="The flow run id", alias="id"),
db: PrefectDBInterface = Depends(provide_database_interface),
) -> StreamingResponse:
"""
Download all flow run logs as a CSV file, collecting all logs until there are no more logs to retrieve.
"""
async with db.session_context() as session:
flow_run = await models.flow_runs.read_flow_run(
session=session, flow_run_id=flow_run_id
)

if not flow_run:
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Flow run not found")

async def generate():
data = io.StringIO()
csv_writer = csv.writer(data)
csv_writer.writerow(
["timestamp", "level", "flow_run_id", "task_run_id", "message"]
)

offset = 0
limit = FLOW_RUN_LOGS_CSV_PAGE_LIMIT

while True:
results = await models.logs.read_logs(
session=session,
log_filter=schemas.filters.LogFilter(
flow_run_id={"any_": [flow_run_id]}
),
offset=offset,
limit=limit,
sort=schemas.sorting.LogSort.TIMESTAMP_ASC,
)

if not results:
break

offset += limit

for log in results:
csv_writer.writerow(
[
log.timestamp,
log.level,
log.flow_run_id,
log.task_run_id,
log.message,
]
)
data.seek(0)
yield data.read()
data.seek(0)
data.truncate(0)

return StreamingResponse(
generate(),
media_type="text/csv",
headers={
"Content-Disposition": f"attachment; filename={flow_run.name}-logs.csv"
},
)
97 changes: 97 additions & 0 deletions tests/server/orchestration/api/test_flow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from prefect.input import RunInput, keyset_from_paused_state
from prefect.server import models, schemas
from prefect.server.schemas import core, responses
from prefect.server.schemas.actions import LogCreate
from prefect.server.schemas.core import TaskRunResult
from prefect.server.schemas.responses import FlowRunResponse, OrchestrationResult
from prefect.server.schemas.states import StateType
Expand Down Expand Up @@ -2693,3 +2694,99 @@ async def test_read_subflow_runs_non_existant(

assert response.status_code == status.HTTP_200_OK, response.text
assert len(response.json()["results"]) == 0


class TestDownloadFlowRunLogs:
@pytest.fixture
async def flow_run_1(self, session, flow):
model = await models.flow_runs.create_flow_run(
session=session,
flow_run=schemas.actions.FlowRunCreate(flow_id=flow.id, flow_version="0.1"),
)

await session.commit()

return model

@pytest.fixture
async def flow_run_2(self, session, flow):
model = await models.flow_runs.create_flow_run(
session=session,
flow_run=schemas.actions.FlowRunCreate(flow_id=flow.id, flow_version="0.1"),
)

await session.commit()

return model

@pytest.fixture
async def flow_run_1_logs(self, flow_run_1, session):
NOW = pendulum.now("UTC")

logs = [
LogCreate(
name="prefect.flow_run",
level=10,
message=f"Log message {i}",
timestamp=NOW,
flow_run_id=flow_run_1.id,
)
for i in range(10)
]

await models.logs.create_logs(session=session, logs=logs)

await session.commit()

return logs

@pytest.fixture
async def flow_run_2_logs(self, flow_run_2, session):
NOW = pendulum.now("UTC")

logs = [
LogCreate(
name="prefect.flow_run",
level=10,
message=f"Log message {i}",
timestamp=NOW,
flow_run_id=flow_run_2.id,
)
for i in range(10)
]

await models.logs.create_logs(session=session, logs=logs)

await session.commit()

return logs

async def test_download_flow_run_logs(
self,
client,
flow_run_1,
flow_run_2,
flow_run_1_logs,
flow_run_2_logs,
monkeypatch: pytest.MonkeyPatch,
):
monkeypatch.setattr(
"prefect.server.api.flow_runs.FLOW_RUN_LOGS_CSV_PAGE_LIMIT", 3
)

async with client.stream(
"GET", f"/flow_runs/{flow_run_1.id}/download-logs-csv"
) as response:
response_body = [
str(chunk, "UTF-8") async for chunk in response.aiter_bytes()
]

lines = "".join(response_body).splitlines()
line_count = len(lines)

# number of logs generated plus 1 for the header row
expected_line_count = len(flow_run_1_logs) + 1

assert (
line_count == expected_line_count
), f"Expected {expected_line_count} lines, got {line_count}"

0 comments on commit 8751f56

Please sign in to comment.