Skip to content

Commit d754960

Browse files
committed
KSQL-12194, KSQL-12195 | Classify missing schema exception and record too large exceptions as user errors.
1 parent cad0064 commit d754960

File tree

7 files changed

+215
-3
lines changed

7 files changed

+215
-3
lines changed
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2022 Confluent Inc.
3+
*
4+
* Licensed under the Confluent Community License (the "License"; you may not use
5+
* this file except in compliance with the License. You may obtain a copy of the
6+
* License at
7+
*
8+
* http://www.confluent.io/confluent-community-license
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
13+
* specific language governing permissions and limitations under the License.
14+
*/
15+
16+
package io.confluent.ksql.query;
17+
18+
import io.confluent.ksql.query.QueryError.Type;
19+
import io.confluent.ksql.schema.registry.SchemaRegistryUtil;
20+
import java.util.Objects;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
24+
/**
25+
* {@code MissingSchemaClassifier} classifies missing SR schema exceptions as user error
26+
*/
27+
public class MissingSchemaClassifier implements QueryErrorClassifier {
28+
private static final Logger LOG = LoggerFactory.getLogger(MissingSchemaClassifier.class);
29+
30+
private final String queryId;
31+
32+
public MissingSchemaClassifier(final String queryId) {
33+
this.queryId = Objects.requireNonNull(queryId, "queryId");
34+
}
35+
36+
@Override
37+
public Type classify(final Throwable e) {
38+
final Type type = SchemaRegistryUtil.isSchemaNotFoundErrorCode(e) ? Type.USER : Type.UNKNOWN;
39+
40+
if (type == Type.USER) {
41+
LOG.info(
42+
"Classified error as USER error based on missing SR schema. Query ID: {} Exception: {}",
43+
queryId,
44+
e.getMessage());
45+
}
46+
47+
return type;
48+
}
49+
}

ksqldb-engine/src/main/java/io/confluent/ksql/query/QueryBuilder.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -656,7 +656,9 @@ private QueryErrorClassifier getConfiguredQueryErrorClassifier(
656656
final QueryErrorClassifier userErrorClassifiers = new MissingTopicClassifier(applicationId)
657657
.and(new AuthorizationClassifier(applicationId))
658658
.and(new KsqlFunctionClassifier(applicationId))
659+
.and(new RecordTooLargeClassifier(applicationId))
659660
.and(new MissingSubjectClassifier(applicationId))
661+
.and(new MissingSchemaClassifier(applicationId))
660662
.and(new SchemaAuthorizationClassifier(applicationId))
661663
.and(new KsqlSerializationClassifier(applicationId));
662664
return buildConfiguredClassifiers(ksqlConfig, applicationId)
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright 2022 Confluent Inc.
3+
*
4+
* Licensed under the Confluent Community License (the "License"; you may not use
5+
* this file except in compliance with the License. You may obtain a copy of the
6+
* License at
7+
*
8+
* http://www.confluent.io/confluent-community-license
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
13+
* specific language governing permissions and limitations under the License.
14+
*/
15+
16+
package io.confluent.ksql.query;
17+
18+
import io.confluent.ksql.query.QueryError.Type;
19+
import java.util.Objects;
20+
import org.apache.commons.lang3.exception.ExceptionUtils;
21+
import org.apache.kafka.common.errors.RecordTooLargeException;
22+
import org.apache.kafka.streams.errors.StreamsException;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
/**
27+
* {@code RecordTooLargeClassifier} classifies records too large to be produced as user exception.
28+
*/
29+
public class RecordTooLargeClassifier implements QueryErrorClassifier {
30+
private static final Logger LOG = LoggerFactory.getLogger(RecordTooLargeClassifier.class);
31+
32+
private final String queryId;
33+
34+
public RecordTooLargeClassifier(final String queryId) {
35+
this.queryId = Objects.requireNonNull(queryId, "queryId");
36+
}
37+
38+
@Override
39+
public Type classify(final Throwable e) {
40+
final Type type = e instanceof StreamsException
41+
&& ExceptionUtils.getRootCause(e) instanceof RecordTooLargeException
42+
? Type.USER
43+
: Type.UNKNOWN;
44+
45+
if (type == Type.USER) {
46+
LOG.info(
47+
"Classified RecordTooLargeException error as USER error. Query ID: {} Exception: {}. "
48+
+ "Consider setting ksql.streams.max.request.size property to a higher value.",
49+
queryId,
50+
e.getMessage()
51+
);
52+
}
53+
54+
return type;
55+
}
56+
}

ksqldb-engine/src/main/java/io/confluent/ksql/schema/registry/SchemaRegistryUtil.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,10 @@ public final class SchemaRegistryUtil {
4646

4747
@VisibleForTesting
4848
public static final int SUBJECT_NOT_FOUND_ERROR_CODE = 40401;
49+
public static final int SCHEMA_NOT_FOUND_ERROR_CODE = 40403;
4950

5051
private SchemaRegistryUtil() {
52+
super();
5153
}
5254

5355
public static void cleanupInternalTopicSchemas(
@@ -185,14 +187,21 @@ public static boolean isSubjectNotFoundErrorCode(final Throwable error) {
185187
&& ((RestClientException) error).getErrorCode() == SUBJECT_NOT_FOUND_ERROR_CODE);
186188
}
187189

190+
public static boolean isSchemaNotFoundErrorCode(final Throwable error) {
191+
return (error instanceof RestClientException
192+
&& ((RestClientException) error).getErrorCode() == SCHEMA_NOT_FOUND_ERROR_CODE);
193+
}
194+
188195
public static boolean isAuthErrorCode(final Throwable error) {
189196
return (error instanceof RestClientException
190197
&& ((((RestClientException) error).getStatus() == HttpStatus.SC_UNAUTHORIZED)
191198
|| ((RestClientException) error).getStatus() == HttpStatus.SC_FORBIDDEN));
192199
}
193200

194201
private static boolean isRetriableError(final Throwable error) {
195-
return !isSubjectNotFoundErrorCode(error) && !isAuthErrorCode(error);
202+
return !isSubjectNotFoundErrorCode(error)
203+
&& !isSchemaNotFoundErrorCode(error)
204+
&& !isAuthErrorCode(error);
196205
}
197206

198207
private static void hardDeleteSubjectWithRetries(
@@ -201,7 +210,7 @@ private static void hardDeleteSubjectWithRetries(
201210
try {
202211
ExecutorUtil.executeWithRetries(
203212
() -> schemaRegistryClient.deleteSubject(subject, true),
204-
error -> isRetriableError(error)
213+
SchemaRegistryUtil::isRetriableError
205214
);
206215
} catch (final RestClientException e) {
207216
if (isAuthErrorCode(e)) {
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2022 Confluent Inc.
3+
*
4+
* Licensed under the Confluent Community License (the "License"; you may not use
5+
* this file except in compliance with the License. You may obtain a copy of the
6+
* License at
7+
*
8+
* http://www.confluent.io/confluent-community-license
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
13+
* specific language governing permissions and limitations under the License.
14+
*/
15+
16+
package io.confluent.ksql.query;
17+
18+
import org.junit.Test;
19+
20+
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
21+
22+
import static org.hamcrest.MatcherAssert.assertThat;
23+
import static org.hamcrest.Matchers.is;
24+
25+
public class MissingSchemaClassifierTest {
26+
@Test
27+
public void shouldClassifyMissingSchemaAsUserError() {
28+
// Given:
29+
final Exception e = new RestClientException("foo", 404, 40403);
30+
31+
// When:
32+
final QueryError.Type type = new MissingSchemaClassifier("").classify(e);
33+
34+
// Then:
35+
assertThat(type, is(QueryError.Type.USER));
36+
}
37+
38+
@Test
39+
public void shouldClassifyOtherExceptionAsUnknownException() {
40+
// Given:
41+
final Exception e = new Exception("foo");
42+
43+
// When:
44+
final QueryError.Type type = new MissingSchemaClassifier("").classify(e);
45+
46+
// Then:
47+
assertThat(type, is(QueryError.Type.UNKNOWN));
48+
}
49+
}

ksqldb-engine/src/test/java/io/confluent/ksql/query/MissingSubjectClassifierTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package io.confluent.ksql.query;
1717

1818
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
19-
import org.apache.kafka.streams.errors.MissingSourceTopicException;
2019
import org.junit.Test;
2120

2221
import static org.hamcrest.MatcherAssert.assertThat;
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright 2022 Confluent Inc.
3+
*
4+
* Licensed under the Confluent Community License (the "License"; you may not use
5+
* this file except in compliance with the License. You may obtain a copy of the
6+
* License at
7+
*
8+
* http://www.confluent.io/confluent-community-license
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
13+
* specific language governing permissions and limitations under the License.
14+
*/
15+
16+
package io.confluent.ksql.query;
17+
18+
import org.apache.kafka.common.KafkaException;
19+
import org.apache.kafka.common.errors.RecordTooLargeException;
20+
import org.apache.kafka.streams.errors.StreamsException;
21+
import org.junit.Test;
22+
23+
import static org.hamcrest.MatcherAssert.assertThat;
24+
import static org.hamcrest.Matchers.is;
25+
26+
public class RecordTooLargeClassifierTest {
27+
28+
@Test
29+
public void shouldClassifyRecordTooLargeExceptionAsUserError() {
30+
// Given:
31+
final Exception e = new StreamsException(
32+
"Error encountered trying to send record to topic foo",
33+
new KafkaException(
34+
"Cannot execute transactional method because we are in an error state",
35+
new RecordTooLargeException(
36+
"The message is 1084728 bytes when serialized which is larger than 1048576, which"
37+
+ " is the value of the max.request.size configuration.")
38+
)
39+
);
40+
41+
// When:
42+
final QueryError.Type type = new RecordTooLargeClassifier("").classify(e);
43+
44+
// Then:
45+
assertThat(type, is(QueryError.Type.USER));
46+
}
47+
48+
}

0 commit comments

Comments
 (0)