Skip to content

Commit

Permalink
Merge branch 'main' into lmukhopadhyay-SNOW-1752856-align-axis-1-none
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-lmukhopadhyay committed Nov 15, 2024
2 parents 13df6b3 + d4e220a commit 8c72513
Show file tree
Hide file tree
Showing 16 changed files with 568 additions and 59 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/python-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install build
- name: Install protoc
shell: bash
run: .github/scripts/install_protoc.sh
- name: Install tox
run: python -m pip install tox
- name: Build protobuf Python files
run: python -m tox -e protoc
- name: Build package
run: python -m build
- name: List artifacts
Expand Down
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
- Added the following new functions in `snowflake.snowpark.dataframe`:
- `map`
- Added support for passing parameter `include_error` to `Session.query_history` to record queries that have error during execution.
- Added support for following methods in class `DataType`, derived class of `DataType` and `StructField`:
- `type_name`
- `simple_string`
- `json_value`
- `json`

#### Improvements

Expand All @@ -17,6 +22,14 @@
- Added distributed tracing using open telemetry APIs for action function in `DataFrame`:
- `cache_result`
- Removed opentelemetry warning from logging.
- Added support for specifying the following to `DataFrame.create_or_replace_dynamic_table`:
- `iceberg_config` A dictionary that can hold the following iceberg configuration options:
- `external_volume`
- `catalog`
- `base_location`
- `catalog_sync`
- `storage_serialization_policy`
- Added support for nested data types to `DataFrame.print_schema`

#### Bug Fixes

Expand Down
2 changes: 2 additions & 0 deletions docs/source/snowpark/dataframe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ DataFrame
DataFrame.orderBy
DataFrame.order_by
DataFrame.pivot
DataFrame.print_schema
DataFrame.printSchema
DataFrame.randomSplit
DataFrame.random_split
DataFrame.rename
Expand Down
7 changes: 7 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@
)
sys.exit(-1)

protoc_gen_mypy = shutil.which("protoc-gen-mypy")
if protoc_gen_mypy is None:
sys.stderr.write(
"protoc-gen-mypy is not installed nor found. Please install the binary package, e.g., `pip install mypy-protobuf`\n"
)
sys.exit(-1)

# Protobuf files need to compile
PROTOS = ("src/snowflake/snowpark/_internal/proto/ast.proto",)

Expand Down
1 change: 1 addition & 0 deletions src/snowflake/snowpark/_internal/analyzer/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1170,6 +1170,7 @@ def do_resolve_with_resolved_children(
max_data_extension_time=logical_plan.max_data_extension_time,
child=resolved_children[logical_plan.child],
source_plan=logical_plan,
iceberg_config=logical_plan.iceberg_config,
)

if isinstance(logical_plan, CopyIntoTableNode):
Expand Down
10 changes: 8 additions & 2 deletions src/snowflake/snowpark/_internal/analyzer/analyzer_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1162,6 +1162,7 @@ def create_or_replace_dynamic_table_statement(
data_retention_time: Optional[int],
max_data_extension_time: Optional[int],
child: str,
iceberg_config: Optional[dict] = None,
) -> str:
cluster_by_sql = (
f"{CLUSTER_BY}{LEFT_PARENTHESIS}{COMMA.join(clustering_keys)}{RIGHT_PARENTHESIS}"
Expand All @@ -1182,11 +1183,16 @@ def create_or_replace_dynamic_table_statement(
}
)

iceberg_options = get_options_statement(
validate_iceberg_config(iceberg_config)
).strip()

return (
f"{CREATE}{OR + REPLACE if replace else EMPTY_STRING}{TRANSIENT if is_transient else EMPTY_STRING}"
f"{DYNAMIC}{TABLE}{IF + NOT + EXISTS if if_not_exists else EMPTY_STRING}{name}{LAG}{EQUALS}"
f"{DYNAMIC}{ICEBERG if iceberg_config else EMPTY_STRING}{TABLE}"
f"{IF + NOT + EXISTS if if_not_exists else EMPTY_STRING}{name}{LAG}{EQUALS}"
f"{convert_value_to_sql_option(lag)}{WAREHOUSE}{EQUALS}{warehouse}"
f"{refresh_and_initialize_options}{cluster_by_sql}{data_retention_options}"
f"{refresh_and_initialize_options}{cluster_by_sql}{data_retention_options}{iceberg_options}"
f"{comment_sql}{AS}{project_statement([], child)}"
)

Expand Down
2 changes: 2 additions & 0 deletions src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -1128,6 +1128,7 @@ def create_or_replace_dynamic_table(
max_data_extension_time: Optional[int],
child: SnowflakePlan,
source_plan: Optional[LogicalPlan],
iceberg_config: Optional[dict] = None,
) -> SnowflakePlan:
if len(child.queries) != 1:
raise SnowparkClientExceptionMessages.PLAN_CREATE_DYNAMIC_TABLE_FROM_DDL_DML_OPERATIONS()
Expand Down Expand Up @@ -1163,6 +1164,7 @@ def create_or_replace_dynamic_table(
data_retention_time=data_retention_time,
max_data_extension_time=max_data_extension_time,
child=x,
iceberg_config=iceberg_config,
),
child,
source_plan,
Expand Down
2 changes: 2 additions & 0 deletions src/snowflake/snowpark/_internal/analyzer/unary_plan_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ def __init__(
data_retention_time: Optional[int],
max_data_extension_time: Optional[int],
child: LogicalPlan,
iceberg_config: Optional[dict] = None,
) -> None:
super().__init__(child)
self.name = name
Expand All @@ -294,3 +295,4 @@ def __init__(
self.is_transient = is_transient
self.data_retention_time = data_retention_time
self.max_data_extension_time = max_data_extension_time
self.iceberg_config = iceberg_config
67 changes: 66 additions & 1 deletion src/snowflake/snowpark/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@
_get_cols_after_join_table,
)
from snowflake.snowpark.types import (
ArrayType,
MapType,
PandasDataFrameType,
StringType,
StructField,
Expand Down Expand Up @@ -3464,6 +3466,7 @@ def create_or_replace_dynamic_table(
data_retention_time: Optional[int] = None,
max_data_extension_time: Optional[int] = None,
statement_params: Optional[Dict[str, str]] = None,
iceberg_config: Optional[dict] = None,
) -> List[Row]:
"""Creates a dynamic table that captures the computation expressed by this DataFrame.
Expand Down Expand Up @@ -3498,6 +3501,15 @@ def create_or_replace_dynamic_table(
the data retention period of the dynamic table to prevent streams on the dynamic table
from becoming stale.
statement_params: Dictionary of statement level parameters to be set while executing this action.
iceberg_config: A dictionary that can contain the following iceberg configuration values:
- external_volume: specifies the identifier for the external volume where
the Iceberg table stores its metadata files and data in Parquet format.
- catalog: specifies either Snowflake or a catalog integration to use for this table.
- base_location: the base directory that snowflake can write iceberg metadata and files to.
- catalog_sync: optionally sets the catalog integration configured for Polaris Catalog.
- storage_serialization_policy: specifies the storage serialization policy for the table.
Note:
See `understanding dynamic table refresh <https://docs.snowflake.com/en/user-guide/dynamic-tables-refresh>`_.
Expand Down Expand Up @@ -3539,6 +3551,7 @@ def create_or_replace_dynamic_table(
_statement_params=create_or_update_statement_params_with_query_tag(
statement_params, self._session.query_tag, SKIP_LEVELS_TWO
),
iceberg_config=iceberg_config,
)

@df_collect_api_telemetry
Expand Down Expand Up @@ -3616,6 +3629,7 @@ def _do_create_or_replace_dynamic_table(
is_transient: bool = False,
data_retention_time: Optional[int] = None,
max_data_extension_time: Optional[int] = None,
iceberg_config: Optional[dict] = None,
**kwargs,
):
validate_object_name(name)
Expand All @@ -3642,6 +3656,7 @@ def _do_create_or_replace_dynamic_table(
data_retention_time=data_retention_time,
max_data_extension_time=max_data_extension_time,
child=self._plan,
iceberg_config=iceberg_config,
)

return self._session._conn.execute(
Expand Down Expand Up @@ -4303,9 +4318,59 @@ def convert(col: ColumnOrName) -> Expression:
return exprs

def print_schema(self) -> None:
"""
Prints the schema of a dataframe in tree format.
Examples::
>>> df = session.create_dataframe([(1, "a"), (2, "b")], schema=["a", "b"])
>>> df.print_schema()
root
|-- "A": LongType() (nullable = False)
|-- "B": StringType() (nullable = False)
"""

def _format_datatype(name, dtype, nullable=None, prefix=""):
nullable_str = (
f" (nullable = {str(nullable)})" if nullable is not None else ""
)
indent = f" | {prefix}"
extra_lines = []
type_str = dtype.__class__.__name__

# Structured Type format their parameters on multiple lines.
if isinstance(dtype, ArrayType):
extra_lines = [
_format_datatype("element", dtype.element_type, prefix=indent),
]
elif isinstance(dtype, MapType):
extra_lines = [
_format_datatype("key", dtype.key_type, prefix=indent),
_format_datatype("value", dtype.value_type, prefix=indent),
]
elif isinstance(dtype, StructType):
extra_lines = [
_format_datatype(
quote_name(field.name, keep_case=True),
field.datatype,
field.nullable,
indent,
)
for field in dtype.fields
]
else:
# By default include all parameters in type string instead
type_str = str(dtype)

return "\n".join(
[
f"{prefix} |-- {name}: {type_str}{nullable_str}",
]
+ extra_lines
)

schema_tmp_str = "\n".join(
[
f" |-- {attr.name}: {attr.datatype} (nullable = {str(attr.nullable)})"
_format_datatype(attr.name, attr.datatype, attr.nullable)
for attr in self._plan.attributes
]
)
Expand Down
Loading

0 comments on commit 8c72513

Please sign in to comment.