Skip to content

Commit

Permalink
[SNOW-1518791] Change concat axis=1 from join_on_index to align_on_in…
Browse files Browse the repository at this point in the history
…dex behavior (#2564)

<!---
Please answer these questions before creating your pull request. Thanks!
--->

1. Which Jira issue is this PR addressing? Make sure that there is an
accompanying issue to your PR.

   <!---
   In this section, please add a Snowflake Jira issue number.

Note that if a corresponding GitHub issue exists, you should still
include
   the Snowflake Jira issue number. For example, for GitHub issue
#1400, you should
   add "SNOW-1335071" here.
    --->

SNOW-1518791

2. Fill out the following pre-review checklist:

- [x] I am adding a new automated test(s) to verify correctness of my
new code
- [ ] If this test skips Local Testing mode, I'm requesting review from
@snowflakedb/local-testing
   - [ ] I am adding new logging messages
   - [ ] I am adding a new telemetry message
   - [ ] I am adding new credentials
   - [ ] I am adding a new dependency
- [ ] If this is a new feature/behavior, I'm adding the Local Testing
parity changes.
- [ ] I acknowledge that I have ensured my changes to be thread-safe.
Follow the link for more information: [Thread-safe Developer
Guidelines](https://docs.google.com/document/d/162d_i4zZ2AfcGRXojj0jByt8EUq-DrSHPPnTa4QvwbA/edit#bookmark=id.e82u4nekq80k)

3. Please describe how your code solves the related issue.
concat when axis = 1 succeeded when the dataframe/series comes from the
same dataframe with an align behavior. However, today we uses join which
gives a different result when duplication occurs.
For example: with following dataframe
```
    C  A  D
2  1  a  3
1  2  b  2
2  3  c  1
```
pd.concat([df['C'], df['A']], axis = 1) gives 
```
    C  A
2  1  a
1  2  b
2  3  c
```
but snowpark pandas returns 
```
     C  A
2  1  a
2  1  c
1  2  b
2  3  a
2  3  c
```
In this pr, we fixed the behavior by switching to align behavior.
Also relaxed the align utility to enable customized sorting behavior for
support the sort behavior of concat
  • Loading branch information
sfc-gh-yzou authored Nov 11, 2024
1 parent fb0fa7b commit ebc749d
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 10 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,13 @@
- Raise a `TypeError` for a scalar `subset` instead of filtering on just that column.
- Raise a `ValueError` for a `subset` of type `pandas.Index` instead of filtering on the columns in the index.
- Disable creation of scoped read only table to mitigate Disable creation of scoped read only table to mitigate `TableNotFoundError` when using dynamic pivot in notebook environment.
- Fixed a bug when concat dataframe or series objects are coming from the same dataframe when axis = 1.

#### Improvements

- Improve np.where with scalar x value by eliminating unnecessary join and temp table creation.


### Snowpark Local Testing Updates

#### New Features
Expand Down
26 changes: 23 additions & 3 deletions src/snowflake/snowpark/modin/plugin/_internal/join_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@
append_columns,
extract_pandas_label_from_snowflake_quoted_identifier,
)
from snowflake.snowpark.modin.plugin._typing import AlignTypeLit, JoinTypeLit
from snowflake.snowpark.modin.plugin._typing import (
AlignSortLit,
AlignTypeLit,
JoinTypeLit,
)
from snowflake.snowpark.modin.plugin.compiler import snowflake_query_compiler
from snowflake.snowpark.types import VariantType

Expand Down Expand Up @@ -1231,6 +1235,7 @@ def align(
left_on: list[str],
right_on: list[str],
how: AlignTypeLit = "outer",
sort: AlignSortLit = "default_sort",
) -> JoinOrAlignInternalFrameResult:
"""
Align the left and the right frame on given columns 'left_on' and 'right_on' with
Expand All @@ -1246,8 +1251,13 @@ def align(
* left: use only index from left frame, preserve left order.
* coalesce: use only index from left frame, preserve left order. If left
frame is empty left_on columns are coalesced with right_on columns.
* outer: use union of index from both frames, sort index lexicographically.
* outer: use union of index from both frames.
* inner: use intersection of index from both frames, preserve left order.
sort: the sort strategy.
* default_sort, outer align result will sort the align key lexicographically
if the original frame is not aligned, no sort happen for others align methods.
* sort, always sort the result based on the align key
* no_sort, do not sort the result
Returns:
New aligned InternalFrame by aligning left frame with right frame.
"""
Expand Down Expand Up @@ -1278,6 +1288,7 @@ def align(
left_on_cols=left_on,
right_on_cols=right_on,
how=how,
enable_default_sort=(sort == "default_sort"),
)
# aligned_ordered_frame after aligning on row_position columns
# Example 1 (left is empty not empty):
Expand All @@ -1297,6 +1308,8 @@ def align(
if how == "outer":
coalesce_key_config = [JoinKeyCoalesceConfig.LEFT] * len(left_on)
inherit_join_index = InheritJoinIndex.FROM_BOTH

sort_result = sort == "sort"
(
aligned_frame,
result_column_mapper,
Expand All @@ -1307,7 +1320,7 @@ def align(
left_on=left_on,
right_on=right_on,
how=how,
sort=False,
sort=sort_result,
key_coalesce_config=coalesce_key_config,
inherit_index=inherit_join_index,
)
Expand All @@ -1318,6 +1331,7 @@ def align_on_index(
left: InternalFrame,
right: InternalFrame,
how: AlignTypeLit = "outer",
sort: AlignSortLit = "default_sort",
) -> JoinOrAlignInternalFrameResult:
"""
Align the left and the right frame on the index columns with given join method (`how`).
Expand All @@ -1338,6 +1352,11 @@ def align_on_index(
right order.
* outer: use union of index from both frames, sort index lexicographically.
* inner: use intersection of index from both frames, preserve left order.
sort: the sort strategy.
* default_sort, outer align result will sort the align key lexicographically
if the original frame is not aligned, no sort happen for others align methods.
* sort, always sort the result based on the align key
* no_sort, do not sort the result
Returns:
An InternalFrame for the aligned result.
A JoinOrAlignResultColumnMapper that provides quoted identifiers mapping from the
Expand All @@ -1358,6 +1377,7 @@ def align_on_index(
left_on=index_join_info.left_join_quoted_identifiers,
right_on=index_join_info.right_join_quoted_identifiers,
how=how,
sort=sort,
)
if how == "outer":
# index reorder should only be needed for outer join since this is the only method inherent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1334,6 +1334,7 @@ def align(
left_on_cols: list[str],
right_on_cols: list[str],
how: AlignTypeLit = "outer",
enable_default_sort: bool = True,
) -> "OrderedDataFrame":
"""
Performs align operation of the specified join type (``how``) with the current
Expand Down Expand Up @@ -1383,6 +1384,8 @@ def align(
left_on column is replaced with the right_on column in the result.
This method can only be used when left_on and right_on type are
compatible, otherwise an error will be thrown.
enable_default_sort: whether to enable the default sorting strategy for align.
Check for [AlignSortLit] under _typing.py for more details.
Returns:
Aligned OrderedDataFrame.
"""
Expand Down Expand Up @@ -1448,7 +1451,9 @@ def align(
)

sort = False
if how == "outer":
if how == "outer" and enable_default_sort:
# if default sort is enabled, and it is outer align, we
# sort the result based on align keys
sort = True
result_helper = JoinOrAlignOrderedDataframeResultHelper(
left,
Expand Down
11 changes: 11 additions & 0 deletions src/snowflake/snowpark/modin/plugin/_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@ class LabelIdentifierPair(NamedTuple):
"coalesce",
] # right and inner can also be supported if needed

AlignSortLit = [
# Align operator provides a default sorting capability, which sort the
# align key lexicographically when the align type is outer, and the original
# dataframe is not aligned. No sort will happen for other align types.
"default_sort",
# Always sort the align key lexicographically regardless of align type.
"sort",
# Do not sort the align key regardless of the align type.
"no_sort",
]

SnowflakeSupportedFileTypeLit = Union[
Literal["csv"], Literal["json"], Literal["parquet"]
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7351,13 +7351,17 @@ def concat(
if axis == 1:
result_frame = frames[0]
for other_frame in frames[1:]:
# Concat on axis = 1 is implemented using join operation. This is
# equivalent to joining on index columns when index labels are same for
# Concat on axis = 1 is implemented using align operation. This is
# equivalent to align on index columns when index labels are same for
# both the frames.
# We rename index labels to make sure index columns are joined level
# We rename index labels to make sure index columns are aligned level
# by level.
result_frame, _ = join_utils.join_on_index_columns(
result_frame, other_frame, how=join, sort=sort
if sort is True:
align_sort = "sort"
else:
align_sort = "no_sort"
result_frame, _ = join_utils.align_on_index(
result_frame, other_frame, how=join, sort=align_sort
)

qc = SnowflakeQueryCompiler(result_frame)
Expand Down
2 changes: 1 addition & 1 deletion tests/integ/modin/frame/test_name.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def test_create_dataframe_from_object_with_name(sample):
)


@sql_count_checker(query_count=1, join_count=1, union_count=1)
@sql_count_checker(query_count=1, join_count=0, union_count=1)
def test_create_dataframe_from_snowpark_pandas_series():
df = pd.DataFrame([[2, 3, 4], [5, 6, 7]], columns=["X", "Y", "Z"])
df = pd.DataFrame([df.X, df.iloc[:, 2]])
Expand Down
48 changes: 48 additions & 0 deletions tests/integ/modin/test_concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -1142,6 +1142,54 @@ def test_concat_keys():
assert_frame_equal(snow_df, native_df, check_dtype=False)


@sql_count_checker(query_count=1, join_count=0)
def test_concat_object_with_same_index_with_dup(join):
df = native_pd.DataFrame(
{
"C": [1, 2, 3],
"A": ["a", "b", "c"],
"D": [3, 2, 1],
},
index=native_pd.Index([2, 1, 2]),
)
native_objs = [df[["C", "A"]], df["D"], df["C"] + 1]
snow_df = pd.DataFrame(df)
snow_objs = [snow_df[["C", "A"]], snow_df["D"], snow_df["C"] + 1]
eval_snowpark_pandas_result(
"pd",
"native_pd",
_concat_operation(snow_objs, native_objs, join=join, axis=1),
)


@sql_count_checker(query_count=1, join_count=0)
def test_concat_object_with_same_index_with_dup_sort(join):
df = native_pd.DataFrame(
{
"C": [1, 2, 3],
"A": ["a", "b", "c"],
"D": [3, 2, 1],
},
index=native_pd.Index([2, 1, 2]),
)
snow_df = pd.DataFrame(df)
snow_objs = [snow_df["D"], snow_df["C"] + 1]

# Note this behavior is different from native pandas, native pandas
# throws ValueError: cannot reindex on an axis with duplicate labels.
# With snowpark pandas, the operation will be successful with an align
# behavior.
expected_result = native_pd.DataFrame(
{
"D": [2, 3, 1],
"C": [3, 2, 4],
},
index=native_pd.Index([1, 2, 2]),
)
snow_res = pd.concat(snow_objs, join=join, axis=1, sort=True)
assert_frame_equal(snow_res, expected_result)


@sql_count_checker(query_count=4, join_count=0)
def test_concat_series_from_same_df(join):
num_cols = 4
Expand Down

0 comments on commit ebc749d

Please sign in to comment.