Skip to content

Commit ef37739

Browse files
authored
IGNITE-24006 Sql. AssertionError when running ddlInsideExplicitTransactionFails test (#6889)
1 parent f20782f commit ef37739

File tree

16 files changed

+271
-77
lines changed

16 files changed

+271
-77
lines changed

modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcHandlerBase.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,10 @@
3737
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
3838
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
3939
import org.apache.ignite.internal.sql.engine.SqlQueryType;
40-
import org.apache.ignite.internal.sql.engine.TxControlInsideExternalTxNotSupportedException;
4140
import org.apache.ignite.internal.util.AsyncCursor.BatchedResult;
4241
import org.apache.ignite.internal.util.ExceptionUtils;
42+
import org.apache.ignite.lang.ErrorGroups.Sql;
43+
import org.apache.ignite.lang.TraceableException;
4344
import org.apache.ignite.sql.ColumnMetadata;
4445
import org.apache.ignite.sql.ColumnType;
4546
import org.apache.ignite.sql.ResultSetMetadata;
@@ -155,7 +156,7 @@ JdbcQuerySingleResult createErrorResult(String logMessage, Throwable origin, @Nu
155156

156157
String errorMessage;
157158

158-
if (ex instanceof TxControlInsideExternalTxNotSupportedException) {
159+
if (ex instanceof TraceableException && ((TraceableException) ex).code() == Sql.TX_CONTROL_INSIDE_EXTERNAL_TX_ERR) {
159160
errorMessage = "Transaction control statements are not supported when autocommit mode is disabled";
160161
} else {
161162
errorMessage = getErrorMessage(ex);

modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCancelQueryTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.ignite.internal.sql.engine;
1919

2020
import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.expectQueryCancelled;
21-
import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.expectQueryCancelledInternalException;
2221
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
2322
import static org.hamcrest.Matchers.is;
2423
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -170,7 +169,7 @@ public void testQueryWontStartWhenHandleIsCancelled() {
170169
"SELECT 1"
171170
));
172171

173-
expectQueryCancelledInternalException(run);
172+
expectQueryCancelled(run);
174173

175174
waitUntilRunningQueriesCount(is(0));
176175
}
@@ -209,7 +208,7 @@ public void testExecutablePlans(String query) {
209208
CompletableFuture<Void> f = cancelHandle.cancelAsync();
210209

211210
// Obverse cancellation error
212-
expectQueryCancelledInternalException(run);
211+
expectQueryCancelled(run);
213212

214213
await(f);
215214

modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCancelScriptTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ public void cancelScriptBeforeExecution() {
124124

125125
cancelHandle.cancel();
126126

127-
expectQueryCancelledInternalException(
127+
expectQueryCancelled(
128128
() -> runScript(token, "SELECT 1; SELECT 2;")
129129
);
130130
}

modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlMultiStatementTxTest.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import static org.junit.jupiter.api.Assertions.assertEquals;
3131
import static org.junit.jupiter.api.Assertions.assertFalse;
3232
import static org.junit.jupiter.api.Assertions.assertNotNull;
33-
import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
3433
import static org.junit.jupiter.api.Assertions.assertTrue;
3534
import static org.junit.jupiter.api.Assertions.fail;
3635

@@ -39,6 +38,7 @@
3938
import org.apache.ignite.internal.tx.InternalTransaction;
4039
import org.apache.ignite.internal.tx.TxState;
4140
import org.apache.ignite.internal.util.AsyncCursor.BatchedResult;
41+
import org.apache.ignite.lang.ErrorGroups.Sql;
4242
import org.junit.jupiter.api.AfterEach;
4343
import org.junit.jupiter.api.BeforeAll;
4444
import org.junit.jupiter.api.BeforeEach;
@@ -351,12 +351,20 @@ void concurrentExecutionDoesntAffectSelectWithExplicitTx() {
351351
@Test
352352
void transactionControlStatementFailsWithExternalTransaction() {
353353
InternalTransaction tx1 = (InternalTransaction) igniteTx().begin();
354-
assertThrowsExactly(TxControlInsideExternalTxNotSupportedException.class, () -> runScript(tx1, null, "COMMIT"));
354+
assertThrowsSqlException(
355+
Sql.TX_CONTROL_INSIDE_EXTERNAL_TX_ERR,
356+
"Transaction control statement cannot be executed within an external transaction.",
357+
() -> runScript(tx1, null, "COMMIT")
358+
);
355359
assertEquals(0, txManager().pending());
356360
assertEquals(TxState.ABORTED, tx1.state());
357361

358362
InternalTransaction tx2 = (InternalTransaction) igniteTx().begin();
359-
assertThrowsExactly(TxControlInsideExternalTxNotSupportedException.class, () -> runScript(tx2, null, "START TRANSACTION; COMMIT;"));
363+
assertThrowsSqlException(
364+
Sql.TX_CONTROL_INSIDE_EXTERNAL_TX_ERR,
365+
"Transaction control statement cannot be executed within an external transaction.",
366+
() -> runScript(tx2, null, "START TRANSACTION; COMMIT;")
367+
);
360368
assertEquals(0, txManager().pending());
361369
assertEquals(TxState.ABORTED, tx2.state());
362370

modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ExecutionPhase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
/** Enumerates possible phases of query execution. */
2121
public enum ExecutionPhase {
2222
/** Query is registered on server. */
23-
REGISTERED(RegisteredPhaseHandler.INSTANCE),
23+
REGISTERED(NoOpHandler.INSTANCE),
2424
/** Query string is parsed at the moment. Parsed AST may or may not be available yet. */
2525
PARSING(ParsingPhaseHandler.INSTANCE),
2626
/** AST is available now, optimization task is submitted. */

modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/MultiStatementHandler.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -214,8 +214,7 @@ CompletableFuture<AsyncSqlCursor<InternalSqlRow>> processNext() {
214214

215215
if (lastStatement) {
216216
// Main program is completed, therefore it's safe to schedule termination of a query
217-
query.resultHolder
218-
.thenRun(this::scheduleTermination);
217+
scheduleTermination();
219218
} else {
220219
CompletableFuture<Void> triggerFuture;
221220
ScriptStatement nextStatement = statements.peek();
@@ -300,7 +299,7 @@ private void cancelAll(Throwable cause) {
300299

301300
private void scheduleTermination() {
302301
CompletableFuture.allOf(dependentQueries.toArray(CompletableFuture[]::new))
303-
.whenComplete((ignored, ex) -> query.moveTo(ExecutionPhase.TERMINATED));
302+
.whenComplete((ignored, ex) -> query.terminate());
304303
}
305304

306305
private static class ScriptStatement {

modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Program.java

Lines changed: 55 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.apache.ignite.internal.logger.IgniteLogger;
2929
import org.apache.ignite.internal.logger.Loggers;
3030
import org.apache.ignite.internal.sql.engine.exec.fsm.Result.Status;
31-
import org.apache.ignite.internal.sql.engine.util.Commons;
3231
import org.apache.ignite.internal.util.ExceptionUtils;
3332

3433
/**
@@ -65,7 +64,11 @@ class Program<ResultT> {
6564
this.errorHandler = errorHandler;
6665
}
6766

68-
CompletableFuture<ResultT> run(Query query) {
67+
ProgramExecutionState<ResultT> createState() {
68+
return new ProgramExecutionState<>(name);
69+
}
70+
71+
void run(Query query, ProgramExecutionState<ResultT> state) {
6972
Result result;
7073
do {
7174
ExecutionPhase phase = query.currentPhase();
@@ -75,17 +78,14 @@ CompletableFuture<ResultT> run(Query query) {
7578
} catch (Throwable th) {
7679
// handles exception from synchronous part of phase evaluation
7780

78-
try {
79-
if (errorHandler.test(query, th)) {
80-
continue;
81-
}
82-
} catch (AssertionError | Exception ex) {
83-
LOG.warn("Exception in error handler [queryId={}]", ex, query.id);
84-
85-
query.onError(th);
81+
if (shouldRetry(query, th)) {
82+
continue;
8683
}
8784

88-
return Commons.cast(query.resultHolder);
85+
query.setError(th);
86+
finalizeActiveProgram(query, state);
87+
88+
return;
8989
}
9090

9191
if (result.status() == Status.WAITING_FOR_COMPLETION) {
@@ -101,31 +101,55 @@ CompletableFuture<ResultT> run(Query query) {
101101
ex = ExceptionUtils.unwrapCause(ex);
102102

103103
// handles exception from asynchronous part of phase evaluation
104-
try {
105-
if (errorHandler.test(query, ex)) {
106-
query.executor.execute(() -> run(query));
107-
}
108-
} catch (AssertionError | Exception ex0) {
109-
LOG.warn("Exception in error handler [queryId={}]", ex0, query.id);
110-
111-
query.onError(ex);
104+
if (shouldRetry(query, ex)) {
105+
query.executor.execute(() -> run(query, state));
106+
} else {
107+
query.setError(ex);
108+
finalizeActiveProgram(query, state);
112109
}
113110

114111
return;
115112
}
116113

117114
query.executor.execute(() -> {
118-
if (advanceQuery(query)) {
119-
run(query);
115+
if (advanceQuery(query, state)) {
116+
run(query, state);
120117
}
121118
});
122119
});
123120
break;
124121
}
125122
}
126-
} while (advanceQuery(query));
123+
} while (advanceQuery(query, state));
124+
}
125+
126+
private boolean shouldRetry(Query query, Throwable th) {
127+
try {
128+
if (errorHandler.test(query, th)) {
129+
return true;
130+
}
131+
} catch (Throwable throwableFromErrorHandler) {
132+
LOG.warn("Exception in error handler [queryId={}]", throwableFromErrorHandler, query.id);
133+
134+
query.terminateExceptionally(th);
135+
}
127136

128-
return Commons.cast(query.resultHolder);
137+
return false;
138+
}
139+
140+
private static void finalizeActiveProgram(Query query, ProgramExecutionState<?> executionState) {
141+
ProgramExecutionHandle activeHandle = query.activeProgram.getAndSet(null);
142+
143+
Throwable throwable = query.error.get();
144+
if (throwable != null) {
145+
// Set error as result of execution.
146+
executionState.notifyError(throwable);
147+
148+
query.terminate();
149+
}
150+
151+
executionState.programFinished.complete(null);
152+
assert activeHandle == executionState;
129153
}
130154

131155
/**
@@ -134,7 +158,7 @@ CompletableFuture<ResultT> run(Query query) {
134158
* @param query Query to advance.
135159
* @return {@code true} if new state is not terminal (e.g. it does make sense to continue execution).
136160
*/
137-
private boolean advanceQuery(Query query) {
161+
private boolean advanceQuery(Query query, ProgramExecutionState<ResultT> state) {
138162
ExecutionPhase phase = query.currentPhase();
139163

140164
Transition transition = transitions.get(phase);
@@ -146,7 +170,13 @@ private boolean advanceQuery(Query query) {
146170
if (terminalPhase.test(query.currentPhase())) {
147171
ResultT result = this.result.apply(query);
148172

149-
query.resultHolder.complete(result);
173+
finalizeActiveProgram(query, state);
174+
175+
if (!state.resultHolder.complete(result)) {
176+
assert state.resultHolder.isCompletedExceptionally();
177+
178+
query.moveTo(ExecutionPhase.TERMINATED);
179+
}
150180

151181
return false;
152182
}
Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,19 @@
1717

1818
package org.apache.ignite.internal.sql.engine.exec.fsm;
1919

20-
/** Handler that kick-starts query processing. */
21-
class RegisteredPhaseHandler implements ExecutionPhaseHandler {
22-
static final ExecutionPhaseHandler INSTANCE = new RegisteredPhaseHandler();
20+
import java.util.concurrent.CompletableFuture;
2321

24-
private RegisteredPhaseHandler() { }
22+
/**
23+
* Provides minimal API to communicate with a running {@link Program program}.
24+
*/
25+
interface ProgramExecutionHandle {
26+
/**
27+
* Notifies program execution about exception related to query this program is running for.
28+
*
29+
* @param error An error to notify about.
30+
*/
31+
void notifyError(Throwable error);
2532

26-
@Override
27-
public Result handle(Query query) {
28-
return Result.completed();
29-
}
33+
/** Returns a future which will be completed successfully. */
34+
CompletableFuture<Void> completionFuture();
3035
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.sql.engine.exec.fsm;
19+
20+
import static org.apache.ignite.internal.lang.SqlExceptionMapperUtil.mapToPublicSqlException;
21+
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
22+
23+
import java.util.concurrent.CompletableFuture;
24+
import org.apache.ignite.internal.tostring.S;
25+
26+
/**
27+
* Represents a state of program execution.
28+
*
29+
* <p>That is, strictly typed holder of an execution result.
30+
*
31+
* @param <ResultT> A type of the execution result.
32+
*/
33+
class ProgramExecutionState<ResultT> implements ProgramExecutionHandle {
34+
final CompletableFuture<Void> programFinished = new CompletableFuture<>();
35+
final CompletableFuture<ResultT> resultHolder = new CompletableFuture<>();
36+
37+
@SuppressWarnings("FieldCanBeLocal") // Used in toString()
38+
private final String programName;
39+
40+
ProgramExecutionState(String programName) {
41+
this.programName = programName;
42+
}
43+
44+
@Override
45+
public void notifyError(Throwable th) {
46+
resultHolder.completeExceptionally(mapToPublicSqlException(unwrapCause(th)));
47+
}
48+
49+
@Override
50+
public CompletableFuture<Void> completionFuture() {
51+
return programFinished;
52+
}
53+
54+
@Override
55+
public String toString() {
56+
return S.toString(this);
57+
}
58+
}

0 commit comments

Comments
 (0)