Skip to content

Commit fd05503

Browse files
committed
Cleanup manifest and metadata files when any createTable fails
Previously only metadata file was cleaned up leaving Avro manifest list files present. Now both type of files are removed. Previously only some exceptions caused transaction files to be cleaned. Now all exceptions cause cleanup.
1 parent 33cb3ad commit fd05503

File tree

4 files changed

+162
-6
lines changed

4 files changed

+162
-6
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.iceberg;
15+
16+
import io.trino.spi.TrinoException;
17+
import io.trino.spi.connector.SchemaTableName;
18+
import org.apache.iceberg.exceptions.CleanableFailure;
19+
20+
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_COMMIT_ERROR;
21+
import static java.lang.String.format;
22+
23+
public class CreateTableException
24+
extends TrinoException
25+
implements CleanableFailure
26+
{
27+
public CreateTableException(Throwable throwable, SchemaTableName tableName)
28+
{
29+
super(ICEBERG_COMMIT_ERROR, format("Failed to create table %s: %s", tableName, throwable.getMessage()), throwable);
30+
}
31+
}

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/AbstractMetastoreTableOperations.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,13 @@
1616
import io.trino.annotation.NotThreadSafe;
1717
import io.trino.metastore.PrincipalPrivileges;
1818
import io.trino.metastore.Table;
19-
import io.trino.plugin.hive.TableAlreadyExistsException;
2019
import io.trino.plugin.hive.metastore.MetastoreUtil;
2120
import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore;
21+
import io.trino.plugin.iceberg.CreateTableException;
2222
import io.trino.plugin.iceberg.UnknownTableTypeException;
2323
import io.trino.plugin.iceberg.catalog.AbstractIcebergTableOperations;
2424
import io.trino.spi.TrinoException;
2525
import io.trino.spi.connector.ConnectorSession;
26-
import io.trino.spi.connector.SchemaNotFoundException;
2726
import io.trino.spi.connector.TableNotFoundException;
2827
import org.apache.iceberg.TableMetadata;
2928
import org.apache.iceberg.io.FileIO;
@@ -124,10 +123,11 @@ protected final void commitNewTable(TableMetadata metadata)
124123
try {
125124
metastore.createTable(table, privileges);
126125
}
127-
catch (SchemaNotFoundException | TableAlreadyExistsException e) {
128-
// clean up metadata files corresponding to the current transaction
126+
catch (Exception e) {
127+
// clean up metadata file corresponding to the current transaction
129128
fileIo.deleteFile(newMetadataLocation);
130-
throw e;
129+
// wrap exception in CleanableFailure to ensure that manifest list Avro files are also cleaned up
130+
throw new CreateTableException(e, getSchemaTableName());
131131
}
132132
}
133133

plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8162,7 +8162,7 @@ public void testDynamicFilterWithExplicitPartitionFilter()
81628162
@Override
81638163
protected void verifyTableNameLengthFailurePermissible(Throwable e)
81648164
{
8165-
assertThat(e).hasMessageMatching("Table name must be shorter than or equal to '128' characters but got .*");
8165+
assertThat(e).hasMessageMatching(".*Table name must be shorter than or equal to '128' characters but got .*");
81668166
}
81678167

81688168
@Test
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.iceberg.catalog.file;
15+
16+
import io.trino.Session;
17+
import io.trino.filesystem.local.LocalFileSystemFactory;
18+
import io.trino.metastore.HiveMetastore;
19+
import io.trino.metastore.PrincipalPrivileges;
20+
import io.trino.metastore.Table;
21+
import io.trino.plugin.hive.NodeVersion;
22+
import io.trino.plugin.hive.metastore.HiveMetastoreConfig;
23+
import io.trino.plugin.hive.metastore.file.FileHiveMetastore;
24+
import io.trino.plugin.hive.metastore.file.FileHiveMetastoreConfig;
25+
import io.trino.plugin.iceberg.TestingIcebergPlugin;
26+
import io.trino.spi.connector.SchemaNotFoundException;
27+
import io.trino.testing.AbstractTestQueryFramework;
28+
import io.trino.testing.DistributedQueryRunner;
29+
import org.junit.jupiter.api.AfterAll;
30+
import org.junit.jupiter.api.Test;
31+
import org.junit.jupiter.api.TestInstance;
32+
import org.junit.jupiter.api.parallel.Execution;
33+
34+
import java.nio.file.Files;
35+
import java.nio.file.Path;
36+
import java.util.Optional;
37+
import java.util.concurrent.atomic.AtomicReference;
38+
39+
import static com.google.common.io.MoreFiles.deleteRecursively;
40+
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
41+
import static io.trino.testing.TestingNames.randomNameSuffix;
42+
import static io.trino.testing.TestingSession.testSessionBuilder;
43+
import static org.assertj.core.api.Assertions.assertThat;
44+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
45+
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;
46+
import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD;
47+
48+
@TestInstance(PER_CLASS)
49+
@Execution(SAME_THREAD)
50+
public class TestIcebergFileMetastoreCreateTableFailure
51+
extends AbstractTestQueryFramework
52+
{
53+
private static final String ICEBERG_CATALOG = "iceberg";
54+
private static final String SCHEMA_NAME = "test_schema";
55+
56+
private Path dataDirectory;
57+
private HiveMetastore metastore;
58+
private final AtomicReference<RuntimeException> testException = new AtomicReference<>();
59+
60+
@Override
61+
protected DistributedQueryRunner createQueryRunner()
62+
throws Exception
63+
{
64+
this.dataDirectory = Files.createTempDirectory("test_iceberg_create_table_failure");
65+
// Using FileHiveMetastore as approximation of HMS
66+
this.metastore = new FileHiveMetastore(
67+
new NodeVersion("testversion"),
68+
new LocalFileSystemFactory(Path.of(dataDirectory.toString())),
69+
new HiveMetastoreConfig().isHideDeltaLakeTables(),
70+
new FileHiveMetastoreConfig()
71+
.setCatalogDirectory("local://"))
72+
{
73+
@Override
74+
public synchronized void createTable(Table table, PrincipalPrivileges principalPrivileges)
75+
{
76+
if (testException.get() != null) {
77+
throw testException.get();
78+
}
79+
}
80+
};
81+
82+
Session session = testSessionBuilder()
83+
.setCatalog(ICEBERG_CATALOG)
84+
.setSchema(SCHEMA_NAME)
85+
.build();
86+
87+
DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session).build();
88+
queryRunner.installPlugin(new TestingIcebergPlugin(Path.of(dataDirectory.toString()), Optional.of(new TestingIcebergFileMetastoreCatalogModule(metastore))));
89+
queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg");
90+
queryRunner.execute("CREATE SCHEMA " + SCHEMA_NAME);
91+
92+
return queryRunner;
93+
}
94+
95+
@AfterAll
96+
public void cleanup()
97+
throws Exception
98+
{
99+
if (metastore != null) {
100+
metastore.dropDatabase(SCHEMA_NAME, true);
101+
}
102+
if (dataDirectory != null) {
103+
deleteRecursively(dataDirectory, ALLOW_INSECURE);
104+
}
105+
}
106+
107+
@Test
108+
public void testCreateTableFailureMetadataCleanedUp()
109+
{
110+
testException.set(new SchemaNotFoundException("simulated_test_schema"));
111+
String tableName = "test_create_failure_" + randomNameSuffix();
112+
String tableLocation = "local:///" + tableName;
113+
String createTableSql = "CREATE TABLE " + tableName + " (a varchar) WITH (location = '" + tableLocation + "')";
114+
assertThatThrownBy(() -> getQueryRunner().execute(createTableSql))
115+
.hasMessageContaining("Schema simulated_test_schema not found");
116+
117+
Path metadataDirectory = dataDirectory.resolve(tableName, "metadata");
118+
assertThat(metadataDirectory).as("Metadata file should not exist").isEmptyDirectory();
119+
120+
// it should be possible to create a table with the same name after the failure
121+
testException.set(null);
122+
getQueryRunner().execute(createTableSql);
123+
assertThat(metadataDirectory).as("Metadata file should not exist").isNotEmptyDirectory();
124+
}
125+
}

0 commit comments

Comments
 (0)