Skip to content

Commit 6ca2248

Browse files
committed
move stream length validity check to wired stream class
1 parent cef0d74 commit 6ca2248

File tree

2 files changed

+22
-26
lines changed

2 files changed

+22
-26
lines changed

x-pack/platform/plugins/shared/streams/server/lib/streams/client.ts

Lines changed: 6 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,13 @@ import type {
1111
IndicesDataStream,
1212
IndicesGetDataStreamResponse,
1313
QueryDslQueryContainer,
14-
Result,
1514
} from '@elastic/elasticsearch/lib/api/types';
16-
import type { IScopedClusterClient, Logger, KibanaRequest } from '@kbn/core/server';
15+
import type { IScopedClusterClient, KibanaRequest, Logger } from '@kbn/core/server';
1716
import { isNotFoundError } from '@kbn/es-errors';
18-
import type { RoutingStatus } from '@kbn/streams-schema';
19-
import { Streams, getAncestors, getParentId } from '@kbn/streams-schema';
2017
import type { LockManagerService } from '@kbn/lock-manager';
2118
import type { Condition } from '@kbn/streamlang';
22-
import { MAX_STREAM_NAME_LENGTH } from '../../../common/constants';
19+
import type { RoutingStatus } from '@kbn/streams-schema';
20+
import { Streams, getAncestors, getParentId } from '@kbn/streams-schema';
2321
import type { AssetClient } from './assets/asset_client';
2422
import { ASSET_ID, ASSET_TYPE } from './assets/fields';
2523
import type { QueryClient } from './assets/query/query_client';
@@ -29,18 +27,12 @@ import {
2927
} from './errors/definition_not_found_error';
3028
import { SecurityError } from './errors/security_error';
3129
import { StatusError } from './errors/status_error';
30+
import { StreamsStatusConflictError } from './errors/streams_status_conflict_error';
31+
import type { FeatureClient } from './feature/feature_client';
3232
import { LOGS_ROOT_STREAM_NAME, rootStreamDefinition } from './root_stream_definition';
33-
import type { StreamsStorageClient } from './storage/streams_storage_client';
3433
import { State } from './state_management/state';
34+
import type { StreamsStorageClient } from './storage/streams_storage_client';
3535
import { checkAccess, checkAccessBulk } from './stream_crud';
36-
import { StreamsStatusConflictError } from './errors/streams_status_conflict_error';
37-
import type { FeatureClient } from './feature/feature_client';
38-
interface AcknowledgeResponse<TResult extends Result> {
39-
acknowledged: true;
40-
result: TResult;
41-
}
42-
43-
export type EnableStreamsResponse = AcknowledgeResponse<'noop' | 'created'>;
4436
export type DisableStreamsResponse = AcknowledgeResponse<'noop' | 'deleted'>;
4537
export type DeleteStreamResponse = AcknowledgeResponse<'noop' | 'deleted'>;
4638
export type SyncStreamResponse = AcknowledgeResponse<'updated' | 'created'>;
@@ -324,18 +316,6 @@ export class StreamsClient {
324316
throw new StatusError(`Child stream ${name} already exists`, 409);
325317
}
326318

327-
const prefix = parent + '.';
328-
if (name.length <= prefix.length) {
329-
throw new StatusError('Stream name must not be empty.', 400);
330-
}
331-
332-
if (name.length > MAX_STREAM_NAME_LENGTH) {
333-
throw new StatusError(
334-
`Stream name cannot be longer than ${MAX_STREAM_NAME_LENGTH} characters.`,
335-
400
336-
);
337-
}
338-
339319
await State.attemptChanges(
340320
[
341321
{

x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import {
2525
isIlmLifecycle,
2626
} from '@kbn/streams-schema';
2727
import _, { cloneDeep } from 'lodash';
28+
import { MAX_STREAM_NAME_LENGTH } from '../../../../../common/constants';
2829
import { generateLayer } from '../../component_templates/generate_layer';
2930
import { getComponentTemplateName } from '../../component_templates/name';
3031
import { isDefinitionNotFoundError } from '../../errors/definition_not_found_error';
@@ -355,7 +356,22 @@ export class WiredStream extends StreamActiveRecord<Streams.WiredStream.Definiti
355356

356357
// validate routing
357358
const children: Set<string> = new Set();
359+
const prefix = this.definition.name + '.';
358360
for (const routing of this._definition.ingest.wired.routing) {
361+
if (routing.destination.length <= prefix.length) {
362+
return {
363+
isValid: false,
364+
errors: [new Error(`Stream name must not be empty.`)],
365+
};
366+
}
367+
if (routing.destination.length > MAX_STREAM_NAME_LENGTH) {
368+
return {
369+
isValid: false,
370+
errors: [
371+
new Error(`Stream name cannot be longer than ${MAX_STREAM_NAME_LENGTH} characters.`),
372+
],
373+
};
374+
}
359375
if (children.has(routing.destination)) {
360376
return {
361377
isValid: false,

0 commit comments

Comments
 (0)