Skip to content

Commit cae8d86

Browse files
committed
Add task group detail across dag runs
1 parent ef2da6f commit cae8d86

File tree

9 files changed

+173
-34
lines changed

9 files changed

+173
-34
lines changed

airflow-core/src/airflow/ui/src/components/Graph/TaskLink.tsx

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,25 +26,20 @@ type Props = {
2626
} & TaskNameProps;
2727

2828
export const TaskLink = forwardRef<HTMLAnchorElement, Props>(({ id, isGroup, isMapped, ...rest }, ref) => {
29-
const { dagId = "", runId, taskId } = useParams();
29+
const { dagId = "", groupId, runId, taskId } = useParams();
3030
const [searchParams] = useSearchParams();
3131

32-
if (isGroup && runId === undefined) {
33-
return undefined;
34-
}
35-
36-
const pathname = isGroup
37-
? `/dags/${dagId}/runs/${runId}/tasks/group/${id}`
38-
: `/dags/${dagId}/${runId === undefined ? "" : `runs/${runId}/`}${taskId === id ? "" : `tasks/${id}`}${isMapped && taskId !== id && runId !== undefined ? "/mapped" : ""}`;
32+
const basePath = `/dags/${dagId}${runId === undefined ? "" : `/runs/${runId}`}`;
33+
const taskPath = isGroup
34+
? groupId === id
35+
? ""
36+
: `/tasks/group/${id}`
37+
: taskId === id
38+
? ""
39+
: `/tasks/${id}${isMapped && taskId !== id && runId !== undefined ? "/mapped" : ""}`;
3940

4041
return (
41-
<RouterLink
42-
ref={ref}
43-
to={{
44-
pathname,
45-
search: searchParams.toString(),
46-
}}
47-
>
42+
<RouterLink ref={ref} to={{ pathname: basePath + taskPath, search: searchParams.toString() }}>
4843
<TaskName isGroup={isGroup} isMapped={isMapped} {...rest} />
4944
</RouterLink>
5045
);

airflow-core/src/airflow/ui/src/layouts/Details/DagBreadcrumb.tsx

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import { TogglePause } from "src/components/TogglePause";
3232
import { isStatePending, useAutoRefresh } from "src/utils";
3333

3434
export const DagBreadcrumb = () => {
35-
const { dagId = "", mapIndex = "-1", runId, taskId } = useParams();
35+
const { dagId = "", groupId, mapIndex = "-1", runId, taskId } = useParams();
3636
const refetchInterval = useAutoRefresh({ dagId });
3737

3838
const { data: dag } = useDagServiceGetDagDetails({
@@ -86,6 +86,23 @@ export const DagBreadcrumb = () => {
8686
});
8787
}
8888

89+
// Add group breadcrumb
90+
if (groupId !== undefined) {
91+
if (runId === undefined) {
92+
links.push({
93+
label: "All Runs",
94+
title: "Dag Run",
95+
value: `/dags/${dagId}/runs`,
96+
});
97+
}
98+
99+
links.push({
100+
label: groupId,
101+
title: "Group",
102+
value: `/dags/${dagId}/groups/${groupId}`,
103+
});
104+
}
105+
89106
// Add task breadcrumb
90107
if (runId !== undefined && taskId !== undefined) {
91108
if (task?.is_mapped) {

airflow-core/src/airflow/ui/src/layouts/Details/Grid/TaskNames.tsx

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -66,16 +66,26 @@ export const TaskNames = ({ nodes }: Props) => {
6666
>
6767
{node.isGroup ? (
6868
<Flex>
69-
<TaskName
70-
display="inline"
71-
fontSize="sm"
72-
fontWeight="normal"
73-
isGroup={true}
74-
isMapped={Boolean(node.is_mapped)}
75-
label={node.label}
76-
paddingLeft={node.depth * 3 + 2}
77-
setupTeardownType={node.setup_teardown_type}
78-
/>
69+
<Link asChild data-testid={node.id}>
70+
<RouterLink
71+
replace
72+
to={{
73+
pathname: `/dags/${dagId}/tasks/group/${node.id}`,
74+
search: searchParams.toString(),
75+
}}
76+
>
77+
<TaskName
78+
display="inline"
79+
fontSize="sm"
80+
fontWeight="normal"
81+
isGroup={true}
82+
isMapped={Boolean(node.is_mapped)}
83+
label={node.label}
84+
paddingLeft={node.depth * 3 + 2}
85+
setupTeardownType={node.setup_teardown_type}
86+
/>
87+
</RouterLink>
88+
</Link>
7989
<chakra.button
8090
aria-label="Toggle group"
8191
display="inline"

airflow-core/src/airflow/ui/src/pages/Events/Events.tsx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ const eventsColumn = ({ dagId, open, runId, taskId }: EventsColumn): Array<Colum
138138
];
139139

140140
export const Events = () => {
141-
const { dagId, runId, taskId } = useParams();
141+
const { dagId, groupId, runId, taskId } = useParams();
142142
const { setTableURLState, tableURLState } = useTableURLState();
143143
const { pagination, sorting } = tableURLState;
144144
const [sort] = sorting;
@@ -153,7 +153,7 @@ export const Events = () => {
153153
offset: pagination.pageIndex * pagination.pageSize,
154154
orderBy,
155155
runId,
156-
taskId,
156+
taskId: groupId ?? taskId,
157157
},
158158
undefined,
159159
{ enabled: !isNaN(pagination.pageSize) },
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*!
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
import type { NodeResponse } from "openapi/requests/types.gen";
20+
import { TaskIcon } from "src/assets/TaskIcon";
21+
import { HeaderCard } from "src/components/HeaderCard";
22+
23+
export const GroupTaskHeader = ({ groupTask }: { readonly groupTask: NodeResponse }) => (
24+
<HeaderCard
25+
icon={<TaskIcon />}
26+
stats={[{ label: "Operator", value: groupTask.operator ?? "Null" }]}
27+
title={groupTask.label}
28+
/>
29+
);

airflow-core/src/airflow/ui/src/pages/Task/Overview/Overview.tsx

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import { isStatePending, useAutoRefresh } from "src/utils";
3030
const defaultHour = "24";
3131

3232
export const Overview = () => {
33-
const { dagId = "", taskId } = useParams();
33+
const { dagId = "", groupId, taskId } = useParams();
3434

3535
const now = dayjs();
3636
const [startDate, setStartDate] = useState(now.subtract(Number(defaultHour), "hour").toISOString());
@@ -46,7 +46,8 @@ export const Overview = () => {
4646
runAfterGte: startDate,
4747
runAfterLte: endDate,
4848
state: ["failed"],
49-
taskId,
49+
taskDisplayNamePattern: groupId ?? undefined,
50+
taskId: Boolean(groupId) ? undefined : taskId,
5051
});
5152

5253
const { data: taskInstances, isLoading: isLoadingTaskInstances } = useTaskInstanceServiceGetTaskInstances(
@@ -55,7 +56,8 @@ export const Overview = () => {
5556
dagRunId: "~",
5657
limit: 14,
5758
orderBy: "-run_after",
58-
taskId,
59+
taskDisplayNamePattern: groupId ?? undefined,
60+
taskId: Boolean(groupId) ? undefined : taskId,
5961
},
6062
undefined,
6163
{

airflow-core/src/airflow/ui/src/pages/Task/Task.tsx

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@ import { LuChartColumn } from "react-icons/lu";
2121
import { MdOutlineEventNote, MdOutlineTask } from "react-icons/md";
2222
import { useParams } from "react-router-dom";
2323

24-
import { useDagServiceGetDagDetails, useTaskServiceGetTask } from "openapi/queries";
24+
import { useDagServiceGetDagDetails, useGridServiceGridData, useTaskServiceGetTask } from "openapi/queries";
2525
import { DetailsLayout } from "src/layouts/Details/DetailsLayout";
26+
import { getGroupTask } from "src/utils/groupTask";
2627

28+
import { GroupTaskHeader } from "./GroupTaskHeader";
2729
import { Header } from "./Header";
2830

2931
const tabs = [
@@ -33,9 +35,28 @@ const tabs = [
3335
];
3436

3537
export const Task = () => {
36-
const { dagId = "", taskId = "" } = useParams();
38+
const { dagId = "", groupId, taskId } = useParams();
3739

38-
const { data: task, error, isLoading } = useTaskServiceGetTask({ dagId, taskId });
40+
const {
41+
data: task,
42+
error,
43+
isLoading,
44+
} = useTaskServiceGetTask({ dagId, taskId: groupId ?? taskId }, undefined, {
45+
enabled: groupId === undefined,
46+
});
47+
48+
const { data: gridData } = useGridServiceGridData(
49+
{
50+
dagId,
51+
includeDownstream: true,
52+
includeUpstream: true,
53+
},
54+
undefined,
55+
{ enabled: groupId !== undefined },
56+
);
57+
58+
const groupTask =
59+
groupId === undefined ? undefined : getGroupTask(gridData?.structure.nodes ?? [], groupId);
3960

4061
const {
4162
data: dag,
@@ -49,6 +70,7 @@ export const Task = () => {
4970
<ReactFlowProvider>
5071
<DetailsLayout dag={dag} error={error ?? dagError} isLoading={isLoading || isDagLoading} tabs={tabs}>
5172
{task === undefined ? undefined : <Header task={task} />}
73+
{groupTask ? <GroupTaskHeader groupTask={groupTask} /> : undefined}
5274
</DetailsLayout>
5375
</ReactFlowProvider>
5476
);

airflow-core/src/airflow/ui/src/router.tsx

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,15 @@ export const routerConfig = [
176176
element: <GroupTaskInstance />,
177177
path: "dags/:dagId/runs/:runId/tasks/group/:groupId",
178178
},
179+
{
180+
children: [
181+
{ element: <TaskOverview />, index: true },
182+
{ element: <TaskInstances />, path: "task_instances" },
183+
{ element: <Events />, path: "events" },
184+
],
185+
element: <Task />,
186+
path: "dags/:dagId/tasks/group/:groupId",
187+
},
179188
{
180189
children: taskInstanceRoutes,
181190
element: <TaskInstance />,
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*!
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
import type { NodeResponse } from "openapi/requests/types.gen";
20+
21+
/**
22+
* Finds a task node by its ID in a tree of nodes
23+
* @param nodes - Array of root nodes to search through
24+
* @param targetId - ID of the node to find
25+
* @returns The found node or undefined if not found
26+
*/
27+
export const getGroupTask = (nodes: Array<NodeResponse>, targetId: string): NodeResponse | undefined => {
28+
if (!nodes.length || !targetId) {
29+
return undefined;
30+
}
31+
32+
const queue: Array<NodeResponse> = [...nodes];
33+
const [root] = targetId.split(".");
34+
35+
while (queue.length > 0) {
36+
const node = queue.shift();
37+
38+
if (node) {
39+
if (node.id === targetId) {
40+
return node;
41+
}
42+
43+
if (node.children && node.children.length > 0) {
44+
const nextNode =
45+
node.id === root && targetId.includes(".")
46+
? node.children.find((child) => child.id === targetId)
47+
: undefined;
48+
49+
queue.unshift(...(nextNode ? [nextNode] : node.children));
50+
}
51+
}
52+
}
53+
54+
return undefined;
55+
};

0 commit comments

Comments
 (0)