From d4d29df8f3420e4a47c8e8049c68c231fb944880 Mon Sep 17 00:00:00 2001 From: Kevin Delemme Date: Wed, 3 Dec 2025 14:09:19 -0500 Subject: [PATCH 1/6] Remove unused params --- .../observability/plugins/slo/server/plugin.ts | 2 +- .../tasks/bulk_delete/bulk_delete_task.ts | 18 +++++------------- 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/x-pack/solutions/observability/plugins/slo/server/plugin.ts b/x-pack/solutions/observability/plugins/slo/server/plugin.ts index e43060478c9e3..32c7ef119f3f0 100644 --- a/x-pack/solutions/observability/plugins/slo/server/plugin.ts +++ b/x-pack/solutions/observability/plugins/slo/server/plugin.ts @@ -242,7 +242,7 @@ export class SLOPlugin new BulkDeleteTask({ core, - plugins: mappedPlugins, + taskManager: plugins.taskManager, logFactory: this.initContext.logger, }); diff --git a/x-pack/solutions/observability/plugins/slo/server/services/tasks/bulk_delete/bulk_delete_task.ts b/x-pack/solutions/observability/plugins/slo/server/services/tasks/bulk_delete/bulk_delete_task.ts index 6ac45b5c41580..bf0860fdf0d47 100644 --- a/x-pack/solutions/observability/plugins/slo/server/services/tasks/bulk_delete/bulk_delete_task.ts +++ b/x-pack/solutions/observability/plugins/slo/server/services/tasks/bulk_delete/bulk_delete_task.ts @@ -7,9 +7,9 @@ import { type CoreSetup, type Logger, type LoggerFactory } from '@kbn/core/server'; import type { BulkDeleteParams, BulkDeleteStatusResponse } from '@kbn/slo-schema'; -import type { RunContext } from '@kbn/task-manager-plugin/server'; +import type { RunContext, TaskManagerSetupContract } from '@kbn/task-manager-plugin/server'; import type { IndicatorTypes } from '../../../domain/models'; -import type { SLOPluginSetupDependencies, SLOPluginStartDependencies } from '../../../types'; +import type { SLOPluginStartDependencies } from '../../../types'; import { DeleteSLO } from '../../delete_slo'; import { KibanaSavedObjectsSLORepository } from '../../slo_repository'; import { DefaultSummaryTransformGenerator } from '../../summary_transform_generator/summary_transform_generator'; @@ -23,27 +23,19 @@ export const TYPE = 'slo:bulk-delete-task'; interface TaskSetupContract { core: CoreSetup; logFactory: LoggerFactory; - plugins: { - [key in keyof SLOPluginSetupDependencies]: { - setup: Required[key]; - }; - } & { - [key in keyof SLOPluginStartDependencies]: { - start: () => Promise[key]>; - }; - }; + taskManager: TaskManagerSetupContract; } export class BulkDeleteTask { private logger: Logger; constructor(setupContract: TaskSetupContract) { - const { core, plugins, logFactory } = setupContract; + const { core, taskManager, logFactory } = setupContract; this.logger = logFactory.get(TYPE); this.logger.debug('Registering task with [10m] timeout'); - plugins.taskManager.setup.registerTaskDefinitions({ + taskManager.registerTaskDefinitions({ [TYPE]: { title: 'SLO bulk delete', timeout: '10m', From bee140d3724b4bdfb4708266d7870c959ed50b74 Mon Sep 17 00:00:00 2001 From: Kevin Delemme Date: Wed, 3 Dec 2025 14:14:50 -0500 Subject: [PATCH 2/6] Move temp summary cleanup task --- x-pack/solutions/observability/plugins/slo/server/plugin.ts | 2 +- .../__snapshots__/clean_up_temp_summary.test.ts.snap | 0 .../temp_summary_cleanup_task}/clean_up_temp_summary.test.ts | 0 .../temp_summary_cleanup_task}/clean_up_temp_summary.ts | 2 +- .../temp_summary_cleanup_task.ts | 4 ++-- 5 files changed, 4 insertions(+), 4 deletions(-) rename x-pack/solutions/observability/plugins/slo/server/services/{management => tasks/temp_summary_cleanup_task}/__snapshots__/clean_up_temp_summary.test.ts.snap (100%) rename x-pack/solutions/observability/plugins/slo/server/services/{management => tasks/temp_summary_cleanup_task}/clean_up_temp_summary.test.ts (100%) rename x-pack/solutions/observability/plugins/slo/server/services/{management => tasks/temp_summary_cleanup_task}/clean_up_temp_summary.ts (99%) rename x-pack/solutions/observability/plugins/slo/server/services/tasks/{ => temp_summary_cleanup_task}/temp_summary_cleanup_task.ts (97%) diff --git a/x-pack/solutions/observability/plugins/slo/server/plugin.ts b/x-pack/solutions/observability/plugins/slo/server/plugin.ts index 32c7ef119f3f0..17902526166b6 100644 --- a/x-pack/solutions/observability/plugins/slo/server/plugin.ts +++ b/x-pack/solutions/observability/plugins/slo/server/plugin.ts @@ -37,7 +37,7 @@ import { import { DefaultSummaryTransformGenerator } from './services/summary_transform_generator/summary_transform_generator'; import { BulkDeleteTask } from './services/tasks/bulk_delete/bulk_delete_task'; import { SloOrphanSummaryCleanupTask } from './services/tasks/orphan_summary_cleanup_task'; -import { TempSummaryCleanupTask } from './services/tasks/temp_summary_cleanup_task'; +import { TempSummaryCleanupTask } from './services/tasks/temp_summary_cleanup_task/temp_summary_cleanup_task'; import { createTransformGenerators } from './services/transform_generators'; import type { SLOConfig, diff --git a/x-pack/solutions/observability/plugins/slo/server/services/management/__snapshots__/clean_up_temp_summary.test.ts.snap b/x-pack/solutions/observability/plugins/slo/server/services/tasks/temp_summary_cleanup_task/__snapshots__/clean_up_temp_summary.test.ts.snap similarity index 100% rename from x-pack/solutions/observability/plugins/slo/server/services/management/__snapshots__/clean_up_temp_summary.test.ts.snap rename to x-pack/solutions/observability/plugins/slo/server/services/tasks/temp_summary_cleanup_task/__snapshots__/clean_up_temp_summary.test.ts.snap diff --git a/x-pack/solutions/observability/plugins/slo/server/services/management/clean_up_temp_summary.test.ts b/x-pack/solutions/observability/plugins/slo/server/services/tasks/temp_summary_cleanup_task/clean_up_temp_summary.test.ts similarity index 100% rename from x-pack/solutions/observability/plugins/slo/server/services/management/clean_up_temp_summary.test.ts rename to x-pack/solutions/observability/plugins/slo/server/services/tasks/temp_summary_cleanup_task/clean_up_temp_summary.test.ts diff --git a/x-pack/solutions/observability/plugins/slo/server/services/management/clean_up_temp_summary.ts b/x-pack/solutions/observability/plugins/slo/server/services/tasks/temp_summary_cleanup_task/clean_up_temp_summary.ts similarity index 99% rename from x-pack/solutions/observability/plugins/slo/server/services/management/clean_up_temp_summary.ts rename to x-pack/solutions/observability/plugins/slo/server/services/tasks/temp_summary_cleanup_task/clean_up_temp_summary.ts index c9a09681b2f09..e0765991358d0 100644 --- a/x-pack/solutions/observability/plugins/slo/server/services/management/clean_up_temp_summary.ts +++ b/x-pack/solutions/observability/plugins/slo/server/services/tasks/temp_summary_cleanup_task/clean_up_temp_summary.ts @@ -10,7 +10,7 @@ import type { AggregationsCompositeAggregateKey } from '@elastic/elasticsearch/l import { SUMMARY_DESTINATION_INDEX_PATTERN, SUMMARY_TEMP_INDEX_NAME, -} from '../../../common/constants'; +} from '../../../../common/constants'; interface AggBucketKey extends AggregationsCompositeAggregateKey { spaceId: string; diff --git a/x-pack/solutions/observability/plugins/slo/server/services/tasks/temp_summary_cleanup_task.ts b/x-pack/solutions/observability/plugins/slo/server/services/tasks/temp_summary_cleanup_task/temp_summary_cleanup_task.ts similarity index 97% rename from x-pack/solutions/observability/plugins/slo/server/services/tasks/temp_summary_cleanup_task.ts rename to x-pack/solutions/observability/plugins/slo/server/services/tasks/temp_summary_cleanup_task/temp_summary_cleanup_task.ts index f8529272a386d..d2f119d746e64 100644 --- a/x-pack/solutions/observability/plugins/slo/server/services/tasks/temp_summary_cleanup_task.ts +++ b/x-pack/solutions/observability/plugins/slo/server/services/tasks/temp_summary_cleanup_task/temp_summary_cleanup_task.ts @@ -12,8 +12,8 @@ import type { TaskManagerSetupContract, } from '@kbn/task-manager-plugin/server'; import { getDeleteTaskRunResult } from '@kbn/task-manager-plugin/server/task'; -import type { SLOConfig, SLOPluginStartDependencies } from '../../types'; -import { CleanUpTempSummary } from '../management/clean_up_temp_summary'; +import type { SLOConfig, SLOPluginStartDependencies } from '../../../types'; +import { CleanUpTempSummary } from './clean_up_temp_summary'; export const TYPE = 'slo:temp-summary-cleanup-task'; export const VERSION = '1.0.0'; From 446e8c3ed60e1fc823541fab1444494bd9fdb270 Mon Sep 17 00:00:00 2001 From: Kevin Delemme Date: Wed, 3 Dec 2025 14:16:21 -0500 Subject: [PATCH 3/6] Move orphan summary cleanup task files --- .../solutions/observability/plugins/slo/server/plugin.ts | 2 +- .../orphan_summary_cleanup_task.test.ts | 2 +- .../orphan_summary_cleanup_task.ts | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) rename x-pack/solutions/observability/plugins/slo/server/services/tasks/{ => orphan_summary_cleanup_task}/orphan_summary_cleanup_task.test.ts (99%) rename x-pack/solutions/observability/plugins/slo/server/services/tasks/{ => orphan_summary_cleanup_task}/orphan_summary_cleanup_task.ts (96%) diff --git a/x-pack/solutions/observability/plugins/slo/server/plugin.ts b/x-pack/solutions/observability/plugins/slo/server/plugin.ts index 17902526166b6..146c6548e9252 100644 --- a/x-pack/solutions/observability/plugins/slo/server/plugin.ts +++ b/x-pack/solutions/observability/plugins/slo/server/plugin.ts @@ -36,7 +36,7 @@ import { } from './services'; import { DefaultSummaryTransformGenerator } from './services/summary_transform_generator/summary_transform_generator'; import { BulkDeleteTask } from './services/tasks/bulk_delete/bulk_delete_task'; -import { SloOrphanSummaryCleanupTask } from './services/tasks/orphan_summary_cleanup_task'; +import { SloOrphanSummaryCleanupTask } from './services/tasks/orphan_summary_cleanup_task/orphan_summary_cleanup_task'; import { TempSummaryCleanupTask } from './services/tasks/temp_summary_cleanup_task/temp_summary_cleanup_task'; import { createTransformGenerators } from './services/transform_generators'; import type { diff --git a/x-pack/solutions/observability/plugins/slo/server/services/tasks/orphan_summary_cleanup_task.test.ts b/x-pack/solutions/observability/plugins/slo/server/services/tasks/orphan_summary_cleanup_task/orphan_summary_cleanup_task.test.ts similarity index 99% rename from x-pack/solutions/observability/plugins/slo/server/services/tasks/orphan_summary_cleanup_task.test.ts rename to x-pack/solutions/observability/plugins/slo/server/services/tasks/orphan_summary_cleanup_task/orphan_summary_cleanup_task.test.ts index 783c6d503dd3a..edb6353682a16 100644 --- a/x-pack/solutions/observability/plugins/slo/server/services/tasks/orphan_summary_cleanup_task.test.ts +++ b/x-pack/solutions/observability/plugins/slo/server/services/tasks/orphan_summary_cleanup_task/orphan_summary_cleanup_task.test.ts @@ -10,7 +10,7 @@ import { savedObjectsClientMock } from '@kbn/core-saved-objects-api-server-mocks import { loggerMock } from '@kbn/logging-mocks'; import { taskManagerMock } from '@kbn/task-manager-plugin/server/mocks'; import { times } from 'lodash'; -import { SUMMARY_DESTINATION_INDEX_PATTERN } from '../../../common/constants'; +import { SUMMARY_DESTINATION_INDEX_PATTERN } from '../../../../common/constants'; import { getDeleteQueryFilter, SloOrphanSummaryCleanupTask } from './orphan_summary_cleanup_task'; const taskManagerSetup = taskManagerMock.createSetup(); diff --git a/x-pack/solutions/observability/plugins/slo/server/services/tasks/orphan_summary_cleanup_task.ts b/x-pack/solutions/observability/plugins/slo/server/services/tasks/orphan_summary_cleanup_task/orphan_summary_cleanup_task.ts similarity index 96% rename from x-pack/solutions/observability/plugins/slo/server/services/tasks/orphan_summary_cleanup_task.ts rename to x-pack/solutions/observability/plugins/slo/server/services/tasks/orphan_summary_cleanup_task/orphan_summary_cleanup_task.ts index 3e9993e81b103..45688381d0604 100644 --- a/x-pack/solutions/observability/plugins/slo/server/services/tasks/orphan_summary_cleanup_task.ts +++ b/x-pack/solutions/observability/plugins/slo/server/services/tasks/orphan_summary_cleanup_task/orphan_summary_cleanup_task.ts @@ -13,10 +13,10 @@ import type { } from '@kbn/task-manager-plugin/server'; import type { AggregationsCompositeAggregateKey } from '@elastic/elasticsearch/lib/api/types'; import { ALL_SPACES_ID } from '@kbn/spaces-plugin/common/constants'; -import type { StoredSLODefinition } from '../../domain/models'; -import { SO_SLO_TYPE } from '../../saved_objects'; -import { SUMMARY_DESTINATION_INDEX_PATTERN } from '../../../common/constants'; -import type { SLOConfig } from '../../types'; +import type { StoredSLODefinition } from '../../../domain/models'; +import { SO_SLO_TYPE } from '../../../saved_objects'; +import { SUMMARY_DESTINATION_INDEX_PATTERN } from '../../../../common/constants'; +import type { SLOConfig } from '../../../types'; export const TASK_TYPE = 'SLO:ORPHAN_SUMMARIES-CLEANUP-TASK'; From b556acd6703baae997d48e38698f081259563717 Mon Sep 17 00:00:00 2001 From: Kevin Delemme Date: Wed, 3 Dec 2025 15:29:57 -0500 Subject: [PATCH 4/6] Complete refactor of orphan summary cleanup task --- .../plugins/slo/server/plugin.ts | 27 +- .../cleanup_orphan_summary.ts | 190 ++++++++++++ .../orphan_summary_cleanup_task.test.ts | 14 +- .../orphan_summary_cleanup_task.ts | 283 ++++++------------ 4 files changed, 295 insertions(+), 219 deletions(-) create mode 100644 x-pack/solutions/observability/plugins/slo/server/services/tasks/orphan_summary_cleanup_task/cleanup_orphan_summary.ts diff --git a/x-pack/solutions/observability/plugins/slo/server/plugin.ts b/x-pack/solutions/observability/plugins/slo/server/plugin.ts index 146c6548e9252..cd51a3e1a816a 100644 --- a/x-pack/solutions/observability/plugins/slo/server/plugin.ts +++ b/x-pack/solutions/observability/plugins/slo/server/plugin.ts @@ -16,10 +16,11 @@ import type { } from '@kbn/core/server'; import { DEFAULT_APP_CATEGORIES, SavedObjectsClient } from '@kbn/core/server'; import { i18n } from '@kbn/i18n'; +import { LockAcquisitionError, LockManagerService } from '@kbn/lock-manager'; import { AlertsLocatorDefinition, sloFeatureId } from '@kbn/observability-plugin/common'; -import { SLO_BURN_RATE_RULE_TYPE_ID, DEPRECATED_ALERTING_CONSUMERS } from '@kbn/rule-data-utils'; +import { DEPRECATED_ALERTING_CONSUMERS, SLO_BURN_RATE_RULE_TYPE_ID } from '@kbn/rule-data-utils'; import { mapValues } from 'lodash'; -import { LockAcquisitionError, LockManagerService } from '@kbn/lock-manager'; +import { LOCK_ID_RESOURCE_INSTALLER } from '../common/constants'; import { getSloClientWithRequest } from './client'; import { registerSloUsageCollector } from './lib/collectors/register'; import { registerBurnRateRule } from './lib/rules/register_burn_rate_rule'; @@ -36,7 +37,7 @@ import { } from './services'; import { DefaultSummaryTransformGenerator } from './services/summary_transform_generator/summary_transform_generator'; import { BulkDeleteTask } from './services/tasks/bulk_delete/bulk_delete_task'; -import { SloOrphanSummaryCleanupTask } from './services/tasks/orphan_summary_cleanup_task/orphan_summary_cleanup_task'; +import { OrphanSummaryCleanupTask } from './services/tasks/orphan_summary_cleanup_task/orphan_summary_cleanup_task'; import { TempSummaryCleanupTask } from './services/tasks/temp_summary_cleanup_task/temp_summary_cleanup_task'; import { createTransformGenerators } from './services/transform_generators'; import type { @@ -46,7 +47,6 @@ import type { SLOServerSetup, SLOServerStart, } from './types'; -import { LOCK_ID_RESOURCE_INSTALLER } from '../common/constants'; const sloRuleTypes = [SLO_BURN_RATE_RULE_TYPE_ID]; @@ -58,7 +58,7 @@ export class SLOPlugin private readonly config: SLOConfig; private readonly isServerless: boolean; private readonly isDev: boolean; - private sloOrphanCleanupTask?: SloOrphanSummaryCleanupTask; + private orphanSummaryCleanupTask?: OrphanSummaryCleanupTask; private tempSummaryCleanupTask?: TempSummaryCleanupTask; constructor(private readonly initContext: PluginInitializerContext) { @@ -227,11 +227,12 @@ export class SLOPlugin } }); - this.sloOrphanCleanupTask = new SloOrphanSummaryCleanupTask( - plugins.taskManager, - this.logger, - this.config - ); + this.orphanSummaryCleanupTask = new OrphanSummaryCleanupTask({ + core, + taskManager: plugins.taskManager, + logFactory: this.initContext.logger, + config: this.config, + }); this.tempSummaryCleanupTask = new TempSummaryCleanupTask({ core, @@ -250,13 +251,9 @@ export class SLOPlugin } public start(core: CoreStart, plugins: SLOPluginStartDependencies): SLOServerStart { - const internalSoClient = new SavedObjectsClient(core.savedObjects.createInternalRepository()); const internalEsClient = core.elasticsearch.client.asInternalUser; - this.sloOrphanCleanupTask - ?.start(plugins.taskManager, internalSoClient, internalEsClient) - .catch(() => {}); - + this.orphanSummaryCleanupTask?.start(plugins).catch(() => {}); this.tempSummaryCleanupTask?.start(plugins).catch(() => {}); return { diff --git a/x-pack/solutions/observability/plugins/slo/server/services/tasks/orphan_summary_cleanup_task/cleanup_orphan_summary.ts b/x-pack/solutions/observability/plugins/slo/server/services/tasks/orphan_summary_cleanup_task/cleanup_orphan_summary.ts new file mode 100644 index 0000000000000..46c9520823ac2 --- /dev/null +++ b/x-pack/solutions/observability/plugins/slo/server/services/tasks/orphan_summary_cleanup_task/cleanup_orphan_summary.ts @@ -0,0 +1,190 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { AggregationsCompositeAggregateKey } from '@elastic/elasticsearch/lib/api/types'; +import type { ElasticsearchClient, Logger, SavedObjectsClient } from '@kbn/core/server'; +import { ALL_SPACES_ID } from '@kbn/spaces-plugin/common/constants'; +import { SUMMARY_DESTINATION_INDEX_PATTERN } from '../../../../common/constants'; +import type { StoredSLODefinition } from '../../../domain/models'; +import { SO_SLO_TYPE } from '../../../saved_objects'; + +interface Dependencies { + esClient: ElasticsearchClient; + soClient: SavedObjectsClient; + logger: Logger; + abortController: AbortController; +} + +interface SLO { + id: string; + revision: number; +} + +type SLOKey = `${SLO['id']}:::${SLO['revision']}`; + +const CHUNK_SIZE = 1000; + +export async function cleanupOrphanSummaries(dependencies: Dependencies) { + const { esClient, logger } = dependencies; + let searchAfter: AggregationsCompositeAggregateKey | undefined; + + do { + const { list, nextSearchAfter } = await fetchUniqueSloFromSummary(searchAfter, dependencies); + searchAfter = nextSearchAfter; + + if (list.length === 0) { + logger.debug(`No more SLO summary items to process`); + return; + } + + const existingDefinitionSet = await findSloDefinitionSet(list, dependencies); + + const nextDelete = list.filter((item) => !existingDefinitionSet.has(getKey(item))); + + if (nextDelete.length > 0) { + logger.debug( + `Deleting ${nextDelete.length} SLO ids from the summary index (including all their instances)` + ); + + await esClient.deleteByQuery({ + index: SUMMARY_DESTINATION_INDEX_PATTERN, + wait_for_completion: false, + conflicts: 'proceed', + slices: 'auto', + query: { + bool: { + should: nextDelete.map(({ id, revision }) => { + return { + bool: { + must: [ + { + term: { + 'slo.id': id, + }, + }, + { + term: { + 'slo.revision': revision, + }, + }, + ], + }, + }; + }), + }, + }, + }); + } + } while (searchAfter); +} + +async function fetchUniqueSloFromSummary( + searchAfter: AggregationsCompositeAggregateKey | undefined, + { logger, esClient, abortController }: Dependencies +): Promise<{ + nextSearchAfter: AggregationsCompositeAggregateKey | undefined; + list: Array; +}> { + logger.debug(`Fetching unique SLO (id, revision) tuples from summary index after ${searchAfter}`); + + const result = await esClient.search< + unknown, + { + id_revision: { + after_key: AggregationsCompositeAggregateKey; + buckets: Array<{ + key: { + id: string; + revision: number; + }; + }>; + }; + } + >( + { + size: 0, + index: SUMMARY_DESTINATION_INDEX_PATTERN, + aggs: { + id_revision: { + composite: { + size: CHUNK_SIZE, + sources: [ + { + id: { + terms: { + field: 'slo.id', + }, + }, + }, + { + revision: { + terms: { + field: 'slo.revision', + }, + }, + }, + ], + after: searchAfter, + }, + }, + }, + }, + { signal: abortController.signal } + ); + + const buckets = result.aggregations?.id_revision.buckets ?? []; + if (buckets.length === 0) { + return { + nextSearchAfter: undefined, + list: [], + }; + } + + return { + nextSearchAfter: + buckets.length < CHUNK_SIZE ? undefined : result.aggregations?.id_revision.after_key, + list: buckets.map(({ key }) => ({ + id: String(key.id), + revision: Number(key.revision), + })), + }; +} + +async function findSloDefinitionSet( + list: Array, + { logger, soClient }: Dependencies +): Promise> { + const response = await soClient.find>({ + type: SO_SLO_TYPE, + page: 1, + perPage: list.length, + filter: `slo.attributes.id:(${list.map((item) => item.id).join(' or ')})`, + namespaces: [ALL_SPACES_ID], + fields: ['id', 'revision'], + }); + + logger.debug( + `Found ${response.total} matching SLO definitions for ${list.length} SLO summary items to check` + ); + + if (response.total === 0) { + return new Set(); + } + + return new Set( + response.saved_objects.map(({ attributes }) => + getKey({ + id: attributes.id, + revision: attributes.revision, + }) + ) + ); +} + +function getKey(item: SLO): SLOKey { + return `${item.id}:::${item.revision}`; +} diff --git a/x-pack/solutions/observability/plugins/slo/server/services/tasks/orphan_summary_cleanup_task/orphan_summary_cleanup_task.test.ts b/x-pack/solutions/observability/plugins/slo/server/services/tasks/orphan_summary_cleanup_task/orphan_summary_cleanup_task.test.ts index edb6353682a16..957654e9aa667 100644 --- a/x-pack/solutions/observability/plugins/slo/server/services/tasks/orphan_summary_cleanup_task/orphan_summary_cleanup_task.test.ts +++ b/x-pack/solutions/observability/plugins/slo/server/services/tasks/orphan_summary_cleanup_task/orphan_summary_cleanup_task.test.ts @@ -11,7 +11,7 @@ import { loggerMock } from '@kbn/logging-mocks'; import { taskManagerMock } from '@kbn/task-manager-plugin/server/mocks'; import { times } from 'lodash'; import { SUMMARY_DESTINATION_INDEX_PATTERN } from '../../../../common/constants'; -import { getDeleteQueryFilter, SloOrphanSummaryCleanupTask } from './orphan_summary_cleanup_task'; +import { getDeleteQueryFilter, OrphanSummaryCleanupTask } from './orphan_summary_cleanup_task'; const taskManagerSetup = taskManagerMock.createSetup(); const taskManagerStart = taskManagerMock.createStart(); @@ -25,7 +25,7 @@ describe('SloSummaryCleanupTask', () => { }); it('should run for empty', async function () { - const task = new SloOrphanSummaryCleanupTask(taskManagerSetup, logger, {} as any); + const task = new OrphanSummaryCleanupTask(taskManagerSetup, logger, {} as any); await task.start(taskManagerStart, soClient, esClient); task.fetchSloSummariesIds = jest.fn().mockReturnValue({ sloSummaryIds: [] }); @@ -35,7 +35,7 @@ describe('SloSummaryCleanupTask', () => { }); it('should run some slos', async function () { - const task = new SloOrphanSummaryCleanupTask(taskManagerSetup, logger, {} as any); + const task = new OrphanSummaryCleanupTask(taskManagerSetup, logger, {} as any); task.findSloDefinitions = jest.fn().mockResolvedValue([ { id: '1', revision: 1 }, { id: '2', revision: 1 }, @@ -76,7 +76,7 @@ describe('SloSummaryCleanupTask', () => { }); it('should run lots of slos', async function () { - const task = new SloOrphanSummaryCleanupTask(taskManagerSetup, logger, {} as any); + const task = new OrphanSummaryCleanupTask(taskManagerSetup, logger, {} as any); task.findSloDefinitions = jest.fn().mockResolvedValue( times(10000, (i) => ({ id: `${i}`, @@ -119,7 +119,7 @@ describe('SloSummaryCleanupTask', () => { }); it('should run when lots of slo defs are there', async function () { - const task = new SloOrphanSummaryCleanupTask(taskManagerSetup, logger, {} as any); + const task = new OrphanSummaryCleanupTask(taskManagerSetup, logger, {} as any); task.findSloDefinitions = jest.fn().mockResolvedValue( times(10000, (i) => ({ id: `${i}`, @@ -197,7 +197,7 @@ describe('SloSummaryCleanupTask', () => { }); it('should run when summaries are way more then defs', async function () { - const task = new SloOrphanSummaryCleanupTask(taskManagerSetup, logger, {} as any); + const task = new OrphanSummaryCleanupTask(taskManagerSetup, logger, {} as any); task.findSloDefinitions = jest.fn().mockResolvedValue( times(100, (i) => ({ id: `${i}`, @@ -273,7 +273,7 @@ describe('SloSummaryCleanupTask', () => { }); it('should run when there are no Slo defs', async function () { - const task = new SloOrphanSummaryCleanupTask(taskManagerSetup, logger, {} as any); + const task = new OrphanSummaryCleanupTask(taskManagerSetup, logger, {} as any); task.findSloDefinitions = jest.fn().mockResolvedValue([]); task.fetchSloSummariesIds = jest.fn().mockImplementation(async (searchKey) => { if (!searchKey) { diff --git a/x-pack/solutions/observability/plugins/slo/server/services/tasks/orphan_summary_cleanup_task/orphan_summary_cleanup_task.ts b/x-pack/solutions/observability/plugins/slo/server/services/tasks/orphan_summary_cleanup_task/orphan_summary_cleanup_task.ts index 45688381d0604..53945a26a004e 100644 --- a/x-pack/solutions/observability/plugins/slo/server/services/tasks/orphan_summary_cleanup_task/orphan_summary_cleanup_task.ts +++ b/x-pack/solutions/observability/plugins/slo/server/services/tasks/orphan_summary_cleanup_task/orphan_summary_cleanup_task.ts @@ -5,58 +5,43 @@ * 2.0. */ -import type { ElasticsearchClient, Logger, SavedObjectsClientContract } from '@kbn/core/server'; +import { errors } from '@elastic/elasticsearch'; +import { + SavedObjectsClient, + type CoreSetup, + type Logger, + type LoggerFactory, +} from '@kbn/core/server'; import type { ConcreteTaskInstance, TaskManagerSetupContract, - TaskManagerStartContract, } from '@kbn/task-manager-plugin/server'; -import type { AggregationsCompositeAggregateKey } from '@elastic/elasticsearch/lib/api/types'; -import { ALL_SPACES_ID } from '@kbn/spaces-plugin/common/constants'; -import type { StoredSLODefinition } from '../../../domain/models'; -import { SO_SLO_TYPE } from '../../../saved_objects'; -import { SUMMARY_DESTINATION_INDEX_PATTERN } from '../../../../common/constants'; -import type { SLOConfig } from '../../../types'; +import { getDeleteTaskRunResult } from '@kbn/task-manager-plugin/server/task'; +import type { SLOConfig, SLOPluginStartDependencies } from '../../../types'; +import { cleanupOrphanSummaries } from './cleanup_orphan_summary'; -export const TASK_TYPE = 'SLO:ORPHAN_SUMMARIES-CLEANUP-TASK'; +export const TYPE = 'slo:orphan-summary-cleanup-task'; -export const getDeleteQueryFilter = ( - sloSummaryIdsToDelete: Array<{ id: string; revision: number }> -) => { - return sloSummaryIdsToDelete.map(({ id, revision }) => { - return { - bool: { - must: [ - { - term: { - 'slo.id': id, - }, - }, - { - term: { - 'slo.revision': Number(revision), - }, - }, - ], - }, - }; - }); -}; +interface TaskSetupContract { + taskManager: TaskManagerSetupContract; + core: CoreSetup; + logFactory: LoggerFactory; + config: SLOConfig; +} -export class SloOrphanSummaryCleanupTask { +export class OrphanSummaryCleanupTask { private logger: Logger; - private taskManager?: TaskManagerStartContract; - private soClient?: SavedObjectsClientContract; - private esClient?: ElasticsearchClient; private config: SLOConfig; + private wasStarted: boolean = false; - constructor(taskManager: TaskManagerSetupContract, logger: Logger, config: SLOConfig) { - this.logger = logger; + constructor(setupContract: TaskSetupContract) { + const { core, config, taskManager, logFactory } = setupContract; + this.logger = logFactory.get(this.taskId); this.config = config; taskManager.registerTaskDefinitions({ - [TASK_TYPE]: { - title: 'SLO Definitions Cleanup Task', + [TYPE]: { + title: 'SLO orphan summary cleanup task', timeout: '3m', maxAttempts: 1, createTaskRunner: ({ @@ -68,7 +53,7 @@ export class SloOrphanSummaryCleanupTask { }) => { return { run: async () => { - return this.runTask(abortController); + return this.runTask(taskInstance, core, abortController); }, cancel: async () => {}, }; @@ -77,181 +62,85 @@ export class SloOrphanSummaryCleanupTask { }); } - public async runTask(abortController: AbortController) { - if (!this.soClient || !this.esClient) { + private get taskId() { + return `${TYPE}:1.0.0`; + } + + public async start(plugins: SLOPluginStartDependencies) { + const hasCorrectLicense = (await plugins.licensing.getLicense()).hasAtLeast('platinum'); + if (!hasCorrectLicense) { + this.logger.debug('Platinum license is required'); return; } - let searchAfterKey: AggregationsCompositeAggregateKey | undefined; - - do { - const { sloSummaryIds, searchAfter } = await this.fetchSloSummariesIds(searchAfterKey); - - if (sloSummaryIds.length === 0) { - return; - } - - searchAfterKey = searchAfter; - - const ids = sloSummaryIds.map(({ id }) => id); - - const sloDefinitions = await this.findSloDefinitions(ids); - - const sloSummaryIdsToDelete = sloSummaryIds.filter( - ({ id, revision }) => - !sloDefinitions.find( - (attributes) => attributes.id === id && attributes.revision === revision - ) - ); - - if (sloSummaryIdsToDelete.length > 0) { - this.logger.debug( - `[SLO] Deleting ${sloSummaryIdsToDelete.length} SLO Summary documents from the summary index` - ); - - await this.esClient.deleteByQuery({ - index: SUMMARY_DESTINATION_INDEX_PATTERN, - wait_for_completion: false, - conflicts: 'proceed', - slices: 'auto', - query: { - bool: { - should: getDeleteQueryFilter(sloSummaryIdsToDelete.sort()), - }, - }, - }); - } - } while (searchAfterKey); - } + if (!plugins.taskManager) { + this.logger.debug('Missing required service during start'); + return; + } - fetchSloSummariesIds = async ( - searchAfter?: AggregationsCompositeAggregateKey - ): Promise<{ - searchAfter?: AggregationsCompositeAggregateKey; - sloSummaryIds: Array<{ id: string; revision: number }>; - }> => { - this.logger.debug(`[TASK] Fetching SLO Summary ids after ${searchAfter}`); - if (!this.esClient) { - return { - searchAfter: undefined, - sloSummaryIds: [], - }; + if (!this.config.sloOrphanSummaryCleanUpTaskEnabled) { + this.logger.debug('Unscheduling task'); + return await plugins.taskManager.removeIfExists(this.taskId); } - const size = 1000; + this.logger.debug('Scheduling task with [1h] interval'); + this.wasStarted = true; - const result = await this.esClient.search< - unknown, - { - slos: { - after_key: AggregationsCompositeAggregateKey; - buckets: Array<{ - key: { - id: string; - revision: number; - }; - }>; - }; - } - >({ - size: 0, - index: SUMMARY_DESTINATION_INDEX_PATTERN, - aggs: { - slos: { - composite: { - size, - sources: [ - { - id: { - terms: { - field: 'slo.id', - }, - }, - }, - { - revision: { - terms: { - field: 'slo.revision', - }, - }, - }, - ], - after: searchAfter, - }, + try { + await plugins.taskManager.ensureScheduled({ + id: this.taskId, + taskType: TYPE, + scope: ['observability', 'slo'], + schedule: { + interval: '1m', }, - }, - }); - - const aggBuckets = result.aggregations?.slos.buckets ?? []; - if (aggBuckets.length === 0) { - return { - searchAfter: undefined, - sloSummaryIds: [], - }; + state: {}, + params: {}, + }); + } catch (e) { + this.logger.error(`Error scheduling task, error: ${e}`); } - - const newSearchAfter = - aggBuckets.length < size ? undefined : result.aggregations?.slos.after_key; - - const sloSummaryIds = aggBuckets.map(({ key }) => { - return { - id: String(key.id), - revision: Number(key.revision), - }; - }); - - return { - searchAfter: newSearchAfter, - sloSummaryIds, - }; - }; - - findSloDefinitions = async (ids: string[]) => { - const sloDefinitions = await this.soClient?.find>({ - type: SO_SLO_TYPE, - page: 1, - perPage: ids.length, - filter: `slo.attributes.id:(${ids.join(' or ')})`, - namespaces: [ALL_SPACES_ID], - fields: ['id', 'revision'], - }); - - return sloDefinitions?.saved_objects.map(({ attributes }) => attributes) ?? []; - }; - - private get taskId() { - return `${TASK_TYPE}:1.0.0`; } - public async start( - taskManager: TaskManagerStartContract, - soClient: SavedObjectsClientContract, - esClient: ElasticsearchClient + public async runTask( + taskInstance: ConcreteTaskInstance, + core: CoreSetup, + abortController: AbortController ) { - this.taskManager = taskManager; - this.soClient = soClient; - this.esClient = esClient; + if (!this.wasStarted) { + this.logger.debug('runTask Aborted. Task not started yet'); + return; + } - if (!taskManager) { + if (taskInstance.id !== this.taskId) { this.logger.debug( - 'Missing required service during startup, skipping orphan-slo-summary-cleanup task.' + `Outdated task version: Got [${taskInstance.id}], current version is [${this.taskId}]` ); - return; + return getDeleteTaskRunResult(); } - if (this.config.sloOrphanSummaryCleanUpTaskEnabled) { - await this.taskManager.ensureScheduled({ - id: this.taskId, - taskType: TASK_TYPE, - schedule: { - interval: '1h', - }, - scope: ['observability', 'slo'], - state: {}, - params: {}, + this.logger.debug(`runTask started`); + + const [coreStart] = await core.getStartServices(); + const esClient = coreStart.elasticsearch.client.asInternalUser; + const internalSoClient = new SavedObjectsClient( + coreStart.savedObjects.createInternalRepository() + ); + + try { + await cleanupOrphanSummaries({ + esClient, + soClient: internalSoClient, + logger: this.logger, + abortController, }); - } else { - await this.taskManager.removeIfExists(this.taskId); + } catch (err) { + if (err instanceof errors.RequestAbortedError) { + this.logger.warn(`Request aborted due to timeout: ${err}`); + + return; + } + this.logger.debug(`Error: ${err}`); } } } From b8ec7ab47ca9c9e13db20c6e847b2ad03cea5b2c Mon Sep 17 00:00:00 2001 From: Kevin Delemme Date: Wed, 3 Dec 2025 15:51:14 -0500 Subject: [PATCH 5/6] refactor test (cursor) --- .../orphan_summary_cleanup_task.test.ts | 486 +++++++++--------- 1 file changed, 230 insertions(+), 256 deletions(-) diff --git a/x-pack/solutions/observability/plugins/slo/server/services/tasks/orphan_summary_cleanup_task/orphan_summary_cleanup_task.test.ts b/x-pack/solutions/observability/plugins/slo/server/services/tasks/orphan_summary_cleanup_task/orphan_summary_cleanup_task.test.ts index 957654e9aa667..e50a973dee633 100644 --- a/x-pack/solutions/observability/plugins/slo/server/services/tasks/orphan_summary_cleanup_task/orphan_summary_cleanup_task.test.ts +++ b/x-pack/solutions/observability/plugins/slo/server/services/tasks/orphan_summary_cleanup_task/orphan_summary_cleanup_task.test.ts @@ -8,327 +8,301 @@ import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks'; import { savedObjectsClientMock } from '@kbn/core-saved-objects-api-server-mocks'; import { loggerMock } from '@kbn/logging-mocks'; -import { taskManagerMock } from '@kbn/task-manager-plugin/server/mocks'; -import { times } from 'lodash'; import { SUMMARY_DESTINATION_INDEX_PATTERN } from '../../../../common/constants'; -import { getDeleteQueryFilter, OrphanSummaryCleanupTask } from './orphan_summary_cleanup_task'; +import { cleanupOrphanSummaries } from './cleanup_orphan_summary'; -const taskManagerSetup = taskManagerMock.createSetup(); -const taskManagerStart = taskManagerMock.createStart(); -const logger = loggerMock.create(); -const esClient = elasticsearchClientMock.createClusterClient().asInternalUser; -const soClient = savedObjectsClientMock.create(); +const createMockAggregationResponse = ( + buckets: Array<{ key: { id: string; revision: number } }>, + afterKey?: { id: string; revision: number } +) => + ({ + took: 1, + timed_out: false, + _shards: { total: 1, successful: 1, skipped: 0, failed: 0 }, + hits: { total: { value: 0, relation: 'eq' }, hits: [] }, + aggregations: { + id_revision: { + buckets, + ...(afterKey ? { after_key: afterKey } : {}), + }, + }, + } as any); + +describe('cleanupOrphanSummaries', () => { + let esClient: ReturnType['asInternalUser']; + let soClient: ReturnType; + let logger: ReturnType; + let abortController: AbortController; -describe('SloSummaryCleanupTask', () => { beforeEach(() => { + esClient = elasticsearchClientMock.createClusterClient().asInternalUser; + soClient = savedObjectsClientMock.create(); + logger = loggerMock.create(); + abortController = new AbortController(); jest.clearAllMocks(); }); - it('should run for empty', async function () { - const task = new OrphanSummaryCleanupTask(taskManagerSetup, logger, {} as any); - await task.start(taskManagerStart, soClient, esClient); + it('should do nothing when summary index is empty', async () => { + esClient.search.mockResolvedValueOnce(createMockAggregationResponse([])); - task.fetchSloSummariesIds = jest.fn().mockReturnValue({ sloSummaryIds: [] }); + await cleanupOrphanSummaries({ esClient, soClient: soClient as any, logger, abortController }); - task.findSloDefinitions = jest.fn(); - await task.runTask(new AbortController()); + expect(esClient.search).toHaveBeenCalledTimes(1); + expect(soClient.find).not.toHaveBeenCalled(); + expect(esClient.deleteByQuery).not.toHaveBeenCalled(); }); - it('should run some slos', async function () { - const task = new OrphanSummaryCleanupTask(taskManagerSetup, logger, {} as any); - task.findSloDefinitions = jest.fn().mockResolvedValue([ - { id: '1', revision: 1 }, - { id: '2', revision: 1 }, - { id: '3', revision: 1 }, - ]); - - await task.start(taskManagerStart, soClient, esClient); - - task.fetchSloSummariesIds = jest.fn().mockReturnValue({ - sloSummaryIds: [ - { id: '1', revision: 1 }, - { id: '2', revision: 1 }, - { id: '3', revision: 2 }, - { id: '4', revision: NaN }, - { id: '3', revision: 1 }, - { id: '3', revision: 0 }, + it('should not delete when all summaries have matching definitions', async () => { + esClient.search.mockResolvedValueOnce( + createMockAggregationResponse([ + { key: { id: 'slo-1', revision: 1 } }, + { key: { id: 'slo-2', revision: 1 } }, + { key: { id: 'slo-3', revision: 1 } }, + ]) + ); + + soClient.find.mockResolvedValueOnce({ + total: 3, + saved_objects: [ + { id: 'so-1', attributes: { id: 'slo-1', revision: 1 } }, + { id: 'so-2', attributes: { id: 'slo-2', revision: 1 } }, + { id: 'so-3', attributes: { id: 'slo-3', revision: 1 } }, ], - }); + page: 1, + per_page: 3, + } as any); + + await cleanupOrphanSummaries({ esClient, soClient: soClient as any, logger, abortController }); + + expect(esClient.search).toHaveBeenCalledTimes(1); + expect(soClient.find).toHaveBeenCalledTimes(1); + expect(esClient.deleteByQuery).not.toHaveBeenCalled(); + }); + + it('should delete orphan summaries with mismatched revisions', async () => { + esClient.search.mockResolvedValueOnce( + createMockAggregationResponse([ + { key: { id: 'slo-1', revision: 1 } }, + { key: { id: 'slo-2', revision: 1 } }, + { key: { id: 'slo-3', revision: 1 } }, // orphan: revision mismatch + { key: { id: 'slo-3', revision: 2 } }, + { key: { id: 'slo-4', revision: 1 } }, // orphan: no definition + ]) + ); + + soClient.find.mockResolvedValueOnce({ + total: 3, + saved_objects: [ + { id: 'so-1', attributes: { id: 'slo-1', revision: 1 } }, + { id: 'so-2', attributes: { id: 'slo-2', revision: 1 } }, + { id: 'so-3', attributes: { id: 'slo-3', revision: 2 } }, + ], + page: 1, + per_page: 5, + } as any); - await task.runTask(new AbortController()); + await cleanupOrphanSummaries({ esClient, soClient: soClient as any, logger, abortController }); - expect(task.fetchSloSummariesIds).toHaveBeenCalled(); expect(esClient.deleteByQuery).toHaveBeenCalledWith({ index: SUMMARY_DESTINATION_INDEX_PATTERN, + wait_for_completion: false, + conflicts: 'proceed', + slices: 'auto', query: { bool: { should: [ - { bool: { must: [{ term: { 'slo.id': '3' } }, { term: { 'slo.revision': 2 } }] } }, - { bool: { must: [{ term: { 'slo.id': '4' } }, { term: { 'slo.revision': NaN } }] } }, - { bool: { must: [{ term: { 'slo.id': '3' } }, { term: { 'slo.revision': 0 } }] } }, + { + bool: { + must: [{ term: { 'slo.id': 'slo-3' } }, { term: { 'slo.revision': 1 } }], + }, + }, + { + bool: { + must: [{ term: { 'slo.id': 'slo-4' } }, { term: { 'slo.revision': 1 } }], + }, + }, ], }, }, - conflicts: 'proceed', - slices: 'auto', - wait_for_completion: false, }); }); - it('should run lots of slos', async function () { - const task = new OrphanSummaryCleanupTask(taskManagerSetup, logger, {} as any); - task.findSloDefinitions = jest.fn().mockResolvedValue( - times(10000, (i) => ({ - id: `${i}`, - revision: 1, - })) + it('should delete all summaries when no definitions exist', async () => { + esClient.search.mockResolvedValueOnce( + createMockAggregationResponse([ + { key: { id: 'slo-1', revision: 1 } }, + { key: { id: 'slo-2', revision: 1 } }, + ]) ); - task.fetchSloSummariesIds = jest.fn().mockReturnValue({ - sloSummaryIds: [ - { id: '01', revision: 1 }, - { id: '02', revision: 1 }, - { id: '03', revision: 1 }, - { id: '04', revision: 1 }, - { id: '1', revision: 1 }, - { id: '2', revision: 1 }, - { id: '3', revision: 1 }, - { id: '4', revision: 1 }, - ], - }); - await task.start(taskManagerStart, soClient, esClient); - await task.runTask(new AbortController()); - expect(task.fetchSloSummariesIds).toHaveBeenCalledTimes(1); - expect(esClient.deleteByQuery).toHaveBeenCalledTimes(1); - expect(esClient.deleteByQuery).toHaveBeenNthCalledWith(1, { + soClient.find.mockResolvedValueOnce({ + total: 0, + saved_objects: [], + page: 1, + per_page: 2, + } as any); + + await cleanupOrphanSummaries({ esClient, soClient: soClient as any, logger, abortController }); + + expect(esClient.deleteByQuery).toHaveBeenCalledWith({ + index: SUMMARY_DESTINATION_INDEX_PATTERN, + wait_for_completion: false, conflicts: 'proceed', slices: 'auto', - wait_for_completion: false, - index: SUMMARY_DESTINATION_INDEX_PATTERN, query: { bool: { - should: getDeleteQueryFilter([ - { id: '01', revision: 1 }, - { id: '02', revision: 1 }, - { id: '03', revision: 1 }, - { id: '04', revision: 1 }, - ]), + should: [ + { + bool: { + must: [{ term: { 'slo.id': 'slo-1' } }, { term: { 'slo.revision': 1 } }], + }, + }, + { + bool: { + must: [{ term: { 'slo.id': 'slo-2' } }, { term: { 'slo.revision': 1 } }], + }, + }, + ], }, }, }); }); - it('should run when lots of slo defs are there', async function () { - const task = new OrphanSummaryCleanupTask(taskManagerSetup, logger, {} as any); - task.findSloDefinitions = jest.fn().mockResolvedValue( - times(10000, (i) => ({ - id: `${i}`, - revision: 2, - })) + it('should paginate through summaries using after_key', async () => { + // Generate 1000 buckets for the first page (CHUNK_SIZE = 1000) + // Include slo-orphan-1 as an orphan (no matching definition) + const firstPageBuckets = Array.from({ length: 1000 }, (_, i) => ({ + key: { id: i === 0 ? 'slo-orphan-1' : `slo-${i}`, revision: 1 }, + })); + + // First batch with after_key indicating more results (full page triggers pagination) + esClient.search.mockResolvedValueOnce( + createMockAggregationResponse(firstPageBuckets, { id: 'slo-999', revision: 1 }) ); - task.fetchSloSummariesIds = jest.fn().mockImplementation(async (searchKey) => { - if (!searchKey) { - return { - sloSummaryIds: [ - { id: '1', revision: 2 }, - { id: '2', revision: 2 }, - { id: '3', revision: 2 }, - { id: '4', revision: 2 }, - { id: '01', revision: 1 }, - { id: '02', revision: 1 }, - { id: '03', revision: 1 }, - { id: '04', revision: 1 }, - ], - searchAfter: '1', - }; - } - return { - sloSummaryIds: [ - { id: '5', revision: 2 }, - { id: '6', revision: 2 }, - { id: '7', revision: 2 }, - { id: '8', revision: 2 }, - { id: '05', revision: 1 }, - { id: '06', revision: 1 }, - { id: '07', revision: 1 }, - { id: '08', revision: 1 }, - ], - }; - }); - await task.start(taskManagerStart, soClient, esClient); - await task.runTask(new AbortController()); - expect(task.fetchSloSummariesIds).toHaveBeenCalledTimes(2); + // Second batch without after_key (last page, fewer than CHUNK_SIZE) + esClient.search.mockResolvedValueOnce( + createMockAggregationResponse([ + { key: { id: 'slo-1000', revision: 1 } }, + { key: { id: 'slo-orphan-2', revision: 1 } }, // orphan + ]) + ); + + // First batch: return all except slo-orphan-1 + soClient.find.mockResolvedValueOnce({ + total: 999, + saved_objects: firstPageBuckets + .filter((b) => b.key.id !== 'slo-orphan-1') + .map((b, i) => ({ + id: `so-${i}`, + attributes: { id: b.key.id, revision: b.key.revision }, + })), + page: 1, + per_page: 1000, + } as any); + + // Second batch: slo-1000 exists, slo-orphan-2 is orphan + soClient.find.mockResolvedValueOnce({ + total: 1, + saved_objects: [{ id: 'so-1000', attributes: { id: 'slo-1000', revision: 1 } }], + page: 1, + per_page: 2, + } as any); + + await cleanupOrphanSummaries({ esClient, soClient: soClient as any, logger, abortController }); + + expect(esClient.search).toHaveBeenCalledTimes(2); + expect(soClient.find).toHaveBeenCalledTimes(2); expect(esClient.deleteByQuery).toHaveBeenCalledTimes(2); + // First delete call for slo-orphan-1 expect(esClient.deleteByQuery).toHaveBeenNthCalledWith(1, { index: SUMMARY_DESTINATION_INDEX_PATTERN, + wait_for_completion: false, conflicts: 'proceed', slices: 'auto', - wait_for_completion: false, query: { bool: { - should: getDeleteQueryFilter([ - { id: '01', revision: 1 }, - { id: '02', revision: 1 }, - { id: '03', revision: 1 }, - { id: '04', revision: 1 }, - ]), + should: [ + { + bool: { + must: [{ term: { 'slo.id': 'slo-orphan-1' } }, { term: { 'slo.revision': 1 } }], + }, + }, + ], }, }, }); - expect(esClient.deleteByQuery).toHaveBeenLastCalledWith({ + // Second delete call for slo-orphan-2 + expect(esClient.deleteByQuery).toHaveBeenNthCalledWith(2, { index: SUMMARY_DESTINATION_INDEX_PATTERN, wait_for_completion: false, conflicts: 'proceed', slices: 'auto', query: { bool: { - should: getDeleteQueryFilter([ - { id: '05', revision: 1 }, - { id: '06', revision: 1 }, - { id: '07', revision: 1 }, - { id: '08', revision: 1 }, - ]), + should: [ + { + bool: { + must: [{ term: { 'slo.id': 'slo-orphan-2' } }, { term: { 'slo.revision': 1 } }], + }, + }, + ], }, }, }); }); - it('should run when summaries are way more then defs', async function () { - const task = new OrphanSummaryCleanupTask(taskManagerSetup, logger, {} as any); - task.findSloDefinitions = jest.fn().mockResolvedValue( - times(100, (i) => ({ - id: `${i}`, - revision: 2, - })) - ); - task.fetchSloSummariesIds = jest.fn().mockImplementation(async (searchKey) => { - if (!searchKey) { - return { - sloSummaryIds: [ - { id: '1', revision: 2 }, - { id: '2', revision: 2 }, - { id: '3', revision: 2 }, - { id: '4', revision: 2 }, - { id: '01', revision: 1 }, - { id: '02', revision: 1 }, - { id: '03', revision: 1 }, - { id: '04', revision: 1 }, - ], - searchAfter: '1', - }; - } - return { - sloSummaryIds: [ - { id: '5', revision: 2 }, - { id: '6', revision: 2 }, - { id: '7', revision: 2 }, - { id: '8', revision: 2 }, - { id: '05', revision: 1 }, - { id: '06', revision: 1 }, - { id: '07', revision: 1 }, - { id: '08', revision: 1 }, - ], - }; - }); - await task.start(taskManagerStart, soClient, esClient); - await task.runTask(new AbortController()); + it('should pass the after_key to subsequent search calls', async () => { + const afterKey = { id: 'slo-1000', revision: 1 }; - expect(task.fetchSloSummariesIds).toHaveBeenCalledTimes(2); - expect(esClient.deleteByQuery).toHaveBeenCalledTimes(2); - expect(esClient.deleteByQuery).toHaveBeenNthCalledWith(1, { - conflicts: 'proceed', - slices: 'auto', - wait_for_completion: false, - index: SUMMARY_DESTINATION_INDEX_PATTERN, - query: { - bool: { - should: getDeleteQueryFilter([ - { id: '01', revision: 1 }, - { id: '02', revision: 1 }, - { id: '03', revision: 1 }, - { id: '04', revision: 1 }, - ]), - }, - }, - }); - expect(esClient.deleteByQuery).toHaveBeenLastCalledWith({ - conflicts: 'proceed', - slices: 'auto', - wait_for_completion: false, - index: SUMMARY_DESTINATION_INDEX_PATTERN, - query: { - bool: { - should: getDeleteQueryFilter([ - { id: '05', revision: 1 }, - { id: '06', revision: 1 }, - { id: '07', revision: 1 }, - { id: '08', revision: 1 }, - ]), - }, - }, - }); + // Generate 1000 buckets to trigger pagination (CHUNK_SIZE = 1000) + const fullPageBuckets = Array.from({ length: 1000 }, (_, i) => ({ + key: { id: `slo-${i + 1}`, revision: 1 }, + })); + + esClient.search + .mockResolvedValueOnce(createMockAggregationResponse(fullPageBuckets, afterKey)) + .mockResolvedValueOnce(createMockAggregationResponse([])); + + // Return all definitions as existing (no orphans to delete) + soClient.find.mockResolvedValueOnce({ + total: 1000, + saved_objects: fullPageBuckets.map((b, i) => ({ + id: `so-${i + 1}`, + attributes: { id: b.key.id, revision: b.key.revision }, + })), + page: 1, + per_page: 1000, + } as any); + + await cleanupOrphanSummaries({ esClient, soClient: soClient as any, logger, abortController }); + + // Second call should include the after_key + expect(esClient.search).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + aggs: expect.objectContaining({ + id_revision: expect.objectContaining({ + composite: expect.objectContaining({ + after: afterKey, + }), + }), + }), + }), + expect.any(Object) + ); }); - it('should run when there are no Slo defs', async function () { - const task = new OrphanSummaryCleanupTask(taskManagerSetup, logger, {} as any); - task.findSloDefinitions = jest.fn().mockResolvedValue([]); - task.fetchSloSummariesIds = jest.fn().mockImplementation(async (searchKey) => { - if (!searchKey) { - return { - sloSummaryIds: [ - { id: '1', revision: 2 }, - { id: '2', revision: 2 }, - { id: '3', revision: 2 }, - { id: '4', revision: 2 }, - { id: '01', revision: 1 }, - { id: '02', revision: 1 }, - { id: '03', revision: 1 }, - { id: '04', revision: 1 }, - ], - searchAfter: '1', - }; - } - return { - sloSummaryIds: [ - { id: '5', revision: 2 }, - { id: '6', revision: 2 }, - { id: '7', revision: 2 }, - { id: '8', revision: 2 }, - { id: '05', revision: 1 }, - { id: '06', revision: 1 }, - { id: '07', revision: 1 }, - { id: '08', revision: 1 }, - ], - }; - }); - await task.start(taskManagerStart, soClient, esClient); - await task.runTask(new AbortController()); + it('should use abort controller signal in search calls', async () => { + esClient.search.mockResolvedValueOnce(createMockAggregationResponse([])); - expect(task.fetchSloSummariesIds).toHaveBeenCalledTimes(2); - expect(esClient.deleteByQuery).toHaveBeenCalledTimes(2); + await cleanupOrphanSummaries({ esClient, soClient: soClient as any, logger, abortController }); - expect(esClient.deleteByQuery).toHaveBeenNthCalledWith(1, { - conflicts: 'proceed', - slices: 'auto', - wait_for_completion: false, - index: SUMMARY_DESTINATION_INDEX_PATTERN, - query: { - bool: { - should: getDeleteQueryFilter([ - { id: '1', revision: 2 }, - { id: '2', revision: 2 }, - { id: '3', revision: 2 }, - { id: '4', revision: 2 }, - { id: '01', revision: 1 }, - { id: '02', revision: 1 }, - { id: '03', revision: 1 }, - { id: '04', revision: 1 }, - ]), - }, - }, - }); + expect(esClient.search).toHaveBeenCalledWith( + expect.any(Object), + expect.objectContaining({ signal: abortController.signal }) + ); }); }); From 61992c4103de9121e3502858190746cc3ce323d8 Mon Sep 17 00:00:00 2001 From: Kevin Delemme Date: Wed, 3 Dec 2025 16:01:33 -0500 Subject: [PATCH 6/6] Bring back previous interval --- .../orphan_summary_cleanup_task/orphan_summary_cleanup_task.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/solutions/observability/plugins/slo/server/services/tasks/orphan_summary_cleanup_task/orphan_summary_cleanup_task.ts b/x-pack/solutions/observability/plugins/slo/server/services/tasks/orphan_summary_cleanup_task/orphan_summary_cleanup_task.ts index 53945a26a004e..f0b662690fc40 100644 --- a/x-pack/solutions/observability/plugins/slo/server/services/tasks/orphan_summary_cleanup_task/orphan_summary_cleanup_task.ts +++ b/x-pack/solutions/observability/plugins/slo/server/services/tasks/orphan_summary_cleanup_task/orphan_summary_cleanup_task.ts @@ -92,7 +92,7 @@ export class OrphanSummaryCleanupTask { taskType: TYPE, scope: ['observability', 'slo'], schedule: { - interval: '1m', + interval: '1h', }, state: {}, params: {},