Skip to content

Commit ea4473c

Browse files
committed
planner: activate stale-read txn when autocommit=0
When `autocommit=0` and stale read is enabled, the first SELECT now activates a proper stale-read transaction instead of just replacing the provider. This ensures: 1. `InTxn()` becomes true after the first SELECT 2. Subsequent reads reuse the same snapshot (StartTS) 3. Write statements are rejected (existing behavior) 4. COMMIT/ROLLBACK ends the transaction normally
1 parent 0a79a3e commit ea4473c

File tree

3 files changed

+103
-5
lines changed

3 files changed

+103
-5
lines changed

pkg/planner/core/preprocess.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1944,8 +1944,13 @@ func (p *preprocessor) updateStateFromStaleReadProcessor() error {
19441944
p.sctx.GetSessionVars().StmtCtx.IsStaleness = true
19451945
if !p.sctx.GetSessionVars().InTxn() {
19461946
txnManager := sessiontxn.GetTxnManager(p.sctx)
1947+
enterType := sessiontxn.EnterNewTxnWithReplaceProvider
1948+
if !p.sctx.GetSessionVars().IsAutocommit() {
1949+
// start a stale-read transaction so that subsequent reads reuse the same snapshot.
1950+
enterType = sessiontxn.EnterNewTxnWithBeginStmt
1951+
}
19471952
newTxnRequest := &sessiontxn.EnterNewTxnRequest{
1948-
Type: sessiontxn.EnterNewTxnWithReplaceProvider,
1953+
Type: enterType,
19491954
Provider: staleread.NewStalenessTxnContextProvider(p.sctx, p.LastSnapshotTS, p.InfoSchema),
19501955
}
19511956
if err := txnManager.EnterNewTxn(context.TODO(), newTxnRequest); err != nil {

pkg/sessiontxn/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ go_test(
2727
"txn_rc_tso_optimize_test.go",
2828
],
2929
flaky = True,
30-
shard_count = 25,
30+
shard_count = 26,
3131
deps = [
3232
":sessiontxn",
3333
"//pkg/config",

pkg/sessiontxn/txn_context_test.go

Lines changed: 96 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -535,7 +535,7 @@ func TestTxnContextForPrepareExecute(t *testing.T) {
535535
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil)
536536
tk.MustExec("begin")
537537

538-
//change schema
538+
// change schema
539539
tk2 := testkit.NewTestKit(t, store)
540540
tk2.MustExec("use test")
541541
tk2.MustExec("alter table t2 add column(c1 int)")
@@ -611,7 +611,7 @@ func TestTxnContextForStaleReadInPrepare(t *testing.T) {
611611
stmtID2, _, _, err := se.PrepareStmt("select * from t1 as of timestamp @a where id=1 ")
612612
require.NoError(t, err)
613613

614-
//change schema
614+
// change schema
615615
tk.MustExec("use test")
616616
tk.MustExec("alter table t2 add column(c1 int)")
617617
tk.MustExec("update t1 set v=11 where id=1")
@@ -711,7 +711,7 @@ func TestTxnContextPreparedStmtWithForUpdate(t *testing.T) {
711711
tk.MustExec("prepare s from 'select * from t1 where id=1 for update'")
712712
tk.MustExec("begin pessimistic")
713713

714-
//change schema
714+
// change schema
715715
tk2 := testkit.NewTestKit(t, store)
716716
tk2.MustExec("use test")
717717
tk2.MustExec("alter table t1 add column(c int default 100)")
@@ -993,3 +993,96 @@ func TestTSOCmdCountForTextSql(t *testing.T) {
993993
count := sctx.Value(sessiontxn.TsoRequestCount)
994994
require.Equal(t, uint64(99), count)
995995
}
996+
997+
func TestStaleReadTxnWithAutocommitOff(t *testing.T) {
998+
store := testkit.CreateMockStore(t)
999+
tk := testkit.NewTestKit(t, store)
1000+
tk.MustExec("use test")
1001+
tk.MustExec("drop table if exists t")
1002+
tk.MustExec("create table t (id int primary key, v int)")
1003+
tk.MustExec("insert into t values (1, 10)")
1004+
1005+
time.Sleep(2 * time.Second)
1006+
1007+
se := tk.Session()
1008+
sessVars := se.GetSessionVars()
1009+
1010+
// Test 1: autocommit=0 should activate a stale-read txn with snapshot isolation
1011+
tk.MustExec("set @@tidb_read_staleness = -1")
1012+
tk.MustExec("set @@autocommit = 0")
1013+
1014+
// First SELECT should activate a stale-read transaction
1015+
require.False(t, sessVars.InTxn(), "should not be in txn before first SELECT")
1016+
tk.MustQuery("select * from t where id = 1").Check(testkit.Rows("1 10"))
1017+
require.True(t, sessVars.InTxn(), "should be in txn after first SELECT with autocommit=0")
1018+
require.True(t, sessVars.TxnCtx.IsStaleness, "txn should be staleness")
1019+
startTS1 := sessVars.TxnCtx.StartTS
1020+
require.Greater(t, startTS1, uint64(0), "StartTS should be set")
1021+
1022+
// Second SELECT should reuse the same snapshot
1023+
tk.MustQuery("select * from t where id = 1").Check(testkit.Rows("1 10"))
1024+
startTS2 := sessVars.TxnCtx.StartTS
1025+
require.Equal(t, startTS1, startTS2, "second SELECT should reuse the same StartTS")
1026+
1027+
// Update data in another session while stale-read txn is active
1028+
tk2 := testkit.NewTestKit(t, store)
1029+
tk2.MustExec("use test")
1030+
tk2.MustExec("update t set v = 99 where id = 1")
1031+
1032+
// Current stale-read txn should still see the same value as before
1033+
tk.MustQuery("select * from t where id = 1").Check(testkit.Rows("1 10"))
1034+
startTS3 := sessVars.TxnCtx.StartTS
1035+
require.Equal(t, startTS1, startTS3, "StartTS should remain unchanged after external update")
1036+
1037+
// COMMIT should end the transaction
1038+
tk.MustExec("commit")
1039+
require.False(t, sessVars.InTxn(), "should not be in txn after COMMIT")
1040+
1041+
// Test 2: Write statements are rejected in stale-read txn
1042+
tk.MustQuery("select * from t where id = 1").Check(testkit.Rows("1 10"))
1043+
require.True(t, sessVars.InTxn(), "should be in txn")
1044+
1045+
tk.MustGetErrMsg("insert into t values (2, 20)",
1046+
"only support read-only statement during read-only staleness transactions")
1047+
tk.MustGetErrMsg("update t set v = 11 where id = 1",
1048+
"only support read-only statement during read-only staleness transactions")
1049+
tk.MustGetErrMsg("delete from t where id = 1",
1050+
"only support read-only statement during read-only staleness transactions")
1051+
1052+
// Txn should still be valid after failed write
1053+
require.True(t, sessVars.InTxn(), "should still be in txn after failed write")
1054+
tk.MustQuery("select * from t where id = 1").Check(testkit.Rows("1 10"))
1055+
tk.MustExec("rollback")
1056+
1057+
// Test 3: tidb_read_staleness should be ignored when txn is started by a write
1058+
tk.MustExec("update t set v = 200 where id = 1")
1059+
require.True(t, sessVars.InTxn(), "should be in txn after UPDATE")
1060+
require.False(t, sessVars.TxnCtx.IsStaleness, "txn started by write should not be staleness")
1061+
tk.MustQuery("select * from t where id = 1").Check(testkit.Rows("1 200"))
1062+
tk.MustExec("rollback")
1063+
// Verify rollback worked
1064+
tk.MustQuery("select * from t where id = 1").Check(testkit.Rows("1 10"))
1065+
1066+
// Test 4: SELECT ... FOR UPDATE is also rejected
1067+
tk.MustQuery("select * from t where id = 1").Check(testkit.Rows("1 10"))
1068+
tk.MustGetErrMsg("select * from t where id = 1 for update",
1069+
"select lock hasn't been supported in stale read yet")
1070+
tk.MustExec("rollback")
1071+
1072+
// Test 5: DDL implicitly end the current stale read transaction
1073+
tk.MustQuery("select * from t where id = 1").Check(testkit.Rows("1 10"))
1074+
require.True(t, sessVars.InTxn(), "should be in stale-read txn")
1075+
require.True(t, sessVars.TxnCtx.IsStaleness, "txn should be staleness")
1076+
tk.MustExec("create table t2 (id int)")
1077+
require.False(t, sessVars.InTxn(), "DDL should implicitly end the stale-read txn")
1078+
tk.MustExec("drop table t2")
1079+
1080+
// Test 6: autocommit=1 should NOT activate a persistent txn
1081+
time.Sleep(1 * time.Second)
1082+
1083+
tk.MustExec("set @@autocommit = 1")
1084+
tk.MustExec("set @@tidb_read_staleness = 0")
1085+
tk.MustExec("set @@tidb_read_staleness = -1")
1086+
tk.MustQuery("select * from t where id = 1").Check(testkit.Rows("1 99"))
1087+
require.False(t, sessVars.InTxn(), "autocommit=1 should not keep session in txn")
1088+
}

0 commit comments

Comments
 (0)