Skip to content

Commit cec82fc

Browse files
committed
Enable streaming log
1 parent 7f1fe81 commit cec82fc

File tree

3 files changed

+25
-3
lines changed

3 files changed

+25
-3
lines changed

airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/Logs.test.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,6 @@ describe("Task log grouping", () => {
9898

9999
fireEvent.click(collapseItem);
100100

101-
expect(screen.getByText(/Marking task as SUCCESS/iu)).toBeVisible();
101+
await waitFor(() => expect(screen.queryByText(/Marking task as SUCCESS/iu)).toBeVisible());
102102
}, 10_000);
103103
});

airflow-core/src/airflow/ui/src/queries/useLogs.tsx

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import type { TaskInstanceResponse, TaskInstancesLogResponse } from "openapi/req
2828
import { renderStructuredLog } from "src/components/renderStructuredLog";
2929
import { isStatePending, useAutoRefresh } from "src/utils";
3030
import { getTaskInstanceLink } from "src/utils/links";
31+
import { parseStreamingLogContent } from "src/utils/logs";
3132

3233
type Props = {
3334
accept?: "*/*" | "application/json" | "application/x-ndjson";
@@ -178,7 +179,7 @@ const parseLogs = ({
178179

179180
export const useLogs = (
180181
{
181-
accept = "application/json",
182+
accept = "application/x-ndjson",
182183
dagId,
183184
expanded,
184185
logLevelFilters,
@@ -215,7 +216,7 @@ export const useLogs = (
215216
);
216217

217218
const parsedData = parseLogs({
218-
data: data?.content ?? [],
219+
data: parseStreamingLogContent(data),
219220
expanded,
220221
logLevelFilters,
221222
showSource,

airflow-core/src/airflow/ui/src/utils/logs.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
/* eslint-disable perfectionist/sort-objects */
2323
import { createListCollection } from "@chakra-ui/react";
2424

25+
import type { TaskInstancesLogResponse } from "openapi/requests/types.gen";
26+
2527
export enum LogLevel {
2628
DEBUG = "debug",
2729
INFO = "info",
@@ -51,3 +53,22 @@ export const logLevelOptions = createListCollection<{
5153
{ label: "dag:logs.critical", value: LogLevel.CRITICAL },
5254
],
5355
});
56+
57+
export const parseStreamingLogContent = (
58+
data: TaskInstancesLogResponse | undefined,
59+
): TaskInstancesLogResponse["content"] => {
60+
if (!data?.content) {
61+
const content = data as unknown as string;
62+
63+
try {
64+
return content
65+
.split("\n")
66+
.filter((line) => line.trim() !== "")
67+
.map((line) => JSON.parse(line) as string);
68+
} catch {
69+
return [];
70+
}
71+
}
72+
73+
return data.content;
74+
};

0 commit comments

Comments
 (0)