Skip to content

Commit a290f6e

Browse files
THROWAWAY: CNDB-16146: Reproduce missing rows failure
1 parent 8246cb4 commit a290f6e

File tree

1 file changed

+379
-0
lines changed

1 file changed

+379
-0
lines changed
Lines changed: 379 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,379 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.distributed.test;
20+
21+
import java.io.IOException;
22+
import java.nio.file.AccessDeniedException;
23+
import java.nio.file.Files;
24+
import java.nio.file.Path;
25+
import java.nio.file.Paths;
26+
import java.nio.file.attribute.PosixFilePermission;
27+
import java.util.ArrayList;
28+
import java.util.Arrays;
29+
import java.util.Comparator;
30+
import java.util.HashMap;
31+
import java.util.List;
32+
import java.util.Map;
33+
import java.util.Optional;
34+
import java.util.Set;
35+
import java.util.concurrent.ExecutionException;
36+
import java.util.concurrent.TimeUnit;
37+
import java.util.concurrent.TimeoutException;
38+
import java.util.function.BiFunction;
39+
import java.util.function.IntFunction;
40+
import java.util.stream.Collectors;
41+
42+
import org.apache.commons.io.FileUtils;
43+
import org.assertj.core.api.Assertions;
44+
import org.json.simple.JSONArray;
45+
import org.json.simple.JSONObject;
46+
import org.json.simple.JSONValue;
47+
import org.json.simple.parser.JSONParser;
48+
import org.json.simple.parser.ParseException;
49+
import org.junit.Ignore;
50+
import org.junit.Test;
51+
import org.slf4j.Logger;
52+
import org.slf4j.LoggerFactory;
53+
54+
import com.datastax.driver.core.ResultSet;
55+
import com.datastax.driver.core.Session;
56+
import org.apache.cassandra.db.Keyspace;
57+
import org.apache.cassandra.distributed.Cluster;
58+
import org.apache.cassandra.distributed.api.ConsistencyLevel;
59+
import org.apache.cassandra.distributed.api.ICoordinator;
60+
import org.apache.cassandra.distributed.api.IInstance;
61+
import org.apache.cassandra.distributed.api.IInvokableInstance;
62+
import org.apache.cassandra.io.FSWriteError;
63+
import org.apache.cassandra.io.util.File;
64+
import org.apache.cassandra.io.util.PathUtils;
65+
import org.apache.cassandra.schema.Schema;
66+
import org.apache.cassandra.tools.SSTableExport;
67+
import org.apache.cassandra.tools.ToolRunner;
68+
import org.apache.cassandra.utils.Collectors3;
69+
70+
import static java.nio.charset.StandardCharsets.UTF_8;
71+
import static java.nio.file.StandardOpenOption.CREATE;
72+
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
73+
import static java.util.Arrays.asList;
74+
75+
import static org.apache.cassandra.config.DatabaseDescriptor.getCommitLogLocation;
76+
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
77+
import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
78+
import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
79+
import static org.apache.cassandra.distributed.shared.AssertUtils.row;
80+
import static org.assertj.core.api.Assertions.assertThat;
81+
82+
public class CNDB_16146_Test extends TestBaseImpl
83+
{
84+
private final static Logger logger = LoggerFactory.getLogger(CNDB_16146_Test.class);
85+
86+
private final static Path TEST_DATA_UDT_PATH = Paths.get("test/data/udt");
87+
private final static Path TMP_PRODUCT_PATH = TEST_DATA_UDT_PATH.resolve("tmp");
88+
private final static String KS = "ks";
89+
private final static String SCHEMA_TXT = "schema.txt";
90+
private final static String SCHEMA0_TXT = "schema0.txt";
91+
private final static String DATA_JSON = "data.json";
92+
93+
private Cluster startCluster() throws IOException
94+
{
95+
Cluster cluster = Cluster.build(1).withConfig(config -> config.set("auto_snapshot", "false")
96+
.set("uuid_sstable_identifiers_enabled", "false")
97+
.with(NATIVE_PROTOCOL)).start();
98+
cluster.setUncaughtExceptionsFilter(t -> {
99+
String cause = Optional.ofNullable(t.getCause()).map(c -> c.getClass().getName()).orElse("");
100+
return t.getClass().getName().equals(FSWriteError.class.getName()) && cause.equals(AccessDeniedException.class.getName());
101+
});
102+
return cluster;
103+
}
104+
105+
private static List<Path> getDataDirectories(IInvokableInstance node)
106+
{
107+
return node.callOnInstance(() -> Keyspace.open(KS).getColumnFamilyStores().stream().map(cfs -> cfs.getDirectories().getDirectoryForNewSSTables().toPath()).collect(Collectors.toList()));
108+
}
109+
110+
@Test
111+
public void testMissingRows() throws Throwable
112+
{
113+
Files.createDirectories(TMP_PRODUCT_PATH);
114+
try (Cluster cluster = startCluster())
115+
{
116+
IInvokableInstance node = cluster.get(1);
117+
node.executeInternal("DROP KEYSPACE IF EXISTS " + KS);
118+
node.executeInternal("CREATE KEYSPACE " + KS + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
119+
createTables(node);
120+
cluster.disableAutoCompaction(KS);
121+
122+
List<Path> dataDirs = getDataDirectories(node);
123+
124+
Map<String, String> schema0 = getSchemaDesc(node);
125+
Files.writeString(TMP_PRODUCT_PATH.resolve(SCHEMA0_TXT),
126+
String.join(";\n", schema0.values()).replaceAll(";;", ";"),
127+
UTF_8,
128+
CREATE, TRUNCATE_EXISTING);
129+
130+
insertData(node, 0, true);
131+
insertData(node, 256, true);
132+
node.flush(KS);
133+
134+
// both rows already written and those written after the drop column can be missing
135+
dropComplexColumn(node);
136+
137+
insertData(node, 128, false);
138+
insertData(node, 256 + 17, false);
139+
140+
Map<String, String> schema1 = getSchemaDesc(node);
141+
Files.writeString(TMP_PRODUCT_PATH.resolve(SCHEMA_TXT),
142+
String.join(";\n", schema1.values()).replaceAll(";;", ";"),
143+
UTF_8,
144+
CREATE, TRUNCATE_EXISTING);
145+
146+
node.flush(KS);
147+
148+
for (String table : schema1.keySet())
149+
if (table.startsWith("tab"))
150+
node.forceCompact(KS, table);
151+
152+
Map<String, List<List<Object>>> data = selectData(node);
153+
Files.writeString(TMP_PRODUCT_PATH.resolve(DATA_JSON), JSONObject.toJSONString(data), UTF_8, CREATE, TRUNCATE_EXISTING);
154+
155+
node.shutdown(true).get(10, TimeUnit.SECONDS);
156+
157+
Path ksTargetPath = TMP_PRODUCT_PATH.resolve(KS);
158+
Files.createDirectories(ksTargetPath);
159+
PathUtils.deleteContent(ksTargetPath);
160+
for (Path dir : dataDirs)
161+
{
162+
String name = dir.getFileName().toString();
163+
Path targetDir = ksTargetPath.resolve(name);
164+
Files.createDirectories(targetDir);
165+
FileUtils.copyDirectory(dir.toFile(), targetDir.toFile(), pathname -> !pathname.toString().endsWith(".log"));
166+
}
167+
}
168+
Thread.sleep(2000);
169+
170+
// new cluster
171+
try (Cluster cluster = startCluster())
172+
{
173+
IInvokableInstance node = cluster.get(1);
174+
node.executeInternal("DROP KEYSPACE IF EXISTS " + KS);
175+
node.executeInternal("CREATE KEYSPACE " + KS + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
176+
177+
for (String stmt : Files.readString(TMP_PRODUCT_PATH.resolve(SCHEMA_TXT), UTF_8).split(";"))
178+
{
179+
if (!stmt.isBlank())
180+
{
181+
logger.info("Executing: {}", stmt);
182+
node.executeInternal(stmt);
183+
}
184+
}
185+
186+
cluster.disableAutoCompaction(KS);
187+
188+
List<Path> dataDirs = getDataDirectories(node);
189+
190+
node.shutdown(true).get(10, TimeUnit.SECONDS);
191+
192+
Path ksSourcePath = TMP_PRODUCT_PATH.resolve(KS);
193+
for (Path dir : dataDirs)
194+
{
195+
String name = dir.getFileName().toString();
196+
Path sourceDir = ksSourcePath.resolve(name);
197+
FileUtils.copyDirectory(sourceDir.toFile(), dir.toFile());
198+
}
199+
200+
logger.info("Restarting node");
201+
node.startup();
202+
203+
// verify same data new cluster and schema recreated
204+
Map<String, List<List<Object>>> data1 = selectData(node);
205+
String jsonData0 = Files.readString(TMP_PRODUCT_PATH.resolve(DATA_JSON), UTF_8);
206+
for (String table1 : data1.keySet())
207+
{
208+
List<List<Object>> table1Data = data1.get(table1);
209+
JSONArray table1Json = new JSONArray();
210+
table1Json.addAll(table1Data);
211+
String table0Json = JSONValue.toJSONString(((JSONObject) new JSONParser().parse(jsonData0)).get(table1));
212+
String missingRows = table0Json;
213+
int originalRowCount = (missingRows.length() - missingRows.replace("[", "").length() -1);
214+
for (List<Object> row1 : table1Data)
215+
{
216+
JSONArray row1Json = new JSONArray();
217+
row1Json.addAll(row1);
218+
missingRows = missingRows.replace(row1Json.toJSONString(), "");
219+
}
220+
String missingMsg = String.format("missing %s/%s rows in %s: %s",
221+
(missingRows.length() - missingRows.replace("[", "").length() -1),
222+
originalRowCount, table1, missingRows.replaceAll(",+", ","));
223+
224+
assertThat(table1Json.toJSONString()).as(missingMsg).isEqualTo(table0Json);
225+
}
226+
String jsonData1 = JSONObject.toJSONString(data1);
227+
assertThat(jsonData1).isEqualTo(jsonData0);
228+
}
229+
}
230+
231+
private Map<String, String> getSchemaDesc(IInvokableInstance node)
232+
{
233+
return Arrays.stream(node.executeInternal("DESCRIBE " + KS + " WITH INTERNALS"))
234+
.filter(r -> r[1].equals("table") || r[1].equals("type"))
235+
.collect(Collectors3.toImmutableMap(r -> r[2].toString(),
236+
r -> Arrays.stream(r[3].toString().split("\\n"))
237+
.filter(s -> !s.strip().startsWith("AND") || s.contains("DROPPED COLUMN RECORD"))
238+
.collect(Collectors.joining("\n"))));
239+
}
240+
241+
private static String udtValue(int i, List<Integer> bits, BiFunction<Integer, Integer, String> vals)
242+
{
243+
List<String> cols = asList("foo", "bar", "baz");
244+
ArrayList<String> udtVals = new ArrayList<>();
245+
for (int j = 0; j < bits.size(); j++)
246+
{
247+
if ((i & bits.get(j)) != 0)
248+
udtVals.add(cols.get(j) + ": " + vals.apply(i, j));
249+
}
250+
return '{' + String.join(", ", udtVals) + '}';
251+
}
252+
253+
private static String tupleValue(int i, List<Integer> bits, BiFunction<Integer, Integer, String> vals)
254+
{
255+
ArrayList<String> tupleVals = new ArrayList<>();
256+
for (int j = 0; j < bits.size(); j++)
257+
{
258+
if ((i & bits.get(j)) != 0)
259+
tupleVals.add(vals.apply(i, j));
260+
else
261+
tupleVals.add("null");
262+
}
263+
return '(' + String.join(", ", tupleVals) + ')';
264+
}
265+
266+
private static String genInsert(int pk, int i, List<Integer> bits, BiFunction<Integer, Integer, String> vals)
267+
{
268+
List<String> cols = asList("a_int", "b_complex", "c_int");
269+
ArrayList<String> c = new ArrayList<>();
270+
ArrayList<String> v = new ArrayList<>();
271+
for (int j = 0; j < bits.size(); j++)
272+
{
273+
if ((i & bits.get(j)) != 0)
274+
{
275+
c.add(cols.get(j));
276+
v.add(vals.apply(i, j));
277+
}
278+
}
279+
if (c.isEmpty())
280+
return String.format("(pk) VALUES (%d)", pk);
281+
else
282+
return String.format("(pk, %s) VALUES (%d, %s)", String.join(", ", c), pk, String.join(", ", v));
283+
}
284+
285+
private static BiFunction<Integer, Integer, String> valsFunction(IntFunction<String> nonIntFunction)
286+
{
287+
return (i, j) -> {
288+
if (j == 0)
289+
return Integer.toString(i);
290+
if (j == 1)
291+
return nonIntFunction.apply(i);
292+
if (j == 2)
293+
return Integer.toString(i * 2);
294+
assert false;
295+
return null;
296+
};
297+
}
298+
299+
private static BiFunction<Integer, Integer, String> valsFunction()
300+
{
301+
return (i, j) -> {
302+
if (j == 0)
303+
return Integer.toString(i);
304+
if (j == 1)
305+
return String.format("'bar%d'", i);
306+
if (j == 2)
307+
return Integer.toString(i * 2);
308+
assert false;
309+
return null;
310+
};
311+
}
312+
313+
private void insertData(IInstance node, int offset, boolean withComplex)
314+
{
315+
for (int pk = offset; pk < offset + (1 << 2); pk++)
316+
{
317+
int i = withComplex ? (pk - offset) : (pk - offset) & ~(2 + 4 + 8);
318+
node.executeInternal("INSERT INTO " + KS + ".tab1_udt1 " + genInsert(pk, i, asList(1, 2 + 4 + 8, 16), valsFunction(j -> udtValue(j, asList(2, 4, 8), valsFunction()))));
319+
node.executeInternal("INSERT INTO " + KS + ".tab2_frozen_udt1 " + genInsert(pk, i, asList(1, 2 + 4 + 8, 16), valsFunction(j -> udtValue(j, asList(2, 4, 8), valsFunction()))));
320+
node.executeInternal("INSERT INTO " + KS + ".tab5_tuple " + genInsert(pk, i, asList(1, 2 + 4 + 8, 16), valsFunction(j -> tupleValue(j, asList(2, 4, 8), valsFunction()))));
321+
node.executeInternal("INSERT INTO " + KS + ".tab6_frozen_tuple " + genInsert(pk, i, asList(1, 2 + 4 + 8, 16), valsFunction(j -> tupleValue(j, asList(2, 4, 8), valsFunction()))));
322+
}
323+
324+
for (int pk = offset; pk < offset + (1 << 2); pk++)
325+
{
326+
int i = withComplex ? (pk - offset) : (pk - offset) & ~(2 + 4 + 8 + 16 + 32);
327+
node.executeInternal("INSERT INTO " + KS + ".tab4_frozen_udt2 " + genInsert(pk, i, asList(1, 2 + 4 + 8 + 16 + 32, 64),
328+
valsFunction(j -> udtValue(j, asList(2, 4 + 8 + 16, 32), valsFunction(k -> udtValue(k, asList(4, 8, 16), valsFunction()))))));
329+
node.executeInternal("INSERT INTO " + KS + ".tab7_tuple_with_udt " + genInsert(pk, i, asList(1, 2 + 4 + 8 + 16 + 32, 64),
330+
valsFunction(j -> tupleValue(j, asList(2, 4 + 8 + 16, 32), valsFunction(k -> udtValue(k, asList(4, 8, 16), valsFunction()))))));
331+
node.executeInternal("INSERT INTO " + KS + ".tab8_frozen_tuple_with_udt " + genInsert(pk, i, asList(1, 2 + 4 + 8 + 16 + 32, 64),
332+
valsFunction(j -> tupleValue(j, asList(2, 4 + 8 + 16, 32), valsFunction(k -> udtValue(k, asList(4, 8, 16), valsFunction()))))));
333+
node.executeInternal("INSERT INTO " + KS + ".tab9_udt_with_tuple " + genInsert(pk, i, asList(1, 2 + 4 + 8 + 16 + 32, 64),
334+
valsFunction(j -> udtValue(j, asList(2, 4 + 8 + 16, 32), valsFunction(k -> tupleValue(k, asList(4, 8, 16), valsFunction()))))));
335+
node.executeInternal("INSERT INTO " + KS + ".tab10_frozen_udt_with_tuple " + genInsert(pk, i, asList(1, 2 + 4 + 8 + 16 + 32, 64),
336+
valsFunction(j -> udtValue(j, asList(2, 4 + 8 + 16, 32), valsFunction(k -> tupleValue(k, asList(4, 8, 16), valsFunction()))))));
337+
}
338+
}
339+
340+
private static void dropComplexColumn(IInvokableInstance node)
341+
{
342+
List<String> tables = node.callOnInstance(() -> Schema.instance.getKeyspaceMetadata(KS).tables.stream().map(t -> t.name).collect(Collectors.toList()));
343+
for (String table : tables)
344+
node.executeInternal("ALTER TABLE " + KS + "." + table + " DROP b_complex");
345+
}
346+
347+
private Map<String, List<List<Object>>> selectData(IInvokableInstance node)
348+
{
349+
Map<String, List<List<Object>>> results = new HashMap<>();
350+
List<String> tables = node.callOnInstance(() -> Schema.instance.getKeyspaceMetadata(KS).tables.stream().map(t -> t.name).collect(Collectors.toList()));
351+
for (String table : tables)
352+
{
353+
Object[][] rows = node.executeInternal("SELECT * FROM " + KS + "." + table);
354+
Arrays.sort(rows, Comparator.comparing(a -> ((Integer) a[0])));
355+
results.put(table, Arrays.stream(rows).map(Arrays::asList).collect(Collectors.toList()));
356+
}
357+
return results;
358+
}
359+
360+
private static void createTables(IInvokableInstance node)
361+
{
362+
node.executeInternal("CREATE TYPE " + KS + ".udt1(foo int, bar text, baz int)");
363+
node.executeInternal("CREATE TYPE " + KS + ".udt2(foo int, bar udt1, baz int)");
364+
node.executeInternal("CREATE TYPE " + KS + ".udt3(foo int, bar tuple<int, text, int>, baz int)");
365+
366+
node.executeInternal("CREATE TABLE " + KS + ".tab1_udt1 (pk int PRIMARY KEY, a_int int, b_complex udt1, c_int int) WITH ID = 513f2627-9356-41c4-a379-7ad42be97432");
367+
node.executeInternal("CREATE TABLE " + KS + ".tab2_frozen_udt1 (pk int PRIMARY KEY, a_int int, b_complex frozen<udt1>, c_int int) WITH ID = 450f91fe-7c47-41c9-97bf-fdad854fa7e5");
368+
Assertions.assertThatExceptionOfType(RuntimeException.class).isThrownBy(
369+
() -> node.executeInternal("CREATE TABLE " + KS + ".tab3_udt2 (pk int PRIMARY KEY, a_int int, b_complex udt2, c_int int) WITH ID = b613aee8-645c-4384-90d2-fc9e82fb1a59"));
370+
node.executeInternal("CREATE TABLE " + KS + ".tab4_frozen_udt2 (pk int PRIMARY KEY, a_int int, b_complex frozen<udt2>, c_int int) WITH ID = 9c03c71c-6775-4357-9173-0f8808901afa");
371+
node.executeInternal("CREATE TABLE " + KS + ".tab5_tuple (pk int PRIMARY KEY, a_int int, b_complex tuple<int, text, int>, c_int int) WITH ID = 90826dd3-8437-4585-9de4-15908236687f");
372+
node.executeInternal("CREATE TABLE " + KS + ".tab6_frozen_tuple (pk int PRIMARY KEY, a_int int, b_complex frozen<tuple<int, text, int>>, c_int int) WITH ID = 54185f9a-a6fd-487c-abc3-c01bd5835e48");
373+
node.executeInternal("CREATE TABLE " + KS + ".tab7_tuple_with_udt (pk int PRIMARY KEY, a_int int, b_complex tuple<int, udt1, int>, c_int int) WITH ID = 4e78f403-7b63-4e0d-a231-42e42cba7cb5");
374+
node.executeInternal("CREATE TABLE " + KS + ".tab8_frozen_tuple_with_udt (pk int PRIMARY KEY, a_int int, b_complex frozen<tuple<int, udt1, int>>, c_int int) WITH ID = 8660f235-0816-4019-9cc9-1798fa7beb17");
375+
node.executeInternal("CREATE TABLE " + KS + ".tab9_udt_with_tuple (pk int PRIMARY KEY, a_int int, b_complex udt3, c_int int) WITH ID = f670fd5a-8145-4669-aceb-75667c000ea6");
376+
node.executeInternal("CREATE TABLE " + KS + ".tab10_frozen_udt_with_tuple (pk int PRIMARY KEY, a_int int, b_complex frozen<udt3>, c_int int) WITH ID = 6a5cff4e-2f94-4c8b-9aa2-0fbd65292caa");
377+
}
378+
379+
}

0 commit comments

Comments
 (0)