-
Notifications
You must be signed in to change notification settings - Fork 44
[FSTORE-1382] fix for async concurrency issues for sql client #1343
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -59,6 +57,3 @@ def get_sdk_info(): | |||
|
|||
|
|||
__all__ = ["connection", "disable_usage_logging", "get_sdk_info"] | |||
# running async code in jupyter throws "RuntimeError: This event loop is already running" | |||
# with tornado 6. This fixes the issue without downgrade to tornado==4.5.3 | |||
nest_asyncio.apply() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we sure that we want to stop providing support for older version of tornado? Or is it enough to move it to the sql engine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After digging deeper, I realised some of the problems were caused by nest_asyncio
as it makes the actual async errors difficult to debug. So I tried to remove its dependency, but its still needed in case of jupyter, otherwise the user has to explicitly call nest_asyncio.apply()
in each notebook. It is not needed for python runtimes. So it is needed for below cases:
- jupyter -> added check in init_async methods
- locust -> added into the
locustfile
@@ -205,18 +209,30 @@ def _parametrize_prepared_statements( | |||
return prepared_statements_dict | |||
|
|||
def init_async_mysql_connection(self, options=None): | |||
def is_runtime_notebook(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this a local function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved to the utils class
from typing import Any, Dict, List, Optional, Set, Tuple, Union | ||
|
||
import nest_asyncio |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you make this an optional import by importing only if is_runtime_notebook?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
) | ||
self._async_pool = await util.create_async_engine( | ||
online_connector, self._external, default_min_size, options=options | ||
async def _get_connection_pool(self, default_min_size: int, loop=None) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
loop
is not used by any caller
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Loop argument is not strictly required as the engine calls its asyncio built-in method, but good to keep it as optional
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure I understand why we would not always pass loop?
python/hsfs/util.py
Outdated
loop = asyncio.get_running_loop() | ||
except RuntimeError as er: | ||
raise RuntimeError( | ||
"Event loop is not running. Please provide an event loop to create the engine." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how can user provide the loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it gets the loop by the asyncio.get_running_loop
so there should be running loop first to call this co-routine. I modified the error message a bit to make it clear. Loop argument is not strictly required but good to keep it as optional
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no loop
argument in init_serving
or get_feature_vector
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed loop argument to avoid confusion @kennethmhc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to understand the optional loop use case. It feels like always having a per serving FV loop is the way to go and keeping it optional is error-prone/not clear to me
@@ -698,3 +726,11 @@ def feature_view_api(self) -> feature_view_api.FeatureViewApi: | |||
@property | |||
def storage_connector_api(self) -> storage_connector_api.StorageConnectorApi: | |||
return self._storage_connector_api | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about online_connector and connection_pool?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
) | ||
self._async_pool = await util.create_async_engine( | ||
online_connector, self._external, default_min_size, options=options | ||
async def _get_connection_pool(self, default_min_size: int, loop=None) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure I understand why we would not always pass loop?
self._hostname = util.get_host_name() if self._external else None | ||
|
||
if util.is_runtime_notebook(): | ||
_logger.debug("Running in Jupyter notebook, applying nest_asyncio") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be a warning rather than debug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think debug is fine as it is not really something user needs to do
…lclocks#1343) * init-working * add lock changes * move lock to execute method * update lock * pass pool as arg * loop changes and refactoring * make new connection pool for each call * make exec_prep async and add lock * use context manager and refactor init connection * use create task * add semaphore * remove nest_async and use manual loop * make nest asyncio only if jupyter * refactoring and move hostname retrieving to init serving * minor clean * minor cleanup * add unit test * remove return of pool * revert locust changes and refactoring * revert locust changes and refactoring * fix review comments * fix review comments * remove loop method argument --------- Co-authored-by: Victor Jouffrey <[email protected]>
…#1372) * init-working * add lock changes * move lock to execute method * update lock * pass pool as arg * loop changes and refactoring * make new connection pool for each call * make exec_prep async and add lock * use context manager and refactor init connection * use create task * add semaphore * remove nest_async and use manual loop * make nest asyncio only if jupyter * refactoring and move hostname retrieving to init serving * minor clean * minor cleanup * add unit test * remove return of pool * revert locust changes and refactoring * revert locust changes and refactoring * fix review comments * fix review comments * remove loop method argument --------- Co-authored-by: Victor Jouffrey <[email protected]>
This PR adds/fixes/changes...
JIRA Issue: -
Priority for Review: -
Related PRs: -
How Has This Been Tested?
Checklist For The Assigned Reviewer: