Skip to content

Commit 6e80c56

Browse files
committed
[#26903] DocDB: TableLocks: Deadlock Detection at the Object Lock Manager
Summary: https://phorge.dev.yugabyte.com/D42862 introduced mechanism for registering waiters at the ObjectLockManager (OLM) with the deadlock detector. The deadlocks were being detected and the offending transaction was being aborted, but the obsolete waiting lock request at the OLM was being responded to only after the client deadline, which could be longer (default of 600s). Since the OLM doesn't poll for transaction statuses, there are 2 ways in which we can early abort such deadlocked waiters: 1. Notify tserver/master OLM on transaction heartbeat failure 2. Periodically timeout waiters and prevent retry if the transaction has already failed, and fail the lock request. This diff takes the second approach as opposed to 1. Since object locking is totally managed by `PgClientSession`, the first approach would require us to introduce an interface to notify `PgClientSession` on `YBTransaction` heartbeat failure. We would then need additional handling to clear state when `PgClientSession` switches `YBTransaction`s. With the second approach, the OLM times out waiters after `FLAGS_refresh_waiter_timeout_ms` (defaults to 30secs), and the caller side simply checks for the status of the transaction before retrying the lock call. The approach has the following downsides: 1. In case of non-deadlocking conflicts (conflicting DDLs and DMLs), we could go through multiple retries. This should be okay since it doesn't happen all the time. 2. Even upon detection of some deadlocks almost instantaneously, the lock call corresponding to the aborted waiter could take about FLAGS_refresh_waiter_timeout_ms to fail. And the blocked waiters remain blocked until the aborted blocker rollsback/finishes. **Upgrade/Downgrade safety** Removing a field in a proto message whose usage is guarded by test flag `TEST_enable_object_locking_for_table_locks` Jira: DB-16319 Test Plan: Jenkins ./yb_build.sh --cxx-test pg_object_locks-test --gtest_filter PgObjectLocksTestRF1.TestDeadlock Reviewers: amitanand, rthallam Reviewed By: amitanand Subscribers: yql, ybase Differential Revision: https://phorge.dev.yugabyte.com/D44228
1 parent 6a9d02a commit 6e80c56

13 files changed

+232
-124
lines changed

src/yb/client/client-internal.cc

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2316,8 +2316,9 @@ class AcquireObjectLocksGlobalRpc
23162316
public:
23172317
AcquireObjectLocksGlobalRpc(
23182318
YBClient* client, master::AcquireObjectLocksGlobalRequestPB request,
2319-
StdStatusCallback user_cb, CoarseTimePoint deadline)
2320-
: ClientMasterRpc(client, deadline), user_cb_(std::move(user_cb)) {
2319+
StdStatusCallback user_cb, CoarseTimePoint deadline, std::function<Status()>&& should_retry)
2320+
: ClientMasterRpc(client, deadline), user_cb_(std::move(user_cb)),
2321+
should_retry_(std::move(should_retry)) {
23212322
req_.CopyFrom(request);
23222323
}
23232324

@@ -2335,7 +2336,14 @@ class AcquireObjectLocksGlobalRpc
23352336
}
23362337

23372338
bool ShouldRetry(const Status& status) override {
2338-
return status.IsTryAgain();
2339+
if (!status.IsTryAgain()) {
2340+
return false;
2341+
}
2342+
if (auto s = should_retry_(); !s.ok()) {
2343+
error_ = s;
2344+
return false;
2345+
}
2346+
return true;
23392347
}
23402348

23412349
void CallRemoteMethod() override {
@@ -2349,10 +2357,12 @@ class AcquireObjectLocksGlobalRpc
23492357
if (!status.ok()) {
23502358
LOG(WARNING) << ToString() << " failed: " << status.ToString();
23512359
}
2352-
user_cb_(status);
2360+
user_cb_(error_.ok() ? status : status.CloneAndAppend(error_.message()));
23532361
}
23542362

23552363
StdStatusCallback user_cb_;
2364+
std::function<Status()> should_retry_;
2365+
Status error_ = Status::OK();
23562366
};
23572367

23582368
class ReleaseObjectLocksGlobalRpc
@@ -2663,8 +2673,9 @@ void YBClient::Data::DeleteNotServingTablet(
26632673

26642674
void YBClient::Data::AcquireObjectLocksGlobalAsync(
26652675
YBClient* client, master::AcquireObjectLocksGlobalRequestPB request, CoarseTimePoint deadline,
2666-
StdStatusCallback callback) {
2667-
auto rpc = StartRpc<internal::AcquireObjectLocksGlobalRpc>(client, request, callback, deadline);
2676+
StdStatusCallback callback, std::function<Status()>&& should_retry) {
2677+
auto rpc = StartRpc<internal::AcquireObjectLocksGlobalRpc>(
2678+
client, request, callback, deadline, std::move(should_retry));
26682679
}
26692680

26702681
void YBClient::Data::ReleaseObjectLocksGlobalAsync(

src/yb/client/client-internal.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ class YBClient::Data {
363363

364364
void AcquireObjectLocksGlobalAsync(
365365
YBClient* client, master::AcquireObjectLocksGlobalRequestPB request, CoarseTimePoint deadline,
366-
StdStatusCallback callback);
366+
StdStatusCallback callback, std::function<Status()>&& should_retry);
367367

368368
void ReleaseObjectLocksGlobalAsync(
369369
YBClient* client, master::ReleaseObjectLocksGlobalRequestPB request, CoarseTimePoint deadline,

src/yb/client/client.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2000,8 +2000,8 @@ void YBClient::DeleteNotServingTablet(const TabletId& tablet_id, StdStatusCallba
20002000

20012001
void YBClient::AcquireObjectLocksGlobalAsync(
20022002
const master::AcquireObjectLocksGlobalRequestPB& request, StdStatusCallback callback,
2003-
CoarseTimePoint deadline) {
2004-
data_->AcquireObjectLocksGlobalAsync(this, request, deadline, callback);
2003+
CoarseTimePoint deadline, std::function<Status()>&& should_retry) {
2004+
data_->AcquireObjectLocksGlobalAsync(this, request, deadline, callback, std::move(should_retry));
20052005
}
20062006

20072007
void YBClient::ReleaseObjectLocksGlobalAsync(

src/yb/client/client.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -673,7 +673,7 @@ class YBClient {
673673

674674
void AcquireObjectLocksGlobalAsync(
675675
const master::AcquireObjectLocksGlobalRequestPB& request, StdStatusCallback callback,
676-
CoarseTimePoint deadline);
676+
CoarseTimePoint deadline, std::function<Status()>&& should_retry);
677677
void ReleaseObjectLocksGlobalAsync(
678678
const master::ReleaseObjectLocksGlobalRequestPB& request, StdStatusCallback callback,
679679
CoarseTimePoint deadline);

src/yb/client/transaction.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -776,7 +776,9 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
776776
Result<TransactionMetadata> metadata() EXCLUDES(mutex_) {
777777
{
778778
std::lock_guard lock(mutex_);
779-
RETURN_NOT_OK(status_);
779+
if (!status_.ok()) {
780+
return status_;
781+
}
780782
if (!ready_) {
781783
return STATUS_FORMAT(IllegalState, "Transaction not ready");
782784
}

src/yb/docdb/object_lock_manager.cc

Lines changed: 25 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,12 @@
4848
DEFINE_test_flag(bool, assert_olm_empty_locks_map, false,
4949
"When set, asserts that the local locks map is empty at shutdown. Used in tests "
5050
"to assert sanity, where tserver doesn't loose YSQL lease and all connections release "
51-
"acquired/timedout/errored locks.");
51+
"acquired/tryagain/errored locks.");
5252

5353
DEFINE_test_flag(bool, olm_skip_scheduling_waiter_resumption, false,
5454
"When set, don't signal potential waiters for resumption");
5555

56-
// TODO(bkolagani): Default flag to false once issues with deadlock detection are resolved.
57-
DEFINE_test_flag(bool, olm_skip_sending_wait_for_probes, true,
56+
DEFINE_test_flag(bool, olm_skip_sending_wait_for_probes, false,
5857
"When set, the lock manager doesn't send wait-for probres to the local waiting txn registry, "
5958
"essentially giving away deadlock detection.");
6059

@@ -71,8 +70,8 @@ namespace {
7170
const Status kShuttingDownError = STATUS(
7271
ShutdownInProgress, "Object Lock Manager shutting down");
7372

74-
const Status kTimedOut = STATUS(
75-
TimedOut, "Failed to acquire object locks within deadline");
73+
const Status kTryAgain = STATUS(
74+
TryAgain, "Failed to acquire object locks within deadline");
7675

7776
const Status kTxnExpired = STATUS(
7877
Expired, "Transaction expired, all acquired object locks have been released");
@@ -191,6 +190,10 @@ struct WaiterEntry {
191190
return lock_data.deadline;
192191
}
193192

193+
std::string ToString() const {
194+
return YB_STRUCT_TO_STRING(lock_data);
195+
}
196+
194197
TrackedTxnLockEntryPtr transaction_entry;
195198
LockData lock_data;
196199
size_t resume_it_offset;
@@ -209,7 +212,6 @@ using WaiterEntryPtr = std::shared_ptr<WaiterEntry>;
209212

210213
struct StartUsTag;
211214
struct DeadlineTag;
212-
struct OwnerTag;
213215
using Waiters = boost::multi_index_container<
214216
WaiterEntryPtr,
215217
boost::multi_index::indexed_by<
@@ -220,12 +222,6 @@ using Waiters = boost::multi_index_container<
220222
boost::multi_index::ordered_non_unique<
221223
boost::multi_index::tag<DeadlineTag>,
222224
boost::multi_index::const_mem_fun<WaiterEntry, CoarseTimePoint, &WaiterEntry::deadline>
223-
>,
224-
boost::multi_index::hashed_non_unique<
225-
boost::multi_index::tag<OwnerTag>,
226-
boost::multi_index::const_mem_fun<
227-
WaiterEntry, const TransactionId&, &WaiterEntry::txn_id
228-
>
229225
>
230226
>
231227
>;
@@ -270,7 +266,7 @@ class ObjectLockManagerImpl {
270266

271267
void Lock(LockData&& data);
272268

273-
void Unlock(const ObjectLockOwner& object_lock_owner, Status resume_with_status);
269+
void Unlock(const ObjectLockOwner& object_lock_owner);
274270

275271
void Poll() EXCLUDES(global_mutex_);
276272

@@ -328,13 +324,6 @@ class ObjectLockManagerImpl {
328324
TrackedTransactionLockEntry::LockEntryMap& locks_map,
329325
TrackedTxnLockEntryPtr& txn_entry) REQUIRES(global_mutex_, txn_entry->mutex);
330326

331-
void SignalTerminateFinishedWaiters(
332-
const ObjectLockOwner& object_lock_owner, TrackedTxnLockEntryPtr& txn_entry,
333-
Status resume_with_status) REQUIRES(global_mutex_, txn_entry->mutex);
334-
335-
void DoSignalTerminateFinishedWaiters(
336-
ObjectLockedBatchEntry* entry, TransactionId txn_id, Status resume_with_status);
337-
338327
bool UnlockSingleEntry(const LockBatchEntry<ObjectLockManager>& lock_entry);
339328

340329
bool DoUnlockSingleEntry(ObjectLockedBatchEntry& entry, LockState sub);
@@ -348,9 +337,6 @@ class ObjectLockManagerImpl {
348337

349338
void DoSignal(ObjectLockedBatchEntry* entry);
350339

351-
void DoTerminateFinishedWaiters(
352-
ObjectLockedBatchEntry* entry, TransactionId txn_id, Status resume_with_status);
353-
354340
void DoComputeBlockersWithinQueue(
355341
ObjectLockedBatchEntry* locked_batch_entry, std::optional<ObjectLockPrefix>& key,
356342
LockStateBlockersMap& lockstate_blocker_map);
@@ -518,7 +504,7 @@ Status ObjectLockManagerImpl::MakePrepareAcquireResult(
518504
const LockData& data, Status resume_with_status) {
519505
RETURN_NOT_OK(resume_with_status);
520506
if (data.deadline < CoarseMonoClock::Now()) {
521-
return kTimedOut;
507+
return kTryAgain;
522508
}
523509
if (shutdown_in_progress_) {
524510
return kShuttingDownError;
@@ -644,8 +630,7 @@ bool ObjectLockManagerImpl::DoLockSingleEntry(
644630
}
645631
}
646632

647-
void ObjectLockManagerImpl::Unlock(const ObjectLockOwner& object_lock_owner,
648-
Status resume_with_status) {
633+
void ObjectLockManagerImpl::Unlock(const ObjectLockOwner& object_lock_owner) {
649634
TRACE("Unlocking all keys for owner $0", AsString(object_lock_owner));
650635

651636
TrackedTxnLockEntryPtr txn_entry;
@@ -672,11 +657,17 @@ void ObjectLockManagerImpl::Unlock(const ObjectLockOwner& object_lock_owner,
672657
std::lock_guard lock(global_mutex_);
673658
UniqueLock txn_lock(txn_entry->mutex);
674659
DoUnlock(object_lock_owner, txn_entry->granted_locks, txn_entry);
675-
// Terminate any obsolete waiting lock request for this txn/subtxn. This could happen when
676-
// 1. txn gets aborted due to a deadlock and the pg backend issues a finish txn request
677-
// 2. txn times out due to conflict and pg backend issues a finish txn request before the
678-
// lock manager times out the waiting lock request.
679-
SignalTerminateFinishedWaiters(object_lock_owner, txn_entry, resume_with_status);
660+
// We let the obsolete waiting lock request for this txn/subtxn, if any, to timeout and be resumed
661+
// as part of ObjectLockManagerImpl::Poll. This should be okay since:
662+
// 1. Obsolete waiting request could exist when txn times out due to conflict and pg backend
663+
// issues a finish txn request before the lock manager times out the obsolete request. Since
664+
// the obsolete waiting request would anyways be past the deadline, it would be resumed soon.
665+
// 2. On abort due to txn deadlock, we anyways don't send an early release all request.
666+
// PgClientSession waits for the previous lock req deadline (FLAGS_refresh_waiter_timeout_ms)
667+
// and then drops the retry since the txn failed.
668+
//
669+
// If there's any requirement to early terminate obsolete waiters based on txn id, then we should
670+
// signal appropriately here.
680671
}
681672

682673
void ObjectLockManagerImpl::DoUnlock(
@@ -703,73 +694,6 @@ void ObjectLockManagerImpl::DoUnlock(
703694
}
704695
}
705696

706-
void ObjectLockManagerImpl::SignalTerminateFinishedWaiters(
707-
const ObjectLockOwner& object_lock_owner,
708-
TrackedTxnLockEntryPtr& txn_entry,
709-
Status resume_with_status) {
710-
auto& locks_map = txn_entry->waiting_locks;
711-
if (object_lock_owner.subtxn_id) {
712-
auto subtxn_itr = locks_map.find(object_lock_owner.subtxn_id);
713-
if (subtxn_itr == locks_map.end()) {
714-
return;
715-
}
716-
for (auto itr = subtxn_itr->second.begin(); itr != subtxn_itr->second.end(); itr++) {
717-
DoSignalTerminateFinishedWaiters(
718-
&itr->second.locked_batch_entry, object_lock_owner.txn_id, resume_with_status);
719-
}
720-
return;
721-
}
722-
for (auto locks_itr = locks_map.begin(); locks_itr != locks_map.end(); locks_itr++) {
723-
for (auto itr = locks_itr->second.begin(); itr != locks_itr->second.end(); itr++) {
724-
DoSignalTerminateFinishedWaiters(
725-
&itr->second.locked_batch_entry, object_lock_owner.txn_id, resume_with_status);
726-
}
727-
}
728-
}
729-
730-
void ObjectLockManagerImpl::DoSignalTerminateFinishedWaiters(
731-
ObjectLockedBatchEntry* entry, TransactionId txn_id, Status resume_with_status) {
732-
WARN_NOT_OK(
733-
thread_pool_token_->SubmitFunc(
734-
std::bind(&ObjectLockManagerImpl::DoTerminateFinishedWaiters, this, entry, txn_id,
735-
resume_with_status)),
736-
"Failure submitting task ObjectLockManagerImpl::DoTerminateFinishedWaiters");
737-
}
738-
739-
void ObjectLockManagerImpl::DoTerminateFinishedWaiters(
740-
ObjectLockedBatchEntry* entry, TransactionId txn_id, Status resume_with_status) {
741-
std::vector<WaiterEntryPtr> waiters_failed_to_schedule;
742-
{
743-
std::lock_guard l(entry->mutex);
744-
auto& index = entry->wait_queue.get<OwnerTag>();
745-
auto it_range = index.equal_range(txn_id);
746-
auto it = it_range.first;
747-
auto* messenger = server_.messenger();
748-
while (it != it_range.second) {
749-
auto waiter_entry = *it;
750-
it = index.erase(it);
751-
waiter_entry->waiter_registration.reset();
752-
entry->waiting_state -= IntentTypeSetAdd(waiter_entry->resume_it()->intent_types);
753-
VLOG(1) << "Resuming " << AsString(waiter_entry->object_lock_owner());
754-
if (PREDICT_TRUE(messenger)) {
755-
ScopedOperation resuming_waiter_op(&waiters_amidst_resumption_on_messenger_);
756-
messenger->ThreadPool().EnqueueFunctor(
757-
[operation = std::move(resuming_waiter_op), entry = std::move(waiter_entry),
758-
lock_manager = this, resume_with_status]() {
759-
entry->Resume(lock_manager, resume_with_status);
760-
});
761-
} else {
762-
// Don't schedule anything here on thread_pool_token_ as a shutdown could destroy tasks.
763-
LOG_WITH_FUNC(WARNING) << "Messenger not available";
764-
waiters_failed_to_schedule.push_back(std::move(waiter_entry));
765-
}
766-
}
767-
}
768-
for (auto& waiter : waiters_failed_to_schedule) {
769-
waiter->Resume(this, resume_with_status);
770-
}
771-
}
772-
773697
bool ObjectLockManagerImpl::UnlockSingleEntry(const LockBatchEntry<ObjectLockManager>& lock_entry) {
774698
TRACE_FUNC();
775699
return DoUnlockSingleEntry(*lock_entry.locked, IntentTypeSetAdd(lock_entry.intent_types));
@@ -797,7 +721,7 @@ void ObjectLockManagerImpl::Poll() {
797721
}
798722
}
799723
for (auto& waiter : timed_out_waiters) {
800-
waiter->Resume(this, kTimedOut);
724+
waiter->Resume(this, kTryAgain);
801725
}
802726
}
803727

@@ -1151,9 +1075,8 @@ void ObjectLockManager::Lock(LockData&& data) {
11511075
impl_->Lock(std::move(data));
11521076
}
11531077

1154-
void ObjectLockManager::Unlock(
1155-
const ObjectLockOwner& object_lock_owner, Status resume_with_status) {
1156-
impl_->Unlock(object_lock_owner, resume_with_status);
1078+
void ObjectLockManager::Unlock(const ObjectLockOwner& object_lock_owner) {
1079+
impl_->Unlock(object_lock_owner);
11571080
}
11581081

11591082
void ObjectLockManager::Poll() {

src/yb/docdb/object_lock_manager.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ struct LockData {
3333
TabletId status_tablet;
3434
MonoTime start_time;
3535
StdStatusCallback callback;
36+
37+
std::string ToString() const {
38+
return YB_STRUCT_TO_STRING(key_to_lock, deadline, object_lock_owner, status_tablet, start_time);
39+
}
3640
};
3741

3842
// Helper struct used for keying table/object locks of a transaction.
@@ -67,7 +71,7 @@ class ObjectLockManager {
6771
void Lock(LockData&& data);
6872

6973
// Release all locks held against the given object_lock_owner.
70-
void Unlock(const ObjectLockOwner& object_lock_owner, Status resume_with_status);
74+
void Unlock(const ObjectLockOwner& object_lock_owner);
7175

7276
void Poll();
7377

src/yb/integration-tests/object_lock-test.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,8 @@ std::future<Status> AcquireLockGloballyAsync(
387387
lease_epoch, client->Clock(), opt_deadline);
388388
auto callback = [promise](const Status& s) { promise->set_value(s); };
389389
client->AcquireObjectLocksGlobalAsync(
390-
req, std::move(callback), ToCoarse(MonoTime::Now() + rpc_timeout));
390+
req, std::move(callback), ToCoarse(MonoTime::Now() + rpc_timeout),
391+
[]() { return Status::OK(); } /* should_retry */);
391392
return future;
392393
}
393394

0 commit comments

Comments
 (0)