SNOW-1737940: Support keepalive parameters in LibSfClient#841
SNOW-1737940: Support keepalive parameters in LibSfClient#841sfc-gh-ext-simba-jy wants to merge 106 commits intomasterfrom
Conversation
cpp/lib/HeartbeatBackground.cpp
Outdated
| if (!renew_session(curl, sf, err)) | ||
| { | ||
| CXX_LOG_TRACE("sf::HeartbeatBackrgound::renew_session_sync::Failed to renew session"); | ||
| stop_heart_beat_for_this_session(sf); |
There was a problem hiding this comment.
There's a deadlock here as the m_lock is used in the renewSession and then the same lock is used in the underlying removeConnection method.
This could be potentially fixed by the usage of RecursiveMutex instead of the MutexUnique
There was a problem hiding this comment.
Since these two methods don’t call each other and are handled by separate threads, I’m trying to understand the reasoning behind the deadlock concern. Could you clarify why you think a deadlock might happen here?
There was a problem hiding this comment.
There is a call chain on the same thread:
HeartbeatBackground::renewSession()acquiresMutexUnique guard(m_lock)- While holding
m_lock, it callsrenew_session_sync(itr->second) renew_session_sync(), ifrenew_session()fails, callsstop_heart_beat_for_this_session(sf)stop_heart_beat_for_this_session()callsbg.removeConnection(sf)removeConnection()tries to acquireMutexGuard m_guard(m_lock)- deadlock
There was a problem hiding this comment.
Hello @sfc-gh-pbulawa, thank you for your feedback. After investigating the issue, I confirmed that this workflow can indeed cause a deadlock. Upon comparing the code flow with the ODBC implementation, I realized that stop_heartbeat is not required in renew_session_sync. Therefore, I have removed the stop_heartbeat_for_this_session call from renew_session_sync, which should resolve this issue.
| } | ||
| } | ||
|
|
||
| heartbeatReq HeartbeatBackground::genHeartBeatReq(SF_CONNECT* connection) |
There was a problem hiding this comment.
Let's acquire the sf->mutex_tokens here, as the Connection parameters read is unprotected right now, which could cause a race condition, e.g., when the token is updated during the query execution.
There was a problem hiding this comment.
I’m not entirely sure which tokens you’re referring to. The heartbeat request doesn’t include any body content, and it only needs the protocol, host, and port to construct the URL. You can find the POST request here: https://github.com/snowflakedb/libsnowflakeclient/pull/841/changes#diff-c11feb72994cdbbcf8f21da1a069803f0abbf5073e068b4ec4f793b5cef71791R176
There was a problem hiding this comment.
genHeartBeatReq calls create_header:
And create_header in connection.c does read the token without any lock:
libsnowflakeclient/lib/connection.c
Lines 182 to 188 in 3d1974b
It reads sf->token (or sf->master_token), which is exactly the kind of field protected by sf->mutex_tokens. Another thread performing query execution or session renewal can update these tokens concurrently.
There was a problem hiding this comment.
Thank you for the clarification @sfc-gh-pbulawa. I applied to this. Thank you!
In this case, I believe the ODBC may need to apply this logic. WDYT?
There was a problem hiding this comment.
Yes, if you could update it in odbc that would be great.
sfc-gh-pbulawa
left a comment
There was a problem hiding this comment.
PTAL at the comments
|
|
||
| void start_heart_beat_for_this_session(SF_CONNECT* sf) | ||
| { | ||
| _mutex_lock(&sf->mutex_heart_beat); |
There was a problem hiding this comment.
Lock ordering risks deadlock start_heart_beat_for_this_session and stop_heart_beat_for_this_session lock sf->mutex_heart_beat then call into HeartbeatBackground which acquires m_lock. However, renewSession iterates failedConnection under m_lock and then acquires conn->mutex_heart_beat. If these two orderings ever overlap, a deadlock occurs. Lock ordering must be documented and enforced (e.g., always acquire m_lock before mutex_heart_beat).
| // join | ||
| if (m_worker != NULL) | ||
| { | ||
| m_worker->join(); |
There was a problem hiding this comment.
Exception can escape. m_worker->join() can throw std::system_error. If it does, the exception propagates out of the destructor, causing std::terminate. Let's wrap in try/catch and log.
| test_heartbeat(sf); | ||
|
|
||
| assert_false(sf_strncasecmp(previous_sessiontoken, sf->token, strlen(sf->token)) == 0); | ||
| assert_false(sf_strncasecmp(previous_masterToken, sf->master_token, strlen(sf->token)) == 0); |
There was a problem hiding this comment.
| assert_false(sf_strncasecmp(previous_masterToken, sf->master_token, strlen(sf->token)) == 0); | |
| assert_false(sf_strncasecmp(previous_masterToken, sf->master_token, strlen(sf->master_token)) == 0); |
| void create_recursive_mutex(void** mutex, uint64_t id) | ||
| { | ||
| try { | ||
| *mutex = (void*) new Snowflake::Client::RecursiveMutex(id); |
There was a problem hiding this comment.
What if creation fails? The mutex will never be set.
| void free_recursive_mutex(void** mutex) | ||
| { | ||
| try { | ||
| delete static_cast<Snowflake::Client::RecursiveMutex*>(*mutex); |
There was a problem hiding this comment.
We should have a null check here for safety.
There was a problem hiding this comment.
Pull request overview
Adds client-session keepalive support to LibSfClient by sending keepalive session parameters during login and running a background heartbeat/renewal loop when enabled.
Changes:
- Introduces new connection attributes and session parameters:
CLIENT_SESSION_KEEP_ALIVEandCLIENT_SESSION_KEEP_ALIVE_HEARTBEAT_FREQUENCY. - Adds a C++ heartbeat background worker (plus C-callable wrappers) and recursive mutex plumbing to renew tokens safely.
- Adds/updates tests and mock expected JSON to cover the new session parameters and heartbeat behavior.
Reviewed changes
Copilot reviewed 21 out of 21 changed files in this pull request and generated 14 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_heartbeat.cpp | New heartbeat/keepalive integration tests (threaded + manual). |
| tests/mock/test_mock_mfa_token_caching.c | Forces keepalive off in MFA mock tests to match new request payload. |
| tests/mock/test_data/expected_second_mfa_request.json | Updates expected request JSON to include keepalive session parameters. |
| tests/mock/test_data/expected_first_mfa_request.json | Updates expected request JSON to include keepalive session parameters. |
| tests/CMakeLists.txt | Adds test_heartbeat and optional HEARTBEAT_DEBUG compile definitions. |
| lib/mutex.h | C API for creating/freeing a recursive mutex implemented in C++. |
| lib/heart_beat_background.h | C API for starting/stopping heartbeat and synchronously renewing session. |
| lib/connection.c | Adds keepalive session parameters to the login request body. |
| lib/client_int.h | Adds heartbeat URL/constants and extends error code constants; wraps uuid APIs for C++. |
| lib/client.c | Stores keepalive settings, session ID, starts/stops heartbeat on connect/term, and adds new set/get attributes. |
| include/snowflake/client.h | Exposes new attributes and adds heartbeat-related fields to SF_CONNECT. |
| include/snowflake/SFURL.hpp | Adds a new SFURL constructor and reorders some members. |
| cpp/util/SnowflakeCppUtil.cpp | Adjusts cJSON→picojson conversion to avoid leaking the printed JSON string. |
| cpp/util/SFURL.cpp | Implements the new SFURL constructor. |
| cpp/lib/SnowflakeUtil.cpp | Removes old duplicate C wrapper utilities (moved to cpp/util). |
| cpp/lib/Mutex.hpp | Adds C++ Mutex/RecursiveMutex wrappers (used by heartbeat + C bridge). |
| cpp/lib/Mutex.cpp | Implements recursive mutex bridge functions and mutex wrappers. |
| cpp/lib/IAuth.cpp | Uses URL constants and new SFURL constructor when building authenticator URL. |
| cpp/lib/HeartbeatBackground.hpp | Declares the singleton heartbeat worker and heartbeat request snapshot struct. |
| cpp/lib/HeartbeatBackground.cpp | Implements heartbeat worker thread, heartbeat POSTs, and session renew logic + C wrappers. |
| CMakeLists.txt | Adds HEARTBEAT option and includes new heartbeat/mutex sources/headers in builds. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| MutexGuard m_guard(m_lock); | ||
|
|
||
| m_connections[connection->session_id] = connection; | ||
|
|
||
| if (m_worker == NULL) |
| cJSON* resp_data = NULL; | ||
| SF_ERROR_STRUCT* err = NULL; | ||
|
|
||
| CXX_LOG_TRACE("sf::HeartbeatBackground::sendQueuedHeartBeatReq::sending heartbeat for session %s", conn.sessionId.c_str()); | ||
|
|
||
| if (http_perform(curl, POST_REQUEST_TYPE, (char*)destination.c_str(), httpExtraHeaders, NULL, NULL, &resp_data, | ||
| NULL, NULL, conn.retryTimeout, conn.networkTimeout, SF_BOOLEAN_FALSE, err, | ||
| conn.isInsecuremode, conn.isOcspOpen, conn.crlCheck, conn.crlAdvisory, conn.crlAllowNoCrl, | ||
| conn.crlDiskCaching, conn.crlMemoryCaching, conn.crlDownloadTimeout, | ||
| conn.retryCurlCount, 0, conn.maxRetryCount, &elapsedTime, &retried_count, NULL, SF_BOOLEAN_FALSE, | ||
| proxy, noProxy, SF_BOOLEAN_FALSE, SF_BOOLEAN_FALSE)) |
| #ifndef HEARTBEAT_DEBUG | ||
| return; | ||
| #endif // | ||
|
|
||
| SF_UNUSED(unused); |
| sf_sleep_ms(30 * 1000); | ||
| assert_false(sf_strncasecmp(previous_sessiontoken, sf->token, strlen(sf->token)) == 0); | ||
| assert_false(sf_strncasecmp(previous_masterToken, sf->master_token, strlen(sf->token)) == 0); | ||
|
|
| test_heartbeat(sf); | ||
|
|
||
| assert_false(sf_strncasecmp(previous_sessiontoken, sf->token, strlen(sf->token)) == 0); | ||
| assert_false(sf_strncasecmp(previous_masterToken, sf->master_token, strlen(sf->token)) == 0); |
|
|
||
| // trace to get the correlate OS reference with our internal mutex, | ||
| CXX_LOG_TRACE("sf::RecursiveMutex,/%s mutex=%lu", mutexName, m_id); | ||
| } |
| sf_bool client_session_keep_alive = SF_BOOLEAN_TRUE; | ||
| snowflake_set_attribute(sf, SF_CON_CLIENT_SESSION_KEEP_ALIVE, &client_session_keep_alive); | ||
| snowflake_connect(sf); | ||
| std::this_thread::sleep_for(std::chrono::milliseconds(100)); | ||
| //Make sure renew_session_sync work well | ||
| *result = renew_session_sync(sf); | ||
|
|
| if (sf->client_session_keep_alive) | ||
| { | ||
| start_heart_beat_for_this_session(sf); | ||
| } | ||
| else | ||
| { | ||
| stop_heart_beat_for_this_session(sf); |
| std::string dataStr = std::string(jsonStr); | ||
| strToPicoJson(picojson, dataStr); | ||
| SF_FREE(jsonStr); |
| */ | ||
| SFURL(const SFURL ©); | ||
|
|
||
| SFURL(std::string& protocol, std::string& host, std::string& port); |
No description provided.