Skip to content

Commit 90b8169

Browse files
committed
Extract base class from DeltaLakeHistoryTable
1 parent 8812e0d commit 90b8169

File tree

2 files changed

+220
-164
lines changed

2 files changed

+220
-164
lines changed
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.deltalake;
15+
16+
import com.google.common.collect.ImmutableList;
17+
import io.trino.filesystem.TrinoFileSystem;
18+
import io.trino.filesystem.TrinoFileSystemFactory;
19+
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
20+
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
21+
import io.trino.plugin.deltalake.transactionlog.Transaction;
22+
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
23+
import io.trino.plugin.deltalake.util.PageListBuilder;
24+
import io.trino.spi.Page;
25+
import io.trino.spi.TrinoException;
26+
import io.trino.spi.connector.ConnectorPageSource;
27+
import io.trino.spi.connector.ConnectorSession;
28+
import io.trino.spi.connector.ConnectorTableMetadata;
29+
import io.trino.spi.connector.ConnectorTransactionHandle;
30+
import io.trino.spi.connector.EmptyPageSource;
31+
import io.trino.spi.connector.FixedPageSource;
32+
import io.trino.spi.connector.SchemaTableName;
33+
import io.trino.spi.connector.SystemTable;
34+
import io.trino.spi.predicate.Domain;
35+
import io.trino.spi.predicate.Range;
36+
import io.trino.spi.predicate.TupleDomain;
37+
import io.trino.spi.type.TypeManager;
38+
39+
import java.io.IOException;
40+
import java.util.List;
41+
import java.util.Map;
42+
import java.util.Optional;
43+
import java.util.stream.IntStream;
44+
45+
import static com.google.common.collect.MoreCollectors.onlyElement;
46+
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA;
47+
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir;
48+
import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.getEntriesFromJson;
49+
import static java.util.Objects.requireNonNull;
50+
51+
public abstract class BaseTransactionsTable
52+
implements SystemTable
53+
{
54+
private final SchemaTableName tableName;
55+
private final String tableLocation;
56+
private final TrinoFileSystemFactory fileSystemFactory;
57+
private final TransactionLogAccess transactionLogAccess;
58+
private final ConnectorTableMetadata tableMetadata;
59+
60+
public BaseTransactionsTable(
61+
SchemaTableName tableName,
62+
String tableLocation,
63+
TrinoFileSystemFactory fileSystemFactory,
64+
TransactionLogAccess transactionLogAccess,
65+
TypeManager typeManager,
66+
ConnectorTableMetadata tableMetadata)
67+
{
68+
requireNonNull(typeManager, "typeManager is null");
69+
this.tableName = requireNonNull(tableName, "tableName is null");
70+
this.tableLocation = requireNonNull(tableLocation, "tableLocation is null");
71+
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
72+
this.transactionLogAccess = requireNonNull(transactionLogAccess, "transactionLogAccess is null");
73+
this.tableMetadata = requireNonNull(tableMetadata, "tableMetadata is null");
74+
}
75+
76+
@Override
77+
public Distribution getDistribution()
78+
{
79+
return Distribution.SINGLE_COORDINATOR;
80+
}
81+
82+
@Override
83+
public ConnectorTableMetadata getTableMetadata()
84+
{
85+
return tableMetadata;
86+
}
87+
88+
@Override
89+
public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
90+
{
91+
long snapshotVersion;
92+
try {
93+
// Verify the transaction log is readable
94+
SchemaTableName baseTableName = new SchemaTableName(tableName.getSchemaName(), DeltaLakeTableName.tableNameFrom(tableName.getTableName()));
95+
TableSnapshot tableSnapshot = transactionLogAccess.loadSnapshot(session, baseTableName, tableLocation, Optional.empty());
96+
snapshotVersion = tableSnapshot.getVersion();
97+
transactionLogAccess.getMetadataEntry(session, tableSnapshot);
98+
}
99+
catch (IOException e) {
100+
throw new TrinoException(DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA, "Unable to load table metadata from location: " + tableLocation, e);
101+
}
102+
103+
int versionColumnIndex = IntStream.range(0, tableMetadata.getColumns().size())
104+
.filter(i -> tableMetadata.getColumns().get(i).getName().equals("version"))
105+
.boxed()
106+
.collect(onlyElement());
107+
108+
Optional<Long> startVersionExclusive = Optional.empty();
109+
Optional<Long> endVersionInclusive = Optional.empty();
110+
111+
if (constraint.getDomains().isPresent()) {
112+
Map<Integer, Domain> domains = constraint.getDomains().get();
113+
if (domains.containsKey(versionColumnIndex)) {
114+
Domain versionDomain = domains.get(versionColumnIndex); // The zero value here relies on the column ordering defined in the constructor
115+
Range range = versionDomain.getValues().getRanges().getSpan();
116+
if (range.isSingleValue()) {
117+
long value = (long) range.getSingleValue();
118+
startVersionExclusive = Optional.of(value - 1);
119+
endVersionInclusive = Optional.of(value);
120+
}
121+
else {
122+
Optional<Long> lowValue = range.getLowValue().map(Long.class::cast);
123+
if (lowValue.isPresent()) {
124+
startVersionExclusive = Optional.of(lowValue.get() - (range.isLowInclusive() ? 1 : 0));
125+
}
126+
127+
Optional<Long> highValue = range.getHighValue().map(Long.class::cast);
128+
if (highValue.isPresent()) {
129+
endVersionInclusive = Optional.of(highValue.get() - (range.isHighInclusive() ? 0 : 1));
130+
}
131+
}
132+
}
133+
}
134+
135+
if (startVersionExclusive.isPresent() && endVersionInclusive.isPresent() && startVersionExclusive.get() >= endVersionInclusive.get()) {
136+
return new EmptyPageSource();
137+
}
138+
139+
if (endVersionInclusive.isEmpty()) {
140+
endVersionInclusive = Optional.of(snapshotVersion);
141+
}
142+
143+
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
144+
PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata);
145+
try {
146+
List<Transaction> transactions = loadNewTailBackward(fileSystem, tableLocation, startVersionExclusive, endVersionInclusive.get()).reversed();
147+
return new FixedPageSource(buildPages(session, pagesBuilder, transactions));
148+
}
149+
catch (TrinoException e) {
150+
throw e;
151+
}
152+
catch (IOException | RuntimeException e) {
153+
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Error getting commit info entries from " + tableLocation, e);
154+
}
155+
}
156+
157+
// Load a section of the Transaction Log JSON entries. Optionally from a given end version (inclusive) through an start version (exclusive)
158+
private static List<Transaction> loadNewTailBackward(
159+
TrinoFileSystem fileSystem,
160+
String tableLocation,
161+
Optional<Long> startVersion,
162+
long endVersion)
163+
throws IOException
164+
{
165+
ImmutableList.Builder<Transaction> transactionsBuilder = ImmutableList.builder();
166+
String transactionLogDir = getTransactionLogDir(tableLocation);
167+
168+
long version = endVersion;
169+
long entryNumber = version;
170+
boolean endOfHead = false;
171+
172+
while (!endOfHead) {
173+
Optional<List<DeltaLakeTransactionLogEntry>> results = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem);
174+
if (results.isPresent()) {
175+
transactionsBuilder.add(new Transaction(version, results.get()));
176+
version = entryNumber;
177+
entryNumber--;
178+
}
179+
else {
180+
// When there is a gap in the transaction log version, indicate the end of the current head
181+
endOfHead = true;
182+
}
183+
if ((startVersion.isPresent() && version == startVersion.get() + 1) || entryNumber < 0) {
184+
endOfHead = true;
185+
}
186+
}
187+
return transactionsBuilder.build();
188+
}
189+
190+
protected abstract List<Page> buildPages(ConnectorSession session, PageListBuilder pagesBuilder, List<Transaction> transactions);
191+
}

0 commit comments

Comments
 (0)