Skip to content

Commit 475b126

Browse files
committed
add proper handling for deffered transactions in shadow state
1 parent eb59bc6 commit 475b126

File tree

2 files changed

+159
-19
lines changed

2 files changed

+159
-19
lines changed

simulator/model/mod.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use sql_generation::model::{
1818
};
1919
use turso_parser::ast::Distinctness;
2020

21+
use crate::runner::env::TransactionMode;
2122
use crate::{generation::Shadow, runner::env::ShadowTablesMut};
2223

2324
pub mod interactions;
@@ -146,6 +147,16 @@ impl Query {
146147
pub fn is_dml(&self) -> bool {
147148
matches!(self, Self::Insert(..) | Self::Update(..) | Self::Delete(..))
148149
}
150+
151+
#[inline]
152+
pub fn is_write(&self) -> bool {
153+
self.is_ddl() || self.is_dml()
154+
}
155+
156+
#[inline]
157+
pub fn is_select(&self) -> bool {
158+
matches!(self, Self::Select(_))
159+
}
149160
}
150161

151162
impl Display for Query {
@@ -173,6 +184,9 @@ impl Shadow for Query {
173184
type Result = anyhow::Result<Vec<Vec<SimValue>>>;
174185

175186
fn shadow(&self, env: &mut ShadowTablesMut) -> Self::Result {
187+
// First check if we are not in a deffered transaction, if we are create a snapshot
188+
env.upgrade_transaction(self);
189+
176190
match self {
177191
Query::Create(create) => create.shadow(env),
178192
Query::Insert(insert) => insert.shadow(env),
@@ -567,9 +581,18 @@ impl Shadow for Select {
567581
impl Shadow for Begin {
568582
type Result = Vec<Vec<SimValue>>;
569583
fn shadow(&self, tables: &mut ShadowTablesMut) -> Self::Result {
570-
// FIXME: currently the snapshot is taken eagerly
571-
// this is wrong for Deffered transactions
572-
tables.create_snapshot();
584+
match self {
585+
Begin::Deferred => {
586+
tables.create_deffered_snapshot();
587+
}
588+
Begin::Immediate => {
589+
tables.create_snapshot(TransactionMode::Write);
590+
}
591+
Begin::Concurrent => {
592+
tables.create_snapshot(TransactionMode::Concurrent);
593+
}
594+
};
595+
573596
vec![]
574597
}
575598
}

simulator/runner/env.rs

Lines changed: 133 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,13 @@ pub(crate) enum SimulationPhase {
3737
Shrink,
3838
}
3939

40+
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
41+
pub enum TransactionMode {
42+
Read = 0,
43+
Concurrent = 1,
44+
Write = 2,
45+
}
46+
4047
/// Represents a single operation during a transaction, applied in order.
4148
/// TODO: encode table/index creations etc. as operations too (although
4249
/// in both WAL mode and MVCC DDL requires an exclusive transaction)
@@ -59,38 +66,93 @@ pub enum RowOperation {
5966
},
6067
}
6168

69+
/// Database snapshot
6270
#[derive(Debug, Clone)]
63-
pub struct TransactionTables {
71+
pub struct Snapshot {
6472
/// The current state after applying transaction's changes (used for reads within the transaction)
6573
current_tables: Vec<Table>,
6674
/// Operations recorded during this transaction, in order
6775
operations: Vec<RowOperation>,
76+
77+
transaction_mode: TransactionMode,
78+
}
79+
80+
impl Snapshot {
81+
#[inline]
82+
fn set_transaction_mode(&mut self, transaction_mode: TransactionMode) {
83+
self.transaction_mode = transaction_mode;
84+
}
85+
}
86+
87+
#[derive(Debug, Clone)]
88+
pub enum TransactionTables {
89+
/// Signifies a Deffered transaction
90+
Deffered,
91+
Snapshot(Snapshot),
6892
}
6993

7094
impl TransactionTables {
71-
pub fn new(tables: Vec<Table>) -> Self {
72-
Self {
73-
current_tables: tables,
74-
operations: Vec::new(),
95+
#[inline]
96+
fn into_snapshot(self) -> Option<Snapshot> {
97+
match self {
98+
TransactionTables::Deffered => None,
99+
TransactionTables::Snapshot(snapshot) => Some(snapshot),
100+
}
101+
}
102+
103+
#[inline]
104+
fn as_snaphot_opt(&self) -> Option<&Snapshot> {
105+
match self {
106+
TransactionTables::Deffered => None,
107+
TransactionTables::Snapshot(snapshot) => Some(snapshot),
108+
}
109+
}
110+
111+
#[inline]
112+
fn as_snaphot_mut_opt(&mut self) -> Option<&mut Snapshot> {
113+
match self {
114+
TransactionTables::Deffered => None,
115+
TransactionTables::Snapshot(snapshot) => Some(snapshot),
75116
}
76117
}
77118

119+
#[inline]
120+
fn as_snaphot(&self) -> &Snapshot {
121+
self.as_snaphot_opt()
122+
.expect("called as_snaphot on deffered transaction")
123+
}
124+
125+
#[inline]
126+
fn as_snaphot_mut(&mut self) -> &mut Snapshot {
127+
self.as_snaphot_mut_opt()
128+
.expect("called as_snaphot_mut on deffered transaction")
129+
}
130+
131+
#[inline]
78132
pub fn record_insert(&mut self, table_name: String, row: Vec<SimValue>) {
79-
self.operations
133+
self.as_snaphot_mut()
134+
.operations
80135
.push(RowOperation::Insert { table_name, row });
81136
}
82137

138+
#[inline]
83139
pub fn record_delete(&mut self, table_name: String, row: Vec<SimValue>) {
84-
self.operations
140+
self.as_snaphot_mut()
141+
.operations
85142
.push(RowOperation::Delete { table_name, row });
86143
}
87144

145+
#[inline]
88146
pub fn record_drop_table(&mut self, table_name: String) {
89-
self.operations.push(RowOperation::DropTable { table_name });
147+
self.as_snaphot_mut()
148+
.operations
149+
.push(RowOperation::DropTable { table_name });
90150
}
91151

152+
#[inline]
92153
pub fn record_rename_table(&mut self, old_name: String, new_name: String) {
93-
self.operations
154+
self.as_snaphot_mut()
155+
.operations
94156
.push(RowOperation::RenameTable { old_name, new_name });
95157
}
96158
}
@@ -110,6 +172,7 @@ pub struct ShadowTablesMut<'a> {
110172
impl<'a> ShadowTables<'a> {
111173
fn tables(&self) -> &'a Vec<Table> {
112174
self.transaction_tables
175+
.and_then(|v| v.as_snaphot_opt())
113176
.map_or(self.commited_tables, |v| &v.current_tables)
114177
}
115178
}
@@ -129,15 +192,21 @@ where
129192
fn tables(&'a self) -> &'a Vec<Table> {
130193
self.transaction_tables
131194
.as_ref()
132-
.map(|t| &t.current_tables)
133-
.unwrap_or(self.commited_tables)
195+
.map_or(self.commited_tables, |v| {
196+
// Want to panic here with `as_snapshot_mut` because
197+
// we don't want to accidently return a the commited tables on deffered transactions
198+
&v.as_snaphot().current_tables
199+
})
134200
}
135201

136202
fn tables_mut(&'b mut self) -> &'b mut Vec<Table> {
137203
self.transaction_tables
138204
.as_mut()
139-
.map(|t| &mut t.current_tables)
140-
.unwrap_or(self.commited_tables)
205+
.map_or(self.commited_tables, |v| {
206+
// Want to panic here with `as_snapshot_mut` because
207+
// we don't want to accidently return a the commited tables on deffered transactions
208+
&mut v.as_snaphot_mut().current_tables
209+
})
141210
}
142211

143212
/// Record that a row was inserted during the current transaction
@@ -168,12 +237,60 @@ where
168237
}
169238
}
170239

171-
pub fn create_snapshot(&mut self) {
172-
*self.transaction_tables = Some(TransactionTables::new(self.commited_tables.clone()));
240+
/// Tries to upgrade the Transaction Mode
241+
#[inline]
242+
pub fn upgrade_transaction(&mut self, query: &Query) {
243+
let transaction_mode = if query.is_write() {
244+
TransactionMode::Write
245+
} else if query.is_select() {
246+
TransactionMode::Read
247+
} else {
248+
return;
249+
};
250+
if let Some(txn) = self.transaction_tables.as_mut() {
251+
match txn {
252+
TransactionTables::Deffered => self.create_snapshot(transaction_mode),
253+
TransactionTables::Snapshot(snapshot) => {
254+
match (snapshot.transaction_mode, transaction_mode) {
255+
(_, TransactionMode::Concurrent) => {
256+
unreachable!();
257+
}
258+
(TransactionMode::Read, TransactionMode::Write) => {
259+
snapshot.set_transaction_mode(transaction_mode)
260+
}
261+
(TransactionMode::Concurrent, TransactionMode::Write) => {
262+
if query.is_ddl() {
263+
// Only upgrade on DDL for MVCC
264+
snapshot.set_transaction_mode(transaction_mode)
265+
}
266+
}
267+
_ => {}
268+
};
269+
}
270+
}
271+
}
272+
}
273+
274+
#[inline]
275+
pub fn create_deffered_snapshot(&mut self) {
276+
*self.transaction_tables = Some(TransactionTables::Deffered);
277+
}
278+
279+
#[inline]
280+
pub fn create_snapshot(&mut self, transaction_mode: TransactionMode) {
281+
*self.transaction_tables = Some(TransactionTables::Snapshot(Snapshot {
282+
current_tables: self.commited_tables.clone(),
283+
operations: Vec::new(),
284+
transaction_mode,
285+
}));
173286
}
174287

175288
pub fn apply_snapshot(&mut self) {
176-
if let Some(transaction_tables) = self.transaction_tables.take() {
289+
if let Some(transaction_tables) = self
290+
.transaction_tables
291+
.take()
292+
.and_then(|transaction_tables| transaction_tables.into_snapshot())
293+
{
177294
// Apply all operations in recorded order, including renames.
178295
// This ensures operations like INSERT foo, RENAME foo->bar, INSERT bar
179296
// are applied correctly.

0 commit comments

Comments
 (0)