Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved.
#

from enum import Enum
from dataclasses import dataclass
from typing import TYPE_CHECKING, DefaultDict, Dict, List, Optional

Expand All @@ -17,6 +17,13 @@
from snowflake.snowpark._internal.analyzer.analyzer import Analyzer


class DescribeQueryTelemetryField(Enum):
TYPE_DESCRIBE_QUERY_DETAILS = "snowpark_describe_query_details"
SQL_TEXT = "sql_text"
E2E_TIME = "e2e_time"
STACK_TRACE = "stack_trace"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's good that we're capturing the stack trace. But I imagine we'll need to further process it to extract the specific operations resulting in describe queries?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, we can do the parsing on the dashboard side



@dataclass(frozen=True)
class PlanMetadata:
"""
Expand Down
15 changes: 13 additions & 2 deletions src/snowflake/snowpark/_internal/analyzer/schema_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#
# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved.
#

import time
import traceback
from typing import TYPE_CHECKING, List, Union

import snowflake.snowpark
Expand Down Expand Up @@ -90,7 +91,17 @@ def analyze_attributes(
session._conn._cursor.description, session._conn.max_string_size
)

return session._get_result_attributes(sql)
# collect describe query details for telemetry
stack = traceback.extract_stack(limit=10)[:-1]
stack_trace = [frame.line for frame in stack] if len(stack) > 0 else None
start_time = time.time()
attributes = session._get_result_attributes(sql)
e2e_time = time.time() - start_time
session._conn._telemetry_client.send_describe_query_details(
session._session_id, sql, e2e_time, stack_trace
)

return attributes


def convert_result_meta_to_attribute(
Expand Down
23 changes: 23 additions & 0 deletions src/snowflake/snowpark/_internal/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
from snowflake.snowpark._internal.compiler.telemetry_constants import (
CompilationStageTelemetryField,
)
from snowflake.snowpark._internal.analyzer.metadata_utils import (
DescribeQueryTelemetryField,
)
from snowflake.snowpark._internal.utils import (
get_application_name,
get_os_name,
Expand Down Expand Up @@ -561,3 +564,23 @@ def send_reduce_describe_query_telemetry(
},
}
self.send(message)

def send_describe_query_details(
self,
session_id: int,
sql_text: str,
e2e_time: float,
stack_trace: Optional[List[Optional[str]]],
):
message = {
**self._create_basic_telemetry_data(
DescribeQueryTelemetryField.TYPE_DESCRIBE_QUERY_DETAILS.value
),
TelemetryField.KEY_DATA.value: {
TelemetryField.SESSION_ID.value: session_id,
DescribeQueryTelemetryField.SQL_TEXT.value: sql_text,
DescribeQueryTelemetryField.E2E_TIME.value: e2e_time,
DescribeQueryTelemetryField.STACK_TRACE.value: stack_trace,
},
}
self.send(message)
41 changes: 33 additions & 8 deletions tests/integ/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -1098,8 +1098,8 @@ def process(self, a, b) -> Iterable[Tuple[int]]:

expected_data = {"func_name": "UDTFRegistration.register", "category": "create"}
assert telemetry_tracker.find_message_in_log_data(
2, sum_udtf_partial, expected_data
), f"could not find expected message: {expected_data} in the last 2 message log entries"
3, sum_udtf_partial, expected_data
), f"could not find expected message: {expected_data} in the last 3 message log entries"

sum_udtf = sum_udtf_partial()
select_partial = partial(df.select, sum_udtf(df.a, df.b))
Expand All @@ -1108,8 +1108,8 @@ def process(self, a, b) -> Iterable[Tuple[int]]:
"category": "usage",
}
assert telemetry_tracker.find_message_in_log_data(
2, select_partial, expected_data
), f"could not find expected message: {expected_data} in the last 2 message log entries"
3, select_partial, expected_data
), f"could not find expected message: {expected_data} in the last 3 message log entries"

# udtf register from file
test_files = TestFiles(resources_path)
Expand All @@ -1127,8 +1127,8 @@ def process(self, a, b) -> Iterable[Tuple[int]]:
"category": "create",
}
assert telemetry_tracker.find_message_in_log_data(
2, my_udtf_partial, expected_data
), f"could not find expected message: {expected_data} in the last 2 message log entries"
3, my_udtf_partial, expected_data
), f"could not find expected message: {expected_data} in the last 3 message log entries"
my_udtf = my_udtf_partial()

invoke_partial = partial(
Expand All @@ -1149,8 +1149,8 @@ def process(self, a, b) -> Iterable[Tuple[int]]:
"category": "usage",
}
assert telemetry_tracker.find_message_in_log_data(
2, invoke_partial, expected_data
), f"could not find expected message: {expected_data} in the last 2 message log entries"
3, invoke_partial, expected_data
), f"could not find expected message: {expected_data} in the last 3 message log entries"


@pytest.mark.skip(
Expand Down Expand Up @@ -1284,3 +1284,28 @@ def send_telemetry():
data, type_, _ = telemetry_tracker.extract_telemetry_log_data(-1, send_telemetry)
assert data == expected_data
assert type_ == "snowpark_cursor_created"


def test_describe_query_details(session):
client = session._conn._telemetry_client

def send_telemetry():
client.send_describe_query_details(
session.session_id,
sql_text="select 1 as a, 2 as b",
e2e_time=0.01,
stack_trace=["line1", "line2"],
)

telemetry_tracker = TelemetryDataTracker(session)

expected_data = {
"session_id": session.session_id,
"sql_text": "select 1 as a, 2 as b",
"e2e_time": 0.01,
"stack_trace": ["line1", "line2"],
}

data, type_, _ = telemetry_tracker.extract_telemetry_log_data(-1, send_telemetry)
assert data == expected_data
assert type_ == "snowpark_describe_query_details"