diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java index 3a73e089888..61002d13a71 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java @@ -42,6 +42,7 @@ import org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactory; import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup; import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentDescription; +import org.apache.ignite.internal.sql.engine.exec.rel.Node; import org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPruningColumns; import org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPruningMetadata; import org.apache.ignite.internal.sql.engine.schema.IgniteTable; @@ -324,7 +325,7 @@ public SharedState sharedState() { } /** - * Executes a query task. + * Executes a query task. To execute a task from a {@link Node} use {@link Node#execute(RunnableX)} instead. * * @param task Query task. */ diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java index ce5cb06af69..3b84aef7d37 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java @@ -21,6 +21,7 @@ import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty; import java.util.List; +import org.apache.ignite.internal.lang.RunnableX; import org.apache.ignite.internal.sql.engine.QueryCancelledException; import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; import org.apache.ignite.internal.sql.engine.util.Commons; @@ -105,6 +106,22 @@ public void rewind() { } } + /** {@inheritDoc} */ + @Override + public void execute(RunnableX task) { + if (this.isClosed()) { + return; + } + + context().execute(() -> { + // If the node is closed, the task must be ignored. + if (this.isClosed()) { + return; + } + task.run(); + }, this::onError); + } + /** {@inheritDoc} */ @Override public void onRegister(Downstream downstream) { diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractRightMaterializedJoinNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractRightMaterializedJoinNode.java index 194391e2f2b..84b99930a68 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractRightMaterializedJoinNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractRightMaterializedJoinNode.java @@ -51,7 +51,7 @@ public void request(int rowsCnt) throws Exception { requested = rowsCnt; if (!inLoop) { - context().execute(this::doJoin, this::onError); + this.execute(this::doJoin); } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractSetOpNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractSetOpNode.java index e8326e7f550..842df5467b8 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractSetOpNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractSetOpNode.java @@ -71,7 +71,7 @@ public void request(int rowsCnt) throws Exception { if (waiting == 0) { sources().get(curSrcIdx).request(waiting = inBufSize); } else if (!inLoop) { - context().execute(this::flush, this::onError); + this.execute(this::flush); } } @@ -179,7 +179,7 @@ private void flush() throws Exception { if (processed >= inBufSize && requested > 0) { // Allow others to do their job. - context().execute(this::flush, this::onError); + this.execute(this::flush); return; } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java index 6d014c3fd22..2fffe12c74d 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java @@ -268,11 +268,11 @@ private void flush() throws Exception { */ private void scheduleTask() { if (!pendingRequests.isEmpty() && taskScheduled.compareAndSet(false, true)) { - source.context().execute(() -> { + source.execute(() -> { taskScheduled.set(false); flush(); - }, source::onError); + }); } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java index 092ce38e93f..f02d35bf87d 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java @@ -258,12 +258,12 @@ private void onRequest() throws Exception { assert nullOrEmpty(leftInBuf); assert nullOrEmpty(rightInBuf); - context().execute(() -> { + this.execute(() -> { checkState(); state = State.FILLING_LEFT; leftSource().request(waitingLeft = leftInBufferSize); - }, this::onError); + }); break; case IDLE: @@ -272,11 +272,11 @@ private void onRequest() throws Exception { assert waitingRight == -1 || waitingRight == 0 && rightInBuf.size() == rightInBufferSize; assert waitingLeft == -1 || waitingLeft == 0 && leftInBuf.size() == leftInBufferSize; - context().execute(() -> { + this.execute(() -> { checkState(); join(); - }, this::onError); + }); break; diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/FilterNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/FilterNode.java index 47415b8cd25..12fb46c9150 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/FilterNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/FilterNode.java @@ -63,7 +63,7 @@ public void request(int rowsCnt) throws Exception { requested = rowsCnt; if (!inLoop) { - context().execute(this::doFilter, this::onError); + this.execute(this::doFilter); } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateNode.java index 119aa8b2b7d..fe8df767d40 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateNode.java @@ -107,7 +107,7 @@ public void request(int rowsCnt) throws Exception { if (waiting == 0) { source().request(waiting = inBufSize); } else if (!inLoop) { - context().execute(this::flush, this::onError); + this.execute(this::flush); } } @@ -191,7 +191,7 @@ private void flush() throws Exception { if (processed >= inBufSize && requested > 0) { // allow others to do their job - context().execute(this::flush, this::onError); + this.execute(this::flush); return; } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java index 2c1d8548eba..3053eaf942e 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java @@ -117,7 +117,7 @@ public void request(int rowsCnt) throws Exception { requested = rowsCnt; if (!inLoop) { - context().execute(this::doPush, this::onError); + this.execute(this::doPush); } } @@ -356,7 +356,7 @@ private void requestBatches(String nodeName, int cnt, @Nullable SharedState stat ex ); - context().execute(() -> onError(wrapperEx), this::onError); + this.execute(() -> onError(wrapperEx)); } }); } @@ -367,9 +367,9 @@ private void requestBatches(String nodeName, int cnt, @Nullable SharedState stat */ public void onNodeLeft(String nodeName) { if (context().originatingNodeName().equals(nodeName) && srcNodeNames == null) { - context().execute(this::close, this::onError); + this.execute(this::close); } else if (srcNodeNames != null && srcNodeNames.contains(nodeName)) { - context().execute(() -> onNodeLeft0(nodeName), this::onError); + this.execute(() -> onNodeLeft0(nodeName)); } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexSpoolNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexSpoolNode.java index e3450824f78..8e521e89e9e 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexSpoolNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexSpoolNode.java @@ -126,7 +126,7 @@ public void push(RowT row) throws Exception { waiting--; if (waiting == 0) { - context().execute(this::requestSource, this::onError); + this.execute(this::requestSource); } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinNode.java index 9f1e757fa0f..692742c78b4 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinNode.java @@ -82,7 +82,7 @@ public void request(int rowsCnt) throws Exception { requested = rowsCnt; if (!inLoop) { - context().execute(this::doJoin, this::onError); + this.execute(this::doJoin); } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java index 3e85e53431d..1cf184b8901 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java @@ -290,7 +290,7 @@ private void flushTuples() { throw new UnsupportedOperationException(modifyOp.name()); } - modifyResult.whenComplete((r, e) -> context().execute(() -> { + modifyResult.whenComplete((r, e) -> this.execute(() -> { if (e != null) { onError(e); @@ -308,7 +308,7 @@ private void flushTuples() { requestNextBatchIfNeeded(); tryEnd(); - }, this::onError)); + })); } private boolean needToFlush() { diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Node.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Node.java index 30e9797ae02..9f9cfa0961f 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Node.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Node.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.sql.engine.exec.rel; import java.util.List; +import org.apache.ignite.internal.lang.RunnableX; import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; /** @@ -71,4 +72,11 @@ public interface Node extends AutoCloseable { * Rewinds upstream. */ void rewind(); + + /** + * Schedules the given action of this execution node. If this node was closed, the task is ignored. + * + * @param task Task. + */ + void execute(RunnableX task); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java index 34a51fb63d3..a420e2a9857 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java @@ -265,7 +265,7 @@ private void sendBatch(String nodeName, int batchId, boolean last, List ro ex ); - context().execute(() -> onError(wrapperEx), this::onError); + this.execute(() -> onError(wrapperEx)); }); } @@ -344,7 +344,7 @@ private void flush() throws Exception { */ public void onNodeLeft(String nodeName) { if (nodeName.equals(context().originatingNodeName())) { - context().execute(this::close, this::onError); + this.execute(this::close); } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/RootNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/RootNode.java index 024c188b931..7b4582adcf0 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/RootNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/RootNode.java @@ -130,7 +130,7 @@ public boolean isClosed() { /** {@inheritDoc} */ @Override public void closeInternal() { - context().execute(() -> sources().forEach(Commons::closeQuiet), this::onError); + this.execute(() -> sources().forEach(Commons::closeQuiet)); } /** {@inheritDoc} */ @@ -255,7 +255,7 @@ private void exchangeBuffers() { close(); } else if (inBuff.isEmpty() && waiting == 0) { int req = waiting = inBufSize; - context().execute(() -> source().request(req), this::onError); + this.execute(() -> source().request(req)); } if (!outBuff.isEmpty() || waiting == -1) { diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ScanNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ScanNode.java index e5687f73364..f0515b36d64 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ScanNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ScanNode.java @@ -70,7 +70,7 @@ public void request(int rowsCnt) throws Exception { requested = rowsCnt; if (!inLoop) { - context().execute(this::push, this::onError); + this.execute(this::push); } } @@ -126,7 +126,7 @@ private void push() throws Exception { if (++processed == inBufSize && requested > 0) { // allow others to do their job - context().execute(this::push, this::onError); + this.execute(this::push); return; } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortAggregateNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortAggregateNode.java index 2b00d20ed44..af391c302b3 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortAggregateNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortAggregateNode.java @@ -154,7 +154,7 @@ public void push(RowT row) throws Exception { if (waiting == 0 && requested > 0) { waiting = inBufSize; - context().execute(() -> source().request(inBufSize), this::onError); + this.execute(() -> source().request(inBufSize)); } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortNode.java index cf7ab16fe8a..4b2d9c2bc66 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortNode.java @@ -119,7 +119,7 @@ public void request(int rowsCnt) throws Exception { if (waiting == 0) { source().request(waiting = inBufSize); } else if (!inLoop) { - context().execute(this::flush, this::onError); + this.execute(this::flush); } } @@ -176,7 +176,7 @@ private void flush() throws Exception { if (++processed >= inBufSize) { // Allow the others to do their job. - context().execute(this::flush, this::onError); + this.execute(this::flush); return; } @@ -194,7 +194,7 @@ private void flush() throws Exception { if (++processed >= inBufSize && requested > 0) { // allow others to do their job - context().execute(this::flush, this::onError); + this.execute(this::flush); return; } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java index f2a7019cc97..02a0c9e794b 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java @@ -83,7 +83,7 @@ public void request(int rowsCnt) throws Exception { requested = rowsCnt; if (!inLoop) { - context().execute(this::push, this::onError); + this.execute(this::push); } } @@ -166,7 +166,7 @@ private void push() throws Exception { requested = 0; downstream().end(); } else { - context().execute(this::push, this::onError); + this.execute(this::push); } } } @@ -228,30 +228,30 @@ public void onNext(RowT row) { inBuffInner.add(row); if (inBuffInner.size() == inBufSize) { - context().execute(() -> { + StorageScanNode.this.execute(() -> { waiting = 0; push(); - }, StorageScanNode.this::onError); + }); } } /** {@inheritDoc} */ @Override public void onError(Throwable throwable) { - context().execute(() -> { + StorageScanNode.this.execute(() -> { throw throwable; - }, StorageScanNode.this::onError); + }); } /** {@inheritDoc} */ @Override public void onComplete() { - context().execute(() -> { + StorageScanNode.this.execute(() -> { activeSubscription = null; waiting = 0; push(); - }, StorageScanNode.this::onError); + }); } } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableSpoolNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableSpoolNode.java index 5d82047455c..8c75191ff88 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableSpoolNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableSpoolNode.java @@ -100,7 +100,7 @@ public void request(int rowsCnt) throws Exception { requested += rowsCnt; if ((waiting == -1 || rowIdx < rows.size()) && !inLoop) { - context().execute(this::doPush, this::onError); + this.execute(this::doPush); } else if (waiting == 0) { source().request(waiting = inBufSize); } @@ -132,7 +132,7 @@ private void doPush() throws Exception { requested = 0; downstream().end(); } else if (requested > 0 && processed >= inBufSize) { - context().execute(this::doPush, this::onError); + this.execute(this::doPush); } } @@ -167,6 +167,6 @@ public void end() throws Exception { waiting = -1; - context().execute(this::doPush, this::onError); + this.execute(this::doPush); } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractJoinExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractJoinExecutionTest.java index 5f533030614..18c9d6c280d 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractJoinExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractJoinExecutionTest.java @@ -458,7 +458,7 @@ private void verifyJoin(Object[][] left, Object[][] right, JoinRelType joinType, TestDownstream downstream = new TestDownstream<>(); project.onRegister(downstream); - ctx.execute(() -> project.request(1024), project::onError); + project.execute(() -> project.request(1024)); Object[][] res = await(downstream.result()).toArray(EMPTY); diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNodeSelfTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNodeSelfTest.java new file mode 100644 index 00000000000..934e6048b6a --- /dev/null +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNodeSelfTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.exec.rel; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import org.apache.ignite.internal.lang.RunnableX; +import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; +import org.apache.ignite.internal.sql.engine.exec.ExecutionId; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +/** + * Test for {@link Node} class. + */ +public class AbstractNodeSelfTest extends BaseIgniteAbstractTest { + + @Test + public void testClose() { + ExecutionContext ctx = Mockito.mock(ExecutionContext.class); + + when(ctx.executionId()).thenReturn(new ExecutionId(UUID.randomUUID(), 1)); + when(ctx.fragmentId()).thenReturn(1L); + + SimpleNode node = new SimpleNode<>(ctx); + + doAnswer(invocation -> { + RunnableX runnableX = invocation.getArgument(0); + runnableX.run(); + return null; + }).when(ctx).execute(any(RunnableX.class), any(Consumer.class)); + + // Schedule task 1 + node.execute(() -> node.request(1)); + assertEquals(1, node.callCount.get()); + + // Schedule close + node.execute(node::close); + assertEquals(1, node.callCount.get()); + + // Task is not called because the node was closed. + node.execute(() -> node.request(1)); + assertEquals(1, node.callCount.get()); + + // The last task was not scheduled + verify(ctx, times(2)).execute(any(RunnableX.class), any(Consumer.class)); + } + + @Test + public void testCloseDelayed() { + ExecutionContext ctx = Mockito.mock(ExecutionContext.class); + + when(ctx.executionId()).thenReturn(new ExecutionId(UUID.randomUUID(), 1)); + when(ctx.fragmentId()).thenReturn(1L); + + SimpleNode node = new SimpleNode<>(ctx); + + doAnswer(invocation -> { + RunnableX runnableX = invocation.getArgument(0); + if (node.callCount.get() == 2) { + // Add a call to close to check the following scenario: + // Node: + // void someTask(): + // this.execute(anotherTask) + // ... + // this.close() + // + // Because someTask first schedules anotherTask and then closes the node, + // we do not want to execute anotherTask. + node.close(); + } + runnableX.run(); + return null; + }).when(ctx).execute(any(RunnableX.class), any(Consumer.class)); + + // Schedule task 1 + node.execute(() -> node.request(1)); + assertEquals(1, node.callCount.get()); + + // Schedule task 2 + node.execute(() -> node.request(1)); + assertEquals(2, node.callCount.get()); + + // Task is not called because the node was closed. + node.execute(() -> node.request(1)); + assertEquals(2, node.callCount.get()); + + // All tasks were scheduled + verify(ctx, times(3)).execute(any(RunnableX.class), any(Consumer.class)); + } + + private static class SimpleNode extends AbstractNode implements Downstream { + + private final AtomicInteger callCount = new AtomicInteger(); + + /** + * Constructor. + * + * @param ctx Execution context. + */ + private SimpleNode(ExecutionContext ctx) { + super(ctx); + } + + @Override + protected void rewindInternal() { + throw new UnsupportedOperationException(); + } + + @Override + protected Downstream requestDownstream(int idx) { + return this; + } + + @Override + public void request(int rowsCnt) throws Exception { + checkState(); + callCount.incrementAndGet(); + } + + @Override + public void push(T row) { + throw new UnsupportedOperationException(); + } + + @Override + public void end() { + throw new UnsupportedOperationException(); + } + } +} diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/DataSourceScanNodeSelfTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/DataSourceScanNodeSelfTest.java index 037742f1d92..903afbdb6ac 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/DataSourceScanNodeSelfTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/DataSourceScanNodeSelfTest.java @@ -244,7 +244,7 @@ context, factory, fromRowSchema(ROW_SCHEMA), dataSource, predicate, projection, node.onRegister(downstream); - context.execute(() -> node.request(Integer.MAX_VALUE), node::onError); + node.execute(() -> node.request(Integer.MAX_VALUE)); return await(downstream.result()); } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java index a7106cc0e92..9551415338c 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java @@ -39,6 +39,7 @@ import org.apache.calcite.rel.core.CorrelationId; import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.type.RelDataType; +import org.apache.ignite.internal.lang.RunnableX; import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; import org.apache.ignite.internal.sql.engine.exec.RowHandler; import org.apache.ignite.internal.sql.engine.exec.row.RowSchema; @@ -446,6 +447,7 @@ protected Object[] row(Object... fields) { * Node that always throws {@link IllegalAccessError} except for {@link #close()} and {@link #onRegister(Downstream)} methods. */ static class CorruptedNode implements Node { + /** {@inheritDoc} */ @Override public ExecutionContext context() { @@ -488,9 +490,15 @@ public void rewind() { throw new IllegalAccessError(); } + @Override + public void execute(RunnableX task) { + throw new IllegalAccessError(); + } + /** {@inheritDoc} */ @Override public void close() { + } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitExecutionTest.java index 19f05c4b2a3..b726beaa890 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitExecutionTest.java @@ -209,11 +209,11 @@ protected void rewindInternal() { @Override public void request(int rowsCnt) { int r = requested.getAndAdd(rowsCnt); - context().execute(() -> { + this.execute(() -> { for (int i = 0; i < rowsCnt; i++) { downstream().push(new Object[]{r + i}); } - }, this::onError); + }); } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNodeExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNodeExecutionTest.java index e2bb80063d7..56344e5de16 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNodeExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNodeExecutionTest.java @@ -126,7 +126,7 @@ void nodeReportsExpectedNumberOfUpdatedRowsOnInsert(int sourceSize) { .thenReturn(nullCompletedFuture()); } - context.execute(() -> modifyNode.request(1), modifyNode::onError); + modifyNode.execute(() -> modifyNode.request(1)); List result = await(downstream.result()); @@ -159,7 +159,7 @@ void nodeReportsExpectedNumberOfUpdatedRowsOnUpdate(int sourceSize) { .thenReturn(nullCompletedFuture()); } - context.execute(() -> modifyNode.request(1), modifyNode::onError); + modifyNode.execute(() -> modifyNode.request(1)); List result = await(downstream.result()); @@ -192,7 +192,7 @@ void nodeReportsExpectedNumberOfUpdatedRowsOnDelete(int sourceSize) { .thenReturn(nullCompletedFuture()); } - context.execute(() -> modifyNode.request(1), modifyNode::onError); + modifyNode.execute(() -> modifyNode.request(1)); List result = await(downstream.result()); @@ -224,7 +224,7 @@ void exceptionIsPassedThroughToErrorHandlerOnInsert(int sourceSize) { when(updatableTable.insertAll(any(), any(), any())) .thenReturn(CompletableFuture.failedFuture(expected)); - context.execute(() -> modifyNode.request(1), modifyNode::onError); + modifyNode.execute(() -> modifyNode.request(1)); assertThat(downstream.result(), willThrow(is(expected))); verify(updatableTable).insertAll(any(), any(), any()); @@ -252,7 +252,7 @@ void exceptionIsPassedThroughToErrorHandlerOnUpdate(int sourceSize) { when(updatableTable.upsertAll(any(), any(), any())) .thenReturn(CompletableFuture.failedFuture(expected)); - context.execute(() -> modifyNode.request(1), modifyNode::onError); + modifyNode.execute(() -> modifyNode.request(1)); assertThat(downstream.result(), willThrow(is(expected))); verify(updatableTable).upsertAll(any(), any(), any()); @@ -280,7 +280,7 @@ void exceptionIsPassedThroughToErrorHandlerOnDelete(int sourceSize) { when(updatableTable.deleteAll(any(), any(), any())) .thenReturn(CompletableFuture.failedFuture(expected)); - context.execute(() -> modifyNode.request(1), modifyNode::onError); + modifyNode.execute(() -> modifyNode.request(1)); assertThat(downstream.result(), willThrow(is(expected))); verify(updatableTable).deleteAll(any(), any(), any()); @@ -411,7 +411,7 @@ void mergePassesCorrectRowsToInsert(int colCount, boolean updateOnly, Object[] d modifyNode.register(List.of(sourceNode)); modifyNode.onRegister(downstream); - context.execute(() -> modifyNode.request(1), modifyNode::onError); + modifyNode.execute(() -> modifyNode.request(1)); await(downstream.result());