Skip to content
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
1831378
[Streams] Add data quality badge to Streams table
ElenaStoeva Sep 12, 2025
6847eb0
Merge branch 'main' into streams/data-quality-badge
ElenaStoeva Sep 12, 2025
cee73e4
Handle failed requests
ElenaStoeva Sep 15, 2025
3c73798
Merge branch 'main' into streams/data-quality-badge
ElenaStoeva Sep 15, 2025
f88420e
[CI] Auto-commit changed files from 'node scripts/yarn_deduplicate'
kibanamachine Sep 15, 2025
0d4cb5f
[CI] Auto-commit changed files from 'node scripts/eslint_all_files --…
kibanamachine Sep 15, 2025
7217c4b
Reuse badge component from Data quality
ElenaStoeva Sep 15, 2025
5636dd5
Address feedback
ElenaStoeva Sep 15, 2025
4d6ef15
Move indicator to package
ElenaStoeva Sep 15, 2025
9a34754
[CI] Auto-commit changed files from 'node scripts/eslint_all_files --…
kibanamachine Sep 15, 2025
32a4703
Fix ci errors
ElenaStoeva Sep 15, 2025
e54eab4
Merge branch 'streams/data-quality-badge' of https://github.com/Elena…
ElenaStoeva Sep 15, 2025
cd87ba5
Revert "Move indicator to package"
ElenaStoeva Sep 16, 2025
c8ab667
Address feedback
ElenaStoeva Sep 16, 2025
14efc9b
[CI] Auto-commit changed files from 'node scripts/yarn_deduplicate'
kibanamachine Sep 16, 2025
2de9eb8
[CI] Auto-commit changed files from 'node scripts/eslint_all_files --…
kibanamachine Sep 16, 2025
0c19f99
[CI] Auto-commit changed files from 'node scripts/yarn_deduplicate'
kibanamachine Sep 16, 2025
448f9d5
Remove types from package
ElenaStoeva Sep 16, 2025
d480e5f
Add failed docs to histogram query
ElenaStoeva Sep 16, 2025
27bce88
Revert exporting to package
ElenaStoeva Sep 16, 2025
1ed398b
Merge branch 'main' into streams/data-quality-badge
ElenaStoeva Sep 16, 2025
f2599e6
Revert translation changes
ElenaStoeva Sep 16, 2025
e81687a
Remove constants from package
ElenaStoeva Sep 16, 2025
d0998b0
Update bundle size limit
ElenaStoeva Sep 16, 2025
878168b
Merge branch 'main' into streams/data-quality-badge
ElenaStoeva Sep 16, 2025
ab1eda4
centralize fetching
flash1293 Sep 17, 2025
2eae0d7
remove debug thingy
flash1293 Sep 17, 2025
982f1ad
Update x-pack/platform/plugins/shared/streams_app/public/components/s…
ElenaStoeva Sep 17, 2025
f07c843
Update x-pack/platform/plugins/shared/streams_app/public/components/s…
ElenaStoeva Sep 17, 2025
ef94bb5
Update x-pack/platform/plugins/shared/streams_app/public/hooks/use_st…
ElenaStoeva Sep 17, 2025
3ed5d15
Update x-pack/platform/plugins/shared/streams_app/public/components/s…
ElenaStoeva Sep 17, 2025
54d2770
Address feedback
ElenaStoeva Sep 17, 2025
bef0633
fix type
flash1293 Sep 17, 2025
63a25fd
Merge branch 'main' into streams/data-quality-badge
ElenaStoeva Sep 17, 2025
8c80cdd
Increase bundle size limit for datasetQuality
ElenaStoeva Sep 17, 2025
abfae36
Merge branch 'main' into streams/data-quality-badge
ElenaStoeva Sep 17, 2025
b1db85e
Merge branch 'main' into streams/data-quality-badge
yngrdyn Sep 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/kbn-optimizer/limits.yml
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ pageLoadAssetSize:
stackAlerts: 31499
stackConnectors: 67939
streams: 8494
streamsApp: 12431
streamsApp: 13748
streamsAppWrapper: 5787
synthetics: 31571
telemetry: 25755
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@
export type { DatasetQualityConfig } from './plugin_config';
export type { FetchOptions } from './fetch_options';
export type { APIClientRequestParamsOf, APIReturnType } from './rest';
export { indexNameToDataStreamParts } from './utils';
export { indexNameToDataStreamParts, mapPercentageToQuality } from './utils';
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@ export type { IDataStreamsStatsClient } from './services/data_streams_stats/type
export function plugin() {
return new DatasetQualityPlugin();
}

export { DatasetQualityIndicator } from './components/quality_indicator';
export { calculatePercentage } from './utils/calculate_percentage';
2 changes: 2 additions & 0 deletions x-pack/platform/plugins/shared/streams/common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ export const STREAMS_UI_PRIVILEGES = {
show: 'show',
} as const;

export const FAILURE_STORE_PRIVILEGE = 'read_failure_store';

/**
* Tiered features
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ import type { estypes } from '@elastic/elasticsearch';
import type { ClassicIngestStreamEffectiveLifecycle } from '@kbn/streams-schema';
import { Streams } from '@kbn/streams-schema';
import { processAsyncInChunks } from '../../../../utils/process_async_in_chunks';
import { STREAMS_API_PRIVILEGES } from '../../../../../common/constants';
import { STREAMS_API_PRIVILEGES, FAILURE_STORE_PRIVILEGE } from '../../../../../common/constants';
import { createServerRoute } from '../../../create_server_route';
import { getDataStreamLifecycle } from '../../../../lib/streams/stream_crud';

export interface ListStreamDetail {
stream: Streams.all.Definition;
effective_lifecycle?: ClassicIngestStreamEffectiveLifecycle;
data_stream?: estypes.IndicesDataStream;
can_read_failure_store?: boolean;
}

export const listStreamsRoute = createServerRoute({
Expand All @@ -38,6 +39,10 @@ export const listStreamsRoute = createServerRoute({

const streamNames = streams.filter(({ exists }) => exists).map(({ stream }) => stream.name);

const failureStorePrivileges = await scopedClusterClient.asCurrentUser.security.hasPrivileges({
index: [{ names: streamNames, privileges: [FAILURE_STORE_PRIVILEGE] }],
});

const dataStreams = await processAsyncInChunks(streamNames, (streamNamesChunk) =>
scopedClusterClient.asCurrentUser.indices.getDataStream({ name: streamNamesChunk })
);
Expand All @@ -53,6 +58,8 @@ export const listStreamsRoute = createServerRoute({
stream,
effective_lifecycle: getDataStreamLifecycle(match ?? null),
data_stream: match,
can_read_failure_store:
failureStorePrivileges.index?.[stream.name]?.[FAILURE_STORE_PRIVILEGE],
});
return acc;
}, []);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import React from 'react';
import { mapPercentageToQuality } from '@kbn/dataset-quality-plugin/common';
import { DatasetQualityIndicator, calculatePercentage } from '@kbn/dataset-quality-plugin/public';
import useAsync from 'react-use/lib/useAsync';
import { esqlResultToTimeseries } from '../../util/esql_result_to_timeseries';
import type { StreamHistogramFetch } from '../../hooks/use_streams_histogram_fetch';

export function DataQualityColumn({
indexPattern,
histogramQueryFetch,
considerFailedQuality,
}: {
indexPattern: string;
histogramQueryFetch: StreamHistogramFetch;
considerFailedQuality?: boolean;
}) {
const histogramQueryResult = useAsync(() => histogramQueryFetch.docCount, [histogramQueryFetch]);
const failedDocsResult = useAsync(
() => histogramQueryFetch.failedDocCount,
[histogramQueryFetch]
);
const degradedDocsResult = useAsync(
() => histogramQueryFetch.degradedDocCount,
[histogramQueryFetch]
);

const allTimeseries = React.useMemo(
() =>
esqlResultToTimeseries({
result: histogramQueryResult,
metricNames: ['doc_count'],
}),
[histogramQueryResult]
);

const docCount = React.useMemo(
() =>
allTimeseries.reduce(
(acc, series) => acc + series.data.reduce((acc2, item) => acc2 + (item.doc_count || 0), 0),
0
),
[allTimeseries]
);

const degradedDocCount = failedDocsResult?.value
? Number(failedDocsResult.value?.values?.[0]?.[0])
: 0;
const failedDocCount = degradedDocsResult?.value
? Number(degradedDocsResult.value.values?.[0]?.[0])
: 0;

const degradedPercentage = calculatePercentage({
totalDocs: docCount,
count: degradedDocCount,
});

const failedPercentage = calculatePercentage({
totalDocs: docCount,
count: failedDocCount,
});

const quality = considerFailedQuality
? mapPercentageToQuality([degradedPercentage, failedPercentage])
: mapPercentageToQuality([degradedPercentage]);

const isLoading =
histogramQueryResult.loading || failedDocsResult?.loading || degradedDocsResult.loading;

return <DatasetQualityIndicator quality={quality} isLoading={isLoading} />;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,56 +26,36 @@ import {
} from '@elastic/charts';
import { useElasticChartsTheme } from '@kbn/charts-theme';
import { i18n } from '@kbn/i18n';
import { useStreamsAppFetch } from '../../hooks/use_streams_app_fetch';
import { useKibana } from '../../hooks/use_kibana';
import useAsync from 'react-use/lib/useAsync';
import { esqlResultToTimeseries } from '../../util/esql_result_to_timeseries';
import { useTimefilter } from '../../hooks/use_timefilter';
import type { useTimefilter } from '../../hooks/use_timefilter';
import { TooltipOrPopoverIcon } from '../tooltip_popover_icon/tooltip_popover_icon';
import { getFormattedError } from '../../util/errors';
import type { StreamHistogramFetch } from '../../hooks/use_streams_histogram_fetch';

export function DocumentsColumn({
indexPattern,
histogramQueryFetch,
timeState,
numDataPoints,
}: {
indexPattern: string;
histogramQueryFetch: StreamHistogramFetch;
timeState: ReturnType<typeof useTimefilter>['timeState'];
numDataPoints: number;
}) {
const { streamsRepositoryClient } = useKibana().dependencies.start.streams;
const chartBaseTheme = useElasticChartsTheme();
const { euiTheme } = useEuiTheme();

const { timeState } = useTimefilter();

const minInterval = Math.floor((timeState.end - timeState.start) / numDataPoints);

const histogramQueryFetch = useStreamsAppFetch(
async ({ signal, timeState: { start, end } }) => {
return streamsRepositoryClient.fetch('POST /internal/streams/esql', {
params: {
body: {
operationName: 'get_doc_count_for_stream',
query: `FROM ${indexPattern} | STATS doc_count = COUNT(*) BY @timestamp = BUCKET(@timestamp, ${minInterval} ms)`,
start,
end,
},
},
signal,
});
},
[streamsRepositoryClient, indexPattern, minInterval],
{
withTimeRange: true,
disableToastOnError: true,
}
);
const histogramQueryResult = useAsync(() => histogramQueryFetch.docCount, [histogramQueryFetch]);

const allTimeseries = React.useMemo(
() =>
esqlResultToTimeseries({
result: histogramQueryFetch,
result: histogramQueryResult,
metricNames: ['doc_count'],
}),
[histogramQueryFetch]
[histogramQueryResult]
);

const docCount = React.useMemo(
Expand All @@ -90,14 +70,15 @@ export function DocumentsColumn({
const hasData = docCount > 0;

const xFormatter = niceTimeFormatter([timeState.start, timeState.end]);
const minInterval = Math.floor((timeState.end - timeState.start) / numDataPoints);

const noDocCountData = histogramQueryFetch.error ? '' : '-';
const noDocCountData = histogramQueryResult.error ? '' : '-';

const noHistogramData = histogramQueryFetch.error ? (
const noHistogramData = histogramQueryResult.error ? (
<TooltipOrPopoverIcon
dataTestSubj="streamsDocCount-error"
icon="warning"
title={getFormattedError(histogramQueryFetch.error).message}
title={getFormattedError(histogramQueryResult.error).message}
mode="popover"
iconColor="danger"
/>
Expand Down Expand Up @@ -126,7 +107,7 @@ export function DocumentsColumn({
role="group"
aria-label={cellAriaLabel}
>
{histogramQueryFetch.loading ? (
{histogramQueryResult.loading ? (
<LoadingPlaceholder />
) : (
<>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ export const DOCUMENTS_COLUMN_HEADER = i18n.translate(
{ defaultMessage: 'Documents' }
);

export const DATA_QUALITY_COLUMN_HEADER = i18n.translate(
'xpack.streams.streamsTreeTable.dataQualityColumnName',
{ defaultMessage: 'Data Quality' }
);

export const RETENTION_COLUMN_HEADER = i18n.translate(
'xpack.streams.streamsTreeTable.retentionColumnName',
{ defaultMessage: 'Retention' }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ import type { TableRow, SortableField } from './utils';
import { buildStreamRows, asTrees, enrichStream, shouldComposeTree } from './utils';
import { StreamsAppSearchBar } from '../streams_app_search_bar';
import { DocumentsColumn } from './documents_column';
import { DataQualityColumn } from './data_quality_column';
import { useStreamsAppRouter } from '../../hooks/use_streams_app_router';
import { useStreamHistogramFetch } from '../../hooks/use_streams_histogram_fetch';
import { useTimefilter } from '../../hooks/use_timefilter';
import { RetentionColumn } from './retention_column';
import {
NAME_COLUMN_HEADER,
Expand All @@ -33,6 +36,7 @@ import {
STREAMS_TABLE_CAPTION_ARIA_LABEL,
RETENTION_COLUMN_HEADER_ARIA_LABEL,
NO_STREAMS_MESSAGE,
DATA_QUALITY_COLUMN_HEADER,
DOCUMENTS_COLUMN_HEADER,
} from './translations';
import { DiscoverBadgeButton } from '../stream_badges';
Expand All @@ -46,6 +50,7 @@ export function StreamsTreeTable({
}) {
const router = useStreamsAppRouter();
const { euiTheme } = useEuiTheme();
const { timeState } = useTimefilter();

const [searchQuery, setSearchQuery] = useState('');
const [sortField, setSortField] = useState<SortableField>('nameSortKey');
Expand Down Expand Up @@ -77,6 +82,10 @@ export function StreamsTreeTable({
}
};

const numDataPoints = 25;

const { getStreamDocCounts } = useStreamHistogramFetch(numDataPoints);

const sorting = {
sort: {
field: sortField,
Expand Down Expand Up @@ -142,7 +151,29 @@ export function StreamsTreeTable({
dataType: 'number',
render: (_: unknown, item: TableRow) =>
item.data_stream ? (
<DocumentsColumn indexPattern={item.stream.name} numDataPoints={25} />
<DocumentsColumn
indexPattern={item.stream.name}
histogramQueryFetch={getStreamDocCounts(item.stream.name)}
timeState={timeState}
numDataPoints={numDataPoints}
/>
) : null,
},
{
field: 'dataQuality',
name: DATA_QUALITY_COLUMN_HEADER,
width: '150px',
sortable: false,
dataType: 'number',
render: (_: unknown, item: TableRow) =>
item.data_stream ? (
<DataQualityColumn
indexPattern={item.stream.name}
histogramQueryFetch={getStreamDocCounts(item.stream.name)}
considerFailedQuality={
item.can_read_failure_store && item.data_stream?.failure_store?.enabled
}
/>
) : null,
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ export const enrichStream = (node: StreamTree | ListStreamDetail): EnrichedStrea
stream: node.stream,
effective_lifecycle: node.effective_lifecycle,
data_stream: node.data_stream,
can_read_failure_store: node.can_read_failure_store,
nameSortKey,
documentsCount: 0,
retentionMs,
Expand Down
Loading