Skip to content

Commit ce6719d

Browse files
committed
create async and blocking statements
1 parent 6db50b3 commit ce6719d

File tree

2 files changed

+200
-2
lines changed

2 files changed

+200
-2
lines changed

core/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ pub use io::{
7575
use parking_lot::{Mutex, RwLock};
7676
use rustc_hash::FxHashMap;
7777
use schema::Schema;
78-
pub use statement::Statement;
78+
pub use statement::{AsyncStatement, AsyncStepResult, BlockingStatement, Statement};
7979
use std::collections::HashSet;
8080
use std::time::Duration;
8181
use std::{

core/statement.rs

Lines changed: 199 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
use std::{
22
borrow::Cow,
3+
future::Future,
34
num::NonZero,
45
ops::Deref,
56
sync::{atomic::Ordering, Arc},
6-
task::Waker,
7+
task::{Context, Poll, Waker},
78
};
89

910
use tracing::{instrument, Level};
@@ -446,3 +447,200 @@ impl Statement {
446447
self.pager.io.as_ref()
447448
}
448449
}
450+
451+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
452+
pub enum AsyncStepResult {
453+
Done,
454+
Row,
455+
Interrupt,
456+
Busy,
457+
}
458+
459+
impl AsyncStepResult {
460+
#[inline]
461+
pub fn from_step_result(step: StepResult) -> Poll<Self> {
462+
let new_step = match step {
463+
vdbe::StepResult::Done => Self::Done,
464+
vdbe::StepResult::IO => return Poll::Pending,
465+
vdbe::StepResult::Row => Self::Row,
466+
vdbe::StepResult::Interrupt => Self::Interrupt,
467+
vdbe::StepResult::Busy => Self::Busy,
468+
};
469+
Poll::Ready(new_step)
470+
}
471+
}
472+
473+
/// A future that represents a single step of statement execution.
474+
/// This allows fine-grained async control over statement execution.
475+
pub struct StepFuture<'a> {
476+
stmt: &'a mut Statement,
477+
}
478+
479+
impl<'a> Future for StepFuture<'a> {
480+
type Output = Result<AsyncStepResult>;
481+
482+
fn poll(
483+
mut self: std::pin::Pin<&mut Self>,
484+
cx: &mut std::task::Context<'_>,
485+
) -> Poll<Self::Output> {
486+
self.stmt
487+
.step_with_waker(cx.waker())
488+
.map(AsyncStepResult::from_step_result)?
489+
.map(Ok)
490+
}
491+
}
492+
493+
impl Statement {
494+
/// Returns a future that completes when the next step result is available.
495+
/// This allows fine-grained async control over statement execution.
496+
#[inline]
497+
pub fn step_async(&mut self) -> StepFuture<'_> {
498+
StepFuture { stmt: self }
499+
}
500+
}
501+
502+
/// An async wrapper around Statement that implements Future for the entire execution.
503+
pub struct AsyncStatement {
504+
inner: Statement,
505+
}
506+
507+
impl AsyncStatement {
508+
#[inline]
509+
pub fn new(statement: Statement) -> Self {
510+
Self { inner: statement }
511+
}
512+
513+
/// Returns a future that completes when the next step result is available.
514+
#[inline]
515+
pub async fn step(&mut self) -> Result<AsyncStepResult> {
516+
self.inner.step_async().await
517+
}
518+
519+
/// Get a reference to the inner Statement for advanced use cases.
520+
#[inline]
521+
pub fn statement(&self) -> &Statement {
522+
&self.inner
523+
}
524+
525+
/// Get a mutable reference to the inner Statement for advanced use cases.
526+
#[inline]
527+
pub fn statement_mut(&mut self) -> &mut Statement {
528+
&mut self.inner
529+
}
530+
531+
/// Consume self and return the inner Statement.
532+
#[inline]
533+
pub fn into_inner(self) -> Statement {
534+
self.inner
535+
}
536+
}
537+
538+
/// A blocking wrapper around AsyncStatement that manually polls the future
539+
/// and advances IO when pending.
540+
/// Useful for synchronous code that wants automatic IO handling.
541+
pub struct BlockingStatement {
542+
inner: AsyncStatement,
543+
}
544+
545+
impl BlockingStatement {
546+
#[inline]
547+
pub fn new(async_stmt: AsyncStatement) -> Self {
548+
Self { inner: async_stmt }
549+
}
550+
551+
/// Create a BlockingStatement directly from a Statement.
552+
#[inline]
553+
pub fn from_statement(statement: Statement) -> Self {
554+
Self::new(AsyncStatement::new(statement))
555+
}
556+
557+
/// Step and block on IO until a non-IO result is available.
558+
/// This manually polls the async future and advances IO when pending.
559+
pub fn step(&mut self) -> Result<StepResult> {
560+
let waker = Waker::noop();
561+
let mut cx = Context::from_waker(&waker);
562+
563+
loop {
564+
// Create the future in a block so it's dropped before we access IO
565+
let poll_result = {
566+
let mut fut = std::pin::pin!(self.inner.step());
567+
fut.as_mut().poll(&mut cx)
568+
};
569+
570+
match poll_result {
571+
Poll::Ready(Ok(async_result)) => {
572+
return Ok(match async_result {
573+
AsyncStepResult::Done => StepResult::Done,
574+
AsyncStepResult::Row => StepResult::Row,
575+
AsyncStepResult::Interrupt => StepResult::Interrupt,
576+
AsyncStepResult::Busy => StepResult::Busy,
577+
});
578+
}
579+
Poll::Ready(Err(e)) => return Err(e),
580+
Poll::Pending => {
581+
// Future is pending - advance IO and retry
582+
self.inner.statement().pager.io.step()?;
583+
}
584+
}
585+
}
586+
}
587+
588+
/// Get a reference to the inner AsyncStatement.
589+
#[inline]
590+
pub fn async_statement(&self) -> &AsyncStatement {
591+
&self.inner
592+
}
593+
594+
/// Get a mutable reference to the inner AsyncStatement.
595+
#[inline]
596+
pub fn async_statement_mut(&mut self) -> &mut AsyncStatement {
597+
&mut self.inner
598+
}
599+
600+
/// Get a reference to the underlying Statement.
601+
#[inline]
602+
pub fn statement(&self) -> &Statement {
603+
self.inner.statement()
604+
}
605+
606+
/// Get a mutable reference to the underlying Statement.
607+
#[inline]
608+
pub fn statement_mut(&mut self) -> &mut Statement {
609+
self.inner.statement_mut()
610+
}
611+
612+
/// Consume self and return the inner AsyncStatement.
613+
#[inline]
614+
pub fn into_async(self) -> AsyncStatement {
615+
self.inner
616+
}
617+
618+
/// Consume self and return the underlying Statement.
619+
#[inline]
620+
pub fn into_statement(self) -> Statement {
621+
self.inner.into_inner()
622+
}
623+
}
624+
625+
impl Statement {
626+
/// Convert this statement into an AsyncStatement for async execution.
627+
#[inline]
628+
pub fn into_async(self) -> AsyncStatement {
629+
AsyncStatement::new(self)
630+
}
631+
632+
/// Convert this statement into a BlockingStatement for sync execution
633+
/// with automatic IO handling.
634+
#[inline]
635+
pub fn into_blocking(self) -> BlockingStatement {
636+
BlockingStatement::from_statement(self)
637+
}
638+
}
639+
640+
impl AsyncStatement {
641+
/// Convert this async statement into a BlockingStatement for sync execution.
642+
#[inline]
643+
pub fn into_blocking(self) -> BlockingStatement {
644+
BlockingStatement::new(self)
645+
}
646+
}

0 commit comments

Comments
 (0)