Skip to content

Commit

Permalink
update default blocksize, and add docstring
Browse files Browse the repository at this point in the history
  • Loading branch information
rjzamora committed Nov 6, 2024
1 parent b9af7b7 commit 2ad1867
Showing 1 changed file with 250 additions and 57 deletions.
307 changes: 250 additions & 57 deletions python/dask_cudf/dask_cudf/io/parquet.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from __future__ import annotations

import functools
import itertools
import math
import warnings
from typing import TYPE_CHECKING, Any

import numpy as np
import pandas as pd
Expand All @@ -18,6 +21,7 @@

from dask.dataframe.io.parquet.arrow import _filters_to_expression
from dask.dataframe.io.parquet.core import ParquetFunctionWrapper
from dask.tokenize import tokenize
from dask.utils import parse_bytes

import cudf
Expand All @@ -27,6 +31,32 @@
# Dask-expr imports CudfEngine from this module
from dask_cudf._legacy.io.parquet import CudfEngine # noqa: F401

if TYPE_CHECKING:
from collections.abc import MutableMapping


_DEVICE_SIZE_CACHE: int | None = None
_STATS_CACHE: MutableMapping[str, Any] = {}


def _normalize_blocksize(fraction: float = 0.03125):
global _DEVICE_SIZE_CACHE

try:
# Plan A: Use PyNVML to set the blocksize
# (Default is 1/32 the total memory of device 0)
import pynvml

if _DEVICE_SIZE_CACHE is None:
pynvml.nvmlInit()
handle = pynvml.nvmlDeviceGetHandleByIndex(0)
_DEVICE_SIZE_CACHE = pynvml.nvmlDeviceGetMemoryInfo(handle).total

return int(_DEVICE_SIZE_CACHE * fraction)
except ImportError:
# Fall back to a 256MiB default
return "256MiB"


class NoOp(Elemwise):
# Workaround - Always wrap read_parquet operations
Expand All @@ -45,59 +75,69 @@ def operation(x):
class CudfReadParquetFSSpec(ReadParquetFSSpec):
def approx_statistics(self):
# Use a few files to approximate column-size statistics

# Account for filters
ds_filters = None
if self.filters is not None:
ds_filters = _filters_to_expression(self.filters)

# Use average total_uncompressed_size of three files
n_sample = 3
column_sizes = {}
for i, frag in enumerate(
self._dataset_info["ds"].get_fragments(ds_filters)
):
md = frag.metadata
for rg in range(md.num_row_groups):
row_group = md.row_group(rg)
for col in range(row_group.num_columns):
column = row_group.column(col)
name = column.path_in_schema
if name not in column_sizes:
column_sizes[name] = np.zeros(n_sample, dtype="int64")
column_sizes[name][i] += column.total_uncompressed_size
if (i + 1) >= n_sample:
break

# Reorganize stats to look like arrow-fs version
return {
"columns": [
{
"path_in_schema": name,
"total_uncompressed_size": np.mean(sizes),
}
for name, sizes in column_sizes.items()
]
}
global _STATS_CACHE

key = tokenize(self._dataset_info["ds"].files[:10], self.filters)
try:
return _STATS_CACHE[key]

except KeyError:
# Account for filters
ds_filters = None
if self.filters is not None:
ds_filters = _filters_to_expression(self.filters)

# Use average total_uncompressed_size of three files
n_sample = 3
column_sizes = {}
for i, frag in enumerate(
self._dataset_info["ds"].get_fragments(ds_filters)
):
md = frag.metadata
for rg in range(md.num_row_groups):
row_group = md.row_group(rg)
for col in range(row_group.num_columns):
column = row_group.column(col)
name = column.path_in_schema
if name not in column_sizes:
column_sizes[name] = np.zeros(
n_sample, dtype="int64"
)
column_sizes[name][i] += column.total_uncompressed_size
if (i + 1) >= n_sample:
break

# Reorganize stats to look like arrow-fs version
_STATS_CACHE[key] = {
"columns": [
{
"path_in_schema": name,
"total_uncompressed_size": np.mean(sizes),
}
for name, sizes in column_sizes.items()
]
}
return _STATS_CACHE[key]

@functools.cached_property
def _fusion_compression_factor(self):
# Disable fusion when blocksize=None
if self.blocksize is None:
# Let blocksize=None disable fusion
return 1

# At this point, `blockwise` was already used to
# split/aggregate files. Therefore, we now
# need to figure out whether we should fuse
# the current partitions to handle column
# projection. We don't know if the current
# partitions are multiple files or file fragments.
# Therefore, we need to use a "multiplier" below
# to correct aggregate_files=False partitioning.

if self.operand("columns") is None and self.aggregate_files:
return 1

# At this point, we *may* have used `blockwise`
# already to split or aggregate files. We don't
# *know* if the current partitions correspond to
# individual/full files, multiple/aggregated files
# or partial/split files.
#
# Therefore, we need to use the statistics from
# a few files to estimate the current partition
# size. This size should be similar to `blocksize`
# *if* aggregate_files is True or if the files
# are *smaller* than `blocksize`.

# Step 1: Sample statistics
approx_stats = self.approx_statistics()
projected_size, original_size = 0, 0
col_op = self.operand("columns") or self.columns
Expand All @@ -108,20 +148,35 @@ def _fusion_compression_factor(self):
and split_name[0] in col_op
):
projected_size += col["total_uncompressed_size"]

if original_size < 1 or projected_size < 1:
return 1

# The multiplier corrects for the fact that the original
# files would have been fused for aggregate_files=True
multiplier = 1.0
if not self.aggregate_files:
multiplier = original_size / parse_bytes(self.blocksize)
# Step 2: Estimate the correction factor
# (Correct for possible pre-optimization fusion/splitting)
blocksize = parse_bytes(self.blocksize)
if original_size > blocksize:
# Input files are bigger than blocksize
# and we already split these large files.
# (correction_factor > 1)
correction_factor = original_size / blocksize
elif self.aggregate_files:
# Input files are smaller than blocksize
# and we already aggregate small files.
# (correction_factor == 1)
correction_factor = 1
else:
# Input files are smaller than blocksize
# but we haven't aggregate small files yet.
# (correction_factor < 1)
correction_factor = original_size / blocksize

# Step 3. Estimate column-projection factor
if self.operand("columns") is None:
projection_factor = 1
else:
projection_factor = projected_size / original_size

return max(
(projected_size / original_size) * multiplier,
0.001,
)
return max(projection_factor * correction_factor, 0.001)

def _tune_up(self, parent):
if self._fusion_compression_factor >= 1:
Expand Down Expand Up @@ -473,6 +528,137 @@ def read_parquet_expr(
arrow_to_pandas=None,
**kwargs,
):
"""
Read a Parquet file into a Dask-cuDF DataFrame.
This reads a directory of Parquet data into a DataFrame collection.
Partitioning behavior mostly depends on the ``blocksize`` argument.
.. note::
Dask may automatically resize partitions at optimization time.
Please set ``blocksize=None`` to disable this behavior in Dask cuDF.
(NOTE: This will not disable fusion for the "pandas" backend)
.. note::
Specifying ``filesystem="arrow"`` leverages a complete reimplementation of
the Parquet reader that is solely based on PyArrow. It is faster than the
legacy implementation in some cases, but doesn't yet support all features.
Parameters
----------
path : str or list
Source directory for data, or path(s) to individual parquet files.
Prefix with a protocol like ``s3://`` to read from alternative
filesystems. To read from multiple files you can pass a globstring or a
list of paths, with the caveat that they must all have the same
protocol.
columns : str or list, default None
Field name(s) to read in as columns in the output. By default all
non-index fields will be read (as determined by the pandas parquet
metadata, if present). Provide a single field name instead of a list to
read in the data as a Series.
filters : Union[List[Tuple[str, str, Any]], List[List[Tuple[str, str, Any]]]], default None
List of filters to apply, like ``[[('col1', '==', 0), ...], ...]``.
Using this argument will result in row-wise filtering of the final partitions.
Predicates can be expressed in disjunctive normal form (DNF). This means that
the inner-most tuple describes a single column predicate. These inner predicates
are combined with an AND conjunction into a larger predicate. The outer-most
list then combines all of the combined filters with an OR disjunction.
Predicates can also be expressed as a ``List[Tuple]``. These are evaluated
as an AND conjunction. To express OR in predicates, one must use the
(preferred for "pyarrow") ``List[List[Tuple]]`` notation.
index : str, list or False, default None
Field name(s) to use as the output frame index. By default will be
inferred from the pandas parquet file metadata, if present. Use ``False``
to read all fields as columns.
categories : list or dict, default None
For any fields listed here, if the parquet encoding is Dictionary,
the column will be created with dtype category. Use only if it is
guaranteed that the column is encoded as dictionary in all row-groups.
If a list, assumes up to 2**16-1 labels; if a dict, specify the number
of labels expected; if None, will load categories automatically for
data written by dask, not otherwise.
storage_options : dict, default None
Key/value pairs to be passed on to the file-system backend, if any.
Note that the default file-system backend can be configured with the
``filesystem`` argument, described below.
calculate_divisions : bool, default False
Whether to use min/max statistics from the footer metadata (or global
``_metadata`` file) to calculate divisions for the output DataFrame
collection. Divisions will not be calculated if statistics are missing.
This option will be ignored if ``index`` is not specified and there is
no physical index column specified in the custom "pandas" Parquet
metadata. Note that ``calculate_divisions=True`` may be extremely slow
when no global ``_metadata`` file is present, especially when reading
from remote storage. Set this to ``True`` only when known divisions
are needed for your workload (see :ref:`dataframe-design-partitions`).
ignore_metadata_file : bool, default False
Whether to ignore the global ``_metadata`` file (when one is present).
If ``True``, or if the global ``_metadata`` file is missing, the parquet
metadata may be gathered and processed in parallel. Parallel metadata
processing is currently supported for ``ArrowDatasetEngine`` only.
metadata_task_size : int, default configurable
If parquet metadata is processed in parallel (see ``ignore_metadata_file``
description above), this argument can be used to specify the number of
dataset files to be processed by each task in the Dask graph. If this
argument is set to ``0``, parallel metadata processing will be disabled.
The default values for local and remote filesystems can be specified
with the "metadata-task-size-local" and "metadata-task-size-remote"
config fields, respectively (see "dataframe.parquet").
split_row_groups : 'infer', 'adaptive', bool, or int, default 'infer'
WARNING: The ``split_row_groups`` argument is now deprecated, please use
``blocksize`` instead.
blocksize : int, float or str, default 'default'
The desired size of each output ``DataFrame`` partition in terms of total
(uncompressed) parquet storage space. This argument may be used to split
large files or aggregate small files into the same partition. Use ``None``
for a simple 1:1 mapping between files and partitions. Use a float value
less than 1.0 to specify the fractional size of the partitions with
respect to the total memory of the first NVIDIA GPU on your machine.
Default is 1/32 the total memory of a single GPU.
aggregate_files : bool or str, default None
WARNING: The behavior of ``aggregate_files=True`` is now obsolete
when query-planning is enabled (the default). Small files are now
aggregated automatically according to the ``blocksize`` setting.
Please expect this argument to be deprecated in a future release.
WARNING: Passing a string argument to ``aggregate_files`` will result
in experimental behavior that may be removed at any time.
parquet_file_extension: str, tuple[str], or None, default (".parq", ".parquet", ".pq")
A file extension or an iterable of extensions to use when discovering
parquet files in a directory. Files that don't match these extensions
will be ignored. This argument only applies when ``paths`` corresponds
to a directory and no ``_metadata`` file is present (or
``ignore_metadata_file=True``). Passing in ``parquet_file_extension=None``
will treat all files in the directory as parquet files.
The purpose of this argument is to ensure that the engine will ignore
unsupported metadata files (like Spark's '_SUCCESS' and 'crc' files).
It may be necessary to change this argument if the data files in your
parquet dataset do not end in ".parq", ".parquet", or ".pq".
filesystem: "fsspec", "arrow", or fsspec.AbstractFileSystem backend to use.
dataset: dict, default None
Dictionary of options to use when creating a ``pyarrow.dataset.Dataset`` object.
These options may include a "filesystem" key to configure the desired
file-system backend. However, the top-level ``filesystem`` argument will always
take precedence.
**Note**: The ``dataset`` options may include a "partitioning" key.
However, since ``pyarrow.dataset.Partitioning``
objects cannot be serialized, the value can be a dict of key-word
arguments for the ``pyarrow.dataset.partitioning`` API
(e.g. ``dataset={"partitioning": {"flavor": "hive", "schema": ...}}``).
Note that partitioned columns will not be converted to categorical
dtypes when a custom partitioning schema is specified in this way.
read: dict, default None
Dictionary of options to pass through to ``CudfEngine.read_partitions``
using the ``read`` key-word argument.
"""

import dask_expr as dx
from fsspec.utils import stringify_path
from pyarrow import fs as pa_fs
Expand Down Expand Up @@ -513,6 +699,12 @@ def read_parquet_expr(
"Value of 'in' filter must be a list, set or tuple."
)

# Normalize blocksize input
if blocksize == "default":
blocksize = _normalize_blocksize()
elif isinstance(blocksize, float) and blocksize < 1:
blocksize = _normalize_blocksize(blocksize)

if (
isinstance(filesystem, pa_fs.FileSystem)
or isinstance(filesystem, str)
Expand Down Expand Up @@ -597,6 +789,7 @@ def read_parquet_expr(

if QUERY_PLANNING_ON:
read_parquet = read_parquet_expr
read_parquet.__doc__ = read_parquet_expr.__doc__
else:
read_parquet = _deprecated_api(
"dask_cudf.io.parquet.read_parquet",
Expand Down

0 comments on commit 2ad1867

Please sign in to comment.