Skip to content

Support parallel iteration of rows from query or speed up arrow batching. #189

@theelderbeever

Description

@theelderbeever

In order to use something like rayon and do parallel iteration of data in a column the has to be loaded into a Vec<RecordBatch> with .query_arrow. This turns out to be extremely slow. It would be great to be able to use par_iter to process data as it is read.

The small example below reads in a column from a parquet file of 100M rows or hexidecimal strings and then performs a big sum. The rough timing results are shown below but the take away is that 5 of the 5.5 seconds that the process takes (on my computer) is all loading the arrow batches. This is actually twice as slow as duckdb.sql(...).arrow() in python. Additionally, reading the data into a polars dataframe is even faster. Examples shown below.

Parquet File Example
image

use duckdb::arrow::array::StringArray;
use duckdb::Connection;
use ibig::ibig;
use ibig::IBig;
use rayon::prelude::*;
use std::time::Instant;

fn main() {
    let conn = Connection::open_in_memory().unwrap();
    conn.execute("PRAGMA threads = 8", []).unwrap();
    let t0 = Instant::now();
    let file = std::env::args().nth(1).unwrap();
    let mut stmt = conn
        .prepare(&format!("SELECT hex FROM read_parquet('{}')", file))
        .unwrap();
    let t1 = Instant::now();
    let batches: Vec<_> = stmt.query_arrow([]).unwrap().collect();
    let t2 = Instant::now();
    let sum = batches
        .par_iter()
        .fold(
            || ibig!(0),
            |acc, batch| {
                let hex_column = batch
                    .column(
                        batch
                            .schema()
                            .index_of("hex")
                            .expect("The dataset should have a 'hex' column"),
                    )
                    .as_any()
                    .downcast_ref::<StringArray>();
                match hex_column {
                    Some(values) => {
                        let value = values.iter().fold(ibig!(0), |acc, row| {
                            acc + IBig::from_str_with_radix_prefix(row.unwrap()).unwrap()
                        });
                        value + acc
                    }
                    None => acc,
                }
            },
        )
        .reduce(|| ibig!(0), |acc, v| acc + v);
    let tf = Instant::now();

    println!(
        "Read: {:?}\tConvert: {:?}\tCompute{:?}",
        t1 - t0,
        t2 - t1,
        tf - t1
    );
    println!("{:?}: {:?}", tf - t0, sum)
}

Results:

Read: 1.972417ms        Convert: 4.914973125s   Compute5.560414708s
5.562387125s: 199979000000000000000000000000

Now for polars:

use ibig::{ibig, IBig};

use polars::prelude::*;
use rayon::prelude::*;
use std::fs::File;
use std::time::Instant;

fn read_dataframe() -> PolarsResult<DataFrame> {
    let file = std::env::args().nth(1).unwrap();
    let r = File::open(file).unwrap();
    let reader = ParquetReader::new(r);
    reader.finish()
}

fn main() {
    let t1 = Instant::now();
    let df = read_dataframe().unwrap();
    let s = df.column("hex").unwrap().utf8().unwrap().rechunk();
    let v: Vec<Option<&str>> = s.into_iter().collect();
    let sum: IBig = v
        .into_par_iter()
        .fold(
            || ibig!(0),
            |acc, x| acc + IBig::from_str_with_radix_prefix(x.unwrap()).unwrap(),
        )
        .reduce(|| ibig!(0), |acc, x| acc + x);
    let finish = Instant::now();
    println!("{:?}: {:?}", finish - t1, sum);
}

Results

3.110326833s: 199979000000000000000000000000

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions