diff --git a/CHANGELOG.md b/CHANGELOG.md index bb43f15d200..34efb8bea39 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,10 +55,13 @@ - Raise a `TypeError` for a scalar `subset` instead of filtering on just that column. - Raise a `ValueError` for a `subset` of type `pandas.Index` instead of filtering on the columns in the index. - Disable creation of scoped read only table to mitigate Disable creation of scoped read only table to mitigate `TableNotFoundError` when using dynamic pivot in notebook environment. +- Fixed a bug when concat dataframe or series objects are coming from the same dataframe when axis = 1. #### Improvements + - Improve np.where with scalar x value by eliminating unnecessary join and temp table creation. + ### Snowpark Local Testing Updates #### New Features diff --git a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py index db01e62747c..fcafef0de99 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py @@ -23,7 +23,11 @@ append_columns, extract_pandas_label_from_snowflake_quoted_identifier, ) -from snowflake.snowpark.modin.plugin._typing import AlignTypeLit, JoinTypeLit +from snowflake.snowpark.modin.plugin._typing import ( + AlignSortLit, + AlignTypeLit, + JoinTypeLit, +) from snowflake.snowpark.modin.plugin.compiler import snowflake_query_compiler from snowflake.snowpark.types import VariantType @@ -1231,6 +1235,7 @@ def align( left_on: list[str], right_on: list[str], how: AlignTypeLit = "outer", + sort: AlignSortLit = "default_sort", ) -> JoinOrAlignInternalFrameResult: """ Align the left and the right frame on given columns 'left_on' and 'right_on' with @@ -1246,8 +1251,13 @@ def align( * left: use only index from left frame, preserve left order. * coalesce: use only index from left frame, preserve left order. If left frame is empty left_on columns are coalesced with right_on columns. - * outer: use union of index from both frames, sort index lexicographically. + * outer: use union of index from both frames. * inner: use intersection of index from both frames, preserve left order. + sort: the sort strategy. + * default_sort, outer align result will sort the align key lexicographically + if the original frame is not aligned, no sort happen for others align methods. + * sort, always sort the result based on the align key + * no_sort, do not sort the result Returns: New aligned InternalFrame by aligning left frame with right frame. """ @@ -1278,6 +1288,7 @@ def align( left_on_cols=left_on, right_on_cols=right_on, how=how, + enable_default_sort=(sort == "default_sort"), ) # aligned_ordered_frame after aligning on row_position columns # Example 1 (left is empty not empty): @@ -1297,6 +1308,8 @@ def align( if how == "outer": coalesce_key_config = [JoinKeyCoalesceConfig.LEFT] * len(left_on) inherit_join_index = InheritJoinIndex.FROM_BOTH + + sort_result = sort == "sort" ( aligned_frame, result_column_mapper, @@ -1307,7 +1320,7 @@ def align( left_on=left_on, right_on=right_on, how=how, - sort=False, + sort=sort_result, key_coalesce_config=coalesce_key_config, inherit_index=inherit_join_index, ) @@ -1318,6 +1331,7 @@ def align_on_index( left: InternalFrame, right: InternalFrame, how: AlignTypeLit = "outer", + sort: AlignSortLit = "default_sort", ) -> JoinOrAlignInternalFrameResult: """ Align the left and the right frame on the index columns with given join method (`how`). @@ -1338,6 +1352,11 @@ def align_on_index( right order. * outer: use union of index from both frames, sort index lexicographically. * inner: use intersection of index from both frames, preserve left order. + sort: the sort strategy. + * default_sort, outer align result will sort the align key lexicographically + if the original frame is not aligned, no sort happen for others align methods. + * sort, always sort the result based on the align key + * no_sort, do not sort the result Returns: An InternalFrame for the aligned result. A JoinOrAlignResultColumnMapper that provides quoted identifiers mapping from the @@ -1358,6 +1377,7 @@ def align_on_index( left_on=index_join_info.left_join_quoted_identifiers, right_on=index_join_info.right_join_quoted_identifiers, how=how, + sort=sort, ) if how == "outer": # index reorder should only be needed for outer join since this is the only method inherent diff --git a/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py b/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py index b1ba815e5a6..90cd72fbe2b 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py @@ -1334,6 +1334,7 @@ def align( left_on_cols: list[str], right_on_cols: list[str], how: AlignTypeLit = "outer", + enable_default_sort: bool = True, ) -> "OrderedDataFrame": """ Performs align operation of the specified join type (``how``) with the current @@ -1383,6 +1384,8 @@ def align( left_on column is replaced with the right_on column in the result. This method can only be used when left_on and right_on type are compatible, otherwise an error will be thrown. + enable_default_sort: whether to enable the default sorting strategy for align. + Check for [AlignSortLit] under _typing.py for more details. Returns: Aligned OrderedDataFrame. """ @@ -1448,7 +1451,9 @@ def align( ) sort = False - if how == "outer": + if how == "outer" and enable_default_sort: + # if default sort is enabled, and it is outer align, we + # sort the result based on align keys sort = True result_helper = JoinOrAlignOrderedDataframeResultHelper( left, diff --git a/src/snowflake/snowpark/modin/plugin/_typing.py b/src/snowflake/snowpark/modin/plugin/_typing.py index da2d52b9efd..32be7eee050 100644 --- a/src/snowflake/snowpark/modin/plugin/_typing.py +++ b/src/snowflake/snowpark/modin/plugin/_typing.py @@ -54,6 +54,17 @@ class LabelIdentifierPair(NamedTuple): "coalesce", ] # right and inner can also be supported if needed +AlignSortLit = [ + # Align operator provides a default sorting capability, which sort the + # align key lexicographically when the align type is outer, and the original + # dataframe is not aligned. No sort will happen for other align types. + "default_sort", + # Always sort the align key lexicographically regardless of align type. + "sort", + # Do not sort the align key regardless of the align type. + "no_sort", +] + SnowflakeSupportedFileTypeLit = Union[ Literal["csv"], Literal["json"], Literal["parquet"] ] diff --git a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py index f06713bb970..c4df86cf793 100644 --- a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py +++ b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py @@ -7351,13 +7351,17 @@ def concat( if axis == 1: result_frame = frames[0] for other_frame in frames[1:]: - # Concat on axis = 1 is implemented using join operation. This is - # equivalent to joining on index columns when index labels are same for + # Concat on axis = 1 is implemented using align operation. This is + # equivalent to align on index columns when index labels are same for # both the frames. - # We rename index labels to make sure index columns are joined level + # We rename index labels to make sure index columns are aligned level # by level. - result_frame, _ = join_utils.join_on_index_columns( - result_frame, other_frame, how=join, sort=sort + if sort is True: + align_sort = "sort" + else: + align_sort = "no_sort" + result_frame, _ = join_utils.align_on_index( + result_frame, other_frame, how=join, sort=align_sort ) qc = SnowflakeQueryCompiler(result_frame) diff --git a/tests/integ/modin/frame/test_name.py b/tests/integ/modin/frame/test_name.py index 54c053e0cf5..6d5a199c79a 100644 --- a/tests/integ/modin/frame/test_name.py +++ b/tests/integ/modin/frame/test_name.py @@ -39,7 +39,7 @@ def test_create_dataframe_from_object_with_name(sample): ) -@sql_count_checker(query_count=1, join_count=1, union_count=1) +@sql_count_checker(query_count=1, join_count=0, union_count=1) def test_create_dataframe_from_snowpark_pandas_series(): df = pd.DataFrame([[2, 3, 4], [5, 6, 7]], columns=["X", "Y", "Z"]) df = pd.DataFrame([df.X, df.iloc[:, 2]]) diff --git a/tests/integ/modin/test_concat.py b/tests/integ/modin/test_concat.py index 0e89c148077..84b343a879b 100644 --- a/tests/integ/modin/test_concat.py +++ b/tests/integ/modin/test_concat.py @@ -1142,6 +1142,54 @@ def test_concat_keys(): assert_frame_equal(snow_df, native_df, check_dtype=False) +@sql_count_checker(query_count=1, join_count=0) +def test_concat_object_with_same_index_with_dup(join): + df = native_pd.DataFrame( + { + "C": [1, 2, 3], + "A": ["a", "b", "c"], + "D": [3, 2, 1], + }, + index=native_pd.Index([2, 1, 2]), + ) + native_objs = [df[["C", "A"]], df["D"], df["C"] + 1] + snow_df = pd.DataFrame(df) + snow_objs = [snow_df[["C", "A"]], snow_df["D"], snow_df["C"] + 1] + eval_snowpark_pandas_result( + "pd", + "native_pd", + _concat_operation(snow_objs, native_objs, join=join, axis=1), + ) + + +@sql_count_checker(query_count=1, join_count=0) +def test_concat_object_with_same_index_with_dup_sort(join): + df = native_pd.DataFrame( + { + "C": [1, 2, 3], + "A": ["a", "b", "c"], + "D": [3, 2, 1], + }, + index=native_pd.Index([2, 1, 2]), + ) + snow_df = pd.DataFrame(df) + snow_objs = [snow_df["D"], snow_df["C"] + 1] + + # Note this behavior is different from native pandas, native pandas + # throws ValueError: cannot reindex on an axis with duplicate labels. + # With snowpark pandas, the operation will be successful with an align + # behavior. + expected_result = native_pd.DataFrame( + { + "D": [2, 3, 1], + "C": [3, 2, 4], + }, + index=native_pd.Index([1, 2, 2]), + ) + snow_res = pd.concat(snow_objs, join=join, axis=1, sort=True) + assert_frame_equal(snow_res, expected_result) + + @sql_count_checker(query_count=4, join_count=0) def test_concat_series_from_same_df(join): num_cols = 4