Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
1831378
[Streams] Add data quality badge to Streams table
ElenaStoeva Sep 12, 2025
6847eb0
Merge branch 'main' into streams/data-quality-badge
ElenaStoeva Sep 12, 2025
cee73e4
Handle failed requests
ElenaStoeva Sep 15, 2025
3c73798
Merge branch 'main' into streams/data-quality-badge
ElenaStoeva Sep 15, 2025
f88420e
[CI] Auto-commit changed files from 'node scripts/yarn_deduplicate'
kibanamachine Sep 15, 2025
0d4cb5f
[CI] Auto-commit changed files from 'node scripts/eslint_all_files --…
kibanamachine Sep 15, 2025
7217c4b
Reuse badge component from Data quality
ElenaStoeva Sep 15, 2025
5636dd5
Address feedback
ElenaStoeva Sep 15, 2025
4d6ef15
Move indicator to package
ElenaStoeva Sep 15, 2025
9a34754
[CI] Auto-commit changed files from 'node scripts/eslint_all_files --…
kibanamachine Sep 15, 2025
32a4703
Fix ci errors
ElenaStoeva Sep 15, 2025
e54eab4
Merge branch 'streams/data-quality-badge' of https://github.com/Elena…
ElenaStoeva Sep 15, 2025
cd87ba5
Revert "Move indicator to package"
ElenaStoeva Sep 16, 2025
c8ab667
Address feedback
ElenaStoeva Sep 16, 2025
14efc9b
[CI] Auto-commit changed files from 'node scripts/yarn_deduplicate'
kibanamachine Sep 16, 2025
2de9eb8
[CI] Auto-commit changed files from 'node scripts/eslint_all_files --…
kibanamachine Sep 16, 2025
0c19f99
[CI] Auto-commit changed files from 'node scripts/yarn_deduplicate'
kibanamachine Sep 16, 2025
448f9d5
Remove types from package
ElenaStoeva Sep 16, 2025
d480e5f
Add failed docs to histogram query
ElenaStoeva Sep 16, 2025
27bce88
Revert exporting to package
ElenaStoeva Sep 16, 2025
1ed398b
Merge branch 'main' into streams/data-quality-badge
ElenaStoeva Sep 16, 2025
f2599e6
Revert translation changes
ElenaStoeva Sep 16, 2025
e81687a
Remove constants from package
ElenaStoeva Sep 16, 2025
d0998b0
Update bundle size limit
ElenaStoeva Sep 16, 2025
878168b
Merge branch 'main' into streams/data-quality-badge
ElenaStoeva Sep 16, 2025
ab1eda4
centralize fetching
flash1293 Sep 17, 2025
2eae0d7
remove debug thingy
flash1293 Sep 17, 2025
982f1ad
Update x-pack/platform/plugins/shared/streams_app/public/components/s…
ElenaStoeva Sep 17, 2025
f07c843
Update x-pack/platform/plugins/shared/streams_app/public/components/s…
ElenaStoeva Sep 17, 2025
ef94bb5
Update x-pack/platform/plugins/shared/streams_app/public/hooks/use_st…
ElenaStoeva Sep 17, 2025
3ed5d15
Update x-pack/platform/plugins/shared/streams_app/public/components/s…
ElenaStoeva Sep 17, 2025
54d2770
Address feedback
ElenaStoeva Sep 17, 2025
bef0633
fix type
flash1293 Sep 17, 2025
63a25fd
Merge branch 'main' into streams/data-quality-badge
ElenaStoeva Sep 17, 2025
8c80cdd
Increase bundle size limit for datasetQuality
ElenaStoeva Sep 17, 2025
abfae36
Merge branch 'main' into streams/data-quality-badge
ElenaStoeva Sep 17, 2025
b1db85e
Merge branch 'main' into streams/data-quality-badge
yngrdyn Sep 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/kbn-optimizer/limits.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pageLoadAssetSize:
dashboardMarkdown: 5134
data: 453742
dataQuality: 10387
datasetQuality: 27203
datasetQuality: 30094
dataUsage: 8926
dataViewEditor: 7735
dataViewFieldEditor: 26024
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@
export type { DatasetQualityConfig } from './plugin_config';
export type { FetchOptions } from './fetch_options';
export type { APIClientRequestParamsOf, APIReturnType } from './rest';
export { indexNameToDataStreamParts } from './utils';
export { indexNameToDataStreamParts, mapPercentageToQuality } from './utils';
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@ export type { IDataStreamsStatsClient } from './services/data_streams_stats/type
export function plugin() {
return new DatasetQualityPlugin();
}

export { DatasetQualityIndicator } from './components/quality_indicator';
export { calculatePercentage } from './utils/calculate_percentage';
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import React from 'react';
import { mapPercentageToQuality } from '@kbn/dataset-quality-plugin/common';
import { DatasetQualityIndicator, calculatePercentage } from '@kbn/dataset-quality-plugin/public';
import useAsync from 'react-use/lib/useAsync';
import { esqlResultToTimeseries } from '../../util/esql_result_to_timeseries';
import type { StreamHistogramFetch } from '../../hooks/use_streams_histogram_fetch';

export function DataQualityColumn({
histogramQueryFetch,
}: {
histogramQueryFetch: StreamHistogramFetch;
}) {
const histogramQueryResult = useAsync(() => histogramQueryFetch.docCount, [histogramQueryFetch]);
const failedDocsResult = useAsync(
() => histogramQueryFetch.failedDocCount,
[histogramQueryFetch]
);
const degradedDocsResult = useAsync(
() => histogramQueryFetch.degradedDocCount,
[histogramQueryFetch]
);

const allTimeseries = React.useMemo(
() =>
esqlResultToTimeseries({
result: histogramQueryResult,
metricNames: ['doc_count'],
}),
[histogramQueryResult]
);

const docCount = React.useMemo(
() =>
allTimeseries.reduce(
(acc, series) => acc + series.data.reduce((acc2, item) => acc2 + (item.doc_count || 0), 0),
0
),
[allTimeseries]
);

const degradedDocCount = degradedDocsResult?.value
? Number(degradedDocsResult.value?.values?.[0]?.[0])
: 0;
const failedDocCount = failedDocsResult?.value
? Number(failedDocsResult.value?.values?.[0]?.[0])
: 0;

const degradedPercentage = calculatePercentage({
totalDocs: docCount,
count: degradedDocCount,
});

const failedPercentage = calculatePercentage({
totalDocs: docCount,
count: failedDocCount,
});

const quality = mapPercentageToQuality([degradedPercentage, failedPercentage]);

const isLoading =
histogramQueryResult.loading || failedDocsResult?.loading || degradedDocsResult.loading;

return <DatasetQualityIndicator quality={quality} isLoading={isLoading} />;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,56 +26,36 @@ import {
} from '@elastic/charts';
import { useElasticChartsTheme } from '@kbn/charts-theme';
import { i18n } from '@kbn/i18n';
import { useStreamsAppFetch } from '../../hooks/use_streams_app_fetch';
import { useKibana } from '../../hooks/use_kibana';
import useAsync from 'react-use/lib/useAsync';
import { esqlResultToTimeseries } from '../../util/esql_result_to_timeseries';
import { useTimefilter } from '../../hooks/use_timefilter';
import type { useTimefilter } from '../../hooks/use_timefilter';
import { TooltipOrPopoverIcon } from '../tooltip_popover_icon/tooltip_popover_icon';
import { getFormattedError } from '../../util/errors';
import type { StreamHistogramFetch } from '../../hooks/use_streams_histogram_fetch';

export function DocumentsColumn({
indexPattern,
histogramQueryFetch,
timeState,
numDataPoints,
}: {
indexPattern: string;
histogramQueryFetch: StreamHistogramFetch;
timeState: ReturnType<typeof useTimefilter>['timeState'];
numDataPoints: number;
}) {
const { streamsRepositoryClient } = useKibana().dependencies.start.streams;
const chartBaseTheme = useElasticChartsTheme();
const { euiTheme } = useEuiTheme();

const { timeState } = useTimefilter();

const minInterval = Math.floor((timeState.end - timeState.start) / numDataPoints);

const histogramQueryFetch = useStreamsAppFetch(
async ({ signal, timeState: { start, end } }) => {
return streamsRepositoryClient.fetch('POST /internal/streams/esql', {
params: {
body: {
operationName: 'get_doc_count_for_stream',
query: `FROM ${indexPattern} | STATS doc_count = COUNT(*) BY @timestamp = BUCKET(@timestamp, ${minInterval} ms)`,
start,
end,
},
},
signal,
});
},
[streamsRepositoryClient, indexPattern, minInterval],
{
withTimeRange: true,
disableToastOnError: true,
}
);
const histogramQueryResult = useAsync(() => histogramQueryFetch.docCount, [histogramQueryFetch]);

const allTimeseries = React.useMemo(
() =>
esqlResultToTimeseries({
result: histogramQueryFetch,
result: histogramQueryResult,
metricNames: ['doc_count'],
}),
[histogramQueryFetch]
[histogramQueryResult]
);

const docCount = React.useMemo(
Expand All @@ -90,14 +70,15 @@ export function DocumentsColumn({
const hasData = docCount > 0;

const xFormatter = niceTimeFormatter([timeState.start, timeState.end]);
const minInterval = Math.floor((timeState.end - timeState.start) / numDataPoints);

const noDocCountData = histogramQueryFetch.error ? '' : '-';
const noDocCountData = histogramQueryResult.error ? '' : '-';

const noHistogramData = histogramQueryFetch.error ? (
const noHistogramData = histogramQueryResult.error ? (
<TooltipOrPopoverIcon
dataTestSubj="streamsDocCount-error"
icon="warning"
title={getFormattedError(histogramQueryFetch.error).message}
title={getFormattedError(histogramQueryResult.error).message}
mode="popover"
iconColor="danger"
/>
Expand Down Expand Up @@ -126,7 +107,7 @@ export function DocumentsColumn({
role="group"
aria-label={cellAriaLabel}
>
{histogramQueryFetch.loading ? (
{histogramQueryResult.loading ? (
<LoadingPlaceholder />
) : (
<>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ export const DOCUMENTS_COLUMN_HEADER = i18n.translate(
{ defaultMessage: 'Documents' }
);

export const DATA_QUALITY_COLUMN_HEADER = i18n.translate(
'xpack.streams.streamsTreeTable.dataQualityColumnName',
{ defaultMessage: 'Data Quality' }
);

export const RETENTION_COLUMN_HEADER = i18n.translate(
'xpack.streams.streamsTreeTable.retentionColumnName',
{ defaultMessage: 'Retention' }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ import type { TableRow, SortableField } from './utils';
import { buildStreamRows, asTrees, enrichStream, shouldComposeTree } from './utils';
import { StreamsAppSearchBar } from '../streams_app_search_bar';
import { DocumentsColumn } from './documents_column';
import { DataQualityColumn } from './data_quality_column';
import { useStreamsAppRouter } from '../../hooks/use_streams_app_router';
import { useStreamHistogramFetch } from '../../hooks/use_streams_histogram_fetch';
import { useTimefilter } from '../../hooks/use_timefilter';
import { RetentionColumn } from './retention_column';
import {
NAME_COLUMN_HEADER,
Expand All @@ -33,6 +36,7 @@ import {
STREAMS_TABLE_CAPTION_ARIA_LABEL,
RETENTION_COLUMN_HEADER_ARIA_LABEL,
NO_STREAMS_MESSAGE,
DATA_QUALITY_COLUMN_HEADER,
DOCUMENTS_COLUMN_HEADER,
} from './translations';
import { DiscoverBadgeButton } from '../stream_badges';
Expand All @@ -46,6 +50,7 @@ export function StreamsTreeTable({
}) {
const router = useStreamsAppRouter();
const { euiTheme } = useEuiTheme();
const { timeState } = useTimefilter();

const [searchQuery, setSearchQuery] = useState('');
const [sortField, setSortField] = useState<SortableField>('nameSortKey');
Expand Down Expand Up @@ -77,6 +82,10 @@ export function StreamsTreeTable({
}
};

const numDataPoints = 25;

const { getStreamDocCounts } = useStreamHistogramFetch(numDataPoints);

const sorting = {
sort: {
field: sortField,
Expand Down Expand Up @@ -142,7 +151,23 @@ export function StreamsTreeTable({
dataType: 'number',
render: (_: unknown, item: TableRow) =>
item.data_stream ? (
<DocumentsColumn indexPattern={item.stream.name} numDataPoints={25} />
<DocumentsColumn
indexPattern={item.stream.name}
histogramQueryFetch={getStreamDocCounts(item.stream.name)}
timeState={timeState}
numDataPoints={numDataPoints}
/>
) : null,
},
{
field: 'dataQuality',
name: DATA_QUALITY_COLUMN_HEADER,
width: '150px',
sortable: false,
dataType: 'number',
render: (_: unknown, item: TableRow) =>
item.data_stream ? (
<DataQualityColumn histogramQueryFetch={getStreamDocCounts(item.stream.name)} />
) : null,
},
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { UnparsedEsqlResponse } from '@kbn/traced-es-client';
import { useEffect, useRef } from 'react';
import { useKibana } from './use_kibana';
import { useTimefilter } from './use_timefilter';

export interface StreamHistogramFetch {
docCount: Promise<UnparsedEsqlResponse>;
failedDocCount: Promise<UnparsedEsqlResponse>;
degradedDocCount: Promise<UnparsedEsqlResponse>;
}

export function useStreamHistogramFetch(numDataPoints: number): {
getStreamDocCounts(streamName: string): StreamHistogramFetch;
} {
const { timeState, timeState$ } = useTimefilter();
const { streamsRepositoryClient } = useKibana().dependencies.start.streams;
const promiseCache = useRef<Partial<Record<string, StreamHistogramFetch>>>({});
const abortControllerRef = useRef<AbortController>();

if (!abortControllerRef.current) {
abortControllerRef.current = new AbortController();
}

useEffect(() => {
return () => {
abortControllerRef.current?.abort();
};
}, []);

useEffect(() => {
const subscription = timeState$.subscribe({
next: ({ kind }) => {
const shouldRefresh = kind !== 'initial';

if (shouldRefresh) {
promiseCache.current = {};
abortControllerRef.current?.abort();
abortControllerRef.current = new AbortController();
}
},
});
return () => {
subscription.unsubscribe();
};
}, [timeState$]);

return {
getStreamDocCounts(streamName: string) {
if (promiseCache.current[streamName]) {
return promiseCache.current[streamName] as StreamHistogramFetch;
}

const abortController = abortControllerRef.current;

if (!abortController) {
throw new Error('Abort controller not set');
}

const minInterval = Math.floor((timeState.end - timeState.start) / numDataPoints);

const countPromise = streamsRepositoryClient.fetch('POST /internal/streams/esql', {
params: {
body: {
operationName: 'get_doc_count_for_stream',
query: `FROM ${streamName},${streamName}::failures | STATS doc_count = COUNT(*) BY @timestamp = BUCKET(@timestamp, ${minInterval} ms)`,
start: timeState.start,
end: timeState.end,
},
},
signal: abortController.signal,
});

const failedCountPromise = streamsRepositoryClient.fetch('POST /internal/streams/esql', {
params: {
body: {
operationName: 'get_failed_doc_count_for_stream',
query: `FROM ${streamName}::failures | STATS failed_doc_count = count(*)`,
start: timeState.start,
end: timeState.end,
},
},
signal: abortController.signal,
});

const degradedCountPromise = streamsRepositoryClient.fetch('POST /internal/streams/esql', {
params: {
body: {
operationName: 'get_degraded_doc_count_for_stream',
query: `FROM ${streamName} METADATA _ignored | WHERE _ignored IS NOT NULL | STATS degraded_doc_count = count(*)`,
start: timeState.start,
end: timeState.end,
},
},
signal: abortController.signal,
});

const histogramFetch = {
docCount: countPromise,
failedDocCount: failedCountPromise,
degradedDocCount: degradedCountPromise,
};

promiseCache.current[streamName] = histogramFetch;

return histogramFetch;
},
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import type { AbortableAsyncState } from '@kbn/react-hooks';
import type { UnparsedEsqlResponse } from '@kbn/traced-es-client';
import { orderBy } from 'lodash';
import type { AsyncState } from 'react-use/lib/useAsync';

interface Timeseries<T extends string> {
id: string;
Expand All @@ -19,7 +20,7 @@ export function esqlResultToTimeseries<T extends string>({
result,
metricNames,
}: {
result: AbortableAsyncState<UnparsedEsqlResponse>;
result: AsyncState<UnparsedEsqlResponse> | AbortableAsyncState<UnparsedEsqlResponse>;
metricNames: T[];
}): Array<Timeseries<T>> {
const columns = result.value?.columns;
Expand Down