Skip to content

Commit

Permalink
Merge branch 'main' into warehouse_rbac
Browse files Browse the repository at this point in the history
  • Loading branch information
TCeason authored Jan 16, 2025
2 parents 02cd206 + 6850110 commit c4935fc
Show file tree
Hide file tree
Showing 13 changed files with 303 additions and 42 deletions.
8 changes: 8 additions & 0 deletions .github/actions/pack_binaries/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ runs:
category: ${{ inputs.category }}
path: distro/bin
artifacts: metactl,meta,query,query.debug
- name: Download BendSQL
shell: bash
env:
GH_TOKEN: ${{ github.token }}
run: |
verison=$(gh release list --repo databendlabs/bendsql | head -n 1 | awk '{print $1}')
curl -sSLfo /tmp/bendsql.tar.gz https://github.com/databendlabs/bendsql/releases/download/${verison}/bendsql-${verison}-${{ inputs.target }}.tar.gz
tar -xzvf /tmp/bendsql.tar.gz -C distro/bin
- name: Pack Binaries
id: pack_binaries
shell: bash
Expand Down
8 changes: 3 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -369,10 +369,10 @@ num-derive = "0.3.3"
num-traits = "0.2.19"
num_cpus = "1.13.1"
object = "0.36.5"
object_store_opendal = { version = "0.49.0" }
object_store_opendal = { version = "0.49.0", package = "object_store_opendal", git = "https://github.com/apache/opendal", rev = "78b6a9f" }
once_cell = "1.15.0"
openai_api_rust = "0.1"
opendal = { version = "0.51.1", features = [
opendal = { version = "0.51.1", package = "opendal", git = "https://github.com/apache/opendal", rev = "78b6a9f", features = [
"layers-fastrace",
"layers-prometheus-client",
"layers-async-backtrace",
Expand Down Expand Up @@ -639,7 +639,7 @@ deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "3038c145" }
ethnum = { git = "https://github.com/datafuse-extras/ethnum-rs", rev = "4cb05f1" }
openai_api_rust = { git = "https://github.com/datafuse-extras/openai-api", rev = "819a0ed" }
openraft = { git = "https://github.com/databendlabs/openraft", tag = "v0.10.0-alpha.7" }
orc-rust = { git = "https://github.com/datafusion-contrib/orc-rust", rev = "dfb1ede" }
orc-rust = { git = "https://github.com/youngsofun/orc-rust", rev = "6c5ac57" }
recursive = { git = "https://github.com/datafuse-extras/recursive.git", rev = "6af35a1" }
sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafuse.1" }
tantivy = { git = "https://github.com/datafuse-extras/tantivy", rev = "7502370" }
Expand Down
12 changes: 7 additions & 5 deletions src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,13 @@ impl PipelineBuilder {
}

// unload spill metas
self.main_pipeline
.set_on_finished(always_callback(move |_info: &ExecutionInfo| {
self.ctx.unload_spill_meta();
Ok(())
}));
if !self.ctx.mark_unload_callbacked() {
self.main_pipeline
.set_on_finished(always_callback(move |_info: &ExecutionInfo| {
self.ctx.unload_spill_meta();
Ok(())
}));
}

Ok(PipelineBuildResult {
main_pipeline: self.main_pipeline,
Expand Down
9 changes: 9 additions & 0 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,9 @@ impl QueryContext {

pub fn update_init_query_id(&self, id: String) {
self.shared.spilled_files.write().clear();
self.shared
.unload_callbacked
.store(false, Ordering::Release);
self.shared.cluster_spill_progress.write().clear();
*self.shared.init_query_id.write() = id;
}
Expand Down Expand Up @@ -471,6 +474,12 @@ impl QueryContext {
Ok(table)
}

pub fn mark_unload_callbacked(&self) -> bool {
self.shared
.unload_callbacked
.fetch_or(true, Ordering::SeqCst)
}

pub fn unload_spill_meta(&self) {
const SPILL_META_SUFFIX: &str = ".list";
let r = self.shared.spilled_files.read();
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ pub struct QueryContextShared {
pub(in crate::sessions) cluster_spill_progress: Arc<RwLock<HashMap<String, SpillProgress>>>,
pub(in crate::sessions) spilled_files:
Arc<RwLock<HashMap<crate::spillers::Location, crate::spillers::Layout>>>,
pub(in crate::sessions) unload_callbacked: AtomicBool,
}

impl QueryContextShared {
Expand Down Expand Up @@ -209,6 +210,7 @@ impl QueryContextShared {

cluster_spill_progress: Default::default(),
spilled_files: Default::default(),
unload_callbacked: AtomicBool::new(false),
warehouse_cache: Arc::new(RwLock::new(None)),
}))
}
Expand Down
13 changes: 5 additions & 8 deletions src/query/sql/src/planner/binder/copy_into_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,15 +182,12 @@ impl Binder {
files: stmt.files.clone(),
pattern,
};
let required_values_schema: DataSchemaRef = Arc::new(
match &stmt.dst_columns {
Some(cols) => self.schema_project(&table.schema(), cols)?,
None => self.schema_project(&table.schema(), &[])?,
}
.into(),
);
let stage_schema = match &stmt.dst_columns {
Some(cols) => self.schema_project(&table.schema(), cols)?,
None => self.schema_project(&table.schema(), &[])?,
};

let stage_schema = infer_table_schema(&required_values_schema)?;
let required_values_schema: DataSchemaRef = Arc::new(stage_schema.clone().into());

let default_values = if stage_info.file_format_params.need_field_default() {
Some(
Expand Down
25 changes: 24 additions & 1 deletion src/query/sql/src/planner/expression_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,30 @@ pub fn parse_cluster_keys(
table_meta: Arc<dyn Table>,
ast_exprs: Vec<AExpr>,
) -> Result<Vec<Expr>> {
let exprs = parse_ast_exprs(ctx, table_meta, ast_exprs)?;
let schema = table_meta.schema();
let (mut bind_context, metadata) = bind_table(table_meta)?;
let settings = ctx.get_settings();
let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?;
let mut type_checker = TypeChecker::try_create(
&mut bind_context,
ctx,
&name_resolution_ctx,
metadata,
&[],
false,
)?;

let exprs: Vec<Expr> = ast_exprs
.iter()
.map(|ast| {
let (scalar, _) = *type_checker.resolve(ast)?;
let expr = scalar
.as_expr()?
.project_column_ref(|col| schema.index_of(&col.column_name).unwrap());
Ok(expr)
})
.collect::<Result<_>>()?;

let mut res = Vec::with_capacity(exprs.len());
for expr in exprs {
let inner_type = expr.data_type().remove_nullable();
Expand Down
Loading

0 comments on commit c4935fc

Please sign in to comment.