Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable multi_asset subsetting #2773

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 27 additions & 29 deletions devtools/materialize_asset.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
#! /usr/bin/env python
"""Materialize one asset & its upstream deps in-process so you can debug."""
"""Materialize one asset & its upstream deps in-process so you can debug.

If you are using the VSCode Debugger, you'll need to specify the asset_id
in the launch.json file:

{
...,
"args": ["{YOUR_ASSET_ID}}"]
...,
}
"""

import argparse
import importlib.resources

from dagster import AssetSelection, Definitions, define_asset_job
from dagster import materialize

from pudl import etl
from pudl.settings import EtlSettings
from pudl.etl import default_assets, default_resources, load_dataset_settings_from_file


def _parse():
Expand All @@ -19,33 +27,23 @@ def _parse():
def main(asset_id):
"""Entry point.

Defines dagster context like in etl/__init__.py - needs to be kept in sync.
Materialize one asset & its upstream deps in-process so you can debug.

Then creates a job with asset selection.
Args:
asset_id: Name of asset you want to materialize.
"""
etl_fast_settings = EtlSettings.from_yaml(
importlib.resources.files("pudl.package_data.settings") / "etl_fast.yml"
).datasets

# TODO (daz/zach): maybe there's a way to do this directly with dagster cli?
defs = Definitions(
assets=etl.default_assets,
resources=etl.default_resources,
jobs=[
define_asset_job(
name="materialize_one",
selection=AssetSelection.keys(asset_id).upstream(),
config={
"resources": {
"dataset_settings": {
"config": etl_fast_settings.dict(),
},
},
},
),
],
materialize(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jdangerx @zschira is there a reason why y'all didn't use materialize() when you initially created this script?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope! This is better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though I think this whole script could probably be replaced with dagster asset materialize, like:

$ dagster asset materialize -m pudl.etl --select "raw_eia860__fgd_equipment"

Which works on this branch but fails with the This AssetsDefinition does not support subsetting. error on dev.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True but I'm not sure you can feed the CLI command run config like you can with the materialize() function which is helpful to running subsets of years:

Usage: dagster asset materialize [OPTIONS]

  Execute a run to materialize a selection of assets

Options:
  -a, --attribute TEXT          Attribute that is either a 1) repository or job or 2) a function that returns a
                                repository or job
  --package-name TEXT           Specify Python package where repository or job function lives
  -m, --module-name TEXT        Specify module where dagster definitions reside as top-level symbols/variables and
                                load the module as a code location in the current python environment.
  -f, --python-file PATH        Specify python file where dagster definitions reside as top-level symbols/variables
                                and load the file as a code location in the current python environment.
  -d, --working-directory TEXT  Specify working directory to use when loading the repository or job
  --select TEXT                 Asset selection to target  [required]
  --partition TEXT              Asset partition to target
  -h, --help                    Show this message and exit.

Also, can we use the vs code debugger with the CLI command?

default_assets,
selection=f"*{asset_id}",
resources=default_resources,
run_config={
"resources": {
"dataset_settings": {
"config": load_dataset_settings_from_file("etl_fast")
}
}
},
)
defs.get_job_def("materialize_one").execute_in_process()


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion environments/conda-linux-64.lock.yml
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ dependencies:
- arrow=1.3.0=pyhd8ed1ab_0
- async-timeout=4.0.3=pyhd8ed1ab_0
- aws-c-s3=0.3.24=h7630044_0
- botocore=1.32.1=pyhd8ed1ab_0
- botocore=1.32.2=pyhd8ed1ab_0
- branca=0.7.0=pyhd8ed1ab_1
- croniter=2.0.1=pyhd8ed1ab_0
- cryptography=41.0.5=py311h63ff55d_0
Expand Down
24 changes: 12 additions & 12 deletions environments/conda-lock.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5316,18 +5316,18 @@ package:
category: main
optional: false
- name: botocore
version: 1.32.1
version: 1.32.2
manager: conda
platform: linux-64
dependencies:
jmespath: ">=0.7.1,<2.0.0"
python: ">=3.7"
python-dateutil: ">=2.1,<3.0.0"
urllib3: ">=1.25.4,<1.27"
url: https://conda.anaconda.org/conda-forge/noarch/botocore-1.32.1-pyhd8ed1ab_0.conda
url: https://conda.anaconda.org/conda-forge/noarch/botocore-1.32.2-pyhd8ed1ab_0.conda
hash:
md5: ed6c51f21b00b73f27d754083a03734f
sha256: 1bbfa7c5b2b0016779805c664bfba9958f5ce5916f57c1a65d173f15d4dc7471
md5: 303d0f8f09c41c07b18b9a1112cec29b
sha256: 621ee76f9d1e741039513e94ef3e4d3442f76098d863f50474ec60d823ef11ae
category: main
optional: false
- name: branca
Expand Down Expand Up @@ -13213,18 +13213,18 @@ package:
category: main
optional: false
- name: botocore
version: 1.32.1
version: 1.32.2
manager: conda
platform: osx-64
dependencies:
python: ">=3.7"
python-dateutil: ">=2.1,<3.0.0"
jmespath: ">=0.7.1,<2.0.0"
urllib3: ">=1.25.4,<1.27"
url: https://conda.anaconda.org/conda-forge/noarch/botocore-1.32.1-pyhd8ed1ab_0.conda
url: https://conda.anaconda.org/conda-forge/noarch/botocore-1.32.2-pyhd8ed1ab_0.conda
hash:
md5: ed6c51f21b00b73f27d754083a03734f
sha256: 1bbfa7c5b2b0016779805c664bfba9958f5ce5916f57c1a65d173f15d4dc7471
md5: 303d0f8f09c41c07b18b9a1112cec29b
sha256: 621ee76f9d1e741039513e94ef3e4d3442f76098d863f50474ec60d823ef11ae
category: main
optional: false
- name: branca
Expand Down Expand Up @@ -21026,18 +21026,18 @@ package:
category: main
optional: false
- name: botocore
version: 1.32.1
version: 1.32.2
manager: conda
platform: osx-arm64
dependencies:
python: ">=3.7"
python-dateutil: ">=2.1,<3.0.0"
jmespath: ">=0.7.1,<2.0.0"
urllib3: ">=1.25.4,<1.27"
url: https://conda.anaconda.org/conda-forge/noarch/botocore-1.32.1-pyhd8ed1ab_0.conda
url: https://conda.anaconda.org/conda-forge/noarch/botocore-1.32.2-pyhd8ed1ab_0.conda
hash:
md5: ed6c51f21b00b73f27d754083a03734f
sha256: 1bbfa7c5b2b0016779805c664bfba9958f5ce5916f57c1a65d173f15d4dc7471
md5: 303d0f8f09c41c07b18b9a1112cec29b
sha256: 621ee76f9d1e741039513e94ef3e4d3442f76098d863f50474ec60d823ef11ae
category: main
optional: false
- name: branca
Expand Down
2 changes: 1 addition & 1 deletion environments/conda-osx-64.lock.yml
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ dependencies:
- arrow=1.3.0=pyhd8ed1ab_0
- async-timeout=4.0.3=pyhd8ed1ab_0
- aws-crt-cpp=0.24.7=ha2eb20f_1
- botocore=1.32.1=pyhd8ed1ab_0
- botocore=1.32.2=pyhd8ed1ab_0
- branca=0.7.0=pyhd8ed1ab_1
- croniter=2.0.1=pyhd8ed1ab_0
- cryptography=41.0.5=py311hd51016d_0
Expand Down
2 changes: 1 addition & 1 deletion environments/conda-osx-arm64.lock.yml
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ dependencies:
- arrow=1.3.0=pyhd8ed1ab_0
- async-timeout=4.0.3=pyhd8ed1ab_0
- aws-crt-cpp=0.24.7=h2da6921_1
- botocore=1.32.1=pyhd8ed1ab_0
- botocore=1.32.2=pyhd8ed1ab_0
- branca=0.7.0=pyhd8ed1ab_1
- croniter=2.0.1=pyhd8ed1ab_0
- cryptography=41.0.5=py311h71175c2_0
Expand Down
15 changes: 5 additions & 10 deletions src/pudl/etl/glue_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,14 @@
logger = pudl.logging_helpers.get_logger(__name__)


# TODO (bendnorman): Currently loading all glue tables. Could potentially allow users
# to load subsets of the glue tables, see: https://docs.dagster.io/concepts/assets/multi-assets#subsetting-multi-assets
# Could split out different types of glue tables into different assets. For example the cross walk table could be a separate asset
# that way dagster doesn't think all glue tables depend on generators_entity_eia, boilers_entity_eia.


@multi_asset(
outs={
table_name: AssetOut(io_manager_key="pudl_sqlite_io_manager")
table_name: AssetOut(io_manager_key="pudl_sqlite_io_manager", is_required=False)
for table_name in Package.get_etl_group_tables("glue")
# do not load epacamd_eia glue assets bc they are stand-alone assets below.
if "epacamd_eia" not in table_name
},
can_subset=True,
required_resource_keys={"datastore", "dataset_settings"},
)
def create_glue_tables(context):
Expand All @@ -46,9 +41,9 @@ def create_glue_tables(context):
# Ensure they are sorted so they match up with the asset outs
glue_dfs = dict(sorted(glue_dfs.items()))

return (
Output(output_name=table_name, value=df) for table_name, df in glue_dfs.items()
)
for table_name, df in glue_dfs.items():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking - would a generator expression work here instead?

return (
    Output(output_name=table_name, value=df)
    for table_name, df in glue_dfs.items()
    if table_name in context.selected_output_names
)

? I guess that's not really much better, is it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure. I am just working off of the dagster docs examples. IDK if dagster cares if its a return or yield statement.

if table_name in context.selected_output_names:
yield Output(output_name=table_name, value=df)


#####################
Expand Down
13 changes: 8 additions & 5 deletions src/pudl/extract/eia860.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ def get_dtypes(page, **partition):

# TODO (bendnorman): Figure out type hint for context keyword and mutli_asset return
@multi_asset(
outs={table_name: AssetOut() for table_name in sorted(raw_table_names)},
outs={
table_name: AssetOut(is_required=False)
for table_name in sorted(raw_table_names)
},
can_subset=True,
required_resource_keys={"datastore", "dataset_settings"},
)
def extract_eia860(context, eia860_raw_dfs):
Expand Down Expand Up @@ -121,7 +125,6 @@ def extract_eia860(context, eia860_raw_dfs):
}
eia860_raw_dfs = dict(sorted(eia860_raw_dfs.items()))

return (
Output(output_name=table_name, value=df)
for table_name, df in eia860_raw_dfs.items()
)
for table_name, df in eia860_raw_dfs.items():
if table_name in context.selected_output_names:
yield Output(output_name=table_name, value=df)
20 changes: 12 additions & 8 deletions src/pudl/extract/eia923.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def get_dtypes(page, **partition):


# TODO (bendnorman): Add this information to the metadata
eia_raw_table_names = (
raw_table_names = (
"raw_eia923__boiler_fuel",
"raw_eia923__fuel_receipts_costs",
"raw_eia923__generation_fuel",
Expand All @@ -109,7 +109,7 @@ def get_dtypes(page, **partition):
# from being extracted currently. When we update to a new DOI this problem will
# probably fix itself. See comments on this issue:
# https://github.com/catalyst-cooperative/pudl/issues/2448
# "raw_emissions_control_eia923",
# "raw_eia923__emissions_control",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we've moved on to a new DOI by now if we want to turn this back on.

)


Expand All @@ -118,7 +118,11 @@ def get_dtypes(page, **partition):

# TODO (bendnorman): Figure out type hint for context keyword and mutli_asset return
@multi_asset(
outs={table_name: AssetOut() for table_name in sorted(eia_raw_table_names)},
outs={
table_name: AssetOut(is_required=False)
for table_name in sorted(raw_table_names)
},
can_subset=True,
required_resource_keys={"datastore", "dataset_settings"},
)
def extract_eia923(context, eia923_raw_dfs):
Expand All @@ -137,12 +141,12 @@ def extract_eia923(context, eia923_raw_dfs):

eia923_raw_dfs = dict(sorted(eia923_raw_dfs.items()))

return (
Output(output_name=table_name, value=df)
for table_name, df in eia923_raw_dfs.items()
for table_name, df in eia923_raw_dfs.items():
# There's an issue with the EIA-923 archive for 2018 which prevents this table
# from being extracted currently. When we update to a new DOI this problem will
# probably fix itself. See comments on this issue:
# https://github.com/catalyst-cooperative/pudl/issues/2448
if table_name != "raw_eia923__emissions_control"
)
if (table_name in context.selected_output_names) and (
table_name != "raw_eia923__emissions_control"
):
yield Output(output_name=table_name, value=df)