Skip to content

Commit 6cd2a68

Browse files
committed
mvcc: table cursor must always use defined bound
because all tables are stored in the same skipmap, so omitting bound for search will find rows from wrong table
1 parent 4576795 commit 6cd2a68

File tree

2 files changed

+56
-22
lines changed

2 files changed

+56
-22
lines changed

core/mvcc/cursor.rs

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,10 @@ impl<Clock: LogicalClock + 'static> MvccLazyCursor<Clock> {
252252
in_btree: false,
253253
btree_consumed: true,
254254
},
255-
(None, None) => CursorPosition::End,
255+
(None, None) => match direction {
256+
IterationDirection::Forwards => CursorPosition::BeforeFirst,
257+
IterationDirection::Backwards => CursorPosition::End,
258+
},
256259
}
257260
}
258261

@@ -319,14 +322,22 @@ impl<Clock: LogicalClock + 'static> CursorTrait for MvccLazyCursor<Clock> {
319322
let current_state = self.state.borrow().clone();
320323
if current_state.is_none() {
321324
let before_first = matches!(self.get_current_pos(), CursorPosition::BeforeFirst);
322-
let min_key_exclusive = match self.current_pos.borrow().clone() {
325+
let min_key = match self.current_pos.borrow().clone() {
323326
CursorPosition::Loaded {
324327
row_id,
325328
in_btree: _,
326329
btree_consumed: _,
327-
} => Some(row_id.row_id),
330+
} => Some(match &self.mv_cursor_type {
331+
MvccCursorType::Table => {
332+
(true, RowKey::Int(row_id.row_id.to_int_or_panic() + 1))
333+
}
334+
MvccCursorType::Index(_) => (false, row_id.row_id),
335+
}),
328336
// TODO: do we need to forward twice?
329-
CursorPosition::BeforeFirst => None,
337+
CursorPosition::BeforeFirst => match &self.mv_cursor_type {
338+
MvccCursorType::Table => Some((false, RowKey::Int(i64::MIN))),
339+
MvccCursorType::Index(_) => None,
340+
},
330341
CursorPosition::End => {
331342
// let's keep same state, we reached the end so no point in moving forward.
332343
return Ok(IOResult::Done(false));
@@ -335,7 +346,7 @@ impl<Clock: LogicalClock + 'static> CursorTrait for MvccLazyCursor<Clock> {
335346

336347
let new_position_in_mvcc = match self.db.get_next_row_id_for_table(
337348
self.table_id,
338-
min_key_exclusive.as_ref(),
349+
min_key.as_ref().map(|(inclusive, key)| (*inclusive, key)),
339350
self.tx_id,
340351
) {
341352
Some(id) => CursorPosition::Loaded {
@@ -455,21 +466,29 @@ impl<Clock: LogicalClock + 'static> CursorTrait for MvccLazyCursor<Clock> {
455466
fn prev(&mut self) -> Result<IOResult<bool>> {
456467
let current_state = self.state.borrow().clone();
457468
if current_state.is_none() {
458-
let max_id_exclusive = match self.current_pos.borrow().clone() {
469+
let max_key = match self.current_pos.borrow().clone() {
459470
CursorPosition::Loaded {
460471
row_id,
461472
in_btree: _,
462473
btree_consumed: _,
463-
} => Some(row_id.row_id.clone()),
474+
} => Some(match &self.mv_cursor_type {
475+
MvccCursorType::Table => {
476+
(true, RowKey::Int(row_id.row_id.to_int_or_panic() - 1))
477+
}
478+
MvccCursorType::Index(_) => (false, row_id.row_id),
479+
}),
464480
CursorPosition::BeforeFirst => {
465481
return Ok(IOResult::Done(false));
466482
}
467-
CursorPosition::End => None,
483+
CursorPosition::End => match &self.mv_cursor_type {
484+
MvccCursorType::Table => Some((true, RowKey::Int(i64::MAX))),
485+
MvccCursorType::Index(_) => None,
486+
},
468487
};
469488

470489
let new_position_in_mvcc = match self.db.get_prev_row_id_for_table(
471490
self.table_id,
472-
max_id_exclusive.as_ref(),
491+
max_key.as_ref().map(|(inclusive, key)| (*inclusive, key)),
473492
self.tx_id,
474493
) {
475494
Some(id) => CursorPosition::Loaded {
@@ -802,9 +821,18 @@ impl<Clock: LogicalClock + 'static> CursorTrait for MvccLazyCursor<Clock> {
802821
self.current_pos.replace(CursorPosition::BeforeFirst);
803822
}
804823

805-
let new_position_in_mvcc =
806-
self.db
807-
.get_next_row_id_for_table(self.table_id, None, self.tx_id);
824+
let start_key_mvcc = match &self.mv_cursor_type {
825+
MvccCursorType::Table => Some((true, RowKey::Int(i64::MIN))),
826+
MvccCursorType::Index(_) => None,
827+
};
828+
829+
let new_position_in_mvcc = self.db.get_next_row_id_for_table(
830+
self.table_id,
831+
start_key_mvcc
832+
.as_ref()
833+
.map(|(inclusive, key)| (*inclusive, key)),
834+
self.tx_id,
835+
);
808836

809837
let maybe_rowid_in_btree = match &self.mv_cursor_type {
810838
MvccCursorType::Table => {
@@ -828,7 +856,7 @@ impl<Clock: LogicalClock + 'static> CursorTrait for MvccLazyCursor<Clock> {
828856
}
829857
};
830858
let new_position = self.get_new_position_from_mvcc_and_btree(
831-
&new_position_in_mvcc.map(|r| r.row_id),
859+
&new_position_in_mvcc.clone().map(|r| r.row_id),
832860
&maybe_rowid_in_btree,
833861
IterationDirection::Forwards,
834862
);

core/mvcc/database/mod.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1401,7 +1401,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
14011401
pub fn get_next_row_id_for_table(
14021402
&self,
14031403
table_id: MVTableId,
1404-
start: Option<&RowKey>,
1404+
start: Option<(bool, &RowKey)>, // (inclusive, row_key)
14051405
tx_id: TxID,
14061406
) -> Option<RowID> {
14071407
let res = self.get_row_id_for_table_in_direction(
@@ -1423,7 +1423,7 @@ impl<Clock: LogicalClock> MvStore<Clock> {
14231423
pub fn get_prev_row_id_for_table(
14241424
&self,
14251425
table_id: MVTableId,
1426-
start: Option<&RowKey>,
1426+
start: Option<(bool, &RowKey)>, // (inclusive, row_key)
14271427
tx_id: TxID,
14281428
) -> Option<RowID> {
14291429
let res = self.get_row_id_for_table_in_direction(
@@ -1445,15 +1445,15 @@ impl<Clock: LogicalClock> MvStore<Clock> {
14451445
pub fn get_row_id_for_table_in_direction(
14461446
&self,
14471447
table_id: MVTableId,
1448-
start: Option<&RowKey>,
1448+
start: Option<(bool, &RowKey)>, // (inclusive, row_key)
14491449
tx_id: TxID,
14501450
direction: IterationDirection,
14511451
) -> Option<RowID> {
14521452
tracing::trace!(
14531453
"getting_row_id_for_table_in_direction(table_id={}, range_start={:?}, direction={:?})",
14541454
table_id,
14551455
start,
1456-
direction
1456+
direction,
14571457
);
14581458

14591459
let tx = self
@@ -1464,13 +1464,16 @@ impl<Clock: LogicalClock> MvStore<Clock> {
14641464
if direction == IterationDirection::Forwards {
14651465
let min_bound = start.map(|start| RowID {
14661466
table_id,
1467-
row_id: start.clone(),
1467+
row_id: start.1.clone(),
14681468
});
1469+
let inclusive = start.map(|start| start.0).unwrap_or(false);
14691470

14701471
match min_bound {
14711472
Some(min_bound) => {
14721473
let mut rows = self.rows.range(min_bound..);
1473-
rows.next(); // exclusive wrt. min bound
1474+
if !inclusive {
1475+
rows.next();
1476+
}
14741477
loop {
14751478
// We are moving forward, so if a row was deleted we just need to skip it. Therefore, we need
14761479
// to loop either until we find a row that is not deleted or until we reach the end of the table.
@@ -1519,13 +1522,16 @@ impl<Clock: LogicalClock> MvStore<Clock> {
15191522
} else {
15201523
let max_bound = start.map(|start| RowID {
15211524
table_id,
1522-
row_id: start.clone(),
1525+
row_id: start.1.clone(),
15231526
});
1527+
let inclusive = start.map(|start| start.0).unwrap_or(false);
15241528

15251529
match max_bound {
15261530
Some(max_bound) => {
1527-
let mut rows = self.rows.range(max_bound..).rev();
1528-
rows.next(); // exclusive wrt. max bound
1531+
let mut rows = self.rows.range(..=max_bound).rev();
1532+
if !inclusive {
1533+
rows.next();
1534+
}
15291535
loop {
15301536
// We are moving backwards, so if a row was deleted we just need to skip it. Therefore, we need
15311537
// to loop either until we find a row that is not deleted or until we reach the beginning of the table.

0 commit comments

Comments
 (0)