Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel workers not used on Postgres #3673

Open
twitu opened this issue Jan 9, 2025 · 2 comments
Open

Parallel workers not used on Postgres #3673

twitu opened this issue Jan 9, 2025 · 2 comments
Labels

Comments

@twitu
Copy link

twitu commented Jan 9, 2025

I have found these related issues/pull requests

I could not find any issues in sqlx repo related to parallel workers not being launched. However, this issue seems somewhat related to brianc/node-postgres#3344. The issue is showing very similar logs as the ones as I'm seeing and mentions that streaming query does not use parallel workers for some reason.

Description

Sqlx query is not taking advantage of parallel workers decided by query plan. It is taking double the time to execute query than other tools which are using parallel workers.

Reproduction steps

use eyre::Result;
use sqlx::{postgres::PgPoolOptions, Executor, Row};
use tracing;

#[tokio::main]
async fn main() -> Result<()> {
    // Connect to database
    let url = <TODO>;

    let pool = PgPoolOptions::new()
        .max_connections(5)
        .after_connect(|conn, _| {
            Box::pin(async move {
                tracing::info!("New connection established");
                // Enable statement logging
                conn.execute("SET log_statement = 'all'").await?;
                conn.execute("LOAD 'auto_explain'").await?;
                conn.execute("SET auto_explain.log_min_duration = '0ms'")
                    .await?;
                conn.execute("SET auto_explain.log_analyze = 'on'").await?;
                conn.execute("SET auto_explain.log_buffers = 'on'").await?;
                conn.execute("SET auto_explain.log_timing = 'on'").await?;

                let settings = conn.fetch_all("SHOW ALL").await?;
                tracing::info!("PostgreSQL settings: {:?}", settings);
                Ok(())
            })
        })
        .connect(url)
        .await
        .map_err(|e| eyre::eyre!("Failed to create database pool: {}", e))?;

    // Create a wide table with many columns
    sqlx::query(
        r#"
        CREATE TABLE IF NOT EXISTS wide_table (
            id SERIAL PRIMARY KEY,
            col1 TEXT, col2 TEXT, col3 TEXT, col4 TEXT, col5 TEXT,
            col6 TEXT, col7 TEXT, col8 TEXT, col9 TEXT, col10 TEXT,
            value INTEGER
        )"#,
    )
    .execute(&pool)
    .await?;

    // Insert dummy data (1M rows to force parallel scan)
    sqlx::query(
        r#"
        INSERT INTO wide_table (col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, value)
        SELECT 
            md5(random()::text), md5(random()::text), md5(random()::text),
            md5(random()::text), md5(random()::text), md5(random()::text),
            md5(random()::text), md5(random()::text), md5(random()::text),
            md5(random()::text), floor(random() * 1000000)::int
        FROM generate_series(1, 1000000)"#,
    )
    .execute(&pool)
    .await?;

    // Query that should trigger parallel scan
    println!("Executing query with sqlx...");
    let start = std::time::Instant::now();
    let result = sqlx::query(
        r#"
        SELECT sum(value) 
        FROM wide_table 
        WHERE value > 50000000"#,
    )
    .fetch_optional(&pool)
    .await?;

    println!("Query took: {:?}", start.elapsed());
    println!("Result: {}", result.is_some());

    // Compare with direct psql
    println!("\nTo compare, run this in psql:");
    println!(
        r#"
    EXPLAIN (ANALYZE, BUFFERS)
    SELECT sum(value) FROM wide_table WHERE value > 500000;
    "#
    );

    Ok(())
}

Db logs from running query with sqlx with the above script

2025-01-09 10:42:13.363 GMT [24484] LOG:  execute sqlx_s_2: 
	        SELECT sum(value) 
	        FROM wide_table 
	        WHERE value > 50000000
2025-01-09 10:42:13.612 GMT [24484] LOG:  duration: 249.180 ms  plan:
	Query Text: 
	        SELECT sum(value) 
	        FROM wide_table 
	        WHERE value > 50000000
	Finalize Aggregate  (cost=54440.44..54440.45 rows=1 width=8) (actual time=249.176..249.177 rows=1 loops=1)
	  Buffers: shared hit=887 read=46733 dirtied=46748 written=46733
	  ->  Gather  (cost=54440.22..54440.43 rows=2 width=8) (actual time=249.174..249.174 rows=1 loops=1)
	        Workers Planned: 2
	        Workers Launched: 0
	        Buffers: shared hit=887 read=46733 dirtied=46748 written=46733
	        ->  Partial Aggregate  (cost=53440.22..53440.23 rows=1 width=8) (actual time=249.173..249.174 rows=1 loops=1)
	              Buffers: shared hit=887 read=46733 dirtied=46748 written=46733
	              ->  Parallel Seq Scan on wide_table  (cost=0.00..53076.46 rows=145505 width=4) (actual time=249.171..249.171 rows=0 loops=1)
	                    Filter: (value > 50000000)
	                    Rows Removed by Filter: 1000000
	                    Buffers: shared hit=887 read=46733 dirtied=46748 written=46733

Db logs from running query with psql Workers Launched: 2

	Query Text: LOAD 'auto_explain';
	SET auto_explain.log_min_duration = '50ms';
	SET auto_explain.log_analyze = 'on';
	SET auto_explain.log_buffers = 'on';
	SET auto_explain.log_timing = 'on';
	EXPLAIN (ANALYZE, BUFFERS) SELECT sum(value) FROM wide_table WHERE value > 50000000
	
	Finalize Aggregate  (cost=53828.59..53828.60 rows=1 width=8) (actual time=231.420..232.170 rows=1 loops=1)
	  Buffers: shared hit=732 read=46888
	  ->  Gather  (cost=53828.38..53828.59 rows=2 width=8) (actual time=231.352..232.164 rows=3 loops=1)
	        Workers Planned: 2
	        Workers Launched: 2
	        Buffers: shared hit=732 read=46888
	        ->  Partial Aggregate  (cost=52828.38..52828.39 rows=1 width=8) (actual time=227.663..227.663 rows=1 loops=3)
	              Buffers: shared hit=732 read=46888
	              ->  Parallel Seq Scan on wide_table  (cost=0.00..52828.27 rows=42 width=4) (actual time=227.659..227.660 rows=0 loops=3)
	                    Filter: (value > 50000000)
	                    Rows Removed by Filter: 333333
	                    Buffers: shared hit=732 read=46888

SQLx version

"0.8.2"

Enabled SQLx features

"runtime-tokio-native-tls", "postgres", "time"

Database server and version

{ "version": "PostgreSQL 14.5 on aarch64-apple-darwin21.3.0, compiled by clang version 11.1.0, 64-bit" }

Operating system

MacOS 15.11

Rust version

rustc 1.82.0 (f6e511eec 2024-10-15) (built from a source tarball)

@twitu twitu added the bug label Jan 9, 2025
@abonander
Copy link
Collaborator

https://www.postgresql.org/docs/current/when-can-parallel-query-be-used.html

Even when a parallel query plan is generated for a particular query, there are several circumstances under which it will be impossible to execute that plan in parallel at execution time. If this occurs, the leader will execute the portion of the plan below the Gather node entirely by itself, almost as if the Gather node were not present. This will happen if any of the following conditions are met:

[...]
The client sends an Execute message with a non-zero fetch count. See the discussion of the extended query protocol. Since libpq currently provides no way to send such a message, this can only occur when using a client that does not rely on libpq. If this is a frequent occurrence, it may be a good idea to set max_parallel_workers_per_gather to zero in sessions where it is likely, so as to avoid generating query plans that may be suboptimal when run serially.

This is an extremely unfortunate coincidence.

We currently send an Execute with a limit of 1 for fetch_optional and fetch_one to ensure the server does not waste resources sending additional rows if the user forgot to ensure that the query only returns one row. If the query returns additional rows, the server will be forced to send them all, and the driver would have to read through and discard them before it would be able to issue another command.

The only way to escape this requires closing the connection or calling pg_cancel_backend() from another connection.

We do send a fetch limit of zero for .fetch() but this also means that the server will keep sending rows even if the stream is dropped. I had plans to use non-zero fetch limits in the future to allow queries to be cancelled from the client side, since the driver could request rows in batches.

This must be what node-postgres does, which is why it has this problem with streaming queries but we don't.

I would personally consider this a Postgres bug, although I can sort of understand why they punted on this. Having to enforce a limit would require additional synchronization between the workers. Though it makes me wonder how it handles LIMIT at the query level. The query planner must take that into account somehow, but the limit from the Execute message must not trigger a re-plan.

For aggregate queries like SELECT sum(...), however, I would expect this to be trivial to handle. Without windowing, your repro query will only ever produce at most one row, so the limit on the Execute message shouldn't matter. That's likely just a special case that hasn't been covered.

The fix is theoretically trivial, as we could just always send a zero for the limit on Execute:

limit: limit.into(),

This wouldn't likely be an issue for .fetch_one() and .fetch_optional() as long as the user is careful to ensure the query doesn't return a lot of additional rows. For completeness, we already warn about this in the documentation: https://docs.rs/sqlx/latest/sqlx/query/struct.Query.html#note-for-best-performance-ensure-the-query-returns-at-most-one-row

For .fetch(), however, we'd probably want to make this behavior toggleable, to ensure that users who need interruptible queries can have them.

Ultimately, it'd be nice if Postgres just handled this correctly, but someone would have to put in that work.

@twitu
Copy link
Author

twitu commented Jan 10, 2025

Thank you for sharing your thoughts on this. Indeed, this is a subtle implementation detail.

So my understanding is that without limit = 0, the db will send all the selected rows. Even if fetch_onedrops the stream after reading one row, the db will keep producing rows. This of course means wasted computation for a single row fetch. However setting limit = 1, means that the executor does not use parallel workers.

It is tricky because while fetch_one kind of implies that executor will only do the work needed to fetch one row but I also want the single row fetch to be as fast as possible. I, personally, am more inclined towards having the limit set to 0 for fetch_one and fetch_optional especially with the docs warning.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants