Skip to content

Commit 62d77f7

Browse files
committed
Add $transactions system table to Delta Lake
1 parent 90b8169 commit 62d77f7

File tree

4 files changed

+109
-0
lines changed

4 files changed

+109
-0
lines changed

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4100,6 +4100,12 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema
41004100
fileSystemFactory,
41014101
transactionLogAccess,
41024102
typeManager));
4103+
case TRANSACTIONS -> Optional.of(new DeltaLakeTransactionsTable(
4104+
systemTableName,
4105+
tableLocation,
4106+
fileSystemFactory,
4107+
transactionLogAccess,
4108+
typeManager));
41034109
case PROPERTIES -> Optional.of(new DeltaLakePropertiesTable(systemTableName, tableLocation, transactionLogAccess));
41044110
case PARTITIONS -> Optional.of(new DeltaLakePartitionsTable(session, systemTableName, tableLocation, transactionLogAccess, typeManager));
41054111
};

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableType.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public enum DeltaLakeTableType
1717
{
1818
DATA,
1919
HISTORY,
20+
TRANSACTIONS,
2021
PROPERTIES,
2122
PARTITIONS,
2223
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.deltalake;
15+
16+
import com.google.common.collect.ImmutableList;
17+
import io.airlift.json.JsonCodec;
18+
import io.trino.filesystem.TrinoFileSystemFactory;
19+
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
20+
import io.trino.plugin.deltalake.transactionlog.Transaction;
21+
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
22+
import io.trino.plugin.deltalake.util.PageListBuilder;
23+
import io.trino.spi.Page;
24+
import io.trino.spi.connector.ColumnMetadata;
25+
import io.trino.spi.connector.ConnectorSession;
26+
import io.trino.spi.connector.ConnectorTableMetadata;
27+
import io.trino.spi.connector.SchemaTableName;
28+
import io.trino.spi.type.TypeManager;
29+
import io.trino.spi.type.TypeSignature;
30+
31+
import java.util.List;
32+
33+
import static io.airlift.json.JsonCodec.listJsonCodec;
34+
import static io.trino.spi.type.BigintType.BIGINT;
35+
import static io.trino.spi.type.StandardTypes.JSON;
36+
import static java.util.Objects.requireNonNull;
37+
38+
public class DeltaLakeTransactionsTable
39+
extends BaseTransactionsTable
40+
{
41+
private static final JsonCodec<List<DeltaLakeTransactionLogEntry>> TRANSACTION_LOG_ENTRIES_CODEC = listJsonCodec(DeltaLakeTransactionLogEntry.class);
42+
43+
public DeltaLakeTransactionsTable(
44+
SchemaTableName tableName,
45+
String tableLocation,
46+
TrinoFileSystemFactory fileSystemFactory,
47+
TransactionLogAccess transactionLogAccess,
48+
TypeManager typeManager)
49+
{
50+
super(
51+
tableName,
52+
tableLocation,
53+
fileSystemFactory,
54+
transactionLogAccess,
55+
typeManager,
56+
new ConnectorTableMetadata(
57+
requireNonNull(tableName, "tableName is null"),
58+
ImmutableList.<ColumnMetadata>builder()
59+
.add(new ColumnMetadata("version", BIGINT))
60+
.add(new ColumnMetadata("transaction", typeManager.getType(new TypeSignature(JSON))))
61+
.build()));
62+
}
63+
64+
@Override
65+
protected List<Page> buildPages(ConnectorSession session, PageListBuilder pagesBuilder, List<Transaction> transactions)
66+
{
67+
for (Transaction transaction : transactions) {
68+
pagesBuilder.beginRow();
69+
pagesBuilder.appendBigint(transaction.transactionId());
70+
pagesBuilder.appendVarchar(TRANSACTION_LOG_ENTRIES_CODEC.toJson(transaction.transactionEntries()));
71+
pagesBuilder.endRow();
72+
}
73+
return pagesBuilder.build();
74+
}
75+
}

plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTables.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.google.common.io.Resources;
1818
import io.trino.testing.AbstractTestQueryFramework;
1919
import io.trino.testing.QueryRunner;
20+
import io.trino.testing.sql.TestTable;
2021
import org.intellij.lang.annotations.Language;
2122
import org.junit.jupiter.api.Test;
2223

@@ -25,6 +26,8 @@
2526
import java.nio.file.Path;
2627

2728
import static io.trino.plugin.deltalake.TestingDeltaLakeUtils.copyDirectoryContents;
29+
import static io.trino.testing.TestingAccessControlManager.TestingPrivilegeType.SELECT_COLUMN;
30+
import static io.trino.testing.TestingAccessControlManager.privilege;
2831
import static io.trino.testing.TestingNames.randomNameSuffix;
2932
import static org.assertj.core.api.Assertions.assertThat;
3033

@@ -100,6 +103,30 @@ public void testHistoryTable()
100103
}
101104
}
102105

106+
@Test
107+
void testTransactionsTable()
108+
{
109+
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_transactions", "(col int)")) {
110+
assertThat((String) computeScalar("SELECT transaction FROM \"" + table.getName() + "$transactions\""))
111+
.contains("commitInfo", "protocol", "metaData");
112+
}
113+
}
114+
115+
@Test
116+
void testTransactionsTableAccessControl()
117+
{
118+
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_transactions", "(col int)")) {
119+
// TODO Disallow access to transactions table when the user can't access the base table
120+
assertAccessAllowed(
121+
"SELECT * FROM \"" + table.getName() + "$transactions\"",
122+
privilege(table.getName(), SELECT_COLUMN));
123+
assertAccessDenied(
124+
"SELECT * FROM \"" + table.getName() + "$transactions\"",
125+
"Cannot select from columns .*",
126+
privilege(table.getName() + "$transactions", SELECT_COLUMN));
127+
}
128+
}
129+
103130
@Test
104131
public void testPropertiesTable()
105132
{

0 commit comments

Comments
 (0)