-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stream large transaction log jsons instead of storing in-memory #24491
Conversation
@raunaqmorarka very interesting! I've skimmed the code, would you consider this an alternative approach to the metadata/protocol caching we were trying to push in #20437, or more as a separate improvement? I assume they'll impact the same sort of time-consuming operations. (Asking as I'm keen on getting out fork closer to upstream; it doesn't really matter which approach ends up being used as long as performance improves 👍 ) |
fd27902
to
05a9faa
Compare
Thanks for pointing out that PR, I hadn't looked at it before. For me the priority was to deal gracefully with transaction log jsons that are GBs in size. I've tweaked the PR a bit to be better about caching metadata/protocol entries. Feel free to try this out on your workloads or add review comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks promissing to me.
...-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java
Show resolved
Hide resolved
...src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TransactionLogEntries.java
Outdated
Show resolved
Hide resolved
* Returns a stream of DeltaLakeTransactionLogEntry | ||
* Caller has the responsibility to close this stream as it potentially holds an open file | ||
*/ | ||
public Stream<DeltaLakeTransactionLogEntry> getEntries(TrinoFileSystem fileSystem) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider passing inputFile
from the constructor for eventually creating the stream instead of specifying again the TrinoFileSystem
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this would likely make the interface of DeltaLakeCommitSummary
cleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TransactionLogEntries ends up being a part of TableSnapshot cache which is used across queries. So I don't think it would be a good idea to hold on to inputFile created from one query's context to other queries.
@@ -170,7 +171,7 @@ private static List<Transaction> loadNewTailBackward( | |||
boolean endOfHead = false; | |||
|
|||
while (!endOfHead) { | |||
Optional<List<DeltaLakeTransactionLogEntry>> results = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem); | |||
Optional<TransactionLogEntries> results = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem, DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you could maybe retrieve through TransactionLogAccess
the transactionLogMaxCachedSize
instead of using hardcoded values.
I'm refferring to DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE
Same for DeltaLakeMetadata
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think its useful to cache transaction log in those contexts, so i've just set it to 0 bytes to be clearer
@@ -84,6 +85,7 @@ public void testExplicitPropertyMappings() | |||
Map<String, String> properties = ImmutableMap.<String, String>builder() | |||
.put("delta.metadata.cache-ttl", "10m") | |||
.put("delta.metadata.cache-max-retained-size", "1GB") | |||
.put("delta.transaction-log.max-cached-file-size", "1MB") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a test where this value is small enough to actually cause the dynamic retrieval of the transaction log entries ?
I'm looking to have TransactionLogEntries
code path tested when cachedEntries
is not present.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've used delta.transaction-log.max-cached-file-size=0B
in TestDeltaLakeBasic
import static io.airlift.slice.SizeOf.instanceSize; | ||
import static io.airlift.slice.SizeOf.sizeOf; | ||
|
||
public record MetadataAndProtocolEntries(Optional<MetadataEntry> metadata, Optional<ProtocolEntry> protocol) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MetadataAndProtocolEntries -> TableDescriptorEntries
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather make it clearer that it's just metadata and protocol entries, a more generic name would imply it could be something more which is not clear to me.
this(Optional.ofNullable(metadata), Optional.ofNullable(protocol)); | ||
} | ||
|
||
public Stream<Object> stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite sure whether we need this method - it is used only in a single place.
Probably simple getters for metadata and protocol would be more straightforward.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method makes the calling code simpler
@@ -427,7 +442,7 @@ public static ImmutableList<DeltaLakeColumnMetadata> columnsWithStats(List<Delta | |||
.collect(toImmutableList()); | |||
} | |||
|
|||
private Stream<AddFileEntry> activeAddEntries(Stream<DeltaLakeTransactionLogEntry> checkpointEntries, List<Transaction> transactions) | |||
private Stream<AddFileEntry> activeAddEntries(Stream<DeltaLakeTransactionLogEntry> checkpointEntries, List<Transaction> transactions, TrinoFileSystem fileSystem) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To investigate whether we can do without passing fileSystem
given that TransactionLogEntries
holds already a pointer to TrinoInputFile
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See #24491 (comment)
fileSystemFactory.create(session), | ||
fileFormatDataSourceStats)) { | ||
return entries.collect(toImmutableMap(Object::getClass, Function.identity(), (first, second) -> second)); | ||
if (tableSnapshot.getCachedMetadata().isEmpty() || tableSnapshot.getCachedProtocol().isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's room for improvement here.
If we're looking only for metadata & protocol, we should be reading backwards from the highest transaction log towards the checkpoint.
In case you apply this request, please also test your algorithm via TestDeltaLakeFileOperations
as well to ensure that e.g., the checkpoint is not read when doing some table changes in the subsequent transaction logs for a SHOW CREATE TABLE
statement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that would be better, but I'd like to limit scope of this PR. The same improvement could apply to getMetadataEntry and getProtocolEntry as well. You can file a GH issue about it for tracking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc4cc3a
to
d714365
Compare
...n/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java
Outdated
Show resolved
Hide resolved
...ain/java/io/trino/plugin/deltalake/transactionlog/checkpoint/MetadataAndProtocolEntries.java
Outdated
Show resolved
Hide resolved
...ain/java/io/trino/plugin/deltalake/transactionlog/checkpoint/MetadataAndProtocolEntries.java
Outdated
Show resolved
Hide resolved
Operations fetching metadata and protocol entries can skip reading the rest of the json file after those entries are found
d714365
to
0301c34
Compare
Description
Operations fetching metadata and protocol entries can skip reading the rest of the json file after those entries are found
Additional context and related issues
On a example transaction log json of 1.5GB, the time taken for simple operations
like register table, DESCRIBE and SELECTs which don't use table statistics (or any read with
set session delta.statistics_enabled=false
) reduces from 18s to under 1s on local machine.Such large transaction log jsons were observed to have been produced by CLONE operation from Apache Spark.
Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
(x) Release notes are required, with the following suggested text: