Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expand ListingSchemaProvider to support register and deregister table #3150

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
63 changes: 43 additions & 20 deletions crates/core/src/data_catalog/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use async_trait::async_trait;
use dashmap::DashMap;
Expand All @@ -13,9 +13,9 @@ use futures::TryStreamExt;
use object_store::ObjectStore;

use crate::errors::DeltaResult;
use crate::open_table_with_storage_options;
use crate::storage::*;
use crate::table::builder::ensure_table_uri;
use crate::DeltaTableBuilder;

const DELTA_LOG_FOLDER: &str = "_delta_log";

Expand All @@ -36,7 +36,7 @@ pub struct ListingSchemaProvider {
/// Underlying object store
store: Arc<dyn ObjectStore>,
/// A map of table names to a fully quilfied storage location
tables: DashMap<String, String>,
tables: Mutex<DashMap<String, Arc<dyn TableProvider>>>,
/// Options used to create underlying object stores
storage_options: StorageOptions,
}
Expand All @@ -54,7 +54,7 @@ impl ListingSchemaProvider {
Ok(Self {
authority: uri.to_string(),
store,
tables: DashMap::new(),
tables: Mutex::new(DashMap::new()),
storage_options,
})
}
Expand All @@ -73,6 +73,7 @@ impl ListingSchemaProvider {
parent = p;
}
}

for table in tables.into_iter() {
let table_name = normalize_table_name(table)?;
let table_path = table
Expand All @@ -81,7 +82,14 @@ impl ListingSchemaProvider {
.to_string();
if !self.table_exist(&table_name) {
let table_url = format!("{}/{}", self.authority, table_path);
self.tables.insert(table_name.to_string(), table_url);
let Ok(delta_table) = DeltaTableBuilder::from_uri(table_url)
.with_storage_options(self.storage_options.0.clone())
.load()
.await
else {
continue;
};
let _ = self.register_table(table_name, Arc::new(delta_table));
}
}
Ok(())
Expand All @@ -108,39 +116,54 @@ impl SchemaProvider for ListingSchemaProvider {
}

fn table_names(&self) -> Vec<String> {
self.tables.iter().map(|t| t.key().clone()).collect()
self.tables
.lock()
.expect("Can't lock tables")
.iter()
.map(|t| t.key().clone())
.collect()
}

async fn table(&self, name: &str) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
let Some(location) = self.tables.get(name).map(|t| t.clone()) else {
let Some(provider) = self
.tables
.lock()
.expect("Can't lock tables")
.get(name)
.map(|t| t.clone())
else {
return Ok(None);
};
let provider =
open_table_with_storage_options(location, self.storage_options.0.clone()).await?;
Ok(Some(Arc::new(provider) as Arc<dyn TableProvider>))
Ok(Some(provider))
}

fn register_table(
&self,
_name: String,
_table: Arc<dyn TableProvider>,
name: String,
table: Arc<dyn TableProvider>,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
Err(DataFusionError::Execution(
"schema provider does not support registering tables".to_owned(),
))
self.tables
.lock()
.expect("Can't lock tables")
.insert(name, table.clone());
Ok(Some(table))
}

fn deregister_table(
&self,
_name: &str,
name: &str,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
Err(DataFusionError::Execution(
"schema provider does not support deregistering tables".to_owned(),
))
if let Some(table) = self.tables.lock().expect("Can't lock tables").remove(name) {
return Ok(Some(table.1));
}
Ok(None)
}

fn table_exist(&self, name: &str) -> bool {
self.tables.contains_key(name)
self.tables
.lock()
.expect("Can't lock tables")
.contains_key(name)
}
}

Expand Down
Loading