Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 2 additions & 16 deletions core/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub use io::{
use parking_lot::{Mutex, RwLock};
use rustc_hash::FxHashMap;
use schema::Schema;
pub use statement::Statement;
pub use statement::{AsyncStatement, AsyncStepResult, BlockingStatement, Statement};
use std::collections::HashSet;
use std::time::Duration;
use std::{
Expand Down Expand Up @@ -2278,21 +2278,7 @@ impl Connection {
}
let pragma = format!("PRAGMA {pragma_name}({pragma_value})");
let mut stmt = self.prepare(pragma)?;
let mut results = Vec::new();
loop {
match stmt.step()? {
vdbe::StepResult::Row => {
let row: Vec<Value> = stmt.row().unwrap().get_values().cloned().collect();
results.push(row);
}
vdbe::StepResult::Interrupt | vdbe::StepResult::Busy => {
return Err(LimboError::Busy);
}
_ => break,
}
}

Ok(results)
stmt.run_collect_rows()
}

#[inline]
Expand Down
214 changes: 204 additions & 10 deletions core/statement.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::{
borrow::Cow,
future::Future,
num::NonZero,
ops::Deref,
sync::{atomic::Ordering, Arc},
task::Waker,
task::{Context, Poll, Waker},
};

use tracing::{instrument, Level};
Expand Down Expand Up @@ -108,14 +109,12 @@ impl Statement {
self.program.connection.mv_store()
}

fn _step(&mut self, waker: Option<&Waker>) -> Result<StepResult> {
fn _step(&mut self, waker: &Waker) -> Result<StepResult> {
// If we're waiting for a busy handler timeout, check if we can proceed
if let Some(busy_state) = self.busy_handler_state.as_ref() {
if self.pager.io.now() < busy_state.timeout() {
// Yield the query as the timeout has not been reached yet
if let Some(waker) = waker {
waker.wake_by_ref();
}
waker.wake_by_ref();
return Ok(StepResult::IO);
}
}
Expand Down Expand Up @@ -172,9 +171,7 @@ impl Statement {
// Invoke the busy handler to determine if we should retry
if busy_state.invoke(&handler, now) {
// Handler says retry, yield with IO to wait for timeout
if let Some(waker) = waker {
waker.wake_by_ref();
}
waker.wake_by_ref();
res = Ok(StepResult::IO);
}
// else: Handler says stop, res stays as Busy
Expand All @@ -184,11 +181,11 @@ impl Statement {
}

pub fn step(&mut self) -> Result<StepResult> {
self._step(None)
self._step(Waker::noop())
}

pub fn step_with_waker(&mut self, waker: &Waker) -> Result<StepResult> {
self._step(Some(waker))
self._step(waker)
}

pub fn run_ignore_rows(&mut self) -> Result<()> {
Expand Down Expand Up @@ -450,3 +447,200 @@ impl Statement {
self.pager.io.as_ref()
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AsyncStepResult {
Done,
Row,
Interrupt,
Busy,
}

impl AsyncStepResult {
#[inline]
pub fn from_step_result(step: StepResult) -> Poll<Self> {
let new_step = match step {
vdbe::StepResult::Done => Self::Done,
vdbe::StepResult::IO => return Poll::Pending,
vdbe::StepResult::Row => Self::Row,
vdbe::StepResult::Interrupt => Self::Interrupt,
vdbe::StepResult::Busy => Self::Busy,
};
Poll::Ready(new_step)
}
}

/// A future that represents a single step of statement execution.
/// This allows fine-grained async control over statement execution.
pub struct StepFuture<'a> {
stmt: &'a mut Statement,
}

impl<'a> Future for StepFuture<'a> {
type Output = Result<AsyncStepResult>;

fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Self::Output> {
self.stmt
.step_with_waker(cx.waker())
.map(AsyncStepResult::from_step_result)?
.map(Ok)
}
}

impl Statement {
/// Returns a future that completes when the next step result is available.
/// This allows fine-grained async control over statement execution.
#[inline]
pub fn step_async(&mut self) -> StepFuture<'_> {
StepFuture { stmt: self }
}
}

/// An async wrapper around Statement that implements Future for the entire execution.
pub struct AsyncStatement {
inner: Statement,
}

impl AsyncStatement {
#[inline]
pub fn new(statement: Statement) -> Self {
Self { inner: statement }
}

/// Returns a future that completes when the next step result is available.
#[inline]
pub async fn step(&mut self) -> Result<AsyncStepResult> {
self.inner.step_async().await
}

/// Get a reference to the inner Statement for advanced use cases.
#[inline]
pub fn statement(&self) -> &Statement {
&self.inner
}

/// Get a mutable reference to the inner Statement for advanced use cases.
#[inline]
pub fn statement_mut(&mut self) -> &mut Statement {
&mut self.inner
}

/// Consume self and return the inner Statement.
#[inline]
pub fn into_inner(self) -> Statement {
self.inner
}
}

/// A blocking wrapper around AsyncStatement that manually polls the future
/// and advances IO when pending.
/// Useful for synchronous code that wants automatic IO handling.
pub struct BlockingStatement {
inner: AsyncStatement,
}

impl BlockingStatement {
#[inline]
pub fn new(async_stmt: AsyncStatement) -> Self {
Self { inner: async_stmt }
}

/// Create a BlockingStatement directly from a Statement.
#[inline]
pub fn from_statement(statement: Statement) -> Self {
Self::new(AsyncStatement::new(statement))
}

/// Step and block on IO until a non-IO result is available.
/// This manually polls the async future and advances IO when pending.
pub fn step(&mut self) -> Result<StepResult> {
let waker = Waker::noop();
let mut cx = Context::from_waker(&waker);

loop {
// Create the future in a block so it's dropped before we access IO
let poll_result = {
let mut fut = std::pin::pin!(self.inner.step());
fut.as_mut().poll(&mut cx)
};

match poll_result {
Poll::Ready(Ok(async_result)) => {
return Ok(match async_result {
AsyncStepResult::Done => StepResult::Done,
AsyncStepResult::Row => StepResult::Row,
AsyncStepResult::Interrupt => StepResult::Interrupt,
AsyncStepResult::Busy => StepResult::Busy,
});
}
Poll::Ready(Err(e)) => return Err(e),
Poll::Pending => {
// Future is pending - advance IO and retry
self.inner.statement().pager.io.step()?;
}
}
}
}

/// Get a reference to the inner AsyncStatement.
#[inline]
pub fn async_statement(&self) -> &AsyncStatement {
&self.inner
}

/// Get a mutable reference to the inner AsyncStatement.
#[inline]
pub fn async_statement_mut(&mut self) -> &mut AsyncStatement {
&mut self.inner
}

/// Get a reference to the underlying Statement.
#[inline]
pub fn statement(&self) -> &Statement {
self.inner.statement()
}

/// Get a mutable reference to the underlying Statement.
#[inline]
pub fn statement_mut(&mut self) -> &mut Statement {
self.inner.statement_mut()
}

/// Consume self and return the inner AsyncStatement.
#[inline]
pub fn into_async(self) -> AsyncStatement {
self.inner
}

/// Consume self and return the underlying Statement.
#[inline]
pub fn into_statement(self) -> Statement {
self.inner.into_inner()
}
}

impl Statement {
/// Convert this statement into an AsyncStatement for async execution.
#[inline]
pub fn into_async(self) -> AsyncStatement {
AsyncStatement::new(self)
}

/// Convert this statement into a BlockingStatement for sync execution
/// with automatic IO handling.
#[inline]
pub fn into_blocking(self) -> BlockingStatement {
BlockingStatement::from_statement(self)
}
}

impl AsyncStatement {
/// Convert this async statement into a BlockingStatement for sync execution.
#[inline]
pub fn into_blocking(self) -> BlockingStatement {
BlockingStatement::new(self)
}
}
26 changes: 3 additions & 23 deletions core/storage/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7815,7 +7815,7 @@ mod tests {
},
types::Text,
vdbe::Register,
BufferPool, Completion, Connection, IOContext, StepResult, Wal, WalFile, WalFileShared,
BufferPool, Completion, Connection, IOContext, Wal, WalFile, WalFileShared,
};
use arc_swap::ArcSwapOption;
use std::{collections::HashSet, mem::transmute, ops::Deref, sync::Arc};
Expand Down Expand Up @@ -9800,17 +9800,7 @@ mod tests {

for query in queries {
let mut stmt = conn.query(query).unwrap().unwrap();
loop {
let row = stmt.step().expect("step");
match row {
StepResult::Done => {
break;
}
_ => {
tracing::debug!("row {:?}", row);
}
}
}
stmt.run_ignore_rows().expect("run_ignore_rows");
}
}

Expand Down Expand Up @@ -9859,17 +9849,7 @@ mod tests {

for query in queries {
let mut stmt = conn.query(query).unwrap().unwrap();
loop {
let row = stmt.step().expect("step");
match row {
StepResult::Done => {
break;
}
_ => {
tracing::debug!("row {:?}", row);
}
}
}
stmt.run_ignore_rows().expect("run_ignore_rows");
}
}

Expand Down
8 changes: 3 additions & 5 deletions core/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2581,11 +2581,9 @@ impl IOCompletions {
}
}

pub fn set_waker(&self, waker: Option<&Waker>) {
if let Some(waker) = waker {
match self {
IOCompletions::Single(c) => c.set_waker(waker),
}
pub fn set_waker(&self, waker: &Waker) {
match self {
IOCompletions::Single(c) => c.set_waker(waker),
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions core/vdbe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ impl Program {
state: &mut ProgramState,
pager: Arc<Pager>,
query_mode: QueryMode,
waker: Option<&Waker>,
waker: &Waker,
) -> Result<StepResult> {
state.execution_state = ProgramExecutionState::Running;
let result = match query_mode {
Expand Down Expand Up @@ -844,7 +844,7 @@ impl Program {
// Process the subprogram - it will handle its own explain_step internally
// The subprogram's explain_step will process all its instructions (including any nested subprograms)
// and return StepResult::Row for each instruction, then StepResult::Done when finished
let result = p.step(state, pager.clone(), QueryMode::Explain, None)?;
let result = p.step(state, pager.clone(), QueryMode::Explain, Waker::noop())?;

match result {
StepResult::Done => {
Expand Down Expand Up @@ -977,7 +977,7 @@ impl Program {
&self,
state: &mut ProgramState,
pager: Arc<Pager>,
waker: Option<&Waker>,
waker: &Waker,
) -> Result<StepResult> {
let enable_tracing = tracing::enabled!(tracing::Level::TRACE);
loop {
Expand Down
Loading
Loading