Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abstract backend storage #13

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use crate::models::MIGRATIONS;
use crate::routes::*;
use axum::extract::DefaultBodyLimit;
Expand Down Expand Up @@ -36,7 +38,7 @@ const API_VERSION: &str = "v2";

#[derive(Clone)]
pub struct State {
db_pool: Pool<ConnectionManager<PgConnection>>,
backend: Arc<dyn models::backend::VssBackend>,
pub auth_key: Option<PublicKey>,
pub self_hosted: bool,
pub secp: Secp256k1<All>,
Expand Down Expand Up @@ -88,8 +90,9 @@ async fn main() -> anyhow::Result<()> {
.expect("migrations could not run");
}

let backend = Arc::new(models::backend::Postgres::new(db_pool));
let state = State {
db_pool,
backend,
auth_key,
self_hosted,
secp,
Expand Down
36 changes: 23 additions & 13 deletions src/migration.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::kv::KeyValue;
use crate::models::VssItem;
use crate::State;
use anyhow::anyhow;
Expand All @@ -6,7 +7,6 @@ use axum::headers::Authorization;
use axum::http::StatusCode;
use axum::{Extension, Json, TypedHeader};
use chrono::{DateTime, NaiveDateTime, Utc};
use diesel::Connection;
use log::{error, info};
use serde::{Deserialize, Deserializer};
use serde_json::json;
Expand Down Expand Up @@ -46,6 +46,12 @@ where
})
}

/// Migration assumes a v1 `get_object` response, which returns value as a base64-encoded string
///
/// Environment:
/// - MIGRATION_URL
/// - MIGRATION_BATCH_SIZE (default 100)
/// - MIGRATION_START_INDEX (default 0)
pub async fn migration_impl(admin_key: String, state: &State) -> anyhow::Result<()> {
let client = Agent::new();
let Ok(url) = std::env::var("MIGRATION_URL") else {
Expand Down Expand Up @@ -77,21 +83,25 @@ pub async fn migration_impl(admin_key: String, state: &State) -> anyhow::Result<
.set("x-api-key", &admin_key)
.send_string(&payload.to_string())?;
let items: Vec<Item> = resp.into_json()?;

let mut conn = state.db_pool.get().unwrap();

// Insert values into DB
conn.transaction::<_, anyhow::Error, _>(|conn| {
for item in items.iter() {
if let Ok(value) = base64::decode(&item.value) {
VssItem::put_item(conn, &item.store_id, &item.key, &value, item.version)?;
}
let nitems = items.len(); // we'll need this later and plan to consume items

let backend = state.backend.clone();

// Original migration code didn't preserve timestamps
// neither does [`VssItem::from_kv`]
let vss_items: Vec<_> = items.into_iter().filter_map(|item| {
if let Ok(value) = base64::decode(&item.value) {
Some(VssItem::from_kv(&item.store_id, KeyValue::new(item.key, value, item.version)))
} else {
log::warn!("Failed to decode value during migration: {}", item.value);
None
}
}).collect();

Ok(())
})?;
// Insert values into DB
backend.put_items(vss_items)?;

if items.len() < limit {
if nitems < limit {
finished = true;
} else {
offset += limit;
Expand Down
236 changes: 236 additions & 0 deletions src/models/backend.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
/// Selectable backend datastore
///
/// Implementation Status
/// - [X] Postgres
/// - [ ] Sqlite
/// - [ ] Rocksdb
use crate::kv::KeyValue;
use crate::models::VssItem;
use diesel::prelude::*;
use diesel::r2d2::PooledConnection;
use diesel::r2d2::{ConnectionManager, Pool};
use diesel::sql_query;
use diesel::sql_types::{BigInt, Bytea, Text};
use diesel::PgConnection;

/// For introspection on dyn trait object (since const generics for ADT still in nightly)
pub enum BackendType {
Postgres,
Rocksdb,
}

pub struct Postgres {
db_pool: Pool<ConnectionManager<PgConnection>>,
}

impl Postgres {
pub fn new(db_pool: Pool<ConnectionManager<PgConnection>>) -> Self {
Self { db_pool }
}
fn conn(
&self,
) -> anyhow::Result<PooledConnection<ConnectionManager<diesel::PgConnection>>, anyhow::Error>
{
Ok(self.db_pool.get()?)
}
}

pub struct Rocksdb;

pub trait VssBackend: Send + Sync {
fn backend_type(&self) -> BackendType;
fn get_item(&self, store_id: &str, key: &str) -> anyhow::Result<Option<VssItem>>;

#[cfg(test)]
/// This method fetches its own connection even if called from within a tx,
/// so it is moved to cfg(test) to prevent errors. It is manually inlined
/// into [`VssBackend::put_items`] and [`VssBackend::put_items_in_store`]
fn put_item(&self, store_id: &str, key: &str, value: &[u8], version: i64)
-> anyhow::Result<()>;

/// Wrap multiple item writes in a transaction
/// Takes `Vec<[VssItem]>`, ∴ Items may go to different `store_id`s
/// If you need atomic put for items all with same `store_id`, see [`VssBackend::put_items_in_store`]
fn put_items(&self, items: Vec<VssItem>) -> anyhow::Result<()>;

/// Wrap multiple item writes in a transaction
/// Items all placed in same `store_id`
/// If you need atomic put to potentially different store_ids, see [`VssBackend::put_items`]
fn put_items_in_store(&self, store_id: &str, items: Vec<KeyValue>) -> anyhow::Result<()>;

fn list_key_versions(
&self,
store_id: &str,
prefix: Option<&str>,
) -> anyhow::Result<Vec<(String, i64)>>;

#[cfg(test)]
/// THIS WILL NUKE YOUR DATA
/// Make sure `DATABASE_URL`` is not same as your dev/stage/prod connection
/// when running `cargo test`
fn clear_database(&self);
}

impl VssBackend for Postgres {
fn backend_type(&self) -> BackendType {
BackendType::Postgres
}

fn get_item(&self, store_id: &str, key: &str) -> anyhow::Result<Option<VssItem>> {
use super::schema::vss_db;

let mut conn = self.conn()?;

Ok(vss_db::table
.filter(vss_db::store_id.eq(store_id))
.filter(vss_db::key.eq(key))
.first::<VssItem>(&mut conn)
.optional()?)
}

#[cfg(test)]
fn put_item(
&self,
store_id: &str,
key: &str,
value: &[u8],
version: i64,
) -> anyhow::Result<()> {
let mut conn = self.conn()?;

sql_query("SELECT upsert_vss_db($1, $2, $3, $4)")
.bind::<Text, _>(store_id)
.bind::<Text, _>(key)
.bind::<Bytea, _>(value)
.bind::<BigInt, _>(version)
.execute(&mut conn)?;

Ok(())
}

fn put_items(&self, items: Vec<VssItem>) -> anyhow::Result<()> {
let mut conn = self.conn()?;

conn.transaction::<_, anyhow::Error, _>(|conn| {
for item in items {
// VssItem.value is Option for unclear reasons
let value = match item.value {
None => vec![],
Some(v) => v,
};
// Inline VssBackend::put_item which was moved to #[cfg(test)] only
sql_query("SELECT upsert_vss_db($1, $2, $3, $4)")
.bind::<Text, _>(item.store_id)
.bind::<Text, _>(item.key)
.bind::<Bytea, _>(value)
.bind::<BigInt, _>(item.version)
.execute(conn)?;
}

Ok(())
})?;

Ok(())
}

fn put_items_in_store(&self, store_id: &str, items: Vec<KeyValue>) -> anyhow::Result<()> {
let mut conn = self.conn()?;

conn.transaction::<_, anyhow::Error, _>(|conn| {
for kv in items {
// Inline VssBackend::put_item which was moved to #[cfg(test)] only
sql_query("SELECT upsert_vss_db($1, $2, $3, $4)")
.bind::<Text, _>(store_id)
.bind::<Text, _>(kv.key)
.bind::<Bytea, _>(kv.value.0)
.bind::<BigInt, _>(kv.version)
.execute(conn)?;
}

Ok(())
})?;

Ok(())
}

fn list_key_versions(
&self,
store_id: &str,
prefix: Option<&str>,
) -> anyhow::Result<Vec<(String, i64)>> {
use super::schema::vss_db;

let mut conn = self.conn()?;

let table = vss_db::table
.filter(vss_db::store_id.eq(store_id))
.select((vss_db::key, vss_db::version));

let res = match prefix {
None => table.load::<(String, i64)>(&mut conn)?,
Some(prefix) => table
.filter(vss_db::key.ilike(format!("{prefix}%")))
.load::<(String, i64)>(&mut conn)?,
};

Ok(res)
}

#[cfg(test)]
fn clear_database(&self) {
use crate::models::schema::vss_db;

let mut conn = self.conn().unwrap();
conn.transaction::<_, anyhow::Error, _>(|conn| {
diesel::delete(vss_db::table).execute(conn)?;
Ok(())
})
.unwrap();
}
}

impl VssBackend for Rocksdb {
fn backend_type(&self) -> BackendType {
BackendType::Rocksdb
}

fn get_item(&self, _store_id: &str, _key: &str) -> anyhow::Result<Option<VssItem>> {
todo!();
//Ok(None)
}

#[cfg(test)]
fn put_item(
&self,
_store_id: &str,
_key: &str,
_value: &[u8],
_version: i64,
) -> anyhow::Result<()> {
todo!();
}

fn put_items(&self, _items: Vec<VssItem>) -> anyhow::Result<()> {
todo!();
//Ok(())
}

fn put_items_in_store(&self, _store_id: &str, _items: Vec<KeyValue>) -> anyhow::Result<()> {
todo!();
//Ok(())
}

fn list_key_versions(
&self,
_store_id: &str,
_prefix: Option<&str>,
) -> anyhow::Result<Vec<(String, i64)>> {
todo!();
//Ok(vec![])
}

#[cfg(test)]
fn clear_database(&self) {
todo!();
}
}
Loading
Loading