Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ cargo run --example dataframe

## Single Process

- [`advanced_udaf.rs`](examples/advanced_udaf.rs): Define and invoke a more complicated User Defined Aggregate Function (UDAF)
- [`advanced_udf.rs`](examples/advanced_udf.rs): Define and invoke a more complicated User Defined Scalar Function (UDF)
- [`advanced_udwf.rs`](examples/advanced_udwf.rs): Define and invoke a more complicated User Defined Window Function (UDWF)
- [`examples/udf/advanced_udaf.rs`](examples/udf/advanced_udaf.rs): Define and invoke a more complicated User Defined Aggregate Function (UDAF)
- [`examples/udf/advanced_udf.rs`](examples/udf/advanced_udf.rs): Define and invoke a more complicated User Defined Scalar Function (UDF)
- [`examples/udf/advanced_udwf.rs`](examples/udf/advanced_udwf.rs): Define and invoke a more complicated User Defined Window Function (UDWF)
- [`advanced_parquet_index.rs`](examples/advanced_parquet_index.rs): Creates a detailed secondary index that covers the contents of several parquet files
- [`async_udf.rs`](examples/async_udf.rs): Define and invoke an asynchronous User Defined Scalar Function (UDF)
- [`examples/udf/async_udf.rs`](examples/udf/async_udf.rs): Define and invoke an asynchronous User Defined Scalar Function (UDF)
- [`analyzer_rule.rs`](examples/analyzer_rule.rs): Use a custom AnalyzerRule to change a query's semantics (row level access control)
- [`catalog.rs`](examples/catalog.rs): Register the table into a custom catalog
- [`composed_extension_codec`](examples/composed_extension_codec.rs): Example of using multiple extension codecs for serialization / deserialization
Expand Down Expand Up @@ -83,9 +83,10 @@ cargo run --example dataframe
- [`query-http-csv.rs`](examples/query-http-csv.rs): Configure `object_store` and run a query against files vi HTTP
- [`regexp.rs`](examples/regexp.rs): Examples of using regular expression functions
- [`remote_catalog.rs`](examples/regexp.rs): Examples of interfacing with a remote catalog (e.g. over a network)
- [`simple_udaf.rs`](examples/simple_udaf.rs): Define and invoke a User Defined Aggregate Function (UDAF)
- [`simple_udf.rs`](examples/simple_udf.rs): Define and invoke a User Defined Scalar Function (UDF)
- [`simple_udfw.rs`](examples/simple_udwf.rs): Define and invoke a User Defined Window Function (UDWF)
- [`examples/udf/simple_udaf.rs`](examples/udf/simple_udaf.rs): Define and invoke a User Defined Aggregate Function (UDAF)
- [`examples/udf/simple_udf.rs`](examples/udf/simple_udf.rs): Define and invoke a User Defined Scalar Function (UDF)
- [`examples/udf/simple_udtf.rs`](examples/udf/simple_udtf.rs): Define and invoke a User Defined Table Function (UDTF)
- [`examples/udf/simple_udfw.rs`](examples/udf/simple_udwf.rs): Define and invoke a User Defined Window Function (UDWF)
- [`sql_analysis.rs`](examples/sql_analysis.rs): Analyse SQL queries with DataFusion structures
- [`sql_frontend.rs`](examples/sql_frontend.rs): Create LogicalPlans (only) from sql strings
- [`sql_dialect.rs`](examples/sql_dialect.rs): Example of implementing a custom SQL dialect on top of `DFParser`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,8 +469,9 @@ fn create_context() -> Result<SessionContext> {
Ok(ctx)
}

#[tokio::main]
async fn main() -> Result<()> {
/// In this example we register `GeoMeanUdaf` and `SimplifiedGeoMeanUdaf`
/// as user defined aggregate functions and invoke them via the DataFrame API and SQL
pub async fn advanced_udaf() -> Result<()> {
let ctx = create_context()?;

let geo_mean_udf = AggregateUDF::from(GeoMeanUdaf::new());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,35 @@ fn maybe_pow_in_place(base: f64, exp_array: ArrayRef) -> Result<ArrayRef> {
}
}

/// create local execution context with an in-memory table:
///
/// ```text
/// +-----+-----+
/// | a | b |
/// +-----+-----+
/// | 2.1 | 1.0 |
/// | 3.1 | 2.0 |
/// | 4.1 | 3.0 |
/// | 5.1 | 4.0 |
/// +-----+-----+
/// ```
fn create_context() -> Result<SessionContext> {
// define data.
let a: ArrayRef = Arc::new(Float32Array::from(vec![2.1, 3.1, 4.1, 5.1]));
let b: ArrayRef = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0]));
let batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b)])?;

// declare a new context. In Spark API, this corresponds to a new SparkSession
let ctx = SessionContext::new();

// declare a table in memory. In Spark API, this corresponds to createDataFrame(...).
ctx.register_batch("t", batch)?;
Ok(ctx)
}

/// In this example we register `PowUdf` as a user defined function
/// and invoke it via the DataFrame API and SQL
#[tokio::main]
async fn main() -> Result<()> {
pub async fn advanced_udf() -> Result<()> {
let ctx = create_context()?;

// create the UDF
Expand Down Expand Up @@ -295,29 +320,3 @@ async fn main() -> Result<()> {

Ok(())
}

/// create local execution context with an in-memory table:
///
/// ```text
/// +-----+-----+
/// | a | b |
/// +-----+-----+
/// | 2.1 | 1.0 |
/// | 3.1 | 2.0 |
/// | 4.1 | 3.0 |
/// | 5.1 | 4.0 |
/// +-----+-----+
/// ```
fn create_context() -> Result<SessionContext> {
// define data.
let a: ArrayRef = Arc::new(Float32Array::from(vec![2.1, 3.1, 4.1, 5.1]));
let b: ArrayRef = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0]));
let batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b)])?;

// declare a new context. In Spark API, this corresponds to a new SparkSession
let ctx = SessionContext::new();

// declare a table in memory. In Spark API, this corresponds to createDataFrame(...).
ctx.register_batch("t", batch)?;
Ok(ctx)
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,9 @@ async fn create_context() -> Result<SessionContext> {
Ok(ctx)
}

#[tokio::main]
async fn main() -> Result<()> {
/// In this example we register `SmoothItUdf` as user defined window function
/// and invoke it via the DataFrame API and SQL
pub async fn advanced_udwf() -> Result<()> {
let ctx = create_context().await?;
let smooth_it = WindowUDF::from(SmoothItUdf::new());
ctx.register_udwf(smooth_it.clone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ use datafusion::prelude::{SessionConfig, SessionContext};
use std::any::Any;
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<()> {
/// In this example we register `AskLLM` as an asynchronous user defined function
/// and invoke it via the DataFrame API and SQL
pub async fn async_udf() -> Result<()> {
// Use a hard coded parallelism level of 4 so the explain plan
// is consistent across machines.
let config = SessionConfig::new().with_target_partitions(4);
Expand Down
133 changes: 133 additions & 0 deletions datafusion-examples/examples/udf/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! # User-Defined Functions Examples
//!
//! These examples demonstrate user-defined functions in DataFusion.
//!
//! Each subcommand runs a corresponding example:
//! - `adv_udaf` — user defined aggregate function example
//! - `adv_udf` — user defined scalar function example
//! - `adv_udwf` — user defined window function example
//! - `async_udf` — asynchronous user defined function example
//! - `udaf` — simple user defined aggregate function example
//! - `udf` — simple user defined scalar function example
//! - `udtf` — simple user defined table function example
//! - `udwf` — simple user defined window function example

mod advanced_udaf;
mod advanced_udf;
mod advanced_udwf;
mod async_udf;
mod simple_udaf;
mod simple_udf;
mod simple_udtf;
mod simple_udwf;

use std::str::FromStr;

use datafusion::error::{DataFusionError, Result};

enum ExampleKind {
AdvUdaf,
AdvUdf,
AdvUdwf,
AsyncUdf,
Udf,
Udaf,
Udwf,
Udtf,
}

impl AsRef<str> for ExampleKind {
fn as_ref(&self) -> &str {
match self {
Self::AdvUdaf => "adv_udaf",
Self::AdvUdf => "adv_udf",
Self::AdvUdwf => "adv_udwf",
Self::AsyncUdf => "async_udf",
Self::Udf => "udf",
Self::Udaf => "udaf",
Self::Udwf => "udwt",
Self::Udtf => "udtf",
}
}
}

impl FromStr for ExampleKind {
type Err = DataFusionError;

fn from_str(s: &str) -> Result<Self> {
match s {
"adv_udaf" => Ok(Self::AdvUdaf),
"adv_udf" => Ok(Self::AdvUdf),
"adv_udwf" => Ok(Self::AdvUdwf),
"async_udf" => Ok(Self::AsyncUdf),
"udaf" => Ok(Self::Udaf),
"udf" => Ok(Self::Udf),
"udtf" => Ok(Self::Udtf),
"udwf" => Ok(Self::Udwf),
_ => Err(DataFusionError::Execution(format!("Unknown example: {s}"))),
}
}
}

impl ExampleKind {
const ALL: [Self; 8] = [
Self::AdvUdaf,
Self::AdvUdf,
Self::AdvUdwf,
Self::AsyncUdf,
Self::Udaf,
Self::Udf,
Self::Udtf,
Self::Udwf,
];

const EXAMPLE_NAME: &str = "udf";

fn variants() -> Vec<&'static str> {
Self::ALL.iter().map(|x| x.as_ref()).collect()
}
}

#[tokio::main]
async fn main() -> Result<()> {
let usage = format!(
"Usage: cargo run --example {} -- [{}]",
ExampleKind::EXAMPLE_NAME,
ExampleKind::variants().join("|")
);

let arg = std::env::args().nth(1).ok_or_else(|| {
eprintln!("{usage}");
DataFusionError::Execution("Missing argument".to_string())
})?;

match arg.parse::<ExampleKind>()? {
ExampleKind::AdvUdaf => advanced_udaf::advanced_udaf().await?,
ExampleKind::AdvUdf => advanced_udf::advanced_udf().await?,
ExampleKind::AdvUdwf => advanced_udwf::advanced_udwf().await?,
ExampleKind::AsyncUdf => async_udf::async_udf().await?,
ExampleKind::Udaf => simple_udaf::simple_udaf().await?,
ExampleKind::Udf => simple_udf::simple_udf().await?,
ExampleKind::Udtf => simple_udtf::simple_udtf().await?,
ExampleKind::Udwf => simple_udwf::simple_udwf().await?,
}

Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,9 @@ impl Accumulator for GeometricMean {
}
}

#[tokio::main]
async fn main() -> Result<()> {
/// In this example we register `GeometricMean`
/// as user defined aggregate function and invoke it via the DataFrame API and SQL
pub async fn simple_udaf() -> Result<()> {
let ctx = create_context()?;

// here is where we define the UDAF. We also declare its signature:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ fn create_context() -> Result<SessionContext> {
}

/// In this example we will declare a single-type, single return type UDF that exponentiates f64, a^b
#[tokio::main]
async fn main() -> Result<()> {
pub async fn simple_udf() -> Result<()> {
let ctx = create_context()?;

// First, declare the actual implementation of the calculation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ use std::sync::Arc;
// 3. Register the function using [`SessionContext::register_udtf`]

/// This example demonstrates how to register a TableFunction
#[tokio::main]
async fn main() -> Result<()> {
pub async fn simple_udtf() -> Result<()> {
// create local execution context
let ctx = SessionContext::new();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ async fn create_context() -> Result<SessionContext> {
}

/// In this example we will declare a user defined window function that computes a moving average and then run it using SQL
#[tokio::main]
async fn main() -> Result<()> {
pub async fn simple_udwf() -> Result<()> {
let ctx = create_context().await?;

// here is where we define the UDWF. We also declare its signature:
Expand Down