Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1674875: merge gate for multithreaded wrapper tests #2336

Merged
Merged
Show file tree
Hide file tree
Changes from 125 commits
Commits
Show all changes
128 commits
Select commit Hold shift + click to select a range
56fb566
init
sfc-gh-aalam Sep 11, 2024
66003d1
make udf/sproc related files thread-safe
sfc-gh-aalam Sep 11, 2024
0e58205
Merge branch 'main' into aalam-SNOW-1418523-make-udf-sproc-thread-safe
sfc-gh-aalam Sep 11, 2024
fb4ecf6
add locks
sfc-gh-aalam Sep 11, 2024
e75dde1
init
sfc-gh-aalam Sep 11, 2024
68a8c1c
make query listener thread-safe
sfc-gh-aalam Sep 11, 2024
31a5734
Fix query_tag and last_action_id
sfc-gh-aalam Sep 11, 2024
b4dadda
core updates done
sfc-gh-aalam Sep 11, 2024
b8c6496
Add tests
sfc-gh-aalam Sep 12, 2024
f39837e
Fix local tests
sfc-gh-aalam Sep 12, 2024
f720701
Merge branch 'main' into aalam-SNOW-1418523-make-internal-session-var…
sfc-gh-aalam Sep 12, 2024
31a196f
Merge branch 'main' into aalam-SNOW-1418523-make-analyzer-server_conn…
sfc-gh-aalam Sep 12, 2024
723bdf7
Merge branch 'aalam-SNOW-1418523-make-internal-session-variables-thre…
sfc-gh-aalam Sep 12, 2024
37c0419
add file IO tests
sfc-gh-aalam Sep 12, 2024
8a2d433
Merge branch 'aalam-SNOW-1418523-concurrent-file-operations' into aal…
sfc-gh-aalam Sep 12, 2024
a083989
make session._runtime_version_from_requirement safe
sfc-gh-aalam Sep 13, 2024
947d384
add sp/udf concurrent tests
sfc-gh-aalam Sep 13, 2024
fd51720
fix broken test
sfc-gh-aalam Sep 13, 2024
3077853
add udtf/udaf tests
sfc-gh-aalam Sep 13, 2024
65c3186
fix broken test
sfc-gh-aalam Sep 13, 2024
94412cf
sql_simplifier, cte_optimization, eliminate_numeric, query_compilatio…
sfc-gh-aalam Sep 13, 2024
638dd09
cover more configs
sfc-gh-aalam Sep 17, 2024
7ae2c33
fix SnowflakePlan copy
sfc-gh-aalam Sep 17, 2024
1689ebf
minor update
sfc-gh-aalam Sep 17, 2024
5e8a2d2
add description
sfc-gh-aalam Sep 17, 2024
e5b3f83
init
sfc-gh-aalam Sep 17, 2024
1c83ef2
use _package_lock to protect Session._packages
sfc-gh-aalam Sep 17, 2024
a649761
undo refactor
sfc-gh-aalam Sep 17, 2024
f03d618
undo refactor
sfc-gh-aalam Sep 17, 2024
5f398d5
fix test
sfc-gh-aalam Sep 17, 2024
3807087
fix test
sfc-gh-aalam Sep 17, 2024
eca13dc
Merge branch 'main' into aalam-SNOW-1418523-make-internal-session-var…
sfc-gh-aalam Sep 17, 2024
4eef3e9
Merge branch 'aalam-SNOW-1418523-make-internal-session-variables-thre…
sfc-gh-aalam Sep 17, 2024
df3263c
add file IO tests
sfc-gh-aalam Sep 12, 2024
6769c54
merge with base
sfc-gh-aalam Sep 17, 2024
af86f67
merge with base
sfc-gh-aalam Sep 17, 2024
a737f33
fix test
sfc-gh-aalam Sep 17, 2024
9f2c707
merge with base
sfc-gh-aalam Sep 17, 2024
8ca2730
protect complexity bounds setter with lock
sfc-gh-aalam Sep 17, 2024
5c8389b
Merge branch 'aalam-SNOW-1663726-make-session-config-updates-thread-s…
sfc-gh-aalam Sep 18, 2024
39ea350
add tests
sfc-gh-aalam Sep 18, 2024
b616424
fix test
sfc-gh-aalam Sep 18, 2024
c10daf6
fix test
sfc-gh-aalam Sep 18, 2024
0140eee
Add telemetery for thread usage
sfc-gh-aalam Sep 18, 2024
c6d96c8
Add telemetery for file uplaods
sfc-gh-aalam Sep 18, 2024
96949be
Merge branch 'main' into aalam-SNOW-1418523-make-internal-session-var…
sfc-gh-aalam Sep 18, 2024
2e53cb4
init
sfc-gh-aalam Sep 18, 2024
734276a
trigger check for files in critical path
sfc-gh-aalam Sep 19, 2024
daadefc
try fix
sfc-gh-aalam Sep 19, 2024
8e63ced
update PR template
sfc-gh-aalam Sep 19, 2024
5fc16af
add link to future changes doc
sfc-gh-aalam Sep 19, 2024
b9c6951
try fix
sfc-gh-aalam Sep 19, 2024
8289829
test change for snowflake/snowpark/*.py
sfc-gh-aalam Sep 19, 2024
04329e5
test change for snowflake/snowpark/_internal/*
sfc-gh-aalam Sep 19, 2024
758fbd0
trigger merge gate on description change
sfc-gh-aalam Sep 19, 2024
8a5f275
test modin
sfc-gh-aalam Sep 19, 2024
54fe228
reset after tests
sfc-gh-aalam Sep 19, 2024
504eba1
run common tests in multithreading mode
sfc-gh-aalam Sep 20, 2024
5119749
run only tagged tests
sfc-gh-aalam Sep 20, 2024
743dde8
don't do check post test for sql counter if it is disabled
sfc-gh-aalam Sep 20, 2024
5f140ab
SNOW-1418523 make analyzer server connection thread safe (#2282)
sfc-gh-aalam Sep 25, 2024
0624824
SNOW-1418523: concurrent file operations (#2288)
sfc-gh-aalam Sep 25, 2024
42d6e19
SNOW-1418523: make udf and sproc registration thread safe (#2289)
sfc-gh-aalam Sep 25, 2024
801ad6e
merge with main
sfc-gh-aalam Oct 2, 2024
c7fa3ae
Merge branch 'main' into aalam-SNOW-1418523-make-internal-session-var…
sfc-gh-aalam Oct 3, 2024
5672a1d
SNOW-1663726 make session config updates thread safe (#2302)
sfc-gh-aalam Oct 4, 2024
bd0528d
SNOW-1663726 make temp table cleaner thread safe (#2309)
sfc-gh-aalam Oct 4, 2024
39a07d4
SNOW-1642189: collect telemetry about concurrency usage (#2316)
sfc-gh-aalam Oct 4, 2024
1e4dcad
merge
sfc-gh-aalam Oct 4, 2024
5392552
undo old changes
sfc-gh-aalam Oct 4, 2024
7977774
Merge branch 'aalam-SNOW-1546090-add-merge-gate-for-future-thread-saf…
sfc-gh-aalam Oct 4, 2024
cacf2cd
Update tests/integ/modin/test_modin_stored_procedures.py
sfc-gh-aalam Oct 4, 2024
4d4e257
SNOW-1546090 add merge gate for future thread safe updates (#2323)
sfc-gh-aalam Oct 4, 2024
5ecb0b4
param protect lock changes
sfc-gh-aalam Oct 4, 2024
66374ee
add plan-builder that was accidentally removed
sfc-gh-aalam Oct 4, 2024
3bec695
Add dummythreadlocal and protect server_connection
sfc-gh-aalam Oct 4, 2024
d41138a
add todo
sfc-gh-aalam Oct 4, 2024
42ca571
undo fixture
sfc-gh-aalam Oct 4, 2024
ee3ce32
fix init
sfc-gh-aalam Oct 4, 2024
24d5b82
Merge branch 'aalam-SNOW-1418523-make-internal-session-variables-thre…
sfc-gh-aalam Oct 4, 2024
816b1d9
fix param read
sfc-gh-aalam Oct 4, 2024
f1ab835
Merge branch 'aalam-SNOW-1418523-make-internal-session-variables-thre…
sfc-gh-aalam Oct 4, 2024
e45382b
fix lint
sfc-gh-aalam Oct 4, 2024
f0cff5c
address comment
sfc-gh-aalam Oct 4, 2024
75bb86c
fix test
sfc-gh-aalam Oct 5, 2024
5e447fd
fix test
sfc-gh-aalam Oct 5, 2024
7e7b47a
enable thread-safe session for tests
sfc-gh-aalam Oct 5, 2024
a0b259d
fix tests
sfc-gh-aalam Oct 6, 2024
97de868
fix option name
sfc-gh-aalam Oct 7, 2024
4d74fbc
fix test
sfc-gh-aalam Oct 7, 2024
eb1d918
address feedback
sfc-gh-aalam Oct 10, 2024
01c0204
merge
sfc-gh-aalam Oct 14, 2024
6a8a18f
Merge branch 'SNOW-1720835-param-protect-client-side-changes' into aa…
sfc-gh-aalam Oct 14, 2024
df1d2f6
merge with param protection changes
sfc-gh-aalam Oct 14, 2024
5f9200c
merge with main
sfc-gh-aalam Oct 14, 2024
312b5ad
add cursor created test
sfc-gh-aalam Oct 15, 2024
e0bcfb2
remove commented lines
sfc-gh-aalam Oct 15, 2024
b4fc798
fix unit test
sfc-gh-aalam Oct 15, 2024
4b83442
fix tests
sfc-gh-aalam Oct 15, 2024
d1592ba
address comments
sfc-gh-aalam Oct 16, 2024
1d63131
address comments
sfc-gh-aalam Oct 16, 2024
e7cef94
merge
sfc-gh-aalam Oct 16, 2024
1bde81e
minor updates
sfc-gh-aalam Oct 16, 2024
f61b526
Merge branch 'SNOW-1720835-param-protect-client-side-changes' into aa…
sfc-gh-aalam Oct 16, 2024
f494010
fix tests
sfc-gh-aalam Oct 16, 2024
1de5526
fix tests
sfc-gh-aalam Oct 16, 2024
dde5af3
Merge branch 'SNOW-1720835-param-protect-client-side-changes' into aa…
sfc-gh-aalam Oct 16, 2024
8ed4de4
fix tests
sfc-gh-aalam Oct 16, 2024
92213f7
Merge branch 'SNOW-1720835-param-protect-client-side-changes' into aa…
sfc-gh-aalam Oct 16, 2024
8d7a48a
fix test profile
sfc-gh-aalam Oct 16, 2024
a155c03
Merge branch 'main' into SNOW-1720835-param-protect-client-side-changes
sfc-gh-aalam Oct 16, 2024
a5810d6
Merge branch 'SNOW-1720835-param-protect-client-side-changes' into aa…
sfc-gh-aalam Oct 16, 2024
adbc395
disable multithreading mode by default
sfc-gh-aalam Oct 16, 2024
56dd2ce
comment upate
sfc-gh-aalam Oct 16, 2024
27a667d
fixture
sfc-gh-aalam Oct 16, 2024
1538a40
merge
sfc-gh-aalam Oct 16, 2024
eef3568
fix coverage
sfc-gh-aalam Oct 16, 2024
d22575c
address comments
sfc-gh-aalam Oct 16, 2024
46b3ca8
address comments
sfc-gh-aalam Oct 16, 2024
55844ea
address comments
sfc-gh-aalam Oct 16, 2024
4e481b2
Merge branch 'main' into SNOW-1720835-param-protect-client-side-changes
sfc-gh-aalam Oct 16, 2024
56169c5
fix integ
sfc-gh-aalam Oct 17, 2024
d491367
Merge branch 'SNOW-1720835-param-protect-client-side-changes' of gith…
sfc-gh-aalam Oct 17, 2024
7c6ab40
fix mock
sfc-gh-aalam Oct 17, 2024
33f3afe
merge
sfc-gh-aalam Oct 17, 2024
64b6aeb
Merge branch 'main' into aalam-SNOW-1674875-merge-gate-for-multithrea…
sfc-gh-aalam Oct 17, 2024
9017ee7
address comments
sfc-gh-aalam Oct 17, 2024
8b7a99b
fix merge
sfc-gh-aalam Oct 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions .github/workflows/precommit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,132 @@ jobs:
.tox/.coverage
.tox/coverage.xml

test-snowpark-multithreading-mode:
name: Test Snowpark Multithreading py-${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.cloud-provider }}
needs: build
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest-64-cores]
python-version: ["3.9"]
cloud-provider: [aws]
steps:
- name: Checkout Code
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Display Python version
run: python -c "import sys; print(sys.version)"
- name: Decrypt parameters.py
shell: bash
run: .github/scripts/decrypt_parameters.sh
env:
PARAMETER_PASSWORD: ${{ secrets.PARAMETER_PASSWORD }}
CLOUD_PROVIDER: ${{ matrix.cloud-provider }}
- name: Download wheel(s)
uses: actions/download-artifact@v4
with:
name: wheel
path: dist
- name: Show wheels downloaded
run: ls -lh dist
shell: bash
- name: Upgrade setuptools, pip and wheel
run: python -m pip install -U setuptools pip wheel
- name: Install tox
run: python -m pip install tox
- name: Run tests (excluding doctests)
run: python -m tox -e "py${PYTHON_VERSION/\./}-multithreaded-ci"
env:
PYTHON_VERSION: ${{ matrix.python-version }}
cloud_provider: ${{ matrix.cloud-provider }}
PYTEST_ADDOPTS: --color=yes --tb=short
TOX_PARALLEL_NO_SPINNER: 1
shell: bash
- name: Run local tests
run: python -m tox -e "py${PYTHON_VERSION/\./}-localmultithreaded-ci"
env:
PYTHON_VERSION: ${{ matrix.python-version }}
cloud_provider: ${{ matrix.cloud-provider }}
PYTEST_ADDOPTS: --color=yes --tb=short
TOX_PARALLEL_NO_SPINNER: 1
shell: bash
- name: Combine coverages
run: python -m tox -e coverage --skip-missing-interpreters false
shell: bash
env:
SNOWFLAKE_IS_PYTHON_RUNTIME_TEST: 1
- uses: actions/upload-artifact@v4
with:
include-hidden-files: true
name: coverage_${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.cloud-provider }}-snowpark-multithreading
path: |
.tox/.coverage
.tox/coverage.xml


test-snowpandas-multithreading-mode:
name: Test Snowpandas Multithreading py-${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.cloud-provider }}
needs: build
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest-64-cores]
python-version: ["3.9"]
cloud-provider: [aws]
steps:
- name: Checkout Code
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Display Python version
run: python -c "import sys; print(sys.version)"
- name: Decrypt parameters.py
shell: bash
run: .github/scripts/decrypt_parameters.sh
env:
PARAMETER_PASSWORD: ${{ secrets.PARAMETER_PASSWORD }}
CLOUD_PROVIDER: ${{ matrix.cloud-provider }}
- name: Download wheel(s)
uses: actions/download-artifact@v4
with:
name: wheel
path: dist
- name: Show wheels downloaded
run: ls -lh dist
shell: bash
- name: Upgrade setuptools, pip and wheel
run: python -m pip install -U setuptools pip wheel
- name: Install tox
run: python -m pip install tox
- name: Run Snowpark pandas API tests (excluding doctests)
run: python -m tox -e "py${PYTHON_VERSION/\./}-snowparkpandasmultithreaded-modin-ci"
env:
PYTHON_VERSION: ${{ matrix.python-version }}
cloud_provider: ${{ matrix.cloud-provider }}
PYTEST_ADDOPTS: --color=yes --tb=short
TOX_PARALLEL_NO_SPINNER: 1
shell: bash
- name: Combine coverages
run: python -m tox -e coverage --skip-missing-interpreters false
shell: bash
env:
SNOWFLAKE_IS_PYTHON_RUNTIME_TEST: 1
- uses: actions/upload-artifact@v4
with:
include-hidden-files: true
name: coverage_${{ matrix.os }}-${{ matrix.python-version }}-${{ matrix.cloud-provider }}-snowpandas-multithreading
path: |
.tox/.coverage
.tox/coverage.xml


combine-coverage:
if: ${{ success() || failure() }}
name: Combine coverage
Expand All @@ -481,6 +607,8 @@ jobs:
- test-enable-cte-optimization
- test-snowpark-pandas
- test-modin-extra-without-pandas-extra
- test-snowpark-multithreading-mode
- test-snowpandas-multithreading-mode
runs-on: ubuntu-latest
steps:
- name: Checkout Code
Expand Down
1 change: 0 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
### Snowpark Python API Updates

- Added support for 'Service' domain to `session.lineage.trace` API.
- Updated `Session` class to be thread-safe. This allows concurrent dataframe transformations, dataframe actions, UDF and store procedure registration, and concurrent file uploads.
- Added support for `copy_grants` parameter when registering UDxF and stored procedures.

#### New Features
Expand Down
11 changes: 9 additions & 2 deletions src/snowflake/snowpark/_internal/server_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
from snowflake.snowpark._internal.error_message import SnowparkClientExceptionMessages
from snowflake.snowpark._internal.telemetry import TelemetryClient
from snowflake.snowpark._internal.utils import (
create_rlock,
create_thread_local,
escape_quotes,
get_application_name,
get_version,
Expand Down Expand Up @@ -155,8 +157,6 @@ def __init__(
options: Dict[str, Union[int, str]],
conn: Optional[SnowflakeConnection] = None,
) -> None:
self._lock = threading.RLock()
self._thread_store = threading.local()
self._lower_case_parameters = {k.lower(): v for k, v in options.items()}
self._add_application_parameters()
self._conn = conn if conn else connect(**self._lower_case_parameters)
Expand All @@ -171,6 +171,13 @@ def __init__(
except TypeError:
pass

# thread safe param protection
self._thread_safe_session_enabled = self._get_client_side_session_parameter(
"PYTHON_SNOWPARK_ENABLE_THREAD_SAFE_SESSION", False
)
self._lock = create_rlock(self._thread_safe_session_enabled)
self._thread_store = create_thread_local(self._thread_safe_session_enabled)

if "password" in self._lower_case_parameters:
self._lower_case_parameters["password"] = None
self._telemetry_client = TelemetryClient(self._conn)
Expand Down
4 changes: 2 additions & 2 deletions src/snowflake/snowpark/_internal/temp_table_auto_cleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved.
#
import logging
import threading
import weakref
from collections import defaultdict
from typing import TYPE_CHECKING, Dict

from snowflake.snowpark._internal.analyzer.snowflake_plan_node import SnowflakeTable
from snowflake.snowpark._internal.utils import create_rlock

if TYPE_CHECKING:
from snowflake.snowpark.session import Session # pragma: no cover
Expand All @@ -33,7 +33,7 @@ def __init__(self, session: "Session") -> None:
# this dict will still be maintained even if the cleaner is stopped (`stop()` is called)
self.ref_count_map: Dict[str, int] = defaultdict(int)
# Lock to protect the ref_count_map
self.lock = threading.RLock()
self.lock = create_rlock(session._conn._thread_safe_session_enabled)

def add(self, table: SnowflakeTable) -> None:
with self.lock:
Expand Down
48 changes: 47 additions & 1 deletion src/snowflake/snowpark/_internal/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,12 @@ def normalize_path(path: str, is_local: bool) -> str:
return f"'{path}'"


def warn_session_config_update_in_multithreaded_mode(config) -> None:
def warn_session_config_update_in_multithreaded_mode(
config: str, thread_safe_mode_enabled: bool
) -> None:
if not thread_safe_mode_enabled:
return

if threading.active_count() > 1:
logger.warning(
"You might have more than one threads sharing the Session object trying to update "
Expand Down Expand Up @@ -675,6 +680,47 @@ def warning(self, text: str) -> None:
self.count += 1


# TODO: SNOW-1720855: Remove DummyRLock and DummyThreadLocal after the rollout
class DummyRLock:
"""This is a dummy lock that is used in place of threading.Rlock when multithreading is
disabled."""

def __enter__(self):
pass

def __exit__(self, exc_type, exc_val, exc_tb):
pass

def acquire(self, *args, **kwargs):
pass # pragma: no cover

def release(self, *args, **kwargs):
pass # pragma: no cover


class DummyThreadLocal:
"""This is a dummy thread local class that is used in place of threading.local when
multithreading is disabled."""

pass


def create_thread_local(
thread_safe_session_enabled: bool,
) -> Union[threading.local, DummyThreadLocal]:
if thread_safe_session_enabled:
return threading.local()
return DummyThreadLocal()


def create_rlock(
thread_safe_session_enabled: bool,
) -> Union[threading.RLock, DummyRLock]:
if thread_safe_session_enabled:
return threading.RLock()
return DummyRLock()


warning_dict: Dict[str, WarningHelper] = {}


Expand Down
23 changes: 15 additions & 8 deletions src/snowflake/snowpark/mock/_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import functools
import json
import logging
import threading
import uuid
from copy import copy
from decimal import Decimal
Expand All @@ -30,6 +29,7 @@
from snowflake.snowpark._internal.error_message import SnowparkClientExceptionMessages
from snowflake.snowpark._internal.server_connection import DEFAULT_STRING_SIZE
from snowflake.snowpark._internal.utils import (
create_rlock,
is_in_stored_procedure,
result_set_to_rows,
)
Expand Down Expand Up @@ -281,19 +281,26 @@ def read_table_if_exists(
def __init__(self, options: Optional[Dict[str, Any]] = None) -> None:
self._conn = MockedSnowflakeConnection()
self._cursor = Mock()
self._lock = threading.RLock()
self._options = options or {}
session_params = self._options.get("session_parameters", {})
# thread safe param protection
self._thread_safe_session_enabled = session_params.get(
"PYTHON_SNOWPARK_ENABLE_THREAD_SAFE_SESSION", False
)
self._lock = create_rlock(self._thread_safe_session_enabled)
self._lower_case_parameters = {}
self.remove_query_listener = Mock()
self.add_query_listener = Mock()
self._telemetry_client = Mock()
self.entity_registry = MockServerConnection.TabularEntityRegistry(self)
self.stage_registry = StageEntityRegistry(self)
self._conn._session_parameters = {
"ENABLE_ASYNC_QUERY_IN_PYTHON_STORED_PROCS": False,
"_PYTHON_SNOWPARK_USE_SCOPED_TEMP_OBJECTS_STRING": True,
"_PYTHON_SNOWPARK_USE_SQL_SIMPLIFIER_STRING": True,
}
self._options = options or {}
self._conn._session_parameters = session_params.update(
{
"ENABLE_ASYNC_QUERY_IN_PYTHON_STORED_PROCS": False,
"_PYTHON_SNOWPARK_USE_SCOPED_TEMP_OBJECTS_STRING": True,
"_PYTHON_SNOWPARK_USE_SQL_SIMPLIFIER_STRING": True,
}
)
self._active_account = self._options.get(
"account", snowflake.snowpark.mock._constants.CURRENT_ACCOUNT
)
Expand Down
Loading
Loading