Skip to content

Commit c815c2c

Browse files
committed
add proper handling for deffered transactions in shadow state
1 parent 2e79493 commit c815c2c

File tree

2 files changed

+181
-34
lines changed

2 files changed

+181
-34
lines changed

simulator/model/mod.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use sql_generation::model::{
1818
};
1919
use turso_parser::ast::Distinctness;
2020

21+
use crate::runner::env::TransactionMode;
2122
use crate::{generation::Shadow, runner::env::ShadowTablesMut};
2223

2324
pub mod interactions;
@@ -146,6 +147,16 @@ impl Query {
146147
pub fn is_dml(&self) -> bool {
147148
matches!(self, Self::Insert(..) | Self::Update(..) | Self::Delete(..))
148149
}
150+
151+
#[inline]
152+
pub fn is_write(&self) -> bool {
153+
self.is_ddl() || self.is_dml()
154+
}
155+
156+
#[inline]
157+
pub fn is_select(&self) -> bool {
158+
matches!(self, Self::Select(_))
159+
}
149160
}
150161

151162
impl Display for Query {
@@ -173,6 +184,9 @@ impl Shadow for Query {
173184
type Result = anyhow::Result<Vec<Vec<SimValue>>>;
174185

175186
fn shadow(&self, env: &mut ShadowTablesMut) -> Self::Result {
187+
// First check if we are not in a deffered transaction, if we are create a snapshot
188+
env.upgrade_transaction(self);
189+
176190
match self {
177191
Query::Create(create) => create.shadow(env),
178192
Query::Insert(insert) => insert.shadow(env),
@@ -567,9 +581,18 @@ impl Shadow for Select {
567581
impl Shadow for Begin {
568582
type Result = Vec<Vec<SimValue>>;
569583
fn shadow(&self, tables: &mut ShadowTablesMut) -> Self::Result {
570-
// FIXME: currently the snapshot is taken eagerly
571-
// this is wrong for Deffered transactions
572-
tables.create_snapshot();
584+
match self {
585+
Begin::Deferred => {
586+
tables.create_deffered_snapshot();
587+
}
588+
Begin::Immediate => {
589+
tables.create_snapshot(TransactionMode::Write);
590+
}
591+
Begin::Concurrent => {
592+
tables.create_snapshot(TransactionMode::Concurrent);
593+
}
594+
};
595+
573596
vec![]
574597
}
575598
}

simulator/runner/env.rs

Lines changed: 155 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,13 @@ pub(crate) enum SimulationPhase {
3838
Shrink,
3939
}
4040

41+
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
42+
pub enum TransactionMode {
43+
Read = 0,
44+
Concurrent = 1,
45+
Write = 2,
46+
}
47+
4148
/// Represents a single operation during a transaction, applied in order.
4249
#[derive(Debug, Clone)]
4350
pub enum RowOperation {
@@ -76,38 +83,93 @@ pub enum RowOperation {
7683
},
7784
}
7885

86+
/// Database snapshot
7987
#[derive(Debug, Clone)]
80-
pub struct TransactionTables {
88+
pub struct Snapshot {
8189
/// The current state after applying transaction's changes (used for reads within the transaction)
8290
current_tables: Vec<Table>,
8391
/// Operations recorded during this transaction, in order
8492
operations: Vec<RowOperation>,
93+
94+
transaction_mode: TransactionMode,
95+
}
96+
97+
impl Snapshot {
98+
#[inline]
99+
fn set_transaction_mode(&mut self, transaction_mode: TransactionMode) {
100+
self.transaction_mode = transaction_mode;
101+
}
102+
}
103+
104+
#[derive(Debug, Clone)]
105+
pub enum TransactionTables {
106+
/// Signifies a Deffered transaction
107+
Deffered,
108+
Snapshot(Snapshot),
85109
}
86110

87111
impl TransactionTables {
88-
pub fn new(tables: Vec<Table>) -> Self {
89-
Self {
90-
current_tables: tables,
91-
operations: Vec::new(),
112+
#[inline]
113+
fn into_snapshot(self) -> Option<Snapshot> {
114+
match self {
115+
TransactionTables::Deffered => None,
116+
TransactionTables::Snapshot(snapshot) => Some(snapshot),
117+
}
118+
}
119+
120+
#[inline]
121+
fn as_snaphot_opt(&self) -> Option<&Snapshot> {
122+
match self {
123+
TransactionTables::Deffered => None,
124+
TransactionTables::Snapshot(snapshot) => Some(snapshot),
92125
}
93126
}
94127

128+
#[inline]
129+
fn as_snaphot_mut_opt(&mut self) -> Option<&mut Snapshot> {
130+
match self {
131+
TransactionTables::Deffered => None,
132+
TransactionTables::Snapshot(snapshot) => Some(snapshot),
133+
}
134+
}
135+
136+
#[inline]
137+
fn as_snaphot(&self) -> &Snapshot {
138+
self.as_snaphot_opt()
139+
.expect("called as_snaphot on deffered transaction")
140+
}
141+
142+
#[inline]
143+
fn as_snaphot_mut(&mut self) -> &mut Snapshot {
144+
self.as_snaphot_mut_opt()
145+
.expect("called as_snaphot_mut on deffered transaction")
146+
}
147+
148+
#[inline]
95149
pub fn record_insert(&mut self, table_name: String, row: Vec<SimValue>) {
96-
self.operations
150+
self.as_snaphot_mut()
151+
.operations
97152
.push(RowOperation::Insert { table_name, row });
98153
}
99154

155+
#[inline]
100156
pub fn record_delete(&mut self, table_name: String, row: Vec<SimValue>) {
101-
self.operations
157+
self.as_snaphot_mut()
158+
.operations
102159
.push(RowOperation::Delete { table_name, row });
103160
}
104161

162+
#[inline]
105163
pub fn record_drop_table(&mut self, table_name: String) {
106-
self.operations.push(RowOperation::DropTable { table_name });
164+
self.as_snaphot_mut()
165+
.operations
166+
.push(RowOperation::DropTable { table_name });
107167
}
108168

169+
#[inline]
109170
pub fn record_rename_table(&mut self, old_name: String, new_name: String) {
110-
self.operations
171+
self.as_snaphot_mut()
172+
.operations
111173
.push(RowOperation::RenameTable { old_name, new_name });
112174
}
113175

@@ -116,23 +178,28 @@ impl TransactionTables {
116178
table_name: String,
117179
column: sql_generation::model::table::Column,
118180
) {
119-
self.operations
181+
self.as_snaphot_mut()
182+
.operations
120183
.push(RowOperation::AddColumn { table_name, column });
121184
}
122185

123186
pub fn record_drop_column(&mut self, table_name: String, column_index: usize) {
124-
self.operations.push(RowOperation::DropColumn {
125-
table_name,
126-
column_index,
127-
});
187+
self.as_snaphot_mut()
188+
.operations
189+
.push(RowOperation::DropColumn {
190+
table_name,
191+
column_index,
192+
});
128193
}
129194

130195
pub fn record_rename_column(&mut self, table_name: String, old_name: String, new_name: String) {
131-
self.operations.push(RowOperation::RenameColumn {
132-
table_name,
133-
old_name,
134-
new_name,
135-
});
196+
self.as_snaphot_mut()
197+
.operations
198+
.push(RowOperation::RenameColumn {
199+
table_name,
200+
old_name,
201+
new_name,
202+
});
136203
}
137204

138205
pub fn record_alter_column(
@@ -141,11 +208,13 @@ impl TransactionTables {
141208
old_name: String,
142209
new_column: sql_generation::model::table::Column,
143210
) {
144-
self.operations.push(RowOperation::AlterColumn {
145-
table_name,
146-
old_name,
147-
new_column,
148-
});
211+
self.as_snaphot_mut()
212+
.operations
213+
.push(RowOperation::AlterColumn {
214+
table_name,
215+
old_name,
216+
new_column,
217+
});
149218
}
150219
}
151220

@@ -164,6 +233,7 @@ pub struct ShadowTablesMut<'a> {
164233
impl<'a> ShadowTables<'a> {
165234
fn tables(&self) -> &'a Vec<Table> {
166235
self.transaction_tables
236+
.and_then(|v| v.as_snaphot_opt())
167237
.map_or(self.commited_tables, |v| &v.current_tables)
168238
}
169239
}
@@ -183,15 +253,21 @@ where
183253
fn tables(&'a self) -> &'a Vec<Table> {
184254
self.transaction_tables
185255
.as_ref()
186-
.map(|t| &t.current_tables)
187-
.unwrap_or(self.commited_tables)
256+
.map_or(self.commited_tables, |v| {
257+
// Want to panic here with `as_snapshot_mut` because
258+
// we don't want to accidently return a the commited tables on deffered transactions
259+
&v.as_snaphot().current_tables
260+
})
188261
}
189262

190263
fn tables_mut(&'b mut self) -> &'b mut Vec<Table> {
191264
self.transaction_tables
192265
.as_mut()
193-
.map(|t| &mut t.current_tables)
194-
.unwrap_or(self.commited_tables)
266+
.map_or(self.commited_tables, |v| {
267+
// Want to panic here with `as_snapshot_mut` because
268+
// we don't want to accidently return a the commited tables on deffered transactions
269+
&mut v.as_snaphot_mut().current_tables
270+
})
195271
}
196272

197273
/// Record that a row was inserted during the current transaction
@@ -259,12 +335,60 @@ where
259335
}
260336
}
261337

262-
pub fn create_snapshot(&mut self) {
263-
*self.transaction_tables = Some(TransactionTables::new(self.commited_tables.clone()));
338+
/// Tries to upgrade the Transaction Mode
339+
#[inline]
340+
pub fn upgrade_transaction(&mut self, query: &Query) {
341+
let transaction_mode = if query.is_write() {
342+
TransactionMode::Write
343+
} else if query.is_select() {
344+
TransactionMode::Read
345+
} else {
346+
return;
347+
};
348+
if let Some(txn) = self.transaction_tables.as_mut() {
349+
match txn {
350+
TransactionTables::Deffered => self.create_snapshot(transaction_mode),
351+
TransactionTables::Snapshot(snapshot) => {
352+
match (snapshot.transaction_mode, transaction_mode) {
353+
(_, TransactionMode::Concurrent) => {
354+
unreachable!();
355+
}
356+
(TransactionMode::Read, TransactionMode::Write) => {
357+
snapshot.set_transaction_mode(transaction_mode)
358+
}
359+
(TransactionMode::Concurrent, TransactionMode::Write) => {
360+
if query.is_ddl() {
361+
// Only upgrade on DDL for MVCC
362+
snapshot.set_transaction_mode(transaction_mode)
363+
}
364+
}
365+
_ => {}
366+
};
367+
}
368+
}
369+
}
370+
}
371+
372+
#[inline]
373+
pub fn create_deffered_snapshot(&mut self) {
374+
*self.transaction_tables = Some(TransactionTables::Deffered);
375+
}
376+
377+
#[inline]
378+
pub fn create_snapshot(&mut self, transaction_mode: TransactionMode) {
379+
*self.transaction_tables = Some(TransactionTables::Snapshot(Snapshot {
380+
current_tables: self.commited_tables.clone(),
381+
operations: Vec::new(),
382+
transaction_mode,
383+
}));
264384
}
265385

266386
pub fn apply_snapshot(&mut self) {
267-
if let Some(transaction_tables) = self.transaction_tables.take() {
387+
if let Some(transaction_tables) = self
388+
.transaction_tables
389+
.take()
390+
.and_then(|transaction_tables| transaction_tables.into_snapshot())
391+
{
268392
// Apply all operations in recorded order.
269393
// This ensures operations like ADD COLUMN, DELETE are applied correctly
270394
// where DELETE sees rows with the same shape as when it was recorded.

0 commit comments

Comments
 (0)