Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions uc-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ tracing = "0.1"
url = "2.5"
chrono = { version = "0.4", features = ["serde"] }

[features]
default = []
test-utils = []

[dev-dependencies]
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
clap = { version = "4.5", features = ["derive", "env"] }
280 changes: 280 additions & 0 deletions uc-client/src/commits_client/in_memory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
//! In-memory implementation of [`UCCommitsClient`] for testing.
use std::collections::HashMap;
use std::sync::RwLock;

use crate::error::{Error, Result};
use crate::models::commits::{Commit, CommitRequest, CommitsRequest, CommitsResponse};

use super::UCCommitsClient;

// ============================================================================
// TableData
// ============================================================================

/// In-memory representation of a UC-managed Delta table's commit state.
struct TableData {
/// The highest version that has been ratified (committed) to this table.
max_ratified_version: i64,
Comment on lines +17 to +18
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my knowledge: do we use the word "ratified" as a specific state in design docs for catalog-based commits?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not just design docs, but also the table feature protocol specification, too. https://github.com/delta-io/delta/blob/master/protocol_rfcs/catalog-managed.md#terminology-commits

Writers propose commits to the catalog, which can either reject the commit, or approve (aka "ratify") it.

/// Commits that have been registered with UC but not yet published.
catalog_commits: Vec<Commit>,
}

impl TableData {
const MAX_UNPUBLISHED_COMMITS: u16 = 50;

/// Creates a new `TableData` representing a UC Delta table that has just been created.
/// The table starts with no commits and version 0.
fn new_post_table_create() -> Self {
Self {
max_ratified_version: 0,
catalog_commits: vec![],
}
}

/// Returns commits within the requested version range.
fn get_commits(&self, request: CommitsRequest) -> Result<CommitsResponse> {
let start = request.start_version.unwrap_or(0);
let end = request.end_version.unwrap_or(i64::MAX);

Ok(CommitsResponse {
commits: Some(
self.catalog_commits
.iter()
.filter(|commit| start <= commit.version && commit.version <= end)
.cloned()
.collect(),
),
latest_table_version: self.max_ratified_version,
})
}

/// Registers a new commit. Returns an error if the version is not the expected next version
/// or if the number of unpublished commits exceeds the maximum.
fn commit(&mut self, request: CommitRequest) -> Result<()> {
let Some(commit) = request.commit_info else {
return Err(Error::UnsupportedOperation(
"commit_info is required".to_string(),
));
};

let expected_version = self.max_ratified_version + 1;

if commit.version != expected_version {
return Err(Error::UnsupportedOperation(format!(
"Expected commit version {} but got {}",
expected_version, commit.version
)));
}

if self.catalog_commits.len() as u16 >= Self::MAX_UNPUBLISHED_COMMITS {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the u16 instead of usize?

return Err(Error::MaxUnpublishedCommitsExceeded(
Self::MAX_UNPUBLISHED_COMMITS,
));
}

if let Some(v) = request.latest_backfilled_version {
self.cleanup_published_commits(v);
}

self.catalog_commits.push(commit);
self.max_ratified_version = expected_version;

Ok(())
}

/// Removes commits that have been published (backfilled) to the Delta log.
fn cleanup_published_commits(&mut self, max_published_version: i64) {
self.catalog_commits
.retain(|commit| max_published_version < commit.version);
}
}

// ============================================================================
// InMemoryCommitsClient
// ============================================================================

/// An in-memory implementation of [`UCCommitsClient`] for testing.
pub struct InMemoryCommitsClient {
// table id -> table data
tables: RwLock<HashMap<String, TableData>>,
}

impl InMemoryCommitsClient {
pub fn new() -> Self {
Self {
tables: RwLock::new(HashMap::new()),
}
}

pub fn create_table(&self, table_id: impl Into<String>) -> Result<()> {
let mut tables = self.tables.write().unwrap();
match tables.entry(table_id.into()) {
std::collections::hash_map::Entry::Vacant(e) => {
Copy link
Collaborator

@OussamaSaoudi OussamaSaoudi Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe use std::collections::hash_map; above?

Suggested change
std::collections::hash_map::Entry::Vacant(e) => {
hash_map::Entry::Vacant(e) => {

e.insert(TableData::new_post_table_create());
Ok(())
}
std::collections::hash_map::Entry::Occupied(e) => Err(Error::UnsupportedOperation(
format!("Table {} already exists", e.key()),
)),
}
}
}

impl Default for InMemoryCommitsClient {
fn default() -> Self {
Self::new()
}
}

impl UCCommitsClient for InMemoryCommitsClient {
async fn get_commits(&self, request: CommitsRequest) -> Result<CommitsResponse> {
let tables = self.tables.read().unwrap();
Copy link
Collaborator

@OussamaSaoudi OussamaSaoudi Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we expect external folks to use this test? We should probably make this an error. Maybe a Fatal variant?

NVM, seems this is test-only.

let table = tables
.get(&request.table_id)
.ok_or_else(|| Error::TableNotFound(request.table_id.clone()))?;
table.get_commits(request)
}

async fn commit(&self, request: CommitRequest) -> Result<()> {
let mut tables = self.tables.write().unwrap();
let table = tables
.get_mut(&request.table_id)
.ok_or_else(|| Error::TableNotFound(request.table_id.clone()))?;
table.commit(request)
}
}

// ============================================================================
// Tests
// ============================================================================

#[cfg(test)]
mod tests {
use super::*;

const TABLE_ID: &str = "test-table-id";
const TABLE_URI: &str = "s3://bucket/table";

fn make_commit(version: i64) -> Commit {
Commit::new(
version,
version * 1000,
format!("{:020}.json", version),
100,
version * 1000,
)
}

fn commit_request(version: i64, latest_backfilled_version: Option<i64>) -> CommitRequest {
CommitRequest::new(
TABLE_ID,
TABLE_URI,
make_commit(version),
latest_backfilled_version,
)
}

fn get_commits_request() -> CommitsRequest {
CommitsRequest::new(TABLE_ID, TABLE_URI)
}

fn extract_commit_versions(commits: &[Commit]) -> Vec<i64> {
commits.iter().map(|c| c.version).collect()
}

#[tokio::test]
async fn test_commit_and_get_commits() {
let client = InMemoryCommitsClient::new();

// Create table
client.create_table(TABLE_ID).unwrap();

// Insert 10 commits (versions 1-10)
for v in 1..=10 {
client.commit(commit_request(v, None)).await.unwrap();
}

// Get commits (versions 3-8)
let commits_request = get_commits_request()
.with_start_version(3)
.with_end_version(8);
let response = client.get_commits(commits_request).await.unwrap();
let commits = response.commits.unwrap();
assert_eq!(commits.len(), 6);
assert_eq!(extract_commit_versions(&commits), vec![3, 4, 5, 6, 7, 8]);
assert_eq!(response.latest_table_version, 10);

// Insert commit 11 with latest_backfilled_version = 5
// This should cleanup commits 1-5 (retain versions >= 6)
client.commit(commit_request(11, Some(5))).await.unwrap();

// Get commits again - should return versions 6-11
let response = client.get_commits(get_commits_request()).await.unwrap();
let commits = response.commits.unwrap();
assert_eq!(extract_commit_versions(&commits), vec![6, 7, 8, 9, 10, 11]);
assert_eq!(response.latest_table_version, 11);
}

#[test]
fn test_create_table_duplicate_throws() {
let client = InMemoryCommitsClient::new();
client.create_table(TABLE_ID).unwrap();
assert!(matches!(
client.create_table(TABLE_ID),
Err(Error::UnsupportedOperation(_))
));
}

#[tokio::test]
async fn test_get_commits_table_not_found() {
assert!(matches!(
InMemoryCommitsClient::new()
.get_commits(get_commits_request())
.await,
Err(Error::TableNotFound(_))
));
}

#[tokio::test]
async fn test_commit_table_not_found() {
assert!(matches!(
InMemoryCommitsClient::new()
.commit(commit_request(1, None))
.await,
Err(Error::TableNotFound(_))
));
}

#[tokio::test]
async fn test_commit_wrong_version() {
let client = InMemoryCommitsClient::new();
client.create_table(TABLE_ID).unwrap();
assert!(matches!(
client.commit(commit_request(5, None)).await,
Err(Error::UnsupportedOperation(_))
));
}

#[tokio::test]
async fn test_get_commits_empty_table() {
let client = InMemoryCommitsClient::new();
client.create_table(TABLE_ID).unwrap();
let response = client.get_commits(get_commits_request()).await.unwrap();
assert!(response.commits.unwrap().is_empty());
assert_eq!(response.latest_table_version, 0);
}

#[tokio::test]
async fn test_commit_max_unpublished_commits_exceeded() {
let client = InMemoryCommitsClient::new();
client.create_table(TABLE_ID).unwrap();
for v in 1..=50 {
client.commit(commit_request(v, None)).await.unwrap();
}
assert!(matches!(
client.commit(commit_request(51, None)).await,
Err(Error::MaxUnpublishedCommitsExceeded(_))
));
}
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename from commits_client.rs to mod.rs. Glad GitHub captures this succinctly!

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ use crate::error::Result;
use crate::http::{build_http_client, execute_with_retry, handle_response};
use crate::models::commits::{CommitRequest, CommitsRequest, CommitsResponse};

#[cfg(any(test, feature = "test-utils"))]
mod in_memory;

#[cfg(any(test, feature = "test-utils"))]
pub use in_memory::InMemoryCommitsClient;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The InMemory UC-Commits-Client is only visible when (a) inside tests, and (b) using the test-utils feature flag.


/// Trait for UC commits API operations.
///
/// Implementations of this trait are responsible for performing any necessary retries on transient
Expand Down
3 changes: 3 additions & 0 deletions uc-client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ pub enum Error {

#[error("Max retries exceeded")]
MaxRetriesExceeded,

#[error("Max unpublished commits exceeded (max: {0})")]
MaxUnpublishedCommitsExceeded(u16),
}

pub type Result<T> = std::result::Result<T, Error>;
3 changes: 3 additions & 0 deletions uc-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ pub use commits_client::{UCCommitsClient, UCCommitsRestClient};
pub use config::{ClientConfig, ClientConfigBuilder};
pub use error::{Error, Result};

#[cfg(any(test, feature = "test-utils"))]
pub use commits_client::InMemoryCommitsClient;

#[doc(hidden)]
pub mod prelude {
pub use crate::client::UCClient;
Expand Down
Loading