Skip to content

Commit

Permalink
SNOW-1692064: Implement DataFrame/Series align for axis = 0 (#2483)
Browse files Browse the repository at this point in the history
<!---
Please answer these questions before creating your pull request. Thanks!
--->

1. Which Jira issue is this PR addressing? Make sure that there is an
accompanying issue to your PR.

   <!---
   In this section, please add a Snowflake Jira issue number.

Note that if a corresponding GitHub issue exists, you should still
include
   the Snowflake Jira issue number. For example, for GitHub issue
#1400, you should
   add "SNOW-1335071" here.
    --->

   Fixes SNOW-1692064

2. Fill out the following pre-review checklist:

- [ x I am adding a new automated test(s) to verify correctness of my
new code
- [ ] If this test skips Local Testing mode, I'm requesting review from
@snowflakedb/local-testing
   - [ ] I am adding new logging messages
   - [ ] I am adding a new telemetry message
   - [ ] I am adding new credentials
   - [ ] I am adding a new dependency
- [ ] If this is a new feature/behavior, I'm adding the Local Testing
parity changes.
- [x] I acknowledge that I have ensured my changes to be thread-safe.
Follow the link for more information: [Thread-safe Developer
Guidelines](https://docs.google.com/document/d/162d_i4zZ2AfcGRXojj0jByt8EUq-DrSHPPnTa4QvwbA/edit#bookmark=id.e82u4nekq80k)

3. Please describe how your code solves the related issue.

Support for DataFrame/Series align for axis = 0 and default
``fill_value`` of np.nan.

---------

Signed-off-by: Labanya Mukhopadhyay <[email protected]>
  • Loading branch information
sfc-gh-lmukhopadhyay authored Oct 28, 2024
1 parent db1b634 commit fb39324
Show file tree
Hide file tree
Showing 14 changed files with 820 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
- Added support for `DataFrame.attrs` and `Series.attrs`.
- Added support for `DataFrame.style`.
- Added support for `Index.to_numpy`.
- Added support for `DataFrame.align` and `Series.align` for `axis=0`.

#### Improvements

Expand Down
5 changes: 4 additions & 1 deletion docs/source/modin/supported/dataframe_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ Methods
| ``aggregate`` | P | ``margins``, ``observed``, | See ``agg`` |
| | | ``sort`` | |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``align`` | N | | |
| ``align`` | P | ``copy``, ``level``, | ``N`` for MultiIndex, for deprecated parameters |
| | | ``fill_value`` | ``method``, ``limit``, ``fill_axis``, |
| | | | ``broadcast_axis``, if ``axis`` == 1 or None, or |
| | | | if ``fill_value`` is not default of np.nan |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``all`` | P | | ``N`` for non-integer/boolean types |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
Expand Down
5 changes: 4 additions & 1 deletion docs/source/modin/supported/series_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ Methods
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``aggregate`` | P | | See ``agg`` |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``align`` | N | | |
| ``align`` | P | ``copy``, ``level``, | ``N`` for MultiIndex, for deprecated parameters |
| | | ``fill_value`` | ``method``, ``limit``, ``fill_axis``, |
| | | | ``broadcast_axis``, if ``axis`` == 1 or None, or |
| | | | if ``fill_value`` is not default of np.nan |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``all`` | P | | ``N`` for non-integer/boolean types |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
Expand Down
95 changes: 95 additions & 0 deletions src/snowflake/snowpark/modin/plugin/_internal/align_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#
# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved.
#

from snowflake.snowpark.modin.plugin._internal.frame import InternalFrame
from snowflake.snowpark.modin.plugin._internal.join_utils import align_on_index


def align_axis_0_left(
frame: InternalFrame, other_frame: InternalFrame, join: str
) -> tuple[InternalFrame, InternalFrame, list[str], list[str]]:
"""
Gets the left align results.
Args:
frame: original frame
other_frame: other frame
join: type of alignment to be performed.
Returns:
Tuple containing:
InternalFrame result of join_utils.align_on_index,
final left_frame,
list of left_frame_data_ids,
list of left_index_ids
"""
if join == "right":
left_result, left_column_mapper = align_on_index(other_frame, frame, how="left")
left_frame_data_ids = left_column_mapper.map_right_quoted_identifiers(
frame.data_column_snowflake_quoted_identifiers
)
left_index_ids = left_result.index_column_snowflake_quoted_identifiers
left_frame = left_result.ordered_dataframe.select(
left_frame_data_ids + left_index_ids
)
else:
left_result, left_column_mapper = align_on_index(frame, other_frame, how=join)
left_frame_data_ids = left_column_mapper.map_left_quoted_identifiers(
frame.data_column_snowflake_quoted_identifiers
)
left_index_ids = left_result.index_column_snowflake_quoted_identifiers
left_frame = left_result.ordered_dataframe.select(
left_frame_data_ids + left_index_ids
)
return left_result, left_frame, left_frame_data_ids, left_index_ids


def align_axis_0_right(
frame: InternalFrame, other_frame: InternalFrame, join: str
) -> tuple[InternalFrame, InternalFrame, list[str], list[str]]:
"""
Gets the right align results.
Args:
frame: original frame
other_frame: other frame
join: type of alignment to be performed.
Returns:
Tuple containing:
InternalFrame result of join_utils.align_on_index,
final right_frame,
list of right_frame_data_ids,
list of right_index_ids
"""
if join == "left":
right_result, right_column_mapper = align_on_index(frame, other_frame, how=join)
right_frame_data_ids = right_column_mapper.map_right_quoted_identifiers(
other_frame.data_column_snowflake_quoted_identifiers
)
right_index_ids = right_result.index_column_snowflake_quoted_identifiers
right_frame = right_result.ordered_dataframe.select(
right_frame_data_ids + right_index_ids
)
elif join == "right":
right_result, right_column_mapper = align_on_index(
other_frame, frame, how="left"
)
right_frame_data_ids = right_column_mapper.map_left_quoted_identifiers(
other_frame.data_column_snowflake_quoted_identifiers
)
right_index_ids = right_result.index_column_snowflake_quoted_identifiers
right_frame = right_result.ordered_dataframe.select(
right_frame_data_ids + right_index_ids
)
else:
right_result, right_column_mapper = align_on_index(other_frame, frame, how=join)
right_frame_data_ids = right_column_mapper.map_left_quoted_identifiers(
other_frame.data_column_snowflake_quoted_identifiers
)
right_index_ids = right_result.index_column_snowflake_quoted_identifiers
right_frame = right_result.ordered_dataframe.select(
right_frame_data_ids + right_index_ids
)
return right_result, right_frame, right_frame_data_ids, right_index_ids
4 changes: 3 additions & 1 deletion src/snowflake/snowpark/modin/plugin/_internal/join_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1247,6 +1247,7 @@ def align(
* coalesce: use only index from left frame, preserve left order. If left
frame is empty left_on columns are coalesced with right_on columns.
* outer: use union of index from both frames, sort index lexicographically.
* inner: use intersection of index from both frames, preserve left order.
Returns:
New aligned InternalFrame by aligning left frame with right frame.
"""
Expand Down Expand Up @@ -1330,12 +1331,13 @@ def align_on_index(
Args:
left: Left DataFrame.
right: right DataFrame.
how: the align method {{'left', 'coalesce', 'outer'}}, by default is outer
how: the align method {{'left', 'coalesce', 'outer', 'inner'}}, by default is outer
* left: use only index from left frame, preserve left order.
* coalesce: if left frame has non-zero rows use only index from left
frame, preserve left order otherwise use only right index and preserver
right order.
* outer: use union of index from both frames, sort index lexicographically.
* inner: use intersection of index from both frames, preserve left order.
Returns:
An InternalFrame for the aligned result.
A JoinOrAlignResultColumnMapper that provides quoted identifiers mapping from the
Expand Down
15 changes: 13 additions & 2 deletions src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1377,6 +1377,7 @@ def align(
how: We support the following align/join types:
- "outer": Full outer align (default value)
- "left": Left outer align
- "inner": Inner align
- "coalesce": If left frame is not empty perform left outer align
otherwise perform right outer align. When left frame is empty, the
left_on column is replaced with the right_on column in the result.
Expand Down Expand Up @@ -1672,9 +1673,19 @@ def align(
elif how == "left":
filter_expression = filter_expression & left_row_pos.is_not_null()
select_list = result_projected_column_snowflake_quoted_identifiers
else: # outer
elif how == "inner":
filter_expression = (
filter_expression
& left_row_pos.is_not_null()
& right_row_pos.is_not_null()
)
select_list = result_projected_column_snowflake_quoted_identifiers

elif how == "outer":
select_list = result_projected_column_snowflake_quoted_identifiers
else:
raise ValueError(
f"how={how} is not valid argument for ordered_dataframe.align."
)
joined_ordered_frame = joined_ordered_frame.filter(filter_expression).sort(
ordering_columns
)
Expand Down
4 changes: 4 additions & 0 deletions src/snowflake/snowpark/modin/plugin/_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ class LabelIdentifierPair(NamedTuple):
# align columns.
"outer",
# If align column values matches exactly, merge frames line by line (this is
# equivalent to joining on row position) otherwise perform INNER JOIN on
# align columns
"inner",
# If align column values matches exactly, merge frames line by line (this is
# equivalent to joining on row position) otherwise
# - perform LEFT OUTER JOIN if left frame is non-empty
# - perform RIGHT OUTER JOIN if left frame is empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@
repr_aggregate_function,
using_named_aggregations_for_func,
)
from snowflake.snowpark.modin.plugin._internal.align_utils import (
align_axis_0_left,
align_axis_0_right,
)
from snowflake.snowpark.modin.plugin._internal.apply_utils import (
APPLY_LABEL_COLUMN_QUOTED_IDENTIFIER,
APPLY_VALUE_COLUMN_QUOTED_IDENTIFIER,
Expand Down Expand Up @@ -8400,6 +8404,117 @@ def vectorized_udf(df: pandas.DataFrame) -> pandas.Series: # pragma: no cover
)
return SnowflakeQueryCompiler(new_frame)

def align(
self,
other: SnowparkDataFrame = None,
join: str = "outer",
axis: int = 0,
level: Level = None,
copy: bool = True,
fill_value: Scalar = None,
) -> tuple["SnowflakeQueryCompiler", "SnowflakeQueryCompiler"]:
"""
Align two objects on their axes with the specified join method.

Join method is specified for each axis Index.

Args:
other: DataFrame or Series
join: {‘outer’, ‘inner’, ‘left’, ‘right’}, default ‘outer’
Type of alignment to be performed.
left: use only keys from left frame, preserve key order.
right: use only keys from right frame, preserve key order.
outer: use union of keys from both frames, sort keys lexicographically.
inner: use intersection of keys from both frames, preserve the order of the left keys.
axis: allowed axis of the other object, default None
Align on index (0), columns (1), or both (None).
level: int or level name, default None
Broadcast across a level, matching Index values on the passed MultiIndex level.
copy: bool, default True
Always returns new objects. If copy=False and no reindexing is required then original objects are returned.
fill_value: scalar, default np.nan
Always returns new objects. If copy=False and no reindexing is required then original objects are returned.

Returns:
tuple of SnowflakeQueryCompilers
Aligned objects.

"""
if copy is not True:
ErrorMessage.not_implemented(
"Snowpark pandas 'align' method doesn't support 'copy=False'"
)
if level is not None:
ErrorMessage.not_implemented(
"Snowpark pandas 'align' method doesn't support 'level'"
)
if fill_value is not None:
# TODO: SNOW-1752860
ErrorMessage.not_implemented(
"Snowpark pandas 'align' method doesn't support 'fill_value'"
)
if axis != 0:
# TODO: SNOW-1752856
ErrorMessage.not_implemented(
f"Snowpark pandas 'align' method doesn't support 'axis={axis}'"
)
frame = self._modin_frame
other_frame = other._query_compiler._modin_frame

if self.is_multiindex(axis=axis) or other._query_compiler.is_multiindex(
axis=axis
):
raise NotImplementedError(
"Snowpark pandas doesn't support `align` with MultiIndex"
)

# convert frames to variant type if index is incompatible for join
frame, other_frame = join_utils.convert_incompatible_types_to_variant(
frame,
other_frame,
frame.index_column_snowflake_quoted_identifiers,
other_frame.index_column_snowflake_quoted_identifiers,
)

(
left_result,
left_frame,
left_frame_data_ids,
left_index_ids,
) = align_axis_0_left(frame, other_frame, join)
(
right_result,
right_frame,
right_frame_data_ids,
right_index_ids,
) = align_axis_0_right(frame, other_frame, join)

left_qc = SnowflakeQueryCompiler(
InternalFrame.create(
ordered_dataframe=left_frame,
data_column_snowflake_quoted_identifiers=left_frame_data_ids,
data_column_pandas_labels=frame.data_column_pandas_labels,
data_column_pandas_index_names=frame.data_column_pandas_index_names,
data_column_types=frame.cached_data_column_snowpark_pandas_types,
index_column_snowflake_quoted_identifiers=left_index_ids,
index_column_pandas_labels=left_result.index_column_pandas_labels,
index_column_types=left_result.cached_index_column_snowpark_pandas_types,
)
)
right_qc = SnowflakeQueryCompiler(
InternalFrame.create(
ordered_dataframe=right_frame,
data_column_snowflake_quoted_identifiers=right_frame_data_ids,
data_column_pandas_labels=other_frame.data_column_pandas_labels,
data_column_pandas_index_names=other_frame.data_column_pandas_index_names,
data_column_types=other_frame.cached_data_column_snowpark_pandas_types,
index_column_snowflake_quoted_identifiers=right_index_ids,
index_column_pandas_labels=right_result.index_column_pandas_labels,
index_column_types=right_result.cached_index_column_snowpark_pandas_types,
)
)
return left_qc, right_qc

def apply(
self,
func: Union[AggFuncType, UserDefinedFunction],
Expand Down
59 changes: 59 additions & 0 deletions src/snowflake/snowpark/modin/plugin/docstrings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,65 @@ def aggregate():
def align():
"""
Align two objects on their axes with the specified join method.
Join method is specified for each axis Index.
Args:
other: DataFrame or Series
join: {‘outer’, ‘inner’, ‘left’, ‘right’}, default ‘outer’
Type of alignment to be performed.
left: use only keys from left frame, preserve key order.
right: use only keys from right frame, preserve key order.
outer: use union of keys from both frames, sort keys lexicographically.
axis: allowed axis of the other object, default None
Align on index (0), columns (1), or both (None).
level: int or level name, default None
Broadcast across a level, matching Index values on the passed MultiIndex level.
copy: bool, default True
Always returns new objects. If copy=False and no reindexing is required then original objects are returned.
fill_value: scalar, default np.nan
Always returns new objects. If copy=False and no reindexing is required then original objects are returned.
Returns:
tuple of (Series/DataFrame, type of other)
Notes
-----
Snowpark pandas DataFrame/Series.align currently does not support `axis = 1 or None`, non-default `fill_value`,
`copy`, `level`, and MultiIndex.
Examples::
>>> df = pd.DataFrame(
... [[1, 2, 3, 4], [6, 7, 8, 9]], columns=["D", "B", "E", "A"], index=[1, 2]
... )
>>> other = pd.DataFrame(
... [[10, 20, 30, 40], [60, 70, 80, 90], [600, 700, 800, 900]],
... columns=["A", "B", "C", "D"],
... index=[2, 3, 4],
... )
>>> df
D B E A
1 1 2 3 4
2 6 7 8 9
>>> other
A B C D
2 10 20 30 40
3 60 70 80 90
4 600 700 800 900
>>> left, right = df.align(other, join="outer", axis=0)
>>> left
D B E A
1 1.0 2.0 3.0 4.0
2 6.0 7.0 8.0 9.0
3 NaN NaN NaN NaN
4 NaN NaN NaN NaN
>>> right
A B C D
1 NaN NaN NaN NaN
2 10.0 20.0 30.0 40.0
3 60.0 70.0 80.0 90.0
4 600.0 700.0 800.0 900.0
"""

@doc(
Expand Down
Loading

0 comments on commit fb39324

Please sign in to comment.