-
Notifications
You must be signed in to change notification settings - Fork 566
Description
Is your feature request related to a problem?
#4112 fixed a correctness bug when DF coalesces batches across file boundaries, assuming stream order is preserved. However, @roeap pointed out we're still relying on ordering invariants for DV row position semantics and log replay that DF doesn't actually guarantee.
Problem
DVs:
consume_dv_mask assumes rows show up in the order they exist in the file, it drains a per-file selection vector from the front. This works fine if:
- Rows for a file come through in order (doesn't have to be contiguous, just monotonic)
- A file doesn't get split across multiple streams/partitions
The DF optimizer can introduce plan shapes that reorder rows or split files across partitions. If it does, we'll silently apply DV bits to the wrong rows. Not great.
#4112 handles mixed file_id batches under order preserving coalesce, but does not protect against upstream operators that break monotonicity or split files across partitions.
Log replay:
This one's more theoretical right now, but if we ever push log replay into a DF plan we need global ordering by commit version for "last write wins" to work correctly. required_input_ordering only gives us per-partition ordering which isn't sufficient, we'd need either single partition execution or a sort preserving merge to get one globally ordered stream.
If multiple actions share a commit version, a deterministic tie breaker (e.g. action index) may also be needed.
Describe the solution you'd like
What I think the contract should be
For DVs: we need monotonic per-file row order and a file can't be split across streams. Interleaving is fine if monotonicity preserved because contiguity isn't required. We should reject plan shapes that imply reordering.
For log replay: We need global total order by version. Local ordering must be paired with global merge or single-partition execution.
Fail-fast
I think we should default to fail-fast on detected violations. Only recover when we can prove order is preserved.
Things that should be fine:
- Batch coalescing (rows stay in order)
- Mixed-file batches that we split by file_id runs
Things that should fail-fast:
- Upstream operator that doesn't preserve order before DV application
- Same file_id showing up in multiple partitions
- repartition_file_scans with DVs active
Detecting "rows arrived out of order" at runtime is not reliable without row position metadata. We need to catch bad plan shapes before execution instead.
Maybe also worth adding a strict/debug mode that errors on file_id reappearance as extra defense? Not sure if that's overkill.
Potential Solutions:
Short term
- Document these ordering contracts
- Add checks that fail on unsafe plan shapes when DVs are active
- Runtime check that
file_idonly shows up in one stream - Test that feeds DV path with shuffled input and verifies we fail fast
Long term
The real fix is making DV semantics order insensitive by carrying explicit (file_path, row_position) through scans. This matches position deletes in other table formats and enables DV filtering under arbitrary repartitioning. Requires upstream support (DF/Parquet exposing row positions).
Describe alternatives you've considered
References
Priority
None
Additional context
No response
Contribution
- I'm willing to submit a pull request for this feature
- I can help with testing this feature
- I can help with documentation for this feature
Metadata
Metadata
Assignees
Labels
Type
Projects
Status