Skip to content

Create pudl.duckdb from parquet files #3741

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docker/gcp_pudl_etl.sh
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ function distribute_parquet() {
fi
}

function create_and_distribute_duckdb() {
echo "Creating DuckDB file from parquet files and copying it to GCS"
GCS_PATH="gs://superset.catalyst.coop"
parquet_to_duckdb "$PUDL_OUTPUT/parquet" "$PUDL_OUTPUT/pudl.duckdb" && \
gsutil -q -m -u "$GCP_BILLING_PROJECT" cp "$PUDL_OUTPUT/pudl.duckdb" "$GCS_PATH"
}

function copy_outputs_to_distribution_bucket() {
# Only attempt to update outputs if we have a real value of BUILD_REF
# This avoids accidentally blowing away the whole bucket if it's not set.
Expand Down Expand Up @@ -282,6 +289,9 @@ if [[ $ETL_SUCCESS == 0 ]]; then
# Distribute Parquet outputs to a private bucket
distribute_parquet 2>&1 | tee -a "$LOGFILE"
DISTRIBUTE_PARQUET_SUCCESS=${PIPESTATUS[0]}
# Create duckdb file from parquet files
create_duckdb 2>&1 | tee -a "$LOGFILE"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create_duckdb() or create_and_distribute_duckdb()?

(I have been loving the ShellCheck linter for VSCode)

CREATE_DUCKDB_SUCCESS=${PIPESTATUS[0]}
# Remove some cruft from the builds that we don't want to distribute
clean_up_outputs_for_distribution 2>&1 | tee -a "$LOGFILE"
CLEAN_UP_OUTPUTS_SUCCESS=${PIPESTATUS[0]}
Expand Down Expand Up @@ -313,6 +323,7 @@ if [[ $ETL_SUCCESS == 0 && \
$UPDATE_STABLE_SUCCESS == 0 && \
$DATASETTE_SUCCESS == 0 && \
$DISTRIBUTE_PARQUET_SUCCESS == 0 && \
$CREATE_DUCKDB_SUCCESS == 0 && \
$CLEAN_UP_OUTPUTS_SUCCESS == 0 && \
$DISTRIBUTION_BUCKET_SUCCESS == 0 && \
$GCS_TEMPORARY_HOLD_SUCCESS == 0 && \
Expand Down
3 changes: 2 additions & 1 deletion environments/conda-linux-64.lock.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 50 additions & 5 deletions environments/conda-lock.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion environments/conda-osx-64.lock.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion environments/conda-osx-arm64.lock.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""Adjust some code table pk types

Revision ID: 7a16ce1fe774
Revises: 49d2f4f7d7b7
Create Date: 2024-07-25 19:09:50.613978

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '7a16ce1fe774'
down_revision = '49d2f4f7d7b7'
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('core_eia__codes_sector_consolidated', schema=None) as batch_op:
batch_op.alter_column('code',
existing_type=sa.TEXT(),
type_=sa.Integer(),
existing_nullable=False,
autoincrement=False)

with op.batch_alter_table('core_eia__codes_steam_plant_types', schema=None) as batch_op:
batch_op.alter_column('code',
existing_type=sa.TEXT(),
type_=sa.Integer(),
existing_nullable=False,
autoincrement=False)

with op.batch_alter_table('core_eia__codes_wind_quality_class', schema=None) as batch_op:
batch_op.alter_column('code',
existing_type=sa.TEXT(),
type_=sa.Integer(),
existing_nullable=False,
autoincrement=False)

# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('core_eia__codes_wind_quality_class', schema=None) as batch_op:
batch_op.alter_column('code',
existing_type=sa.Integer(),
type_=sa.TEXT(),
existing_nullable=False,
autoincrement=False)

with op.batch_alter_table('core_eia__codes_steam_plant_types', schema=None) as batch_op:
batch_op.alter_column('code',
existing_type=sa.Integer(),
type_=sa.TEXT(),
existing_nullable=False,
autoincrement=False)

with op.batch_alter_table('core_eia__codes_sector_consolidated', schema=None) as batch_op:
batch_op.alter_column('code',
existing_type=sa.Integer(),
type_=sa.TEXT(),
existing_nullable=False,
autoincrement=False)

# ### end Alembic commands ###
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dependencies = [
"dask-expr", # Required for dask[dataframe]
"datasette>=0.64",
"doc8>=1.1",
"duckdb-engine>=0.12.1",
"email-validator>=1.0.3", # pydantic[email]
"frictionless>=5,<6",
"fsspec>=2024",
Expand Down Expand Up @@ -129,6 +130,7 @@ keywords = [
[project.scripts]
ferc_to_sqlite = "pudl.ferc_to_sqlite.cli:main"
metadata_to_rst = "pudl.convert.metadata_to_rst:metadata_to_rst"
parquet_to_duckdb = "pudl.convert.parquet_to_duckdb:parquet_to_duckdb"
pudl_check_fks = "pudl.etl.check_foreign_keys:pudl_check_fks"
pudl_datastore = "pudl.workspace.datastore:pudl_datastore"
pudl_etl = "pudl.etl.cli:pudl_etl"
Expand Down
95 changes: 95 additions & 0 deletions src/pudl/convert/parquet_to_duckdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#! /usr/bin/env python
"""Script that creates a DuckDB database from a collection of PUDL Parquet files."""

import logging
from pathlib import Path

import click
import duckdb
import sqlalchemy as sa

from pudl.metadata import PUDL_PACKAGE
from pudl.metadata.classes import Package

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@click.command(
name="parquet_to_duckdb",
context_settings={"help_option_names": ["-h", "--help"]},
)
@click.argument("parquet_dir", type=click.Path(exists=True, resolve_path=True))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can require that it be a directory here too.

@click.argument(
"duckdb_path", type=click.Path(resolve_path=True, writable=True, allow_dash=False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the expected / desired behavior if the pudl.duckdb file already exists?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see below. I think you can require that it not already exist in this check too?

)
@click.option(
"--no-load",
is_flag=True,
show_default=True,
default=False,
help="Only create metadata, don't load data.",
)
@click.option(
"--check-fks",
is_flag=True,
show_default=True,
default=False,
help="If true, enable foreign keys in the database. Currently,"
"the parquet load process freezes up when foreign keys are enabled.",
)
def parquet_to_duckdb(
parquet_dir: str, duckdb_path: str, no_load: bool, check_fks: bool
):
"""Convert a directory of Parquet files to a DuckDB database.

Args:
parquet_dir: Path to a directory of parquet files.
duckdb_path: Path to the new DuckDB database file (should not exist).
no_load: Only create metadata, don't load data.
check_fks: If true, enable foreign keys in the database. Currently,
the parquet load process freezes up when foreign keys are enabled.
Comment on lines +51 to +52
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dunno if you checked which tables were grinding to a halt, but I'm curious what happens if you don't load any of the hourly tables, which have 10s to 100s of millions of rows -- especially core_epacems__hourly_emissions which is nearly 1 billion rows and definitely has several implied FK relationships.


Example:
parquet_to_duckdb /path/to/parquet/directory duckdb.db
"""
parquet_dir = Path(parquet_dir)
duckdb_path = Path(duckdb_path)

# Check if DuckDB file already exists
if duckdb_path.exists():
click.echo(
f"Error: DuckDB file '{duckdb_path}' already exists. Please provide a new filename."
)
return

# create duck db schema from pudl package
resource_ids = (r.name for r in PUDL_PACKAGE.resources if len(r.name) <= 63)
package = Package.from_resource_ids(resource_ids)

metadata = package.to_sql(dialect="duckdb", check_foreign_keys=check_fks)
engine = sa.create_engine(f"duckdb:///{duckdb_path}")
metadata.create_all(engine)

if not no_load:
with duckdb.connect(database=str(duckdb_path)) as duckdb_conn:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I messed around with this loop over the weekend to explicitly add the row and table comments to the schema, but later realized that:

  • there was a bug in Resource.to_sql() in that it didn't set the comment field.
  • the first and last columns that show up when you SELECT comment FROM duckdb_columns() aren't our columns -- they're duckdb internal stuff... and they have no comments. So it looked like there were no comments on the columns, but there actually were... so my explicit adding was unnecessary.

Thus, the only change that stuck around was switching to using a context manager.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What a journey - at least we learned something 😅 , thanks for doing the digging!

duckdb_cursor = duckdb_conn.cursor()
# Load data into the DuckDB database from parquet files, if requested:
# Iterate through the tables in order of foreign key dependency
for table in metadata.sorted_tables:
parquet_file_path = parquet_dir / f"{table.name}.parquet"
if parquet_file_path.exists():
logger.info(
f"Loading parquet file: {parquet_file_path} into {duckdb_path}"
)
sql_command = f"""
COPY {table.name} FROM '{parquet_file_path}' (FORMAT PARQUET);
"""
Comment on lines +86 to +88
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even without the foreign keys turned on, I got an out-of-memory error while it was attempting to load the EPA CEMS parquet (5GB on disk). Do we need to do some kind of chunking? Maybe by row-groups?

duckdb_cursor.execute(sql_command)
else:
raise FileNotFoundError("Parquet file not found for: ", table.name)


if __name__ == "__main__":
parquet_to_duckdb()
Loading