Skip to content

Commit 6cedd96

Browse files
[#25105] YSQL: Clear pggate session state on query retry
Summary: During the course of its execution, a query accumulates state in its pggate session. Some of the state is stored in: - The foreign key buffer which holds a collections of keys to be inspected, as well as a collection of keys thus far seen by the transaction. - The explicit row lock buffer which holds a collection of rows that are to be locked explicitly. - The insert on conflict buffer which holds a collection of keys to be inspected as well a collection of rows thus far modified by the query. Currently, this state is cleared only at the transaction boundary by operations such as recreating, restarting and aborting a DML transaction. Notably, this state is NOT cleared when a sub-transaction is rolled back. An example of this is when a query is retried (such as due to a kConflict or a kReadRestart error) in read committed mode. This works fine with the row lock buffer which is largely idempotent (adding the same key/row to the buffer upon retry did not affect the state). However, it can cause issues with the foreign key buffer ([[ #25234 | GH-25234 ]]). D39023 introduced a new insert-on-conflict buffer which is not idempotent. One of the functions of this buffer is to ensure that a row is not modified twice within the same transaction. Since two iterations of the same query, are likely to modify the same row(s), residual state can lead to the buffer incorrectly deducing that the same row has been modified twice. This revision: - Introduces the distinction that some session state is sub-transaction-scoped ie. needs to be cleared at the sub-transaction boundary (such as for each retry/iteration of a query), while others are transaction-scoped ie. can be cleared at the transaction boundary only. - Explicitly clears sub-transaction-scoped session state when the subtransaction is rolled back. - Does not alter the behavior of the foreign key buffer. Jira: DB-14251 Test Plan: Run the following automated test: ``` ./yb_build.sh --java-test 'org.yb.pgsql.TestPgRegressIsolationWithoutWaitQueues#testPgRegress' ./yb_build.sh --cxx-test pgwrapper_pg_libpq-test --gtest_filter 'BatchInsertOnConflictTest.InsertOnConflictWithQueryRestart' ``` Jenkins: urgent Reviewers: patnaik.balivada, pjain, telgersma Reviewed By: patnaik.balivada, pjain Subscribers: smishra, yql Tags: #jenkins-ready Differential Revision: https://phorge.dev.yugabyte.com/D40460
1 parent a886b8d commit 6cedd96

File tree

3 files changed

+87
-14
lines changed

3 files changed

+87
-14
lines changed

src/yb/yql/pggate/pggate.cc

+26-12
Original file line numberDiff line numberDiff line change
@@ -1844,12 +1844,12 @@ Status PgApiImpl::BeginTransaction(int64_t start_time) {
18441844
}
18451845

18461846
Status PgApiImpl::RecreateTransaction() {
1847-
ClearSessionState();
1847+
RollbackTransactionScopedSessionState();
18481848
return pg_txn_manager_->RecreateTransaction();
18491849
}
18501850

18511851
Status PgApiImpl::RestartTransaction() {
1852-
ClearSessionState();
1852+
RollbackTransactionScopedSessionState();
18531853
return pg_txn_manager_->RestartTransaction();
18541854
}
18551855

@@ -1870,15 +1870,12 @@ bool PgApiImpl::IsRestartReadPointRequested() {
18701870
}
18711871

18721872
Status PgApiImpl::CommitPlainTransaction() {
1873-
DCHECK(pg_session_->explicit_row_lock_buffer().IsEmpty());
1874-
DCHECK(pg_session_->IsInsertOnConflictBufferEmpty());
1875-
pg_session_->InvalidateForeignKeyReferenceCache();
1876-
RETURN_NOT_OK(pg_session_->FlushBufferedOperations());
1873+
RETURN_NOT_OK(CommitTransactionScopedSessionState());
18771874
return pg_txn_manager_->CommitPlainTransaction();
18781875
}
18791876

18801877
Status PgApiImpl::AbortPlainTransaction() {
1881-
ClearSessionState();
1878+
RollbackTransactionScopedSessionState();
18821879
return pg_txn_manager_->AbortPlainTransaction();
18831880
}
18841881

@@ -1931,7 +1928,7 @@ Status PgApiImpl::ExitSeparateDdlTxnMode(PgOid db_oid, bool is_silent_modificati
19311928
}
19321929

19331930
Status PgApiImpl::ClearSeparateDdlTxnMode() {
1934-
ClearSessionState();
1931+
RollbackTransactionScopedSessionState();
19351932
return pg_txn_manager_->ExitSeparateDdlTxnModeWithAbort();
19361933
}
19371934

@@ -1941,8 +1938,7 @@ Status PgApiImpl::SetActiveSubTransaction(SubTransactionId id) {
19411938
}
19421939

19431940
Status PgApiImpl::RollbackToSubTransaction(SubTransactionId id) {
1944-
pg_session_->DropBufferedOperations();
1945-
pg_session_->explicit_row_lock_buffer().Clear();
1941+
RollbackSubTransactionScopedSessionState();
19461942
return pg_session_->RollbackToSubTransaction(id);
19471943
}
19481944

@@ -2236,13 +2232,31 @@ Result<tserver::PgServersMetricsResponsePB> PgApiImpl::ServersMetrics() {
22362232
return pg_session_->ServersMetrics();
22372233
}
22382234

2239-
void PgApiImpl::ClearSessionState() {
2240-
pg_session_->InvalidateForeignKeyReferenceCache();
2235+
void PgApiImpl::RollbackSubTransactionScopedSessionState() {
22412236
pg_session_->DropBufferedOperations();
22422237
pg_session_->explicit_row_lock_buffer().Clear();
22432238
pg_session_->ClearAllInsertOnConflictBuffers();
22442239
}
22452240

2241+
void PgApiImpl::RollbackTransactionScopedSessionState() {
2242+
RollbackSubTransactionScopedSessionState();
2243+
pg_session_->InvalidateForeignKeyReferenceCache();
2244+
}
2245+
2246+
Status PgApiImpl::CommitTransactionScopedSessionState() {
2247+
RSTATUS_DCHECK(
2248+
pg_session_->explicit_row_lock_buffer().IsEmpty(),
2249+
IllegalState,
2250+
"Expected row lock buffer to be empty");
2251+
RSTATUS_DCHECK(
2252+
pg_session_->IsInsertOnConflictBufferEmpty(),
2253+
IllegalState,
2254+
"Expected INSERT ... ON CONFLICT buffer to be empty");
2255+
RETURN_NOT_OK(pg_session_->FlushBufferedOperations());
2256+
pg_session_->InvalidateForeignKeyReferenceCache();
2257+
return Status::OK();
2258+
}
2259+
22462260
bool PgApiImpl::IsCronLeader() const { return tserver_shared_object_->IsCronLeader(); }
22472261

22482262
Status PgApiImpl::SetCronLastMinute(int64_t last_minute) {

src/yb/yql/pggate/pggate.h

+4-2
Original file line numberDiff line numberDiff line change
@@ -773,6 +773,10 @@ class PgApiImpl {
773773

774774
void RestoreSessionState(const YBCPgSessionState& session_data);
775775

776+
void RollbackSubTransactionScopedSessionState();
777+
void RollbackTransactionScopedSessionState();
778+
Status CommitTransactionScopedSessionState();
779+
776780
//------------------------------------------------------------------------------------------------
777781
// Replication Slots Functions.
778782

@@ -836,8 +840,6 @@ class PgApiImpl {
836840
Status ReleaseAllAdvisoryLocks(uint32_t db_oid);
837841

838842
private:
839-
void ClearSessionState();
840-
841843
Result<bool> RetrieveYbctidsImpl(
842844
PgDmlRead& dml_read, int natts, size_t max_mem_bytes, std::vector<Slice>& ybctids);
843845

src/yb/yql/pgwrapper/pg_libpq-test.cc

+57
Original file line numberDiff line numberDiff line change
@@ -4509,6 +4509,63 @@ TEST_F_EX(PgLibPqTest, BlockDangerousRoles, PgLibPqBlockDangerousRolesTest) {
45094509
YBPgErrorCode::YB_PG_RESERVED_NAME);
45104510
}
45114511

4512+
class BatchInsertOnConflictTest : public PgLibPqTestRF1 {
4513+
protected:
4514+
void RunConflictingIOCTxn(PGConn& conn, IsolationLevel isolation_level, int32_t expected_price) {
4515+
thread_holder_.AddThreadFunctor([this, &conn, isolation_level, expected_price]() -> void {
4516+
ASSERT_OK(conn.StartTransaction(isolation_level));
4517+
ASSERT_OK(conn.Execute(kIOCQuery));
4518+
const int32_t price = ASSERT_RESULT(conn.FetchRow<int32_t>(
4519+
"SELECT price FROM products WHERE id = 1"));
4520+
4521+
ASSERT_EQ(price, expected_price);
4522+
ASSERT_OK(conn.CommitTransaction());
4523+
});
4524+
}
4525+
4526+
void StartMainTxn(PGConn& conn, IsolationLevel isolation_level = READ_COMMITTED) {
4527+
ASSERT_OK(conn.StartTransaction(isolation_level));
4528+
ASSERT_OK(conn.Execute("UPDATE products SET price = price + 5 WHERE id = 1"));
4529+
}
4530+
4531+
void CommitMainTxnAfterWait(PGConn& conn) {
4532+
std::this_thread::sleep_for(RandomUniformInt(1, 10) * 100ms);
4533+
ASSERT_OK(conn.CommitTransaction());
4534+
}
4535+
4536+
constexpr static auto kInsertOnConflictBatchSize = 1024;
4537+
const std::vector<IsolationLevel> kIsolationLevels = {
4538+
READ_COMMITTED, SNAPSHOT_ISOLATION, SERIALIZABLE_ISOLATION
4539+
};
4540+
const std::string kIOCQuery = "INSERT INTO products VALUES (1, 'oats', 10) ON CONFLICT (id) "
4541+
"DO UPDATE SET price = products.price + 5";
4542+
TestThreadHolder thread_holder_;
4543+
};
4544+
4545+
TEST_F(BatchInsertOnConflictTest, InsertOnConflictWithQueryRestart) {
4546+
PGConn conn1 = ASSERT_RESULT(Connect());
4547+
PGConn conn2 = ASSERT_RESULT(Connect());
4548+
int32_t expected_price = 10;
4549+
4550+
// Setup
4551+
ASSERT_OK(conn1.Execute("DROP TABLE IF EXISTS products"));
4552+
ASSERT_OK(conn1.Execute("CREATE TABLE products (id INT PRIMARY KEY, name TEXT, price INT)"));
4553+
ASSERT_OK(conn1.Execute("INSERT INTO products VALUES (1, 'oats', 10)"));
4554+
ASSERT_OK(conn2.Execute(
4555+
Format("SET yb_insert_on_conflict_read_batch_size TO $0", kInsertOnConflictBatchSize)));
4556+
4557+
// Test that a query retried due to a kConflict error correctly clears the insert on conflict
4558+
// buffer between retries. Not clearing the buffer would result in an incorrect "command cannot
4559+
// affect row a second time" error.
4560+
for (IsolationLevel isolation_level : kIsolationLevels) {
4561+
expected_price += 10;
4562+
StartMainTxn(conn1);
4563+
RunConflictingIOCTxn(conn2, isolation_level, expected_price);
4564+
CommitMainTxnAfterWait(conn1);
4565+
thread_holder_.WaitAndStop(2s * kTimeMultiplier);
4566+
}
4567+
}
4568+
45124569
// https://github.com/yugabyte/yugabyte-db/issues/24320.
45134570
TEST_F(PgLibPqTest, TableRewriteOidCollision) {
45144571
PGConn conn = ASSERT_RESULT(Connect());

0 commit comments

Comments
 (0)