Skip to content

Commit 072a56d

Browse files
ElenaStoevakibanamachineflash1293yngrdyn
authored andcommitted
[Streams] Add data quality badge to Streams table (#234946)
Closes #233966 ## Summary This PR adds a Data quality column to the Streams table. <img width="1167" height="666" alt="Screenshot 2025-09-12 at 17 06 26" src="https://github.com/user-attachments/assets/5d1102ce-0b5d-41fa-ba6d-7ba8eae1ab1e" /> --------- Co-authored-by: kibanamachine <[email protected]> Co-authored-by: Joe Reuter <[email protected]> Co-authored-by: Yngrid Coello <[email protected]>
1 parent 5f1a018 commit 072a56d

File tree

9 files changed

+239
-38
lines changed

9 files changed

+239
-38
lines changed

packages/kbn-optimizer/limits.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ pageLoadAssetSize:
3030
dashboardMarkdown: 5134
3131
data: 453742
3232
dataQuality: 10387
33-
datasetQuality: 27203
33+
datasetQuality: 30094
3434
dataUsage: 8926
3535
dataViewEditor: 7735
3636
dataViewFieldEditor: 26024

x-pack/platform/plugins/shared/dataset_quality/common/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@
88
export type { DatasetQualityConfig } from './plugin_config';
99
export type { FetchOptions } from './fetch_options';
1010
export type { APIClientRequestParamsOf, APIReturnType } from './rest';
11-
export { indexNameToDataStreamParts } from './utils';
11+
export { indexNameToDataStreamParts, mapPercentageToQuality } from './utils';

x-pack/platform/plugins/shared/dataset_quality/public/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,6 @@ export type { IDataStreamsStatsClient } from './services/data_streams_stats/type
1616
export function plugin() {
1717
return new DatasetQualityPlugin();
1818
}
19+
20+
export { DatasetQualityIndicator } from './components/quality_indicator';
21+
export { calculatePercentage } from './utils/calculate_percentage';
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
import React from 'react';
9+
import { mapPercentageToQuality } from '@kbn/dataset-quality-plugin/common';
10+
import { DatasetQualityIndicator, calculatePercentage } from '@kbn/dataset-quality-plugin/public';
11+
import useAsync from 'react-use/lib/useAsync';
12+
import { esqlResultToTimeseries } from '../../util/esql_result_to_timeseries';
13+
import type { StreamHistogramFetch } from '../../hooks/use_streams_histogram_fetch';
14+
15+
export function DataQualityColumn({
16+
histogramQueryFetch,
17+
}: {
18+
histogramQueryFetch: StreamHistogramFetch;
19+
}) {
20+
const histogramQueryResult = useAsync(() => histogramQueryFetch.docCount, [histogramQueryFetch]);
21+
const failedDocsResult = useAsync(
22+
() => histogramQueryFetch.failedDocCount,
23+
[histogramQueryFetch]
24+
);
25+
const degradedDocsResult = useAsync(
26+
() => histogramQueryFetch.degradedDocCount,
27+
[histogramQueryFetch]
28+
);
29+
30+
const allTimeseries = React.useMemo(
31+
() =>
32+
esqlResultToTimeseries({
33+
result: histogramQueryResult,
34+
metricNames: ['doc_count'],
35+
}),
36+
[histogramQueryResult]
37+
);
38+
39+
const docCount = React.useMemo(
40+
() =>
41+
allTimeseries.reduce(
42+
(acc, series) => acc + series.data.reduce((acc2, item) => acc2 + (item.doc_count || 0), 0),
43+
0
44+
),
45+
[allTimeseries]
46+
);
47+
48+
const degradedDocCount = degradedDocsResult?.value
49+
? Number(degradedDocsResult.value?.values?.[0]?.[0])
50+
: 0;
51+
const failedDocCount = failedDocsResult?.value
52+
? Number(failedDocsResult.value?.values?.[0]?.[0])
53+
: 0;
54+
55+
const degradedPercentage = calculatePercentage({
56+
totalDocs: docCount,
57+
count: degradedDocCount,
58+
});
59+
60+
const failedPercentage = calculatePercentage({
61+
totalDocs: docCount,
62+
count: failedDocCount,
63+
});
64+
65+
const quality = mapPercentageToQuality([degradedPercentage, failedPercentage]);
66+
67+
const isLoading =
68+
histogramQueryResult.loading || failedDocsResult?.loading || degradedDocsResult.loading;
69+
70+
return <DatasetQualityIndicator quality={quality} isLoading={isLoading} />;
71+
}

x-pack/platform/plugins/shared/streams_app/public/components/stream_list_view/documents_column.tsx

Lines changed: 15 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -26,56 +26,36 @@ import {
2626
} from '@elastic/charts';
2727
import { useElasticChartsTheme } from '@kbn/charts-theme';
2828
import { i18n } from '@kbn/i18n';
29-
import { useStreamsAppFetch } from '../../hooks/use_streams_app_fetch';
30-
import { useKibana } from '../../hooks/use_kibana';
29+
import useAsync from 'react-use/lib/useAsync';
3130
import { esqlResultToTimeseries } from '../../util/esql_result_to_timeseries';
32-
import { useTimefilter } from '../../hooks/use_timefilter';
31+
import type { useTimefilter } from '../../hooks/use_timefilter';
3332
import { TooltipOrPopoverIcon } from '../tooltip_popover_icon/tooltip_popover_icon';
3433
import { getFormattedError } from '../../util/errors';
34+
import type { StreamHistogramFetch } from '../../hooks/use_streams_histogram_fetch';
3535

3636
export function DocumentsColumn({
3737
indexPattern,
38+
histogramQueryFetch,
39+
timeState,
3840
numDataPoints,
3941
}: {
4042
indexPattern: string;
43+
histogramQueryFetch: StreamHistogramFetch;
44+
timeState: ReturnType<typeof useTimefilter>['timeState'];
4145
numDataPoints: number;
4246
}) {
43-
const { streamsRepositoryClient } = useKibana().dependencies.start.streams;
4447
const chartBaseTheme = useElasticChartsTheme();
4548
const { euiTheme } = useEuiTheme();
4649

47-
const { timeState } = useTimefilter();
48-
49-
const minInterval = Math.floor((timeState.end - timeState.start) / numDataPoints);
50-
51-
const histogramQueryFetch = useStreamsAppFetch(
52-
async ({ signal, timeState: { start, end } }) => {
53-
return streamsRepositoryClient.fetch('POST /internal/streams/esql', {
54-
params: {
55-
body: {
56-
operationName: 'get_doc_count_for_stream',
57-
query: `FROM ${indexPattern} | STATS doc_count = COUNT(*) BY @timestamp = BUCKET(@timestamp, ${minInterval} ms)`,
58-
start,
59-
end,
60-
},
61-
},
62-
signal,
63-
});
64-
},
65-
[streamsRepositoryClient, indexPattern, minInterval],
66-
{
67-
withTimeRange: true,
68-
disableToastOnError: true,
69-
}
70-
);
50+
const histogramQueryResult = useAsync(() => histogramQueryFetch.docCount, [histogramQueryFetch]);
7151

7252
const allTimeseries = React.useMemo(
7353
() =>
7454
esqlResultToTimeseries({
75-
result: histogramQueryFetch,
55+
result: histogramQueryResult,
7656
metricNames: ['doc_count'],
7757
}),
78-
[histogramQueryFetch]
58+
[histogramQueryResult]
7959
);
8060

8161
const docCount = React.useMemo(
@@ -90,14 +70,15 @@ export function DocumentsColumn({
9070
const hasData = docCount > 0;
9171

9272
const xFormatter = niceTimeFormatter([timeState.start, timeState.end]);
73+
const minInterval = Math.floor((timeState.end - timeState.start) / numDataPoints);
9374

94-
const noDocCountData = histogramQueryFetch.error ? '' : '-';
75+
const noDocCountData = histogramQueryResult.error ? '' : '-';
9576

96-
const noHistogramData = histogramQueryFetch.error ? (
77+
const noHistogramData = histogramQueryResult.error ? (
9778
<TooltipOrPopoverIcon
9879
dataTestSubj="streamsDocCount-error"
9980
icon="warning"
100-
title={getFormattedError(histogramQueryFetch.error).message}
81+
title={getFormattedError(histogramQueryResult.error).message}
10182
mode="popover"
10283
iconColor="danger"
10384
/>
@@ -126,7 +107,7 @@ export function DocumentsColumn({
126107
role="group"
127108
aria-label={cellAriaLabel}
128109
>
129-
{histogramQueryFetch.loading ? (
110+
{histogramQueryResult.loading ? (
130111
<LoadingPlaceholder />
131112
) : (
132113
<>

x-pack/platform/plugins/shared/streams_app/public/components/stream_list_view/translations.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ export const DOCUMENTS_COLUMN_HEADER = i18n.translate(
1616
{ defaultMessage: 'Documents' }
1717
);
1818

19+
export const DATA_QUALITY_COLUMN_HEADER = i18n.translate(
20+
'xpack.streams.streamsTreeTable.dataQualityColumnName',
21+
{ defaultMessage: 'Data Quality' }
22+
);
23+
1924
export const RETENTION_COLUMN_HEADER = i18n.translate(
2025
'xpack.streams.streamsTreeTable.retentionColumnName',
2126
{ defaultMessage: 'Retention' }

x-pack/platform/plugins/shared/streams_app/public/components/stream_list_view/tree_table.tsx

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@ import type { TableRow, SortableField } from './utils';
2424
import { buildStreamRows, asTrees, enrichStream, shouldComposeTree } from './utils';
2525
import { StreamsAppSearchBar } from '../streams_app_search_bar';
2626
import { DocumentsColumn } from './documents_column';
27+
import { DataQualityColumn } from './data_quality_column';
2728
import { useStreamsAppRouter } from '../../hooks/use_streams_app_router';
29+
import { useStreamHistogramFetch } from '../../hooks/use_streams_histogram_fetch';
30+
import { useTimefilter } from '../../hooks/use_timefilter';
2831
import { RetentionColumn } from './retention_column';
2932
import {
3033
NAME_COLUMN_HEADER,
@@ -33,6 +36,7 @@ import {
3336
STREAMS_TABLE_CAPTION_ARIA_LABEL,
3437
RETENTION_COLUMN_HEADER_ARIA_LABEL,
3538
NO_STREAMS_MESSAGE,
39+
DATA_QUALITY_COLUMN_HEADER,
3640
DOCUMENTS_COLUMN_HEADER,
3741
} from './translations';
3842
import { DiscoverBadgeButton } from '../stream_badges';
@@ -46,6 +50,7 @@ export function StreamsTreeTable({
4650
}) {
4751
const router = useStreamsAppRouter();
4852
const { euiTheme } = useEuiTheme();
53+
const { timeState } = useTimefilter();
4954

5055
const [searchQuery, setSearchQuery] = useState('');
5156
const [sortField, setSortField] = useState<SortableField>('nameSortKey');
@@ -77,6 +82,10 @@ export function StreamsTreeTable({
7782
}
7883
};
7984

85+
const numDataPoints = 25;
86+
87+
const { getStreamDocCounts } = useStreamHistogramFetch(numDataPoints);
88+
8089
const sorting = {
8190
sort: {
8291
field: sortField,
@@ -142,7 +151,23 @@ export function StreamsTreeTable({
142151
dataType: 'number',
143152
render: (_: unknown, item: TableRow) =>
144153
item.data_stream ? (
145-
<DocumentsColumn indexPattern={item.stream.name} numDataPoints={25} />
154+
<DocumentsColumn
155+
indexPattern={item.stream.name}
156+
histogramQueryFetch={getStreamDocCounts(item.stream.name)}
157+
timeState={timeState}
158+
numDataPoints={numDataPoints}
159+
/>
160+
) : null,
161+
},
162+
{
163+
field: 'dataQuality',
164+
name: DATA_QUALITY_COLUMN_HEADER,
165+
width: '150px',
166+
sortable: false,
167+
dataType: 'number',
168+
render: (_: unknown, item: TableRow) =>
169+
item.data_stream ? (
170+
<DataQualityColumn histogramQueryFetch={getStreamDocCounts(item.stream.name)} />
146171
) : null,
147172
},
148173
{
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
import type { UnparsedEsqlResponse } from '@kbn/traced-es-client';
9+
import { useEffect, useRef } from 'react';
10+
import { useKibana } from './use_kibana';
11+
import { useTimefilter } from './use_timefilter';
12+
13+
export interface StreamHistogramFetch {
14+
docCount: Promise<UnparsedEsqlResponse>;
15+
failedDocCount: Promise<UnparsedEsqlResponse>;
16+
degradedDocCount: Promise<UnparsedEsqlResponse>;
17+
}
18+
19+
export function useStreamHistogramFetch(numDataPoints: number): {
20+
getStreamDocCounts(streamName: string): StreamHistogramFetch;
21+
} {
22+
const { timeState, timeState$ } = useTimefilter();
23+
const { streamsRepositoryClient } = useKibana().dependencies.start.streams;
24+
const promiseCache = useRef<Partial<Record<string, StreamHistogramFetch>>>({});
25+
const abortControllerRef = useRef<AbortController>();
26+
27+
if (!abortControllerRef.current) {
28+
abortControllerRef.current = new AbortController();
29+
}
30+
31+
useEffect(() => {
32+
return () => {
33+
abortControllerRef.current?.abort();
34+
};
35+
}, []);
36+
37+
useEffect(() => {
38+
const subscription = timeState$.subscribe({
39+
next: ({ kind }) => {
40+
const shouldRefresh = kind !== 'initial';
41+
42+
if (shouldRefresh) {
43+
promiseCache.current = {};
44+
abortControllerRef.current?.abort();
45+
abortControllerRef.current = new AbortController();
46+
}
47+
},
48+
});
49+
return () => {
50+
subscription.unsubscribe();
51+
};
52+
}, [timeState$]);
53+
54+
return {
55+
getStreamDocCounts(streamName: string) {
56+
if (promiseCache.current[streamName]) {
57+
return promiseCache.current[streamName] as StreamHistogramFetch;
58+
}
59+
60+
const abortController = abortControllerRef.current;
61+
62+
if (!abortController) {
63+
throw new Error('Abort controller not set');
64+
}
65+
66+
const minInterval = Math.floor((timeState.end - timeState.start) / numDataPoints);
67+
68+
const countPromise = streamsRepositoryClient.fetch('POST /internal/streams/esql', {
69+
params: {
70+
body: {
71+
operationName: 'get_doc_count_for_stream',
72+
query: `FROM ${streamName},${streamName}::failures | STATS doc_count = COUNT(*) BY @timestamp = BUCKET(@timestamp, ${minInterval} ms)`,
73+
start: timeState.start,
74+
end: timeState.end,
75+
},
76+
},
77+
signal: abortController.signal,
78+
});
79+
80+
const failedCountPromise = streamsRepositoryClient.fetch('POST /internal/streams/esql', {
81+
params: {
82+
body: {
83+
operationName: 'get_failed_doc_count_for_stream',
84+
query: `FROM ${streamName}::failures | STATS failed_doc_count = count(*)`,
85+
start: timeState.start,
86+
end: timeState.end,
87+
},
88+
},
89+
signal: abortController.signal,
90+
});
91+
92+
const degradedCountPromise = streamsRepositoryClient.fetch('POST /internal/streams/esql', {
93+
params: {
94+
body: {
95+
operationName: 'get_degraded_doc_count_for_stream',
96+
query: `FROM ${streamName} METADATA _ignored | WHERE _ignored IS NOT NULL | STATS degraded_doc_count = count(*)`,
97+
start: timeState.start,
98+
end: timeState.end,
99+
},
100+
},
101+
signal: abortController.signal,
102+
});
103+
104+
const histogramFetch = {
105+
docCount: countPromise,
106+
failedDocCount: failedCountPromise,
107+
degradedDocCount: degradedCountPromise,
108+
};
109+
110+
promiseCache.current[streamName] = histogramFetch;
111+
112+
return histogramFetch;
113+
},
114+
};
115+
}

x-pack/platform/plugins/shared/streams_app/public/util/esql_result_to_timeseries.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import type { AbortableAsyncState } from '@kbn/react-hooks';
88
import type { UnparsedEsqlResponse } from '@kbn/traced-es-client';
99
import { orderBy } from 'lodash';
10+
import type { AsyncState } from 'react-use/lib/useAsync';
1011

1112
interface Timeseries<T extends string> {
1213
id: string;
@@ -19,7 +20,7 @@ export function esqlResultToTimeseries<T extends string>({
1920
result,
2021
metricNames,
2122
}: {
22-
result: AbortableAsyncState<UnparsedEsqlResponse>;
23+
result: AsyncState<UnparsedEsqlResponse> | AbortableAsyncState<UnparsedEsqlResponse>;
2324
metricNames: T[];
2425
}): Array<Timeseries<T>> {
2526
const columns = result.value?.columns;

0 commit comments

Comments
 (0)