Skip to content

Commit

Permalink
SNOW-1447365: Add support for to_csv (#1832)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-nkumar authored Jul 8, 2024
1 parent 6cd31f1 commit a881c7e
Show file tree
Hide file tree
Showing 14 changed files with 792 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
- Added support for `DataFrame.assign`.
- Added support for `DataFrame.stack`.
- Added support for `DataFrame.pivot` and `pd.pivot`.
- Added support for `DataFrame.to_csv` and `Series.to_csv`.

#### Bug Fixes

Expand Down
7 changes: 7 additions & 0 deletions docs/source/modin/dataframe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,10 @@ DataFrame
DataFrame.first_valid_index
DataFrame.last_valid_index
DataFrame.resample

.. rubric:: Serialization / IO / conversion

.. autosummary::
:toctree: pandas_api/

DataFrame.to_csv
7 changes: 7 additions & 0 deletions docs/source/modin/series.rst
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,10 @@ Series
Series.str.strip
Series.str.translate
Series.str.upper

.. rubric:: Serialization / IO / conversion

.. autosummary::
:toctree: pandas_api/

Series.to_csv
9 changes: 8 additions & 1 deletion docs/source/modin/supported/dataframe_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,14 @@ Methods
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``to_clipboard`` | N | | |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``to_csv`` | N | | |
| ``to_csv`` | P | | Supports writing to both local and snowflake stage.|
| | | | Filepath starting with ``@`` is treated as |
| | | | snowflake stage location. |
| | | | Writing to local file supports all parameters. |
| | | | Writing to snowflake state does not support |
| | | | ``float_format``, ``mode``, ``encoding``, |
| | | | ``quoting``, ``quotechar``, ``lineterminator``, |
| | | | ``doublequote`` and ``decimal`` parameters. |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``to_dict`` | Y | | |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
Expand Down
9 changes: 8 additions & 1 deletion docs/source/modin/supported/series_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,14 @@ Methods
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``to_clipboard`` | N | | |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``to_csv`` | N | | |
| ``to_csv`` | P | | Supports writing to both local and snowflake stage.|
| | | | Filepath starting with ``@`` is treated as |
| | | | snowflake stage location. |
| | | | Writing to local file supports all parameters. |
| | | | Writing to snowflake state does not support |
| | | | ``float_format``, ``mode``, ``encoding``, |
| | | | ``quoting``, ``quotechar``, ``lineterminator``, |
| | | | ``doublequote`` and ``decimal`` parameters. |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``to_dict`` | Y | | |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
Expand Down
3 changes: 1 addition & 2 deletions src/snowflake/snowpark/modin/pandas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3323,7 +3323,6 @@ def to_clipboard(
# TODO: SNOW-1119855: Modin upgrade - modin.pandas.base.BasePandasDataset
return self._default_to_pandas("to_clipboard", excel=excel, sep=sep, **kwargs)

@base_not_implemented()
def to_csv(
self,
path_or_buf=None,
Expand All @@ -3348,7 +3347,7 @@ def to_csv(
errors: str = "strict",
storage_options: StorageOptions = None,
): # pragma: no cover
from snowflake.snowpark.modin.pandas.core.execution.dispatching.factories.dispatcher import (
from snowflake.snowpark.modin.core.execution.dispatching.factories.dispatcher import (
FactoryDispatcher,
)

Expand Down
111 changes: 110 additions & 1 deletion src/snowflake/snowpark/modin/plugin/_internal/io_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,113 @@
import glob
import os
from collections.abc import Hashable
from typing import Any, Callable, Union
from typing import Any, Callable, Optional, Union

import numpy as np
import pandas as native_pd
from pandas._typing import FilePath

import snowflake.snowpark.modin.pandas as pd
from snowflake.snowpark.session import Session

PANDAS_KWARGS = {"names", "index_col", "usecols", "dtype"}

# Series.to_csv and DataFrame.to_csv default values.
# This must be same as modin.pandas.base.py:to_csv.
TO_CSV_DEFAULTS = {
"path_or_buf": None,
"sep": ",",
"na_rep": "",
"float_format": None,
"columns": None,
"header": True,
"index": True,
"index_label": None,
"mode": "w",
"encoding": None,
"compression": "infer",
"quoting": None,
"quotechar": '"',
"lineterminator": None,
"chunksize": None,
"date_format": None,
"doublequote": True,
"escapechar": None,
"decimal": ".",
"errors": "strict",
"storage_options": None,
}

# Reference https://docs.snowflake.com/en/sql-reference/sql/copy-into-location#type-csv
SUPPORTED_COMPRESSION_IN_SNOWFLAKE = [
"auto",
"brotli",
"bz2",
"deflate",
"gzip",
"raw_deflate",
"zstd",
]


def infer_compression_algorithm(filepath: str) -> Optional[str]:
"""
Try to infer compression algorithm from extension of given filepath.
Return None, if we fail to map extension to any known compression algorithm.
Args:
filepath: path to file.
Returns:
Corresponding compression algorithm on success, None otherwise.
"""
_, ext = os.path.splitext(filepath)
if not ext:
return None
# Remove leading dot and convert to lower case.
ext = ext[1:].lower()
# Map from file extension to compression algorithm.
ext_to_algo = {
"br": "brotli",
"br2": "br2",
"gz": "gzip",
"tar": "tar",
"xz": "xz",
"zip": "zip",
"zst": "zstd",
"zz": "deflate",
}
return ext_to_algo.get(ext)


def get_compression_algorithm_for_csv(
compression: Union[str, dict, None], filepath: str
) -> Optional[str]:
"""
Get compression algorithm for output csv file.
Args:
compression: compression parameter value.
filepath: path to write csv file to.
Returns:
Compression algorithm or None.
"""
if compression == "infer":
# Same as native pandas, try to infer compression from file extension.
compression = infer_compression_algorithm(filepath)
elif isinstance(compression, dict):
compression = compression.get("method")

if compression is None:
return compression

# Check against supported compression algorithms in Snowflake.
if compression.lower() not in SUPPORTED_COMPRESSION_IN_SNOWFLAKE:
raise ValueError(
f"Unrecognized compression type: {compression}\nValid "
f"compression types are {SUPPORTED_COMPRESSION_IN_SNOWFLAKE}"
)
return compression


def upload_local_path_to_snowflake_stage(
session: Session, path: str, sf_stage: str
Expand Down Expand Up @@ -75,6 +172,18 @@ def is_local_filepath(filepath: str) -> bool:
return not filepath.startswith("@") or filepath.startswith(r"\@")


def is_snowflake_stage_path(filepath: FilePath) -> bool:
"""
Returns whether a filepath refers to snowflake stage location.
Args:
filepath: File path to file.
Returns:
"""
return (
filepath is not None and isinstance(filepath, str) and filepath.startswith("@")
)


def get_non_pandas_kwargs(kwargs: Any) -> Any:
"""
Returns a new dict without pandas keyword
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,9 @@
set_frame_2d_positional,
)
from snowflake.snowpark.modin.plugin._internal.io_utils import (
TO_CSV_DEFAULTS,
get_columns_to_keep_for_usecols,
get_compression_algorithm_for_csv,
get_non_pandas_kwargs,
is_local_filepath,
upload_local_path_to_snowflake_stage,
Expand Down Expand Up @@ -1087,10 +1089,12 @@ def _to_snowpark_dataframe_from_snowpark_pandas_dataframe(
self,
index: bool = True,
index_label: Optional[IndexLabel] = None,
data_column_labels: Optional[List[Hashable]] = None,
) -> SnowparkDataFrame:
"""
Convert the Snowpark pandas Dataframe to Snowpark Dataframe. The Snowpark Dataframe is created by selecting
all index columns of the Snowpark pandas Dataframe if index=True, and also all data columns.
all index columns of the Snowpark pandas Dataframe if index=True, and also all data columns
if data_column_labels is None.
For example:
With a Snowpark pandas Dataframe (df) has index=[`A`, `B`], columns = [`C`, `D`],
the result Snowpark Dataframe after calling _to_snowpark_dataframe_from_snowpark_pandas_dataframe(index=True),
Expand All @@ -1108,6 +1112,8 @@ def _to_snowpark_dataframe_from_snowpark_pandas_dataframe(
index_label: Optional[IndexLabel], default None
the new label used for the index columns, the length must be the same as the number of index column
of the current dataframe. If None, the original index name is used.
data_column_labels: Optional[Hashable], default None
Data columns to include. If none include all data columns.

Returns:
SnowparkDataFrame
Expand All @@ -1132,7 +1138,8 @@ def _to_snowpark_dataframe_from_snowpark_pandas_dataframe(
else:
index_column_labels = self._modin_frame.index_column_pandas_labels

data_column_labels = self._modin_frame.data_column_pandas_labels
if data_column_labels is None:
data_column_labels = self._modin_frame.data_column_pandas_labels
if self._modin_frame.is_unnamed_series():
# this is an unnamed Snowpark pandas series, there is no customer visible pandas
# label for the data column, set the label to be None
Expand Down Expand Up @@ -1172,7 +1179,12 @@ def _to_snowpark_dataframe_from_snowpark_pandas_dataframe(
self._modin_frame.index_column_snowflake_quoted_identifiers
)
identifiers_to_retain.extend(
self._modin_frame.data_column_snowflake_quoted_identifiers
[
t[0]
for t in self._modin_frame.get_snowflake_quoted_identifiers_group_by_pandas_labels(
data_column_labels, include_index=False
)
]
)
for pandas_label, snowflake_identifier in zip(
index_column_labels + data_column_labels,
Expand All @@ -1191,6 +1203,59 @@ def _to_snowpark_dataframe_from_snowpark_pandas_dataframe(
col_mapper=rename_mapper
)

def to_csv_with_snowflake(self, **kwargs: Any) -> None:
"""
Write data to a csv file in snowflake stage.
Args:
**kwargs: to_csv arguments.
"""
# Raise not implemented error for unsupported parameters.
unsupported_params = [
"float_format",
"mode",
"encoding",
"quoting",
"quotechar",
"lineterminator",
"doublequote",
"decimal",
]
for param in unsupported_params:
if kwargs.get(param) is not TO_CSV_DEFAULTS[param]:
ErrorMessage.parameter_not_implemented_error(param, "to_csv")

ignored_params = ["chunksize", "errors", "storage_options"]
for param in ignored_params:
if kwargs.get(param) is not TO_CSV_DEFAULTS[param]:
WarningMessage.ignored_argument("to_csv", param, "")

def _get_param(param_name: str) -> Any:
"""
Extract parameter value from kwargs. If missing return default value.
"""
return kwargs.get(param_name, TO_CSV_DEFAULTS[param_name])

path = _get_param("path_or_buf")
compression = get_compression_algorithm_for_csv(_get_param("compression"), path)

index = _get_param("index")
snowpark_df = self._to_snowpark_dataframe_from_snowpark_pandas_dataframe(
index, _get_param("index_label"), _get_param("columns")
)
na_sep = _get_param("na_rep")
snowpark_df.write.csv(
location=path,
format_type_options={
"COMPRESSION": compression if compression else "NONE",
"FIELD_DELIMITER": _get_param("sep"),
"NULL_IF": na_sep if na_sep else (),
"ESCAPE": _get_param("escapechar"),
"DATE_FORMAT": _get_param("date_format"),
},
header=_get_param("header"),
single=True,
)

def to_snowflake(
self,
name: Union[str, Iterable[str]],
Expand Down
Loading

0 comments on commit a881c7e

Please sign in to comment.