Skip to content

Commit ebb28b6

Browse files
committed
Add test
1 parent 5e1f94a commit ebb28b6

File tree

1 file changed

+97
-0
lines changed

1 file changed

+97
-0
lines changed
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package com.linkedin.openhouse.catalog.e2e;
2+
3+
import static org.assertj.core.api.Assertions.*;
4+
5+
import com.linkedin.openhouse.jobs.spark.Operations;
6+
import com.linkedin.openhouse.tablestest.OpenHouseSparkITest;
7+
import java.util.Collections;
8+
import java.util.List;
9+
import org.apache.iceberg.Table;
10+
import org.apache.iceberg.actions.RewriteDataFiles;
11+
import org.apache.iceberg.expressions.Expressions;
12+
import org.apache.iceberg.spark.actions.SparkActions;
13+
import org.apache.spark.sql.Row;
14+
import org.apache.spark.sql.SparkSession;
15+
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
16+
import org.junit.jupiter.api.AfterEach;
17+
import org.junit.jupiter.api.BeforeEach;
18+
import org.junit.jupiter.api.Test;
19+
20+
public class SparkRewriteBucketsTest extends OpenHouseSparkITest {
21+
static final String tableName = "db.test_data_compaction_filter";
22+
private Operations ops;
23+
24+
@BeforeEach
25+
public void setUp() throws Exception {
26+
ops = Operations.withCatalog(getSparkSession(), null);
27+
}
28+
29+
@AfterEach
30+
public void cleanUp() throws Exception {
31+
sql("DROP TABLE IF EXISTS %s", tableName);
32+
}
33+
34+
@Test
35+
public void testBucketPartitionsCanBeFilteredInCompaction() throws NoSuchTableException {
36+
SparkSession spark = ops.spark();
37+
sql(
38+
"CREATE TABLE openhouse.%s (id int, key string) PARTITIONED BY (bucket(2, key))",
39+
tableName);
40+
sql(
41+
"INSERT INTO openhouse.%s VALUES (0, 'a'), (1, 'b'), (2, 'c'), (3, 'd'), (4, 'e')",
42+
tableName);
43+
sql(
44+
"INSERT INTO openhouse.%s VALUES (5, 'a'), (6, 'b'), (7, 'c'), (8, 'd'), (9, 'e')",
45+
tableName);
46+
47+
Table table = ops.getTable(tableName);
48+
49+
RewriteDataFiles.Result result =
50+
SparkActions.get(spark)
51+
.rewriteDataFiles(table)
52+
.filter(Expressions.in(Expressions.bucket("key", 2), 0))
53+
.binPack()
54+
.option("min-input-files", "2")
55+
.execute();
56+
57+
// rewrite bucket 0
58+
assertThat(result.rewrittenDataFilesCount()).isEqualTo(2);
59+
assertThat(result.addedDataFilesCount()).isEqualTo(1);
60+
61+
table.refresh();
62+
63+
result =
64+
SparkActions.get(spark)
65+
.rewriteDataFiles(table)
66+
.filter(Expressions.in(Expressions.bucket("key", 2), 1))
67+
.binPack()
68+
.option("min-input-files", "2")
69+
.execute();
70+
71+
// rewrite bucket 1
72+
assertThat(result.rewrittenDataFilesCount()).isEqualTo(2);
73+
assertThat(result.addedDataFilesCount()).isEqualTo(1);
74+
75+
table.refresh();
76+
77+
result =
78+
SparkActions.get(spark)
79+
.rewriteDataFiles(table)
80+
.filter(Expressions.in(Expressions.bucket("key", 2), 1))
81+
.binPack()
82+
.option("min-input-files", "2")
83+
.execute();
84+
85+
// rewrite bucket 1 and check no-op
86+
assertThat(result.rewrittenDataFilesCount()).isEqualTo(0);
87+
assertThat(result.addedDataFilesCount()).isEqualTo(0);
88+
}
89+
90+
protected List<Row> sql(String query, Object... args) {
91+
List<Row> rows = ops.spark().sql(String.format(query, args)).collectAsList();
92+
if (rows.isEmpty()) {
93+
return Collections.emptyList();
94+
}
95+
return rows;
96+
}
97+
}

0 commit comments

Comments
 (0)