diff --git a/pkg/sessiontxn/BUILD.bazel b/pkg/sessiontxn/BUILD.bazel index db040c84b0bfe..9ea0d11f89055 100644 --- a/pkg/sessiontxn/BUILD.bazel +++ b/pkg/sessiontxn/BUILD.bazel @@ -27,7 +27,7 @@ go_test( "txn_rc_tso_optimize_test.go", ], flaky = True, - shard_count = 25, + shard_count = 26, deps = [ ":sessiontxn", "//pkg/config", diff --git a/pkg/sessiontxn/staleread/provider.go b/pkg/sessiontxn/staleread/provider.go index fd12013f82428..5f024ac4897b9 100644 --- a/pkg/sessiontxn/staleread/provider.go +++ b/pkg/sessiontxn/staleread/provider.go @@ -64,8 +64,22 @@ func (p *StalenessTxnContextProvider) GetReadReplicaScope() string { return config.GetTxnScopeFromConfig() } -// GetStmtReadTS returns the read timestamp +// GetStmtReadTS returns the read timestamp. func (p *StalenessTxnContextProvider) GetStmtReadTS() (uint64, error) { + // When autocommit is disabled and in txn flag is flase, activate the txn here to make sure + // subsequent reads can reuse the snapshot. + // This is to suport usage like: + // set autocommit = 0; + // select * fromt as of timestamp; + // select statments; // using the same stale read ts + // commit; + // Related issue: https://github.com/pingcap/tidb/issues/64198 + if !p.sctx.GetSessionVars().IsAutocommit() && !p.sctx.GetSessionVars().InTxn() { + if _, err := p.ActivateTxn(); err != nil { + return 0, err + } + p.sctx.GetSessionVars().SetInTxn(true) + } return p.ts, nil } diff --git a/pkg/sessiontxn/txn_context_test.go b/pkg/sessiontxn/txn_context_test.go index e57ee8581110d..4e92f35cd3f23 100644 --- a/pkg/sessiontxn/txn_context_test.go +++ b/pkg/sessiontxn/txn_context_test.go @@ -535,7 +535,7 @@ func TestTxnContextForPrepareExecute(t *testing.T) { se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil) tk.MustExec("begin") - //change schema + // change schema tk2 := testkit.NewTestKit(t, store) tk2.MustExec("use test") tk2.MustExec("alter table t2 add column(c1 int)") @@ -611,7 +611,7 @@ func TestTxnContextForStaleReadInPrepare(t *testing.T) { stmtID2, _, _, err := se.PrepareStmt("select * from t1 as of timestamp @a where id=1 ") require.NoError(t, err) - //change schema + // change schema tk.MustExec("use test") tk.MustExec("alter table t2 add column(c1 int)") tk.MustExec("update t1 set v=11 where id=1") @@ -711,7 +711,7 @@ func TestTxnContextPreparedStmtWithForUpdate(t *testing.T) { tk.MustExec("prepare s from 'select * from t1 where id=1 for update'") tk.MustExec("begin pessimistic") - //change schema + // change schema tk2 := testkit.NewTestKit(t, store) tk2.MustExec("use test") tk2.MustExec("alter table t1 add column(c int default 100)") @@ -993,3 +993,151 @@ func TestTSOCmdCountForTextSql(t *testing.T) { count := sctx.Value(sessiontxn.TsoRequestCount) require.Equal(t, uint64(99), count) } + +func TestStaleReadTxnWithAutocommitOff(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (id int primary key, v int)") + tk.MustExec("insert into t values (1, 10)") + + time.Sleep(1200 * time.Millisecond) + + se := tk.Session() + sessVars := se.GetSessionVars() + + // Test 1: autocommit=0 should activate a stale-read txn with snapshot isolation + tk.MustExec("set @@tidb_read_staleness = -1") + tk.MustExec("set @@autocommit = 0") + + // First SELECT should activate a stale-read transaction + require.False(t, sessVars.InTxn(), "should not be in txn before first SELECT") + tk.MustQuery("select * from t where id = 1").Check(testkit.Rows("1 10")) + require.True(t, sessVars.InTxn(), "should be in txn after first SELECT with autocommit=0") + require.True(t, sessVars.TxnCtx.IsStaleness, "txn should be staleness") + startTS1 := sessVars.TxnCtx.StartTS + require.Greater(t, startTS1, uint64(0), "StartTS should be set") + + // Second SELECT should reuse the same snapshot + tk.MustQuery("select * from t where id = 1").Check(testkit.Rows("1 10")) + startTS2 := sessVars.TxnCtx.StartTS + require.Equal(t, startTS1, startTS2, "second SELECT should reuse the same StartTS") + + // Update data in another session while stale-read txn is active + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use test") + tk2.MustExec("update t set v = 99 where id = 1") + + // Current stale-read txn should still see the same value as before + tk.MustQuery("select * from t where id = 1").Check(testkit.Rows("1 10")) + startTS3 := sessVars.TxnCtx.StartTS + require.Equal(t, startTS1, startTS3, "StartTS should remain unchanged after external update") + + // COMMIT should end the transaction + tk.MustExec("commit") + require.False(t, sessVars.InTxn(), "should not be in txn after COMMIT") + + // Test 2: Write statements are rejected in stale-read txn + tk.MustQuery("select * from t where id = 1").Check(testkit.Rows("1 10")) + require.True(t, sessVars.InTxn(), "should be in txn") + + tk.MustGetErrMsg("insert into t values (2, 20)", + "only support read-only statement during read-only staleness transactions") + tk.MustGetErrMsg("update t set v = 11 where id = 1", + "only support read-only statement during read-only staleness transactions") + tk.MustGetErrMsg("delete from t where id = 1", + "only support read-only statement during read-only staleness transactions") + + // Txn should still be valid after failed write + require.True(t, sessVars.InTxn(), "should still be in txn after failed write") + tk.MustQuery("select * from t where id = 1").Check(testkit.Rows("1 10")) + tk.MustExec("rollback") + + // Test 3: tidb_read_staleness should be ignored when txn is started by a write + tk.MustExec("update t set v = 200 where id = 1") + require.True(t, sessVars.InTxn(), "should be in txn after UPDATE") + require.False(t, sessVars.TxnCtx.IsStaleness, "txn started by write should not be staleness") + tk.MustQuery("select * from t where id = 1").Check(testkit.Rows("1 200")) + tk.MustExec("rollback") + // Verify rollback worked + tk.MustQuery("select * from t where id = 1").Check(testkit.Rows("1 10")) + + // Test 4: SELECT ... FOR UPDATE is also rejected + tk.MustQuery("select * from t where id = 1").Check(testkit.Rows("1 10")) + tk.MustGetErrMsg("select * from t where id = 1 for update", + "select lock hasn't been supported in stale read yet") + tk.MustExec("rollback") + + // Test 5: Prepared statement in stale-read txn should NOT use plan cache + tk.MustExec("set tidb_enable_prepared_plan_cache=ON") + tk.MustExec("set @@tidb_read_staleness = ''") + tk.MustExec("set @@autocommit = 1") + stmtID, _, _, err := se.PrepareStmt("select * from t where id = ?") + require.NoError(t, err) + + // Verify plan cache works in normal mode (need two executions: first generates plan, second uses cache) + rs, err := se.ExecutePreparedStmt(context.TODO(), stmtID, expression.Args2Expressions4Test(1)) + require.NoError(t, err) + require.NoError(t, rs.Close()) + rs, err = se.ExecutePreparedStmt(context.TODO(), stmtID, expression.Args2Expressions4Test(1)) + require.NoError(t, err) + require.NoError(t, rs.Close()) + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + + // Enter stale-read mode with autocommit=0 + tk.MustExec("set @@tidb_read_staleness = -1") + tk.MustExec("set @@autocommit = 0") + + // Activate stale-read transaction + tk.MustQuery("select * from t where id = 1").Check(testkit.Rows("1 10")) + require.True(t, sessVars.InTxn()) + require.True(t, sessVars.TxnCtx.IsStaleness) + + // Execute prepared statement in stale-read txn + rs, err = se.ExecutePreparedStmt(context.TODO(), stmtID, expression.Args2Expressions4Test(1)) + require.NoError(t, err) + tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 10")) + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + + tk.MustExec("commit") + + // Test 5b: PreparedStmt can activate stale-read transaction directly + tk.MustExec("set @@tidb_read_staleness = -1") + tk.MustExec("set @@autocommit = 0") + require.False(t, sessVars.InTxn(), "should not be in txn before PreparedStmt") + + // ExecutePreparedStmt should activate the stale-read transaction + rs, err = se.ExecutePreparedStmt(context.TODO(), stmtID, expression.Args2Expressions4Test(1)) + require.NoError(t, err) + tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 10")) + + // Verify stale-read transaction is now active + require.True(t, sessVars.InTxn(), "PreparedStmt should activate stale-read txn") + require.True(t, sessVars.TxnCtx.IsStaleness, "txn should be staleness") + + tk.MustExec("commit") + + // Test 6: After exiting stale-read txn, plan cache should work normally + tk.MustExec("set @@tidb_read_staleness = ''") + tk.MustExec("set @@autocommit = 1") + + rs, err = se.ExecutePreparedStmt(context.TODO(), stmtID, expression.Args2Expressions4Test(1)) + require.NoError(t, err) + require.NoError(t, rs.Close()) + rs, err = se.ExecutePreparedStmt(context.TODO(), stmtID, expression.Args2Expressions4Test(1)) + require.NoError(t, err) + require.NoError(t, rs.Close()) + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + + tk.MustExec("set @@tidb_read_staleness = -1") + tk.MustExec("set @@autocommit = 0") + + // Test 7: DDL implicitly ends the current stale read transaction + tk.MustQuery("select * from t where id = 1").Check(testkit.Rows("1 10")) + require.True(t, sessVars.InTxn(), "should be in stale-read txn") + require.True(t, sessVars.TxnCtx.IsStaleness, "txn should be staleness") + tk.MustExec("create table t2 (id int)") + require.False(t, sessVars.InTxn(), "DDL should implicitly end the stale-read txn") + tk.MustExec("drop table t2") +}