Skip to content

Commit f2fa12f

Browse files
committed
Simplify and extend for fastparquet
- pass through filters= from open_parquet_file
1 parent dcb167e commit f2fa12f

File tree

3 files changed

+87
-76
lines changed

3 files changed

+87
-76
lines changed

fsspec/caching.py

Lines changed: 13 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import math
77
import os
88
import threading
9-
import warnings
109
from collections import OrderedDict
1110
from concurrent.futures import Future, ThreadPoolExecutor
1211
from itertools import groupby
@@ -660,43 +659,21 @@ def _fetch(self, start: int | None, stop: int | None) -> bytes:
660659
stop = self.size
661660

662661
out = b""
663-
for (loc0, loc1), data in self.data.items():
664-
# If self.strict=False, use zero-padded data
665-
# for reads beyond the end of a "known" buffer
662+
for loc0, loc1 in sorted(self.data):
663+
if (loc0 <= start < loc1) and (loc0 <= stop <= loc1):
664+
# entirely within the block
665+
off = start - loc0
666+
return self.data[(loc0, loc1)][off : off + stop - start]
666667
if loc0 <= start < loc1:
668+
# found the start
667669
off = start - loc0
668-
out = data[off : off + stop - start]
669-
if not self.strict or loc0 <= stop <= loc1:
670-
# The request is within a known range, or
671-
# it begins within a known range, and we
672-
# are allowed to pad reads beyond the
673-
# buffer with zero
674-
out += b"\x00" * (stop - start - len(out))
675-
self.hit_count += 1
676-
return out
677-
else:
678-
# The request ends outside a known range,
679-
# and we are being "strict" about reads
680-
# beyond the buffer
681-
start = loc1
682-
break
683-
684-
# We only get here if there is a request outside the
685-
# known parts of the file. In an ideal world, this
686-
# should never happen
687-
if self.fetcher is None:
688-
# We cannot fetch the data, so raise an error
689-
raise ValueError(f"Read is outside the known file parts: {(start, stop)}. ")
690-
# We can fetch the data, but should warn the user
691-
# that this may be slow
692-
warnings.warn(
693-
f"Read is outside the known file parts: {(start, stop)}. "
694-
f"IO/caching performance may be poor!"
695-
)
696-
logger.debug(f"KnownPartsOfAFile cache fetching {start}-{stop}")
697-
self.total_requested_bytes += stop - start
698-
self.miss_count += 1
699-
return out + super()._fetch(start, stop)
670+
out = self.data[(loc0, loc1)][off : off + stop - start]
671+
if start < loc0 and stop > loc1:
672+
# the whole block
673+
out += self.data[(loc0, loc1)]
674+
if loc0 <= stop <= loc1:
675+
return out + self.data[(loc0, loc1)][: stop - loc0]
676+
raise ValueError
700677

701678

702679
class UpdatableLRU(Generic[P, T]):

fsspec/parquet.py

Lines changed: 55 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import warnings
44

55
from .core import url_to_fs
6+
from .spec import AbstractBufferedFile
67
from .utils import merge_offset_ranges
78

89
# Parquet-Specific Utilities for fsspec
@@ -14,6 +15,14 @@
1415
# on remote file systems.
1516

1617

18+
class AlreadyBufferedFile(AbstractBufferedFile):
19+
def _fetch_range(self, start, end):
20+
raise NotImplementedError
21+
22+
def close(self):
23+
pass
24+
25+
1726
def open_parquet_file(
1827
path,
1928
mode="rb",
@@ -22,11 +31,11 @@ def open_parquet_file(
2231
columns=None,
2332
row_groups=None,
2433
storage_options=None,
25-
strict=False,
2634
engine="auto",
2735
max_gap=64_000,
2836
max_block=256_000_000,
2937
footer_sample_size=1_000_000,
38+
filters=None,
3039
**kwargs,
3140
):
3241
"""
@@ -72,12 +81,6 @@ def open_parquet_file(
7281
storage_options : dict, optional
7382
Used to generate an `AbstractFileSystem` object if `fs` was
7483
not specified.
75-
strict : bool, optional
76-
Whether the resulting `KnownPartsOfAFile` cache should
77-
fetch reads that go beyond a known byte-range boundary.
78-
If `False` (the default), any read that ends outside a
79-
known part will be zero padded. Note that using
80-
`strict=True` may be useful for debugging.
8184
max_gap : int, optional
8285
Neighboring byte ranges will only be merged when their
8386
inter-range gap is <= `max_gap`. Default is 64KB.
@@ -89,6 +92,10 @@ def open_parquet_file(
8992
for the footer metadata. If the sampled bytes do not contain
9093
the footer, a second read request will be required, and
9194
performance will suffer. Default is 1MB.
95+
filters : list[list], optional
96+
List of filters to apply to prevent reading row groups, of the
97+
same format as accepted by the loading engines. Ignored if
98+
``row_groups`` is specified.
9299
**kwargs :
93100
Optional key-word arguments to pass to `fs.open`
94101
"""
@@ -98,10 +105,10 @@ def open_parquet_file(
98105
if fs is None:
99106
fs = url_to_fs(path, **(storage_options or {}))[0]
100107

101-
# For now, `columns == []` not supported. Just use
102-
# default `open` command with `path` input
108+
# For now, `columns == []` not supported, is the same
109+
# as all columns
103110
if columns is not None and len(columns) == 0:
104-
return fs.open(path, mode=mode)
111+
columns = None
105112

106113
# Set the engine
107114
engine = _set_engine(engine)
@@ -118,22 +125,24 @@ def open_parquet_file(
118125
max_gap=max_gap,
119126
max_block=max_block,
120127
footer_sample_size=footer_sample_size,
128+
filters=filters,
121129
)
122130

123131
# Extract file name from `data`
124132
fn = next(iter(data)) if data else path
125133

126134
# Call self.open with "parts" caching
127135
options = kwargs.pop("cache_options", {}).copy()
128-
return fs.open(
129-
fn,
136+
return AlreadyBufferedFile(
137+
fs=None,
138+
path=fn,
130139
mode=mode,
131140
cache_type="parts",
132141
cache_options={
133142
**options,
134143
"data": data.get(fn, {}),
135-
"strict": strict,
136144
},
145+
size=max(_[1] for _ in data.get(fn, {})),
137146
**kwargs,
138147
)
139148

@@ -148,6 +157,7 @@ def _get_parquet_byte_ranges(
148157
max_block=256_000_000,
149158
footer_sample_size=1_000_000,
150159
engine="auto",
160+
filters=None,
151161
):
152162
"""Get a dictionary of the known byte ranges needed
153163
to read a specific column/row-group selection from a
@@ -172,6 +182,7 @@ def _get_parquet_byte_ranges(
172182
row_groups=row_groups,
173183
max_gap=max_gap,
174184
max_block=max_block,
185+
filters=filters,
175186
)
176187

177188
# Get file sizes asynchronously
@@ -183,17 +194,16 @@ def _get_parquet_byte_ranges(
183194
data_starts = []
184195
data_ends = []
185196
add_header_magic = True
186-
if columns is None and row_groups is None:
197+
if columns is None and row_groups is None and filters is None:
187198
# We are NOT selecting specific columns or row-groups.
188199
#
189200
# We can avoid sampling the footers, and just transfer
190201
# all file data with cat_ranges
191202
for i, path in enumerate(paths):
192203
result[path] = {}
193-
for b in range(0, file_sizes[i], max_block):
194-
data_paths.append(path)
195-
data_starts.append(b)
196-
data_ends.append(min(b + max_block, file_sizes[i]))
204+
data_paths.append(path)
205+
data_starts.append(0)
206+
data_ends.append(file_sizes[i])
197207
add_header_magic = False # "Magic" should already be included
198208
else:
199209
# We ARE selecting specific columns or row-groups.
@@ -238,26 +248,30 @@ def _get_parquet_byte_ranges(
238248
# Deal with small-file case.
239249
# Just include all remaining bytes of the file
240250
# in a single range.
241-
if file_sizes[i] < max_block:
242-
if footer_starts[i] > 0:
243-
# Only need to transfer the data if the
244-
# footer sample isn't already the whole file
245-
data_paths.append(path)
246-
data_starts.append(0)
247-
data_ends.append(footer_starts[i])
248-
continue
251+
# if file_sizes[i] < max_block:
252+
# if footer_starts[i] > 0:
253+
# # Only need to transfer the data if the
254+
# # footer sample isn't already the whole file
255+
# data_paths.append(path)
256+
# data_starts.append(0)
257+
# data_ends.append(footer_starts[i])
258+
# continue
249259

250260
# Use "engine" to collect data byte ranges
251261
path_data_starts, path_data_ends = engine._parquet_byte_ranges(
252262
columns,
253263
row_groups=row_groups,
254264
footer=footer_samples[i],
255265
footer_start=footer_starts[i],
266+
filters=filters,
256267
)
257268

258269
data_paths += [path] * len(path_data_starts)
259270
data_starts += path_data_starts
260271
data_ends += path_data_ends
272+
result.setdefault(path, {})[(footer_starts[i], file_sizes[i])] = (
273+
footer_samples[i]
274+
)
261275

262276
# Merge adjacent offset ranges
263277
data_paths, data_starts, data_ends = merge_offset_ranges(
@@ -291,6 +305,7 @@ def _get_parquet_byte_ranges_from_metadata(
291305
row_groups=None,
292306
max_gap=64_000,
293307
max_block=256_000_000,
308+
filters=None,
294309
):
295310
"""Simplified version of `_get_parquet_byte_ranges` for
296311
the case that an engine-specific `metadata` object is
@@ -300,9 +315,7 @@ def _get_parquet_byte_ranges_from_metadata(
300315

301316
# Use "engine" to collect data byte ranges
302317
data_paths, data_starts, data_ends = engine._parquet_byte_ranges(
303-
columns,
304-
row_groups=row_groups,
305-
metadata=metadata,
318+
columns, row_groups=row_groups, metadata=metadata, filters=filters
306319
)
307320

308321
# Merge adjacent offset ranges
@@ -401,16 +414,19 @@ def _parquet_byte_ranges(
401414
metadata=None,
402415
footer=None,
403416
footer_start=None,
417+
filters=None,
404418
):
405419
# Initialize offset ranges and define ParqetFile metadata
406420
pf = metadata
407421
data_paths, data_starts, data_ends = [], [], []
422+
if filters and row_groups:
423+
raise ValueError("filters and row_groups cannot be used together")
408424
if pf is None:
409425
pf = self.fp.ParquetFile(io.BytesIO(footer))
410426

411427
# Convert columns to a set and add any index columns
412428
# specified in the pandas metadata (just in case)
413-
column_set = None if columns is None else set(columns)
429+
column_set = None if columns is None else {c.split(".", 1)[0] for c in columns}
414430
if column_set is not None and hasattr(pf, "pandas_metadata"):
415431
md_index = [
416432
ind
@@ -422,7 +438,12 @@ def _parquet_byte_ranges(
422438

423439
# Check if row_groups is a list of integers
424440
# or a list of row-group metadata
425-
if row_groups and not isinstance(row_groups[0], int):
441+
if filters:
442+
from fastparquet.api import filter_row_groups
443+
444+
row_group_indices = None
445+
row_groups = filter_row_groups(pf, filters)
446+
elif row_groups and not isinstance(row_groups[0], int):
426447
# Input row_groups contains row-group metadata
427448
row_group_indices = None
428449
else:
@@ -486,9 +507,12 @@ def _parquet_byte_ranges(
486507
metadata=None,
487508
footer=None,
488509
footer_start=None,
510+
filters=None,
489511
):
490512
if metadata is not None:
491513
raise ValueError("metadata input not supported for PyarrowEngine")
514+
if filters:
515+
raise NotImplementedError
492516

493517
data_starts, data_ends = [], []
494518
md = self.pq.ParquetFile(io.BytesIO(footer)).metadata

fsspec/tests/test_parquet.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,19 @@
1717
# Define `engine` fixture
1818
FASTPARQUET_MARK = pytest.mark.skipif(not fastparquet, reason="fastparquet not found")
1919
PYARROW_MARK = pytest.mark.skipif(not pq, reason="pyarrow not found")
20-
ANY_ENGINE_MARK = pytest.mark.skipif(
21-
not (fastparquet or pq),
22-
reason="No parquet engine (fastparquet or pyarrow) found",
23-
)
2420

2521

2622
@pytest.fixture(
2723
params=[
2824
pytest.param("fastparquet", marks=FASTPARQUET_MARK),
2925
pytest.param("pyarrow", marks=PYARROW_MARK),
30-
pytest.param("auto", marks=ANY_ENGINE_MARK),
3126
]
3227
)
3328
def engine(request):
3429
return request.param
3530

3631

32+
@pytest.mark.filterwarnings("ignore:.*Not enough data.*")
3733
@pytest.mark.parametrize("columns", [None, ["x"], ["x", "y"], ["z"]])
3834
@pytest.mark.parametrize("max_gap", [0, 64])
3935
@pytest.mark.parametrize("max_block", [64, 256_000_000])
@@ -44,6 +40,10 @@ def test_open_parquet_file(
4440
):
4541
# Pandas required for this test
4642
pd = pytest.importorskip("pandas")
43+
if engine != "fastparquet":
44+
return
45+
if columns == ["z"] and engine == "fastparquet":
46+
columns = ["z.a"] # fastparquet is more specific
4747

4848
# Write out a simple DataFrame
4949
path = os.path.join(str(tmpdir), "test.parquet")
@@ -62,7 +62,7 @@ def test_open_parquet_file(
6262
df.to_parquet(path)
6363

6464
# "Traditional read" (without `open_parquet_file`)
65-
expect = pd.read_parquet(path, columns=columns)
65+
expect = pd.read_parquet(path, columns=columns, engine=engine)
6666

6767
# Use `_get_parquet_byte_ranges` to re-write a
6868
# place-holder file with all bytes NOT required
@@ -106,7 +106,7 @@ def test_open_parquet_file(
106106
max_block=max_block,
107107
footer_sample_size=footer_sample_size,
108108
) as f:
109-
result = pd.read_parquet(f, columns=columns)
109+
result = pd.read_parquet(f, columns=columns, engine=engine)
110110

111111
# Check that `result` matches `expect`
112112
pd.testing.assert_frame_equal(expect, result)
@@ -124,11 +124,21 @@ def test_open_parquet_file(
124124
max_block=max_block,
125125
footer_sample_size=footer_sample_size,
126126
) as f:
127-
result = pd.read_parquet(f, columns=columns)
127+
# TODO: construct directory test
128+
import struct
129+
130+
footer = bytes(pf.fmd.to_bytes())
131+
footer2 = footer + struct.pack(b"<I", len(footer)) + b"PAR1"
132+
f.cache.data[(f.size, f.size + len(footer2))] = footer2
133+
f.size = f.cache.size = f.size + len(footer2)
134+
135+
result = pd.read_parquet(f, columns=columns, engine=engine)
128136
pd.testing.assert_frame_equal(expect, result)
129137
elif engine == "pyarrow":
130138
# Should raise ValueError for "pyarrow"
131-
with pytest.raises(ValueError):
139+
import pyarrow
140+
141+
with pytest.raises((ValueError, pyarrow.ArrowError)):
132142
open_parquet_file(
133143
path,
134144
metadata=["Not-None"],

0 commit comments

Comments
 (0)