Skip to content

Commit 6264fb5

Browse files
committed
attempts
1 parent fff1950 commit 6264fb5

File tree

2 files changed

+32
-78
lines changed

2 files changed

+32
-78
lines changed

fastparquet/core.py

Lines changed: 30 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -224,13 +224,6 @@ def read_data_page_v2(infile, schema_helper, se, data_header2, cmd,
224224
b is delta encoded; c is dict encoded
225225
226226
"""
227-
if data_header2.encoding not in [parquet_thrift.Encoding.PLAIN_DICTIONARY,
228-
parquet_thrift.Encoding.RLE_DICTIONARY,
229-
parquet_thrift.Encoding.RLE,
230-
parquet_thrift.Encoding.PLAIN,
231-
parquet_thrift.Encoding.DELTA_BINARY_PACKED
232-
]:
233-
raise NotImplementedError
234227
size = (ph.compressed_page_size - data_header2.repetition_levels_byte_length -
235228
data_header2.definition_levels_byte_length)
236229
data = infile.tell() + data_header2.definition_levels_byte_length + data_header2.repetition_levels_byte_length
@@ -252,11 +245,8 @@ def read_data_page_v2(infile, schema_helper, se, data_header2, cmd,
252245
bit_width = encoding.width_from_max_int(max_def)
253246
# not the same as read_data(), because we know the length
254247
io_obj = encoding.NumpyIO(infile.read(data_header2.definition_levels_byte_length))
255-
if nullable:
256-
defi = assign._mask
257-
else:
258-
# TODO: in tabular data, nulls arrays could be reused for each column
259-
defi = np.empty(data_header2.num_values, dtype=np.uint8)
248+
defi = np.empty(data_header2.num_values, dtype=np.uint8)
249+
260250
encoding.read_rle_bit_packed_hybrid(io_obj, bit_width, data_header2.num_values,
261251
encoding.NumpyIO(defi), itemsize=1)
262252
if max_rep:
@@ -265,41 +255,21 @@ def read_data_page_v2(infile, schema_helper, se, data_header2, cmd,
265255
else:
266256
np.not_equal(defi.view("uint8"), max_def, out=defi)
267257
nulls = defi.view(np.bool_)
258+
if nullable:
259+
assign._mask[:] = nulls[row_filter]
268260
infile.seek(data)
269261

270-
# input and output element sizes match
271-
see = se.type_length == assign.dtype.itemsize * 8 or simple.get(se.type).itemsize == assign.dtype.itemsize
272-
# can read-into
273-
into0 = ((use_cat or converts_inplace(se) and see)
274-
and data_header2.num_nulls == 0
275-
and max_rep == 0 and assign.dtype.kind != "O" and row_filter is None
276-
and assign.dtype.kind not in "Mm") # TODO: this can be done in place but is complex
277262
if row_filter is None:
278263
row_filter = Ellipsis
279264
# can decompress-into
280265
if data_header2.is_compressed is None:
281266
data_header2.is_compressed = True
282-
into = (data_header2.is_compressed and rev_map[cmd.codec] in decom_into
283-
and into0)
284267
if nullable:
285268
assign = assign._data
286269

287270
uncompressed_page_size = (ph.uncompressed_page_size - data_header2.definition_levels_byte_length -
288271
data_header2.repetition_levels_byte_length)
289-
if into0 and data_header2.encoding == parquet_thrift.Encoding.PLAIN and (
290-
not data_header2.is_compressed or cmd.codec == parquet_thrift.CompressionCodec.UNCOMPRESSED
291-
):
292-
# PLAIN read directly into output (a copy for remote files)
293-
assign[num:num+n_values].view('uint8')[:] = infile.read(size)
294-
convert(assign[num:num+n_values], se)
295-
elif into and data_header2.encoding == parquet_thrift.Encoding.PLAIN:
296-
# PLAIN decompress directly into output
297-
decomp = decom_into[rev_map[cmd.codec]]
298-
decomp(np.frombuffer(infile.read(size), dtype="uint8"),
299-
assign[num:num+data_header2.num_values].view('uint8'))
300-
convert(assign[num:num+n_values], se)
301-
elif data_header2.encoding == parquet_thrift.Encoding.PLAIN:
302-
# PLAIN, but with nulls or not in-place conversion
272+
if data_header2.encoding == parquet_thrift.Encoding.PLAIN:
303273
codec = cmd.codec if data_header2.is_compressed else "UNCOMPRESSED"
304274
raw_bytes = decompress_data(np.frombuffer(infile.read(size), "uint8"),
305275
uncompressed_page_size, codec)
@@ -310,7 +280,7 @@ def read_data_page_v2(infile, schema_helper, se, data_header2, cmd,
310280
utf=se.converted_type == 0)
311281
if data_header2.num_nulls:
312282
if nullable:
313-
assign[num:num+data_header2.num_values][~nulls[row_filter]] = convert(values, se)[row_filter]
283+
assign[num:num+data_header2.num_values][~nulls[row_filter]] = convert(values, se)[row_filter[~nulls]]
314284
else:
315285
assign[num:num+data_header2.num_values][nulls[row_filter]] = None # or nan or nat
316286
if row_filter is Ellipsis:
@@ -323,53 +293,40 @@ def read_data_page_v2(infile, schema_helper, se, data_header2, cmd,
323293
parquet_thrift.Encoding.PLAIN_DICTIONARY,
324294
parquet_thrift.Encoding.RLE_DICTIONARY,
325295
]) or (data_header2.encoding == parquet_thrift.Encoding.RLE):
326-
# DICTIONARY or BOOL direct decode RLE into output (no nulls)
327296
codec = cmd.codec if data_header2.is_compressed else "UNCOMPRESSED"
328297
raw_bytes = np.frombuffer(infile.read(size), dtype='uint8')
329298
raw_bytes = decompress_data(raw_bytes, uncompressed_page_size, codec)
330299
pagefile = encoding.NumpyIO(raw_bytes)
331300
if data_header2.encoding != parquet_thrift.Encoding.RLE:
332301
# TODO: check this bit; is the varint read only row byte-exact fastpath?
333302
bit_width = pagefile.read_byte()
334-
encoding.read_unsigned_var_int(pagefile)
303+
#encoding.read_unsigned_var_int(pagefile)
335304
else:
336305
bit_width = 1
337306
pagefile.seek(4, 1)
338-
if bit_width in [8, 16, 32] and selfmade:
339-
# special fastpath for cats
340-
outbytes = raw_bytes[pagefile.tell():]
341-
if len(outbytes) == assign[num:num+data_header2.num_values].nbytes:
342-
assign[num:num+data_header2.num_values].view('uint8')[row_filter] = outbytes[row_filter]
343-
else:
344-
if data_header2.num_nulls == 0:
345-
assign[num:num+data_header2.num_values][row_filter] = outbytes[row_filter]
346-
else:
347-
if row_filter is Ellipsis:
348-
assign[num:num+data_header2.num_values][~nulls] = outbytes
349-
else:
350-
assign[num:num + data_header2.num_values][~nulls[row_filter]] = outbytes[~nulls * row_filter]
351-
assign[num:num+data_header2.num_values][nulls[row_filter]] = -1
307+
if data_header2.num_nulls == 0:
308+
encoding.read_rle_bit_packed_hybrid(
309+
pagefile,
310+
bit_width,
311+
uncompressed_page_size,
312+
encoding.NumpyIO(assign[num:num+data_header2.num_values].view('uint8')),
313+
itemsize=bit_width // 8
314+
)
352315
else:
353-
if data_header2.num_nulls == 0:
354-
encoding.read_rle_bit_packed_hybrid(
355-
pagefile,
356-
bit_width,
357-
uncompressed_page_size,
358-
encoding.NumpyIO(assign[num:num+data_header2.num_values].view('uint8')),
359-
itemsize=bit_width
360-
)
361-
else:
362-
temp = np.empty(data_header2.num_values, assign.dtype)
363-
encoding.read_rle_bit_packed_hybrid(
364-
pagefile,
365-
bit_width,
366-
uncompressed_page_size,
367-
encoding.NumpyIO(temp.view('uint8')),
368-
itemsize=bit_width
369-
)
370-
if not nullable:
371-
assign[num:num+data_header2.num_values][nulls[row_filter]] = None
372-
assign[num:num+data_header2.num_values][~nulls[row_filter]] = temp[row_filter]
316+
temp = np.empty(data_header2.num_values - data_header2.num_nulls, assign.dtype)
317+
encoding.read_rle_bit_packed_hybrid(
318+
pagefile,
319+
bit_width,
320+
uncompressed_page_size,
321+
encoding.NumpyIO(temp.view('uint8')),
322+
itemsize=bit_width // 8
323+
)
324+
if not nullable:
325+
part = assign[num:num+data_header2.num_values][nulls[row_filter]]
326+
if len(part) and not use_cat:
327+
# categories already have -1 everywhere
328+
part[:] = None
329+
assign[num:num+data_header2.num_values][~nulls[row_filter]] = temp[row_filter[~nulls]]
373330

374331
elif data_header2.encoding in [
375332
parquet_thrift.Encoding.PLAIN_DICTIONARY,
@@ -403,7 +360,7 @@ def read_data_page_v2(infile, schema_helper, se, data_header2, cmd,
403360
else:
404361
assign[num:num+data_header2.num_values][row_filter] = dic[out][row_filter]
405362
elif data_header2.encoding == parquet_thrift.Encoding.DELTA_BINARY_PACKED:
406-
assert data_header2.num_nulls == 0, "null delta-int not implemented"
363+
assert data_header2.num_nulls == 0, "nullable delta-int not implemented"
407364
codec = cmd.codec if data_header2.is_compressed else "UNCOMPRESSED"
408365
raw_bytes = decompress_data(np.frombuffer(infile.read(size), "uint8"),
409366
uncompressed_page_size, codec)
@@ -420,9 +377,6 @@ def read_data_page_v2(infile, schema_helper, se, data_header2, cmd,
420377
)
421378
assign[num:num+data_header2.num_values][row_filter] = convert(out, se)[row_filter]
422379
else:
423-
# codec = cmd.codec if data_header2.is_compressed else "UNCOMPRESSED"
424-
# raw_bytes = decompress_data(infile.read(size),
425-
# ph.uncompressed_page_size, codec)
426380
raise NotImplementedError
427381
return data_header2.num_values
428382

fastparquet/parquet.thrift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/**
1+
df_filtered = pf.to_pandas(filters=[('filter_col', '>', 6)], row_filter=True)v2/**
22
* Licensed to the Apache Software Foundation (ASF) under one
33
* or more contributor license agreements. See the NOTICE file
44
* distributed with this work for additional information
@@ -11,7 +11,7 @@
1111
*
1212
* Unless required by applicable law or agreed to in writing,
1313
* software distributed under the License is distributed on an
14-
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANYdf_filtered = pf.to_pandas(filters=[('filter_col', '>', 6)], row_filter=True)
1515
* KIND, either express or implied. See the License for the
1616
* specific language governing permissions and limitations
1717
* under the License.

0 commit comments

Comments
 (0)