Skip to content

Commit fb3bf03

Browse files
committed
[#26666] docdb: Skip tracking readonly transactions in recently_applied_ map
Summary: We currently track all recently applied transactions in the recently_applied_ map in transaction participant in order to calculate the min_replay_txn_start_ht used to filter transactions during transaction loading in tablet bootstrap. It is posible to have a table where the only transactions performed on it are readonly, i.e. no WRITE_OPs with write pairs. In this case, since we do not replicate any write pairs, the retryable requests max_replicated_op_id is never updated. The op id we use to filter the recently_applied_ map with to clean it up is at most max_replicated_op_id, so we end up never removing entries from the map, and the size of this map grows indefinitely. This diff changes the behavior to never add transactions that have no batches with any write pairs to recently_applied_, to avoid this issue. Memory consumed by the recently_applied_ map was also added to mem tracker. Jira: DB-16044 Test Plan: Added tests: `./yb_build.sh --cxx-test pgwrapper_pg_mini-test --gtest_filter PgMiniTestSingleNode.TestAppliedTransactionsStateReadOnly` `./yb_build.sh --cxx-test pgwrapper_pg_mini-test --gtest_filter PgMiniTest.TestAppliedTransactionsStateInFlight` Reviewers: sergei, bkolagani Reviewed By: sergei, bkolagani Subscribers: bkolagani, rthallam, ybase, yql Differential Revision: https://phorge.dev.yugabyte.com/D43077
1 parent 1eed672 commit fb3bf03

File tree

6 files changed

+183
-6
lines changed

6 files changed

+183
-6
lines changed

src/yb/tablet/running_transaction.cc

+8
Original file line numberDiff line numberDiff line change
@@ -691,5 +691,13 @@ bool RunningTransaction::IsTxnLoadedWithCDC() const {
691691
return is_txn_loaded_with_cdc_;
692692
}
693693

694+
void RunningTransaction::MarkHasRetryableRequestsReplicated() {
695+
has_retryable_requests_replicated_ = true;
696+
}
697+
698+
bool RunningTransaction::HasRetryableRequestsReplicated() const {
699+
return has_retryable_requests_replicated_;
700+
}
701+
694702
} // namespace tablet
695703
} // namespace yb

src/yb/tablet/running_transaction.h

+7
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,10 @@ class RunningTransaction : public std::enable_shared_from_this<RunningTransactio
154154

155155
bool IsTxnLoadedWithCDC() const;
156156

157+
void MarkHasRetryableRequestsReplicated();
158+
159+
bool HasRetryableRequestsReplicated() const;
160+
157161
private:
158162
static boost::optional<TransactionStatus> GetStatusAt(
159163
HybridTime time,
@@ -237,6 +241,9 @@ class RunningTransaction : public std::enable_shared_from_this<RunningTransactio
237241
// Identification marker for transactions that are loaded on tablet bootstrap with CDC
238242
// enbled.
239243
bool is_txn_loaded_with_cdc_ = false;
244+
245+
// Whether or not transaction has any batches replicated by retryable requests.
246+
bool has_retryable_requests_replicated_ = false;
240247
};
241248

242249
Status MakeAbortedStatus(const TransactionId& id);

src/yb/tablet/tablet.cc

+2-1
Original file line numberDiff line numberDiff line change
@@ -1707,7 +1707,8 @@ Status Tablet::WriteTransactionalBatch(
17071707
}
17081708
boost::container::small_vector<uint8_t, 16> encoded_replicated_batch_idx_set;
17091709
auto prepare_batch_data = VERIFY_RESULT(transaction_participant()->PrepareBatchData(
1710-
transaction_id, batch_idx, &encoded_replicated_batch_idx_set));
1710+
transaction_id, batch_idx, &encoded_replicated_batch_idx_set,
1711+
!put_batch.write_pairs().empty()));
17111712
if (!prepare_batch_data) {
17121713
// If metadata is missing it could be caused by aborted and removed transaction.
17131714
// In this case we should not add new intents for it.

src/yb/tablet/transaction_participant.cc

+18-4
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,8 @@ class TransactionParticipant::Impl
462462

463463
Result<boost::optional<std::pair<IsolationLevel, TransactionalBatchData>>> PrepareBatchData(
464464
const TransactionId& id, size_t batch_idx,
465-
boost::container::small_vector_base<uint8_t>* encoded_replicated_batches) {
465+
boost::container::small_vector_base<uint8_t>* encoded_replicated_batches,
466+
bool has_write_pairs) {
466467
// We are not trying to cleanup intents here because we don't know whether this transaction
467468
// has intents of not.
468469
auto lock_and_iterator = VERIFY_RESULT(LockAndFind(
@@ -475,6 +476,9 @@ class TransactionParticipant::Impl
475476
}
476477
auto& transaction = lock_and_iterator.transaction();
477478
transaction.AddReplicatedBatch(batch_idx, encoded_replicated_batches);
479+
if (has_write_pairs) {
480+
transaction.MarkHasRetryableRequestsReplicated();
481+
}
478482
return std::make_pair(transaction.metadata().isolation, transaction.last_batch_data());
479483
}
480484

@@ -1957,6 +1961,9 @@ class TransactionParticipant::Impl
19571961
if (GetLatestCheckPointUnlocked() != OpId::Max()) {
19581962
txn->SetTxnLoadedWithCDC();
19591963
}
1964+
if (last_batch_data.hybrid_time) {
1965+
txn->MarkHasRetryableRequestsReplicated();
1966+
}
19601967
transactions_.insert(txn);
19611968
mem_tracker_->Consume(kRunningTransactionSize);
19621969
TransactionsModifiedUnlocked(&min_running_notifier);
@@ -1991,7 +1998,13 @@ class TransactionParticipant::Impl
19911998
recently_removed_transactions_cleanup_queue_.push_back({transaction.id(), now + 15s});
19921999
LOG_IF_WITH_PREFIX(DFATAL, !recently_removed_transactions_.insert(transaction.id()).second)
19932000
<< "Transaction removed twice: " << transaction.id();
1994-
AddRecentlyAppliedTransaction(transaction.start_ht(), transaction.GetApplyOpId());
2001+
if (transaction.HasRetryableRequestsReplicated()) {
2002+
AddRecentlyAppliedTransaction(transaction.start_ht(), transaction.GetApplyOpId());
2003+
} else {
2004+
VLOG_WITH_PREFIX(2)
2005+
<< "Transaction " << transaction.id() << " has no write pairs, not adding to recently "
2006+
<< "applied transactions map";
2007+
}
19952008
transactions_.erase(it);
19962009
mem_tracker_->Release(kRunningTransactionSize);
19972010
TransactionsModifiedUnlocked(min_running_notifier);
@@ -2581,8 +2594,9 @@ Result<TransactionMetadata> TransactionParticipant::PrepareMetadata(
25812594
Result<boost::optional<std::pair<IsolationLevel, TransactionalBatchData>>>
25822595
TransactionParticipant::PrepareBatchData(
25832596
const TransactionId& id, size_t batch_idx,
2584-
boost::container::small_vector_base<uint8_t>* encoded_replicated_batches) {
2585-
return impl_->PrepareBatchData(id, batch_idx, encoded_replicated_batches);
2597+
boost::container::small_vector_base<uint8_t>* encoded_replicated_batches,
2598+
bool has_write_pairs) {
2599+
return impl_->PrepareBatchData(id, batch_idx, encoded_replicated_batches, has_write_pairs);
25862600
}
25872601

25882602
void TransactionParticipant::BatchReplicated(

src/yb/tablet/transaction_participant.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,8 @@ class TransactionParticipant : public TransactionStatusManager {
139139
// Returns boost::none when transaction is unknown.
140140
Result<boost::optional<std::pair<IsolationLevel, TransactionalBatchData>>> PrepareBatchData(
141141
const TransactionId& id, size_t batch_idx,
142-
boost::container::small_vector_base<uint8_t>* encoded_replicated_batches);
142+
boost::container::small_vector_base<uint8_t>* encoded_replicated_batches,
143+
bool has_write_pairs);
143144

144145
void BatchReplicated(const TransactionId& id, const TransactionalBatchData& data);
145146

src/yb/yql/pgwrapper/pg_mini-test.cc

+146
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ METRIC_DECLARE_entity(tablet);
134134
METRIC_DECLARE_gauge_uint64(aborted_transactions_pending_cleanup);
135135
METRIC_DECLARE_histogram(handler_latency_outbound_transfer);
136136
METRIC_DECLARE_gauge_int64(rpc_busy_reactors);
137+
METRIC_DECLARE_gauge_uint64(wal_replayable_applied_transactions);
137138

138139
namespace yb::pgwrapper {
139140
namespace {
@@ -2552,6 +2553,151 @@ TEST_F(PgMiniTestSingleNode, TestBootstrapOnAppliedTransactionWithIntents) {
25522553
ASSERT_EQ(res, 1);
25532554
}
25542555

2556+
TEST_F(PgMiniTestSingleNode, TestAppliedTransactionsStateReadOnly) {
2557+
constexpr size_t kIters = 100;
2558+
2559+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_delete_intents_sst_files) = false;
2560+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_use_bootstrap_intent_ht_filter) = true;
2561+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_no_schedule_remove_intents) = true;
2562+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_disable_flush_on_shutdown) = true;
2563+
2564+
auto conn = ASSERT_RESULT(Connect());
2565+
2566+
LOG(INFO) << "Creating tables";
2567+
ASSERT_OK(conn.Execute("CREATE TABLE test1(a int primary key) SPLIT INTO 1 TABLETS"));
2568+
2569+
tablet::TabletPeerPtr test1_peer = nullptr;
2570+
{
2571+
const auto& peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kLeaders);
2572+
for (auto peer : peers) {
2573+
if (peer->shared_tablet()->regular_db()) {
2574+
test1_peer = peer;
2575+
break;
2576+
}
2577+
}
2578+
ASSERT_NE(test1_peer, nullptr);
2579+
}
2580+
std::string test1_tablet_id = test1_peer->shared_tablet()->tablet_id();
2581+
2582+
ASSERT_OK(conn.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION));
2583+
ASSERT_OK(conn.Execute("INSERT INTO test1(a) VALUES (0)"));
2584+
ASSERT_OK(conn.CommitTransaction());
2585+
2586+
ASSERT_OK(test1_peer->shared_tablet()->Flush(tablet::FlushMode::kSync));
2587+
ASSERT_OK(test1_peer->FlushBootstrapState());
2588+
2589+
ASSERT_OK(conn.Execute("CREATE TABLE test2(a int references test1(a)) SPLIT INTO 1 TABLETS"));
2590+
for (size_t i = 0; i < kIters; ++i) {
2591+
ASSERT_OK(conn.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION));
2592+
ASSERT_OK(conn.Execute("INSERT INTO test2(a) VALUES (0)"));
2593+
ASSERT_OK(conn.CommitTransaction());
2594+
}
2595+
2596+
ASSERT_EQ(
2597+
GetMetricOpt<AtomicGauge<uint64_t>>(
2598+
*test1_peer->shared_tablet(), METRIC_wal_replayable_applied_transactions)->value(),
2599+
1);
2600+
2601+
LOG(INFO) << "Restarting cluster";
2602+
ASSERT_OK(RestartCluster());
2603+
2604+
for (auto peer : ListTabletPeers(cluster_.get(), ListPeersFilter::kAll)) {
2605+
if (!peer->shared_tablet()->regular_db()) {
2606+
continue;
2607+
}
2608+
auto metric_value = GetMetricOpt<AtomicGauge<uint64_t>>(
2609+
*peer->shared_tablet(), METRIC_wal_replayable_applied_transactions)->value();
2610+
if (peer->shared_tablet()->tablet_id() == test1_tablet_id) {
2611+
ASSERT_EQ(metric_value, 1);
2612+
} else {
2613+
ASSERT_EQ(metric_value, kIters);
2614+
}
2615+
}
2616+
2617+
conn = ASSERT_RESULT(Connect());
2618+
auto res = ASSERT_RESULT(conn.FetchRow<PGUint64>("SELECT COUNT(*) FROM test1"));
2619+
ASSERT_EQ(res, 1);
2620+
res = ASSERT_RESULT(conn.FetchRow<PGUint64>("SELECT COUNT(*) FROM test2"));
2621+
ASSERT_EQ(res, kIters);
2622+
}
2623+
2624+
TEST_F(PgMiniTest, TestAppliedTransactionsStateInFlight) {
2625+
const auto kApplyWait = 5s * kTimeMultiplier;
2626+
2627+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_delete_intents_sst_files) = false;
2628+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_use_bootstrap_intent_ht_filter) = true;
2629+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_no_schedule_remove_intents) = true;
2630+
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_disable_flush_on_shutdown) = true;
2631+
2632+
auto conn1 = ASSERT_RESULT(Connect());
2633+
auto conn2 = ASSERT_RESULT(Connect());
2634+
auto conn3 = ASSERT_RESULT(Connect());
2635+
2636+
LOG(INFO) << "Creating table";
2637+
ASSERT_OK(conn1.Execute("CREATE TABLE test(a int) SPLIT INTO 1 TABLETS"));
2638+
2639+
const auto& pg_ts_uuid = cluster_->mini_tablet_server(kPgTsIndex)->server()->permanent_uuid();
2640+
tablet::TabletPeerPtr tablet_peer = nullptr;
2641+
for (auto peer : ListTabletPeers(cluster_.get(), ListPeersFilter::kNonLeaders)) {
2642+
if (peer->shared_tablet()->regular_db() && peer->permanent_uuid() != pg_ts_uuid) {
2643+
tablet_peer = peer;
2644+
break;
2645+
}
2646+
}
2647+
ASSERT_NE(tablet_peer, nullptr);
2648+
2649+
tserver::MiniTabletServer* tablet_server = nullptr;
2650+
for (size_t i = 0; i < NumTabletServers(); ++i) {
2651+
auto* ts = cluster_->mini_tablet_server(i);
2652+
if (ts->server()->permanent_uuid() == tablet_peer->permanent_uuid()) {
2653+
tablet_server = ts;
2654+
break;
2655+
}
2656+
}
2657+
ASSERT_NE(tablet_server, nullptr);
2658+
2659+
ASSERT_OK(conn1.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION));
2660+
ASSERT_OK(conn1.Execute("INSERT INTO test(a) VALUES (0)"));
2661+
ASSERT_OK(conn1.CommitTransaction());
2662+
2663+
ASSERT_OK(conn1.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION));
2664+
ASSERT_OK(conn1.Execute("INSERT INTO test(a) VALUES (1)"));
2665+
2666+
ASSERT_OK(conn2.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION));
2667+
ASSERT_OK(conn2.Execute("INSERT INTO test(a) VALUES (2)"));
2668+
ASSERT_OK(conn2.FetchRow<PGUint32>("SELECT a FROM test WHERE a = 0 FOR KEY SHARE"));
2669+
2670+
ASSERT_OK(conn3.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION));
2671+
ASSERT_OK(conn3.FetchRow<PGUint32>("SELECT a FROM test WHERE a = 0 FOR KEY SHARE"));
2672+
2673+
ASSERT_OK(tablet_peer->shared_tablet()->Flush(tablet::FlushMode::kSync));
2674+
2675+
ASSERT_OK(tablet_server->Restart());
2676+
ASSERT_OK(tablet_server->WaitStarted());
2677+
2678+
ASSERT_OK(conn1.CommitTransaction());
2679+
ASSERT_OK(conn2.CommitTransaction());
2680+
ASSERT_OK(conn3.CommitTransaction());
2681+
2682+
// Wait for apply.
2683+
SleepFor(kApplyWait);
2684+
2685+
std::unordered_map<std::string, uint64_t> metric_values;
2686+
for (auto peer : ListTabletPeers(cluster_.get(), ListPeersFilter::kAll)) {
2687+
if (peer->shared_tablet()->regular_db()) {
2688+
metric_values[peer->permanent_uuid()] = GetMetricOpt<AtomicGauge<uint64_t>>(
2689+
*peer->shared_tablet(), METRIC_wal_replayable_applied_transactions)->value();
2690+
}
2691+
}
2692+
2693+
// Expecting metric value to be 3 on all peers: the intial insert + conn1/conn2 transactions.
2694+
// conn3 transaction is readonly and not added to map.
2695+
LOG(INFO) << "Metric values: " << CollectionToString(metric_values);
2696+
for (const auto& [_, value] : metric_values) {
2697+
ASSERT_EQ(value, 3);
2698+
}
2699+
}
2700+
25552701
Status MockAbortFailure(
25562702
const yb::tserver::PgFinishTransactionRequestPB* req,
25572703
yb::tserver::PgFinishTransactionResponsePB* resp, yb::rpc::RpcContext* context) {

0 commit comments

Comments
 (0)