Skip to content
Draft
20 changes: 15 additions & 5 deletions bindings/rust/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::transaction::DropBehavior;
use crate::transaction::TransactionBehavior;
use crate::ConnectionPool;
use crate::Error;
use crate::IntoParams;
use crate::Row;
Expand Down Expand Up @@ -49,17 +50,16 @@ pub struct Connection {
///
/// By default, the value is [DropBehavior::Ignore] which effectively does nothing.
pub(crate) dangling_tx: AtomicDropBehavior,
connection_pool: Arc<ConnectionPool>,
}

impl Clone for Connection {
fn clone(&self) -> Self {
let i = self.inner.clone();

Self {
inner: i,
//inner: Arc::clone(&self.inner),
inner: self.inner.clone(),
transaction_behavior: self.transaction_behavior,
dangling_tx: AtomicDropBehavior::new(self.dangling_tx.load(Ordering::SeqCst)),
connection_pool: self.connection_pool.clone(),
}
}
}
Expand All @@ -68,12 +68,13 @@ unsafe impl Send for Connection {}
unsafe impl Sync for Connection {}

impl Connection {
pub fn create(conn: Arc<turso_core::Connection>) -> Self {
pub(crate) fn create(conn: Arc<turso_core::Connection>, cp: Arc<ConnectionPool>) -> Self {
#[allow(clippy::arc_with_non_send_sync)]
let connection = Connection {
inner: Some(Arc::new(Mutex::new(conn))),
transaction_behavior: TransactionBehavior::Deferred,
dangling_tx: AtomicDropBehavior::new(DropBehavior::Ignore),
connection_pool: cp,
};
connection
}
Expand Down Expand Up @@ -236,3 +237,12 @@ impl Debug for Connection {
f.debug_struct("Connection").finish()
}
}

impl Drop for Connection {
fn drop(&mut self) {
if self.connection_pool.is_enabled() {
self.connection_pool.add(self.clone());
self.inner = None;
}
}
}
86 changes: 86 additions & 0 deletions bindings/rust/src/connection_pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Connection Pool for the Turso Rust Binding
//
// It is transparent, one ConnectionPool per Database
// if the ConnectionPool option is set the Database
// creates and holds the pool.
// If not set then Database behaves as before
//
// When connections are dropped (by going out of scope) they will be cleaned and
// returned to the pool.
//
// The pool will be filled lazily. If the ConnectionPool is enabled in the Database
// then it will try to get a Connection from the pool. If there is no Connection
// available then a new Connection will be created by the database. When that
// Connection goes out of scope, and is dropped, the connection gets cleaned and
// added to the pool.

use crate::Connection;
use std::sync::{Arc, Mutex};

const POOL_SIZE: usize = 10;

#[derive(Clone)]
struct InnerConnectionPool {
pool: Arc<Mutex<Vec<Connection>>>,
}

#[derive(Clone)]
pub(crate) struct ConnectionPool {
inner_pool: Option<InnerConnectionPool>,
}

impl ConnectionPool {
pub(crate) fn new(active_pool: bool) -> Self {
match active_pool {
true => {
let inner_pool = InnerConnectionPool {
pool: Arc::new(Mutex::new(Vec::with_capacity(POOL_SIZE))),
};
ConnectionPool {
inner_pool: Some(inner_pool),
}
}
false => ConnectionPool { inner_pool: None },
}
}

pub(crate) fn is_enabled(&self) -> bool {
match &self.inner_pool {
Some(_) => {
return true;
}
None => return false,
};
}

pub(crate) fn get(&self) -> Option<Connection> {
match &self.inner_pool {
Some(p) => {
let mut pool = p.pool.lock().unwrap();
return pool.pop();
}
None => return None,
};
}

pub(crate) fn add(&self, obj: Connection) {
if let Some(p) = &self.inner_pool {
let mut pool = p.pool.lock().unwrap();
//if &self.available_connections() >= &pool.capacity() {
// let _ = &pool.reserve(&self.available_connections() * 10);
//}

pool.push(obj);
}
}

// probably only used for testing and potentially for tuning the size
#[allow(dead_code)]
pub(crate) fn available_connections(&self) -> usize {
if let Some(p) = &self.inner_pool {
p.pool.lock().unwrap().len()
} else {
0
}
}
}
109 changes: 107 additions & 2 deletions bindings/rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;

pub mod connection;
mod connection_pool;
pub mod params;
mod rows;
pub mod transaction;
Expand All @@ -56,6 +57,7 @@ use std::task::Poll;
pub use turso_core::EncryptionOpts;
use turso_core::OpenFlags;

use crate::connection_pool::ConnectionPool;
// Re-exports rows
pub use crate::rows::{Row, Rows};

Expand Down Expand Up @@ -91,6 +93,7 @@ pub struct Builder {
enable_encryption: bool,
vfs: Option<String>,
encryption_opts: Option<EncryptionOpts>,
enable_connection_pool: bool,
}

impl Builder {
Expand All @@ -101,6 +104,7 @@ impl Builder {
enable_encryption: false,
vfs: None,
encryption_opts: None,
enable_connection_pool: false,
}
}

Expand All @@ -119,6 +123,11 @@ impl Builder {
self
}

pub fn with_connection_pool(mut self, enable_connection_pool: bool) -> Self {
self.enable_connection_pool = enable_connection_pool;
self
}

/// Build the database.
#[allow(unused_variables, clippy::arc_with_non_send_sync)]
pub async fn build(self) -> Result<Database> {
Expand All @@ -131,7 +140,12 @@ impl Builder {
opts,
self.encryption_opts.clone(),
)?;
Ok(Database { inner: db })
let cp = ConnectionPool::new(self.enable_connection_pool);

Ok(Database {
inner: db,
connection_pool: cp.into(),
})
}

fn get_io(&self) -> Result<Arc<dyn turso_core::IO>> {
Expand Down Expand Up @@ -193,6 +207,7 @@ impl Builder {
#[derive(Clone)]
pub struct Database {
inner: Arc<turso_core::Database>,
connection_pool: Arc<ConnectionPool>,
}

unsafe impl Send for Database {}
Expand All @@ -207,8 +222,18 @@ impl Debug for Database {
impl Database {
/// Connect to the database.
pub fn connect(&self) -> Result<Connection> {
match &self.connection_pool.is_enabled() {
true => match self.connection_pool.get() {
Some(c) => return Ok(c),
None => return self._connect(),
},
false => return self._connect(),
}
}

pub fn _connect(&self) -> Result<Connection> {
let conn = self.inner.connect()?;
Ok(Connection::create(conn))
Ok(Connection::create(conn, self.connection_pool.clone()))
}
}

Expand Down Expand Up @@ -399,6 +424,7 @@ pub struct Transaction {}
#[cfg(test)]
mod tests {
use super::*;
use connection_pool::ConnectionPool;
use tempfile::NamedTempFile;

#[tokio::test]
Expand Down Expand Up @@ -561,4 +587,83 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_connection_pool() -> Result<()> {
let db = Builder::new_local(":memory:")
.build()
.await
.expect("Turso Failed to Build memory db");

let cp = ConnectionPool::new(true);

// with a new connection pool we should get a None out
let no_connection = cp.get();
assert!(no_connection.is_none());
assert!(cp.available_connections() == 0);

let conn = db.connect()?;
cp.add(conn);
assert!(cp.available_connections() == 1);

let conn_r = cp.get();
assert!(conn_r.is_some());
match conn_r {
Some(c) => {
assert!(cp.available_connections() == 0);
cp.add(c);
assert!(cp.available_connections() == 1);
}
None => assert!(false),
}

let conn2 = db.connect()?;
cp.add(conn2);
assert!(cp.available_connections() == 2);

let conn3 = db.connect()?;
cp.add(conn3);
assert!(cp.available_connections() == 3);

let conn4 = db.connect()?;
cp.add(conn4);
assert!(cp.available_connections() == 4);

let _conn_r1 = cp.get();
assert!(cp.available_connections() == 3);
let _conn_r2 = cp.get();
assert!(cp.available_connections() == 2);
let _conn_r3 = cp.get();
assert!(cp.available_connections() == 1);
let _conn_r4 = cp.get();
assert!(cp.available_connections() == 0);

Ok(())
}

#[tokio::test]
async fn test_no_connection_pool() -> Result<()> {
let db = Builder::new_local(":memory:")
.build()
.await
.expect("Turso Failed to Build memory db");

let cp = ConnectionPool::new(false);

// with a new connection pool we should get a None out
let no_connection = cp.get();
assert!(no_connection.is_none());
assert!(cp.available_connections() == 0);

// if we add a conmnection to a non existant pool then the number of connections is 0
let conn = db.connect()?;
cp.add(conn);
assert!(cp.available_connections() == 0);

// no pool so still can't get a connection out
let conn_r = cp.get();
assert!(conn_r.is_none());

Ok(())
}
}