@@ -165,7 +165,12 @@ impl<Clock: LogicalClock + 'static> MvccLazyCursor<Clock> {
165165 } if !in_btree => row_id,
166166 _ => panic ! ( "invalid position to read current mvcc row" ) ,
167167 } ;
168- self . db . read ( self . tx_id , row_id)
168+ let maybe_index_id = match & self . mv_cursor_type {
169+ MvccCursorType :: Index ( _) => Some ( self . table_id ) ,
170+ MvccCursorType :: Table => None ,
171+ } ;
172+ self . db
173+ . read_from_table_or_index ( self . tx_id , row_id, maybe_index_id)
169174 }
170175
171176 pub fn close ( self ) -> Result < ( ) > {
@@ -253,8 +258,8 @@ impl<Clock: LogicalClock + 'static> MvccLazyCursor<Clock> {
253258 btree_consumed : true ,
254259 } ,
255260 ( None , None ) => match direction {
256- IterationDirection :: Forwards => CursorPosition :: BeforeFirst ,
257- IterationDirection :: Backwards => CursorPosition :: End ,
261+ IterationDirection :: Forwards => CursorPosition :: End ,
262+ IterationDirection :: Backwards => CursorPosition :: BeforeFirst ,
258263 } ,
259264 }
260265 }
@@ -335,7 +340,7 @@ impl<Clock: LogicalClock + 'static> CursorTrait for MvccLazyCursor<Clock> {
335340 } ) ,
336341 // TODO: do we need to forward twice?
337342 CursorPosition :: BeforeFirst => match & self . mv_cursor_type {
338- MvccCursorType :: Table => Some ( ( false , RowKey :: Int ( i64:: MIN ) ) ) ,
343+ MvccCursorType :: Table => Some ( ( true , RowKey :: Int ( i64:: MIN ) ) ) ,
339344 MvccCursorType :: Index ( _) => None ,
340345 } ,
341346 CursorPosition :: End => {
@@ -603,7 +608,26 @@ impl<Clock: LogicalClock + 'static> CursorTrait for MvccLazyCursor<Clock> {
603608 row_id,
604609 in_btree : _,
605610 btree_consumed : _,
606- } => Some ( row_id. row_id . to_int_or_panic ( ) ) ,
611+ } => match & row_id. row_id {
612+ RowKey :: Int ( id) => Some ( * id) ,
613+ RowKey :: Record ( sortable_key) => {
614+ // For index cursors, the rowid is stored in the last column of the index record
615+ let MvccCursorType :: Index ( index_info) = & self . mv_cursor_type else {
616+ panic ! ( "RowKey::Record requires Index cursor type" ) ;
617+ } ;
618+ if index_info. has_rowid {
619+ let mut record_cursor = RecordCursor :: new ( ) ;
620+ match sortable_key. key . last_value ( & mut record_cursor) {
621+ Some ( Ok ( crate :: types:: ValueRef :: Integer ( rowid) ) ) => Some ( rowid) ,
622+ _ => {
623+ crate :: bail_parse_error!( "Failed to parse rowid from index record" )
624+ }
625+ }
626+ } else {
627+ crate :: bail_parse_error!( "Indexes without rowid are not supported in MVCC" ) ;
628+ }
629+ }
630+ } ,
607631 CursorPosition :: BeforeFirst => None ,
608632 CursorPosition :: End => None ,
609633 } ;
@@ -617,19 +641,26 @@ impl<Clock: LogicalClock + 'static> CursorTrait for MvccLazyCursor<Clock> {
617641 }
618642
619643 fn seek ( & mut self , seek_key : SeekKey < ' _ > , op : SeekOp ) -> Result < IOResult < SeekResult > > {
620- let row_id = match seek_key {
621- SeekKey :: TableRowId ( row_id) => row_id,
622- SeekKey :: IndexKey ( _) => {
623- panic ! ( "SeekKey::IndexKey is not supported for mvcc table cursor" ) ;
624- }
625- } ;
626644 // gt -> lower_bound bound excluded, we want first row after row_id
627645 // ge -> lower_bound bound included, we want first row equal to row_id or first row after row_id
628646 // lt -> upper_bound bound excluded, we want last row before row_id
629647 // le -> upper_bound bound included, we want last row equal to row_id or first row before row_id
630- let rowid = RowID {
631- table_id : self . table_id ,
632- row_id : RowKey :: Int ( row_id) ,
648+ let rowid = match seek_key {
649+ SeekKey :: TableRowId ( row_id) => RowID {
650+ table_id : self . table_id ,
651+ row_id : RowKey :: Int ( row_id) ,
652+ } ,
653+ SeekKey :: IndexKey ( index_key) => {
654+ let MvccCursorType :: Index ( index_info) = & self . mv_cursor_type else {
655+ panic ! ( "SeekKey::IndexKey requires Index cursor type" ) ;
656+ } ;
657+ let sortable_key =
658+ SortableIndexKey :: new_from_record ( index_key. clone ( ) , index_info. clone ( ) ) ;
659+ RowID {
660+ table_id : self . table_id ,
661+ row_id : RowKey :: Record ( sortable_key) ,
662+ }
663+ }
633664 } ;
634665 let ( bound, lower_bound) = match op {
635666 SeekOp :: GT => ( Bound :: Excluded ( & rowid) , true ) ,
@@ -638,15 +669,15 @@ impl<Clock: LogicalClock + 'static> CursorTrait for MvccLazyCursor<Clock> {
638669 SeekOp :: LE { eq_only : _ } => ( Bound :: Included ( & rowid) , false ) ,
639670 } ;
640671 self . invalidate_record ( ) ;
641- let rowid = self . db . seek_rowid ( bound, lower_bound, self . tx_id ) ;
642- if let Some ( rowid ) = rowid {
672+ let found_rowid = self . db . seek_rowid ( bound, lower_bound, self . tx_id ) ;
673+ if let Some ( found_rowid ) = found_rowid {
643674 self . current_pos . replace ( CursorPosition :: Loaded {
644- row_id : rowid . clone ( ) ,
675+ row_id : found_rowid . clone ( ) ,
645676 in_btree : false ,
646677 btree_consumed : false ,
647678 } ) ;
648679 if op. eq_only ( ) {
649- if rowid . row_id . to_int_or_panic ( ) == row_id {
680+ if found_rowid . row_id == rowid . row_id {
650681 Ok ( IOResult :: Done ( SeekResult :: Found ) )
651682 } else {
652683 Ok ( IOResult :: Done ( SeekResult :: NotFound ) )
@@ -665,13 +696,20 @@ impl<Clock: LogicalClock + 'static> CursorTrait for MvccLazyCursor<Clock> {
665696 }
666697 }
667698
668- /// Insert a row into the table.
699+ /// Insert a row into the table or index .
669700 /// Sets the cursor to the inserted row.
670701 fn insert ( & mut self , key : & BTreeKey ) -> Result < IOResult < ( ) > > {
671- let Some ( rowid) = key. maybe_rowid ( ) else {
672- panic ! ( "BTreeKey::maybe_rowid() should return Some(rowid) for table rowid keys" ) ;
702+ let row_id = match key {
703+ BTreeKey :: TableRowId ( ( rowid, _) ) => RowID :: new ( self . table_id , RowKey :: Int ( * rowid) ) ,
704+ BTreeKey :: IndexKey ( record) => {
705+ let MvccCursorType :: Index ( index_info) = & self . mv_cursor_type else {
706+ panic ! ( "BTreeKey::IndexKey requires Index cursor type" ) ;
707+ } ;
708+ let sortable_key =
709+ SortableIndexKey :: new_from_record ( ( * record) . clone ( ) , index_info. clone ( ) ) ;
710+ RowID :: new ( self . table_id , RowKey :: Record ( sortable_key) )
711+ }
673712 } ;
674- let row_id = RowID :: new ( self . table_id , RowKey :: Int ( rowid) ) ;
675713 let record_buf = key
676714 . get_record ( )
677715 . ok_or ( LimboError :: InternalError (
@@ -695,26 +733,43 @@ impl<Clock: LogicalClock + 'static> CursorTrait for MvccLazyCursor<Clock> {
695733 in_btree : false ,
696734 btree_consumed : true ,
697735 } ) ;
736+ let maybe_index_id = match & self . mv_cursor_type {
737+ MvccCursorType :: Index ( _) => Some ( self . table_id ) ,
738+ MvccCursorType :: Table => None ,
739+ } ;
698740 // FIXME: set btree to somewhere close to this rowid?
699- if self . db . read ( self . tx_id , row. id . clone ( ) ) ?. is_some ( ) {
700- self . db . update ( self . tx_id , row) . inspect_err ( |_| {
701- self . current_pos . replace ( CursorPosition :: BeforeFirst ) ;
702- } ) ?;
741+ if self
742+ . db
743+ . read_from_table_or_index ( self . tx_id , row. id . clone ( ) , maybe_index_id) ?
744+ . is_some ( )
745+ {
746+ self . db
747+ . update_to_table_or_index ( self . tx_id , row, maybe_index_id)
748+ . inspect_err ( |_| {
749+ self . current_pos . replace ( CursorPosition :: BeforeFirst ) ;
750+ } ) ?;
703751 } else {
704- self . db . insert ( self . tx_id , row) . inspect_err ( |_| {
705- self . current_pos . replace ( CursorPosition :: BeforeFirst ) ;
706- } ) ?;
752+ self . db
753+ . insert_to_table_or_index ( self . tx_id , row, maybe_index_id)
754+ . inspect_err ( |_| {
755+ self . current_pos . replace ( CursorPosition :: BeforeFirst ) ;
756+ } ) ?;
707757 }
708758 self . invalidate_record ( ) ;
709759 Ok ( IOResult :: Done ( ( ) ) )
710760 }
711761
712762 fn delete ( & mut self ) -> Result < IOResult < ( ) > > {
713- let IOResult :: Done ( Some ( rowid) ) = self . rowid ( ) ? else {
714- panic ! ( "Rowid should be Some(rowid) for mvcc table cursor" ) ;
763+ let rowid = match self . get_current_pos ( ) {
764+ CursorPosition :: Loaded { row_id, .. } => row_id,
765+ _ => panic ! ( "Cannot delete: no current row" ) ,
766+ } ;
767+ let maybe_index_id = match & self . mv_cursor_type {
768+ MvccCursorType :: Index ( _) => Some ( self . table_id ) ,
769+ MvccCursorType :: Table => None ,
715770 } ;
716- let rowid = RowID :: new ( self . table_id , RowKey :: Int ( rowid ) ) ;
717- self . db . delete ( self . tx_id , rowid) ?;
771+ self . db
772+ . delete_from_table_or_index ( self . tx_id , rowid, maybe_index_id ) ?;
718773 self . invalidate_record ( ) ;
719774 Ok ( IOResult :: Done ( ( ) ) )
720775 }
@@ -873,7 +928,10 @@ impl<Clock: LogicalClock + 'static> CursorTrait for MvccLazyCursor<Clock> {
873928 }
874929
875930 fn get_index_info ( & self ) -> & crate :: types:: IndexInfo {
876- todo ! ( )
931+ match & self . mv_cursor_type {
932+ MvccCursorType :: Index ( index_info) => index_info,
933+ MvccCursorType :: Table => panic ! ( "get_index_info called on table cursor" ) ,
934+ }
877935 }
878936
879937 fn seek_end ( & mut self ) -> Result < IOResult < ( ) > > {
@@ -911,7 +969,10 @@ impl<Clock: LogicalClock + 'static> CursorTrait for MvccLazyCursor<Clock> {
911969 }
912970
913971 fn has_rowid ( & self ) -> bool {
914- todo ! ( )
972+ match & self . mv_cursor_type {
973+ MvccCursorType :: Index ( index_info) => index_info. has_rowid ,
974+ MvccCursorType :: Table => true , // currently we don't support WITHOUT ROWID tables
975+ }
915976 }
916977
917978 fn record_cursor_mut ( & self ) -> std:: cell:: RefMut < ' _ , crate :: types:: RecordCursor > {
0 commit comments