Skip to content

Commit 34a850b

Browse files
IGNITE-16452 Fix IndexQuery can query rebuilding index. (#12378)
Co-authored-by: Maksim Timonin <[email protected]>
1 parent f345d77 commit 34a850b

File tree

6 files changed

+230
-16
lines changed

6 files changed

+230
-16
lines changed

modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/AbstractIndex.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,13 @@ public abstract class AbstractIndex implements Index {
2626
/** Whether index is rebuilding now. */
2727
private final AtomicBoolean rebuildInProgress = new AtomicBoolean(false);
2828

29-
/**
30-
* @param val Mark or unmark index to rebuild.
31-
*/
32-
public void markIndexRebuild(boolean val) {
29+
/** {@inheritDoc} */
30+
@Override public void markIndexRebuild(boolean val) {
3331
rebuildInProgress.compareAndSet(!val, val);
3432
}
3533

36-
/**
37-
* @return Whether index is rebuilding now.
38-
*/
39-
public boolean rebuildInProgress() {
34+
/** {@inheritDoc} */
35+
@Override public boolean rebuildInProgress() {
4036
return rebuildInProgress.get();
4137
}
4238

modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/Index.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,14 @@ public void onUpdate(@Nullable CacheDataRow oldRow, @Nullable CacheDataRow newRo
7171
* @return Index definition.
7272
*/
7373
public IndexDefinition indexDefinition();
74+
75+
/**
76+
* @param val Mark or unmark index to (re)build.
77+
*/
78+
public void markIndexRebuild(boolean val);
79+
80+
/**
81+
* @return Whether index is (re)building now.
82+
*/
83+
public boolean rebuildInProgress();
7484
}

modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexProcessor.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -212,9 +212,12 @@ public void remove(String cacheName, @Nullable CacheDataRow prevRow) throws Igni
212212
* @param definition Description of an index to create.
213213
* @param cacheVisitor Enable to cancel dynamic index populating.
214214
*/
215-
public Index createIndexDynamically(GridCacheContext cctx, IndexFactory factory, IndexDefinition definition,
216-
SchemaIndexCacheVisitor cacheVisitor) {
217-
215+
public Index createIndexDynamically(
216+
GridCacheContext<?, ?> cctx,
217+
IndexFactory factory,
218+
IndexDefinition definition,
219+
SchemaIndexCacheVisitor cacheVisitor
220+
) {
218221
Index idx = createIndex(cctx, factory, definition);
219222

220223
// Populate index with cache rows.
@@ -345,11 +348,8 @@ public void markRebuildIndexesForCache(GridCacheContext<?, ?> cctx, boolean val)
345348

346349
Collection<Index> idxs = cacheToIdx.get(cctx.name()).values();
347350

348-
for (Index idx: idxs) {
349-
if (idx instanceof AbstractIndex)
350-
((AbstractIndex)idx).markIndexRebuild(val);
351-
}
352-
351+
for (Index idx: idxs)
352+
idx.markIndexRebuild(val);
353353
}
354354
finally {
355355
ddlLock.readLock().unlock();

modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexQueryProcessor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ public <K, V> IndexQueryResult<K, V> queryLocal(
8484
) throws IgniteCheckedException {
8585
InlineIndexImpl idx = (InlineIndexImpl)findSortedIndex(cctx, idxQryDesc);
8686

87+
if (idx.rebuildInProgress()) {
88+
throw new IgniteCheckedException(String.format("Failed to run IndexQuery due to index rebuild is in progress"
89+
+ " [index=%s, query=%s]", idx.indexDefinition().idxName(), idxQryDesc));
90+
}
91+
8792
IndexMultipleRangeQuery qry = prepareQuery(idx, idxQryDesc);
8893

8994
GridCursor<IndexRow> cursor = queryMultipleRanges(idx, cacheFilter, qry);
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.cache.query;
19+
20+
import java.util.LinkedHashMap;
21+
import java.util.concurrent.CountDownLatch;
22+
import org.apache.ignite.Ignite;
23+
import org.apache.ignite.IgniteCache;
24+
import org.apache.ignite.IgniteDataStreamer;
25+
import org.apache.ignite.IgniteException;
26+
import org.apache.ignite.cache.CacheAtomicityMode;
27+
import org.apache.ignite.cache.QueryEntity;
28+
import org.apache.ignite.cluster.ClusterState;
29+
import org.apache.ignite.configuration.CacheConfiguration;
30+
import org.apache.ignite.configuration.DataRegionConfiguration;
31+
import org.apache.ignite.configuration.DataStorageConfiguration;
32+
import org.apache.ignite.configuration.IgniteConfiguration;
33+
import org.apache.ignite.internal.cache.query.index.IndexProcessor;
34+
import org.apache.ignite.internal.managers.indexing.IndexesRebuildTask;
35+
import org.apache.ignite.internal.processors.cache.GridCacheContext;
36+
import org.apache.ignite.internal.processors.query.schema.IndexRebuildCancelToken;
37+
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
38+
import org.apache.ignite.internal.util.future.GridFutureAdapter;
39+
import org.apache.ignite.internal.util.typedef.F;
40+
import org.apache.ignite.testframework.GridTestUtils;
41+
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
42+
import org.junit.Test;
43+
import org.junit.runner.RunWith;
44+
import org.junit.runners.Parameterized;
45+
46+
import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.between;
47+
48+
/** */
49+
@RunWith(Parameterized.class)
50+
public class IndexQueryRebuildIndexTest extends GridCommonAbstractTest {
51+
/** */
52+
private static final String CACHE = "TEST_CACHE";
53+
54+
/** */
55+
private static final String IDX = "TEST_IDX";
56+
57+
/** */
58+
private static final int CNT = 10_000;
59+
60+
/** */
61+
private boolean persistenceEnabled;
62+
63+
/** */
64+
@Parameterized.Parameter
65+
public String qryNode;
66+
67+
/** */
68+
@Parameterized.Parameters(name = "qryNode={0}")
69+
public static Object[] parameters() {
70+
return new Object[] { "CRD", "CLN" };
71+
}
72+
73+
/** {@inheritDoc} */
74+
@Override protected void beforeTest() throws Exception {
75+
cleanPersistenceDir();
76+
}
77+
78+
/** {@inheritDoc} */
79+
@Override protected void afterTest() throws Exception {
80+
stopAllGrids();
81+
82+
cleanPersistenceDir();
83+
}
84+
85+
/** {@inheritDoc} */
86+
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
87+
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
88+
89+
QueryEntity qe = new QueryEntity(Long.class.getName(), Integer.class.getName())
90+
.setTableName("Person")
91+
.setKeyFieldName("id")
92+
.setValueFieldName("fld")
93+
.setFields(new LinkedHashMap<>(
94+
F.asMap("id", Long.class.getName(), "fld", Integer.class.getName()))
95+
);
96+
97+
CacheConfiguration<Long, Integer> ccfg = new CacheConfiguration<Long, Integer>()
98+
.setName(CACHE)
99+
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
100+
.setQueryEntities(F.asList(qe))
101+
.setBackups(1);
102+
103+
cfg.setDataStorageConfiguration(
104+
new DataStorageConfiguration().setDefaultDataRegionConfiguration(
105+
new DataRegionConfiguration().setPersistenceEnabled(persistenceEnabled)));
106+
107+
cfg.setCacheConfiguration(ccfg);
108+
109+
return cfg;
110+
}
111+
112+
/** */
113+
@Test
114+
public void testConcurrentRebuildIndex() throws Exception {
115+
persistenceEnabled = true;
116+
117+
IndexProcessor.idxRebuildCls = BlockingRebuildIndexes.class;
118+
119+
Ignite crd = startGrids(3);
120+
121+
crd.cluster().state(ClusterState.ACTIVE);
122+
123+
IgniteCache<Long, Integer> cache = cache();
124+
125+
cache.query(new SqlFieldsQuery("create index " + IDX + " on Person(fld)")).getAll();
126+
127+
insertData();
128+
129+
BlockingRebuildIndexes rebuild = (BlockingRebuildIndexes)grid(0).context().indexProcessor().idxRebuild();
130+
131+
rebuild.setUp();
132+
133+
multithreadedAsync(() -> {
134+
forceRebuildIndexes(grid(0), grid(0).cachex(CACHE).context());
135+
}, 1);
136+
137+
rebuild.idxRebuildStartLatch.await();
138+
139+
IndexQuery<Long, Integer> qry = new IndexQuery<Long, Integer>(Integer.class)
140+
.setCriteria(between("fld", 0, CNT));
141+
142+
GridTestUtils.assertThrows(null,
143+
() -> {
144+
cache.query(qry).getAll();
145+
}, IgniteException.class, "Failed to run IndexQuery due to index rebuild is in progress");
146+
147+
rebuild.blockIdxRebuildLatch.countDown();
148+
149+
crd.cache(CACHE).indexReadyFuture().get();
150+
151+
assertEquals(CNT, cache.query(qry).getAll().size());
152+
}
153+
154+
/** */
155+
private void insertData() {
156+
try (IgniteDataStreamer<Long, Integer> streamer = grid(0).dataStreamer(CACHE)) {
157+
for (int i = 0; i < CNT; i++)
158+
streamer.addData((long)i, i);
159+
}
160+
}
161+
162+
/** */
163+
private IgniteCache<Long, Integer> cache() throws Exception {
164+
Ignite n = "CRD".equals(qryNode) ? grid(0) : startClientGrid();
165+
166+
return n.cache(CACHE);
167+
}
168+
169+
/** Blocks filling dynamically created index with cache data. */
170+
public static class BlockingRebuildIndexes extends IndexesRebuildTask {
171+
/** */
172+
public volatile CountDownLatch blockIdxRebuildLatch = new CountDownLatch(0);
173+
174+
/** */
175+
public volatile CountDownLatch idxRebuildStartLatch = new CountDownLatch(0);
176+
177+
/** {@inheritDoc} */
178+
@Override protected void startRebuild(
179+
GridCacheContext cctx,
180+
GridFutureAdapter<Void> fut,
181+
SchemaIndexCacheVisitorClosure clo,
182+
IndexRebuildCancelToken cancelTok
183+
) {
184+
try {
185+
idxRebuildStartLatch.countDown();
186+
187+
blockIdxRebuildLatch.await();
188+
189+
super.startRebuild(cctx, fut, clo, cancelTok);
190+
}
191+
catch (InterruptedException e) {
192+
throw new IgniteException(e);
193+
}
194+
}
195+
196+
/** */
197+
public void setUp() {
198+
blockIdxRebuildLatch = new CountDownLatch(1);
199+
idxRebuildStartLatch = new CountDownLatch(1);
200+
}
201+
}
202+
}

modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
IndexQuerySqlIndexTest.class,
3737
IndexQueryRangeTest.class,
3838
IndexQueryPartitionTest.class,
39+
IndexQueryRebuildIndexTest.class,
3940
IndexQueryCacheKeyValueFieldsTest.class,
4041
IndexQueryCacheKeyValueEscapedFieldsTest.class,
4142
IndexQueryCacheKeyValueTransformedFieldsTest.class,

0 commit comments

Comments
 (0)