-
Notifications
You must be signed in to change notification settings - Fork 94
SNOW-3138822: Add snow streamlit logs command for Streamlit apps live log streaming
#2773
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
cf76056
8116251
c2a642a
81c8474
7261f57
c6ee3a7
31942ee
854393b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,4 @@ | ||
| # Copyright (c) 2024 Snowflake Inc. | ||
| # Copyright (c) 2026 Snowflake Inc. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it needed to update the year? I'd rather keep it as previous value |
||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
|
|
@@ -24,6 +24,10 @@ | |
| add_object_command_aliases, | ||
| scope_option, | ||
| ) | ||
| from snowflake.cli._plugins.streamlit.log_streaming import ( | ||
| stream_logs, | ||
| validate_spcs_v2_runtime, | ||
| ) | ||
| from snowflake.cli._plugins.streamlit.manager import StreamlitManager | ||
| from snowflake.cli._plugins.streamlit.streamlit_entity import StreamlitEntity | ||
| from snowflake.cli._plugins.workspace.context import ActionContext, WorkspaceContext | ||
|
|
@@ -33,6 +37,7 @@ | |
| with_project_definition, | ||
| ) | ||
| from snowflake.cli.api.commands.flags import ( | ||
| IdentifierType, | ||
| PruneOption, | ||
| ReplaceOption, | ||
| entity_argument, | ||
|
|
@@ -215,6 +220,84 @@ def get_url( | |
| return MessageResult(url) | ||
|
|
||
|
|
||
| @app.command("logs", requires_connection=True) | ||
| @with_project_definition(is_optional=True) | ||
| def streamlit_logs( | ||
| entity_id: str = entity_argument("streamlit"), | ||
| name: FQN = typer.Option( | ||
| None, | ||
| "--name", | ||
| help="Fully qualified name of the Streamlit app (e.g. my_app, schema.my_app, or db.schema.my_app). " | ||
| "Overrides the project definition when provided.", | ||
| click_type=IdentifierType(), | ||
| ), | ||
| tail: int = typer.Option( | ||
| 100, | ||
| "--tail", | ||
| "-n", | ||
| min=0, | ||
| max=1000, # server-side buffer size limit (see logs_service.proto) | ||
| help="Number of historical log lines to fetch. Use 0 for live logs only.", | ||
| ), | ||
| **options, | ||
| ) -> CommandResult: | ||
| """ | ||
| Streams live logs from a deployed Streamlit app to your terminal. | ||
|
|
||
| Reads the Streamlit app name from the project definition file (snowflake.yml) | ||
| or from the --name option. Connects to the app's developer log service via | ||
| WebSocket and prints log entries in real time. Press Ctrl+C to stop streaming. | ||
|
|
||
| Log streaming requires SPCSv2 runtime. | ||
| """ | ||
| cli_context = get_cli_context() | ||
| conn = cli_context.connection | ||
|
|
||
| if name is not None: | ||
| if entity_id is not None: | ||
| raise ClickException( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use |
||
| "Cannot specify both --name and an entity ID. " | ||
| "Use --name to identify the app directly, or use an " | ||
| "entity ID to reference a snowflake.yml definition." | ||
| ) | ||
| # --name flag provided: resolve FQN and validate via server-side DESCRIBE | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you find those comments useful then maybe it would be better to convert those into log.debug messages? That way we could get additional data when debugging client issues |
||
| fqn = name.using_connection(conn) | ||
| validate_spcs_v2_runtime(conn, str(fqn)) | ||
| else: | ||
| # No --name: require project definition | ||
| pd = cli_context.project_definition | ||
| if pd is None: | ||
| raise ClickException( | ||
| "No Streamlit app specified. Provide --name or run from a " | ||
| "directory with a snowflake.yml project definition." | ||
| ) | ||
| if not pd.meets_version_requirement("2"): | ||
| if not pd.streamlit: | ||
| raise NoProjectDefinitionError( | ||
| project_type="streamlit", project_root=cli_context.project_root | ||
| ) | ||
| pd = convert_project_definition_to_v2(cli_context.project_root, pd) | ||
|
|
||
| entity_model = get_entity_for_operation( | ||
| cli_context=cli_context, | ||
| entity_id=entity_id, | ||
| project_definition=pd, | ||
| entity_type=ObjectType.STREAMLIT.value.cli_name, | ||
| ) | ||
|
|
||
| fqn = entity_model.fqn.using_connection(conn) | ||
| # Validate SPCSv2 runtime via server-side DESCRIBE (same path as --name) | ||
| validate_spcs_v2_runtime(conn, str(fqn)) | ||
|
|
||
| stream_logs( | ||
| conn=conn, | ||
| fqn=str(fqn), | ||
| tail_lines=tail, | ||
| json_output=cli_context.output_format.is_json, | ||
| ) | ||
| return MessageResult("Log streaming ended.") | ||
|
|
||
|
|
||
| def _get_current_workspace_context(): | ||
| ctx = get_cli_context() | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,228 @@ | ||
| # Copyright (c) 2026 Snowflake Inc. | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| """ | ||
| WebSocket log streaming client for Streamlit developer logs. | ||
|
|
||
| Connects to the Streamlit container runtime's developer log service | ||
| via WebSocket and streams log entries in real time. | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import json | ||
| import logging | ||
| import sys | ||
| from dataclasses import dataclass | ||
|
|
||
| import websocket | ||
| from click import ClickException | ||
| from google.protobuf.message import DecodeError | ||
| from snowflake.cli._plugins.streamlit.proto_codec import ( | ||
| decode_log_entry, | ||
| encode_stream_logs_request, | ||
| ) | ||
| from snowflake.cli._plugins.streamlit.streamlit_entity_model import ( | ||
| SPCS_RUNTIME_V2_NAME, | ||
| ) | ||
| from snowflake.cli.api.console import cli_console | ||
| from snowflake.connector import SnowflakeConnection | ||
|
|
||
| log = logging.getLogger(__name__) | ||
|
|
||
| # Timeout for each ws.recv_data() call — mirrors the Go client's 90-second | ||
| # read deadline. When no log entry arrives within this window, we re-issue | ||
| # recv_data() so the loop stays responsive to KeyboardInterrupt. | ||
| _WS_RECV_TIMEOUT_SECONDS = 90 | ||
|
|
||
| _HANDSHAKE_TIMEOUT_SECONDS = 10 | ||
|
|
||
|
|
||
| @dataclass | ||
| class DeveloperApiToken: | ||
| token: str | ||
| resource_uri: str | ||
|
|
||
|
|
||
| def get_developer_api_token(conn: SnowflakeConnection, fqn: str) -> DeveloperApiToken: | ||
| """ | ||
| Calls SYSTEM$GET_STREAMLIT_DEVELOPER_API_TOKEN and returns a | ||
| DeveloperApiToken with the token and resource URI. | ||
| """ | ||
| if "'" in fqn: | ||
| raise ClickException( | ||
| f"Invalid Streamlit app name: {fqn}. Name must not contain single quotes." | ||
| ) | ||
|
|
||
| query = f"CALL SYSTEM$GET_STREAMLIT_DEVELOPER_API_TOKEN('{fqn}', false);" | ||
| log.debug("Fetching developer API token for %s", fqn) | ||
|
|
||
| cursor = conn.cursor() | ||
| try: | ||
| cursor.execute(query) | ||
| row = cursor.fetchone() | ||
| if not row: | ||
| raise ClickException( | ||
| "Empty response from SYSTEM$GET_STREAMLIT_DEVELOPER_API_TOKEN" | ||
| ) | ||
| raw = row[0] | ||
| finally: | ||
| cursor.close() | ||
|
|
||
| try: | ||
| resp = json.loads(raw) | ||
| except (json.JSONDecodeError, TypeError) as e: | ||
| raise ClickException(f"Failed to parse token response: {e}") from e | ||
|
|
||
| token = resp.get("token", "") | ||
| resource_uri = resp.get("resourceUri", "") | ||
|
|
||
| if not token: | ||
| raise ClickException("Empty token in developer API response") | ||
| if not resource_uri: | ||
| raise ClickException("Empty resourceUri in developer API response") | ||
|
|
||
| log.debug("Resource URI: %s", resource_uri) | ||
| return DeveloperApiToken(token=token, resource_uri=resource_uri) | ||
|
|
||
|
|
||
| def build_ws_url(resource_uri: str) -> str: | ||
| """Convert resource URI to WebSocket URL and append /logs path.""" | ||
| ws_url = resource_uri.replace("https://", "wss://", 1).replace( | ||
| "http://", "ws://", 1 | ||
| ) | ||
| return ws_url.rstrip("/") + "/logs" | ||
|
|
||
|
|
||
| def validate_spcs_v2_runtime(conn: SnowflakeConnection, fqn: str) -> None: | ||
| """ | ||
| Run DESCRIBE STREAMLIT and verify the app uses SPCSv2 runtime. | ||
|
|
||
| Raises ClickException if the app does not use the SPCS Runtime V2 | ||
| (required for log streaming). | ||
| """ | ||
| cursor = conn.cursor() | ||
| try: | ||
| # fqn is already validated by IdentifierType / FQN.using_connection — | ||
| # DESCRIBE uses identifier syntax, not string literals, so no | ||
| # single-quote injection risk. | ||
| cursor.execute(f"DESCRIBE STREAMLIT {fqn}") | ||
| row = cursor.fetchone() | ||
| description = cursor.description | ||
| finally: | ||
| cursor.close() | ||
|
|
||
| if not row or not description: | ||
| raise ClickException( | ||
| f"Could not describe Streamlit app {fqn}. " | ||
| "Verify the app exists and you have access." | ||
| ) | ||
|
|
||
| # Build column-name -> value mapping from cursor.description | ||
| columns = {desc[0].lower(): val for desc, val in zip(description, row)} | ||
| runtime_name = columns.get("runtime_name") | ||
|
|
||
| if runtime_name != SPCS_RUNTIME_V2_NAME: | ||
| raise ClickException( | ||
| f"Log streaming is only supported for Streamlit apps running on " | ||
| f"SPCSv2 runtime ({SPCS_RUNTIME_V2_NAME}). " | ||
| f"App '{fqn}' has runtime_name='{runtime_name}'." | ||
| ) | ||
|
|
||
|
|
||
| def stream_logs( | ||
| conn: SnowflakeConnection, | ||
| fqn: str, | ||
| tail_lines: int = 100, | ||
| json_output: bool = False, | ||
| ) -> None: | ||
| """ | ||
| Connect to the Streamlit developer log streaming WebSocket and print | ||
| log entries to stdout until interrupted. | ||
|
|
||
| When *json_output* is True each log entry is emitted as a single-line | ||
| JSON object (JSONL), suitable for piping to ``jq`` or other tools. | ||
| """ | ||
| # 1. Get token | ||
| cli_console.step("Fetching developer API token...") | ||
| token_info = get_developer_api_token(conn, fqn) | ||
|
|
||
| # 2. Build WebSocket URL | ||
| ws_url = build_ws_url(token_info.resource_uri) | ||
| cli_console.step(f"Connecting to log stream: {ws_url}") | ||
|
|
||
| # 3. Connect | ||
| # NOTE: Do not log `header` — it contains the auth token. Also be aware | ||
| # that websocket.enableTrace(True) will dump headers to stderr. | ||
| header = [f'Authorization: Snowflake Token="{token_info.token}"'] | ||
| ws = websocket.WebSocket() | ||
| ws.timeout = _WS_RECV_TIMEOUT_SECONDS | ||
| streaming = False | ||
|
|
||
| try: | ||
| try: | ||
| ws.connect(ws_url, header=header, timeout=_HANDSHAKE_TIMEOUT_SECONDS) | ||
| except Exception as e: | ||
| raise ClickException(f"Failed to connect to log stream: {e}") from e | ||
|
|
||
| # 4. Send StreamLogsRequest | ||
| ws.send_binary(encode_stream_logs_request(tail_lines)) | ||
| log.debug("Sent StreamLogsRequest with tail_lines=%d", tail_lines) | ||
|
|
||
| cli_console.step(f"Streaming logs (tail={tail_lines}). Press Ctrl+C to stop.") | ||
| sys.stdout.write("---\n") | ||
| sys.stdout.flush() | ||
| streaming = True | ||
|
|
||
| # 5. Read loop | ||
| while True: | ||
| try: | ||
| opcode, data = ws.recv_data() | ||
| except websocket.WebSocketTimeoutException: | ||
| # No message within the timeout window — loop back so we | ||
| # stay responsive to KeyboardInterrupt. | ||
| continue | ||
| except websocket.WebSocketConnectionClosedException: | ||
| log.debug("WebSocket connection closed by server") | ||
| break | ||
| except (websocket.WebSocketException, OSError) as e: | ||
| log.debug("WebSocket recv error: %s", e) | ||
| break | ||
|
|
||
| if opcode == websocket.ABNF.OPCODE_BINARY: | ||
| try: | ||
| entry = decode_log_entry(data) | ||
| except (DecodeError, ValueError) as e: | ||
| log.warning("Failed to decode log entry: %s", e) | ||
| continue | ||
| if json_output: | ||
| sys.stdout.write(json.dumps(entry.to_dict()) + "\n") | ||
| else: | ||
| sys.stdout.write(entry.format_line() + "\n") | ||
| sys.stdout.flush() | ||
| elif opcode == websocket.ABNF.OPCODE_CLOSE: | ||
| break | ||
| elif opcode == websocket.ABNF.OPCODE_PING: | ||
| ws.pong(data) | ||
|
|
||
| except KeyboardInterrupt: | ||
| pass | ||
| finally: | ||
| try: | ||
| ws.close(status=websocket.STATUS_NORMAL) | ||
| except Exception as e: | ||
| log.debug("Error closing WebSocket: %s", e) | ||
| if streaming: | ||
| sys.stdout.write("\n--- Log streaming stopped.\n") | ||
| sys.stdout.flush() | ||
|
Comment on lines
+183
to
+228
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're not sanitizing the data coming server with |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| # Copyright (c) 2026 Snowflake Inc. | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will need to ask around if we need a formal approval to add a new package