Skip to content

Commit f8ad15f

Browse files
lmatzxxhZswenym1
authored
cherry-pick fix: a few fixes to Cassandra e2e test (#18657)
Co-authored-by: Xinhao Xu <[email protected]> Co-authored-by: William Wen <[email protected]>
1 parent 4dce1b1 commit f8ad15f

File tree

6 files changed

+93
-35
lines changed

6 files changed

+93
-35
lines changed

ci/scripts/common.sh

+13
Original file line numberDiff line numberDiff line change
@@ -114,3 +114,16 @@ get_latest_kafka_download_url() {
114114
local download_url="https://downloads.apache.org/kafka/${latest_version}/kafka_2.13-${latest_version}.tgz"
115115
echo "$download_url"
116116
}
117+
118+
get_latest_cassandra_version() {
119+
local versions=$(curl -s https://downloads.apache.org/cassandra/ | grep -Eo 'href="[0-9]+\.[0-9]+\.[0-9]+/"' | grep -Eo "[0-9]+\.[0-9]+\.[0-9]+")
120+
# Sort the version numbers and get the latest one
121+
local latest_version=$(echo "$versions" | sort -V | tail -n1)
122+
echo "$latest_version"
123+
}
124+
125+
get_latest_cassandra_download_url() {
126+
local latest_version=$(get_latest_cassandra_version)
127+
local download_url="https://downloads.apache.org/cassandra/${latest_version}/apache-cassandra-${latest_version}-bin.tar.gz"
128+
echo "$download_url"
129+
}

ci/scripts/e2e-cassandra-sink-test.sh

+18-26
Original file line numberDiff line numberDiff line change
@@ -33,40 +33,32 @@ tar xf ./risingwave-connector.tar.gz -C ./connector-node
3333

3434
echo "--- starting risingwave cluster"
3535
risedev ci-start ci-sink-test
36-
sleep 1
36+
# Wait cassandra server to start
37+
sleep 40
3738

38-
echo "--- create cassandra table"
39-
curl https://downloads.apache.org/cassandra/4.1.3/apache-cassandra-4.1.3-bin.tar.gz --output apache-cassandra-4.1.3-bin.tar.gz
40-
tar xfvz apache-cassandra-4.1.3-bin.tar.gz
41-
# remove bundled packages, and use installed packages, because Python 3.12 has removed asyncore, but I failed to install libev support for bundled Python driver.
42-
rm apache-cassandra-4.1.3/lib/six-1.12.0-py2.py3-none-any.zip
43-
rm apache-cassandra-4.1.3/lib/cassandra-driver-internal-only-3.25.0.zip
44-
apt-get install -y libev4 libev-dev
45-
pip3 install --break-system-packages cassandra-driver
39+
echo "--- install cassandra"
40+
wget $(get_latest_cassandra_download_url) -O cassandra_latest.tar.gz
41+
tar xfvz cassandra_latest.tar.gz
42+
export LATEST_CASSANDRA_VERSION=$(get_latest_cassandra_version)
43+
export CASSANDRA_DIR="./apache-cassandra-${LATEST_CASSANDRA_VERSION}"
44+
45+
# Cassandra only support python 3.11
46+
apt-get install -y software-properties-common
47+
add-apt-repository ppa:deadsnakes/ppa
48+
apt-get update
49+
apt-get install -y python3.11
50+
apt-get install -y python3.11-venv
51+
python3.11 -m venv cqlsh_env
52+
source cqlsh_env/bin/activate
4653

47-
cd apache-cassandra-4.1.3/bin
4854
export CQLSH_HOST=cassandra-server
4955
export CQLSH_PORT=9042
50-
./cqlsh -e "CREATE KEYSPACE demo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};use demo;
51-
CREATE table demo_bhv_table(v1 int primary key,v2 smallint,v3 bigint,v4 float,v5 double,v6 text,v7 date,v8 timestamp,v9 boolean);"
5256

5357
echo "--- testing sinks"
54-
cd ../../
5558
sqllogictest -p 4566 -d dev './e2e_test/sink/cassandra_sink.slt'
56-
sleep 1
57-
cd apache-cassandra-4.1.3/bin
58-
./cqlsh -e "COPY demo.demo_bhv_table TO './query_result.csv' WITH HEADER = false AND ENCODING = 'UTF-8';"
5959

60-
if cat ./query_result.csv | awk -F "," '{
61-
exit !($1 == 1 && $2 == 1 && $3 == 1 && $4 == 1.1 && $5 == 1.2 && $6 == "test" && $7 == "2013-01-01" && $8 == "2013-01-01 01:01:01.000+0000" && $9 == "False\r"); }'; then
62-
echo "Cassandra sink check passed"
63-
else
64-
echo "The output is not as expected."
65-
echo "output:"
66-
cat ./query_result.csv
67-
exit 1
68-
fi
60+
deactivate
6961

7062
echo "--- Kill cluster"
7163
cd ../../
72-
risedev ci-kill
64+
risedev ci-kill

e2e_test/sink/cassandra_sink.slt

+46-4
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
1+
system ok
2+
${CASSANDRA_DIR}/bin/cqlsh --request-timeout=20 -e "CREATE KEYSPACE demo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};use demo;CREATE table demo_bhv_table(v1 int primary key,v2 smallint,v3 bigint,v4 float,v5 double,v6 text,v7 date,v8 timestamp,v9 boolean);"
3+
4+
system ok
5+
${CASSANDRA_DIR}/bin/cqlsh --request-timeout=20 -e "use demo;CREATE table \"Test_uppercase\"(\"TEST_V1\" int primary key, \"TEST_V2\" int,\"TEST_V3\" int);"
6+
17
statement ok
28
CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamptz, v9 boolean);
39

410
statement ok
5-
CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;
11+
CREATE TABLE t7 ("TEST_V1" int primary key, "TEST_V2" int, "TEST_V3" int);
612

713
statement ok
814
CREATE SINK s6
915
FROM
10-
mv6 WITH (
16+
t6 WITH (
1117
connector = 'cassandra',
1218
type = 'append-only',
1319
force_append_only='true',
@@ -17,17 +23,53 @@ FROM
1723
cassandra.datacenter = 'datacenter1',
1824
);
1925

26+
statement ok
27+
CREATE SINK s7
28+
FROM
29+
t7 WITH (
30+
connector = 'cassandra',
31+
type = 'append-only',
32+
force_append_only='true',
33+
cassandra.url = 'cassandra-server:9042',
34+
cassandra.keyspace = 'demo',
35+
cassandra.table = 'Test_uppercase',
36+
cassandra.datacenter = 'datacenter1',
37+
);
38+
2039
statement ok
2140
INSERT INTO t6 VALUES (1, 1, 1, 1.1, 1.2, 'test', '2013-01-01', '2013-01-01 01:01:01+00:00' , false);
2241

42+
statement ok
43+
INSERT INTO t7 VALUES (1, 1, 1);
44+
2345
statement ok
2446
FLUSH;
2547

2648
statement ok
2749
DROP SINK s6;
2850

2951
statement ok
30-
DROP MATERIALIZED VIEW mv6;
52+
DROP TABLE t6;
53+
54+
statement ok
55+
DROP SINK s7;
3156

3257
statement ok
33-
DROP TABLE t6;
58+
DROP TABLE t7;
59+
60+
system ok
61+
${CASSANDRA_DIR}/bin/cqlsh --request-timeout=20 -e "COPY demo.demo_bhv_table TO './query_result.csv' WITH HEADER = false AND ENCODING = 'UTF-8';"
62+
63+
system ok
64+
${CASSANDRA_DIR}/bin/cqlsh --request-timeout=20 -e "COPY demo.\"Test_uppercase\" TO './query_result2.csv' WITH HEADER = false AND ENCODING = 'UTF-8';"
65+
66+
system ok
67+
cat ./query_result.csv
68+
----
69+
1,1,1,1.1,1.2,test,2013-01-01,2013-01-01 01:01:01.000+0000,False
70+
71+
72+
system ok
73+
cat ./query_result2.csv
74+
----
75+
1,1,1

java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public CassandraConfig(
5757
@JsonProperty(value = "type") String type) {
5858
this.url = url;
5959
this.keyspace = keyspace;
60-
this.table = table;
60+
this.table = CassandraUtil.convertCQLIdentifiers(table);
6161
this.datacenter = datacenter;
6262
this.type = type;
6363
}

java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java

+11-4
Original file line numberDiff line numberDiff line change
@@ -193,8 +193,15 @@ public void drop() {
193193

194194
private String createInsertStatement(String tableName, TableSchema tableSchema) {
195195
String[] columnNames = tableSchema.getColumnNames();
196-
String columnNamesString = String.join(", ", columnNames);
196+
String columnNamesString =
197+
Arrays.stream(columnNames)
198+
.map(columnName -> CassandraUtil.convertCQLIdentifiers(columnName))
199+
.collect(Collectors.joining(", "));
197200
String placeholdersString = String.join(", ", Collections.nCopies(columnNames.length, "?"));
201+
System.out.println(
202+
String.format(
203+
"INSERT INTO %s (%s) VALUES (%s)",
204+
tableName, columnNamesString, placeholdersString));
198205
return String.format(
199206
"INSERT INTO %s (%s) VALUES (%s)",
200207
tableName, columnNamesString, placeholdersString);
@@ -204,11 +211,11 @@ private String createUpdateStatement(String tableName, TableSchema tableSchema)
204211
List<String> primaryKeys = tableSchema.getPrimaryKeys();
205212
String setClause = // cassandra does not allow SET on primary keys
206213
nonKeyColumns.stream()
207-
.map(columnName -> columnName + " = ?")
214+
.map(columnName -> CassandraUtil.convertCQLIdentifiers(columnName) + " = ?")
208215
.collect(Collectors.joining(", "));
209216
String whereClause =
210217
primaryKeys.stream()
211-
.map(columnName -> columnName + " = ?")
218+
.map(columnName -> CassandraUtil.convertCQLIdentifiers(columnName) + " = ?")
212219
.collect(Collectors.joining(" AND "));
213220
return String.format("UPDATE %s SET %s WHERE %s", tableName, setClause, whereClause);
214221
}
@@ -217,7 +224,7 @@ private static String createDeleteStatement(String tableName, TableSchema tableS
217224
List<String> primaryKeys = tableSchema.getPrimaryKeys();
218225
String whereClause =
219226
primaryKeys.stream()
220-
.map(columnName -> columnName + " = ?")
227+
.map(columnName -> CassandraUtil.convertCQLIdentifiers(columnName) + " = ?")
221228
.collect(Collectors.joining(" AND "));
222229
return String.format("DELETE FROM %s WHERE %s", tableName, whereClause);
223230
}

java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java

+4
Original file line numberDiff line numberDiff line change
@@ -166,4 +166,8 @@ public static Object convertRow(Object value, TypeName typeName) {
166166
.asRuntimeException();
167167
}
168168
}
169+
170+
public static String convertCQLIdentifiers(String identifier) {
171+
return "\"" + identifier + "\"";
172+
}
169173
}

0 commit comments

Comments
 (0)