Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add delete_object #8

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions migrations/2023-12-06-020636_deletes/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE vss_db
DROP COLUMN deleted;
68 changes: 68 additions & 0 deletions migrations/2023-12-06-020636_deletes/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
ALTER TABLE vss_db
ADD COLUMN deleted BOOLEAN NOT NULL DEFAULT FALSE;

-- modify upsert_vss_db to set deleted to false
CREATE OR REPLACE FUNCTION upsert_vss_db(
p_store_id TEXT,
p_key TEXT,
p_value bytea,
p_version BIGINT
) RETURNS VOID AS
$$
BEGIN

WITH new_values (store_id, key, value, version) AS (VALUES (p_store_id, p_key, p_value, p_version))
INSERT
INTO vss_db
(store_id, key, value, version)
SELECT new_values.store_id,
new_values.key,
new_values.value,
new_values.version
FROM new_values
LEFT JOIN vss_db AS existing
ON new_values.store_id = existing.store_id
AND new_values.key = existing.key
WHERE CASE
WHEN new_values.version >= 4294967295 THEN new_values.version >= COALESCE(existing.version, -1)
ELSE new_values.version > COALESCE(existing.version, -1)
END
ON CONFLICT (store_id, key)
DO UPDATE SET value = excluded.value,
version = excluded.version,
deleted = false;

END;
$$ LANGUAGE plpgsql;

-- modified upsert_vss_db but to delete
CREATE OR REPLACE FUNCTION delete_item(
p_store_id TEXT,
p_key TEXT,
p_version BIGINT
) RETURNS VOID AS
$$
BEGIN

WITH new_values (store_id, key, version) AS (VALUES (p_store_id, p_key, p_version))
INSERT
INTO vss_db
(store_id, key, version)
SELECT new_values.store_id,
new_values.key,
new_values.version
FROM new_values
LEFT JOIN vss_db AS existing
ON new_values.store_id = existing.store_id
AND new_values.key = existing.key
WHERE CASE
WHEN new_values.version >= 4294967295 THEN new_values.version >= COALESCE(existing.version, -1)
ELSE new_values.version > COALESCE(existing.version, -1)
END
ON CONFLICT (store_id, key)
DO UPDATE SET value = NULL,
version = excluded.version,
deleted = true;

END;
$$ LANGUAGE plpgsql;
2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ async fn main() -> anyhow::Result<()> {
.route("/v2/putObjects", put(put_objects))
.route("/listKeyVersions", post(list_key_versions))
.route("/v2/listKeyVersions", post(list_key_versions))
.route("/deleteObject", post(delete_object))
.route("/v2/deleteObject", post(delete_object))
.route("/migration", get(migration::migration))
.fallback(fallback)
.layer(
Expand Down
65 changes: 65 additions & 0 deletions src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub struct VssItem {

created_date: chrono::NaiveDateTime,
updated_date: chrono::NaiveDateTime,

pub deleted: bool,
}

impl VssItem {
Expand Down Expand Up @@ -68,6 +70,21 @@ impl VssItem {
Ok(())
}

pub fn delete_item(
conn: &mut PgConnection,
store_id: &str,
key: &str,
version: i64,
) -> anyhow::Result<()> {
sql_query("SELECT delete_item($1, $2, $3)")
.bind::<Text, _>(store_id)
.bind::<Text, _>(key)
.bind::<BigInt, _>(version)
.execute(conn)?;

Ok(())
}

pub fn list_key_versions(
conn: &mut PgConnection,
store_id: &str,
Expand Down Expand Up @@ -209,6 +226,54 @@ mod test {
clear_database(&state);
}

#[tokio::test]
async fn test_delete() {
let state = init_state();
clear_database(&state);

let store_id = "test_store_id";
let key = "test";
let value = [1, 2, 3];
let version = 0;

let mut conn = state.db_pool.get().unwrap();
VssItem::put_item(&mut conn, store_id, key, &value, version).unwrap();

let versions = VssItem::list_key_versions(&mut conn, store_id, None).unwrap();

assert_eq!(versions.len(), 1);
assert_eq!(versions[0].0, key);
assert_eq!(versions[0].1, version);

// delete item
let new_version = version + 1;
VssItem::delete_item(&mut conn, store_id, key, new_version).unwrap();
let item = VssItem::get_item(&mut conn, store_id, key)
.unwrap()
.unwrap();
assert!(item.value.is_none());
assert!(item.deleted);

// bring item back with higher version

let final_value = [4, 5, 6];
let final_version = new_version + 1;

VssItem::put_item(&mut conn, store_id, key, &final_value, final_version).unwrap();

let item = VssItem::get_item(&mut conn, store_id, key)
.unwrap()
.unwrap();

assert_eq!(item.store_id, store_id);
assert_eq!(item.key, key);
assert_eq!(item.value.unwrap(), final_value);
assert_eq!(item.version, final_version);
assert!(!item.deleted);

clear_database(&state);
}

#[tokio::test]
async fn test_list_key_versions() {
let state = init_state();
Expand Down
1 change: 1 addition & 0 deletions src/models/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ diesel::table! {
version -> Int8,
created_date -> Timestamp,
updated_date -> Timestamp,
deleted -> Bool,
}
}
40 changes: 40 additions & 0 deletions src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,46 @@ pub async fn list_key_versions(
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeleteObjectRequest {
pub store_id: Option<String>,
pub key: String,
pub version: i64,
}

async fn delete_object_impl(req: DeleteObjectRequest, state: &State) -> anyhow::Result<()> {
let store_id = req.store_id.expect("must have");

let mut conn = state.db_pool.get()?;

VssItem::delete_item(&mut conn, &store_id, &req.key, req.version)?;

Ok(())
}

pub async fn delete_object(
origin: Option<TypedHeader<Origin>>,
auth: Option<TypedHeader<Authorization<Bearer>>>,
Extension(state): Extension<State>,
Json(mut payload): Json<DeleteObjectRequest>,
) -> Result<Json<()>, (StatusCode, String)> {
if !state.self_hosted {
validate_cors(origin)?;
}

let store_id = auth
.map(|TypedHeader(token)| verify_token(token.token(), &state))
.transpose()?
.flatten();

ensure_store_id!(payload, store_id);

match delete_object_impl(payload, &state).await {
Ok(res) => Ok(Json(res)),
Err(e) => Err(handle_anyhow_error("delete_object", e)),
}
}

pub async fn health_check() -> Result<Json<()>, (StatusCode, String)> {
Ok(Json(()))
}
Expand Down
Loading