Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingest - tweaks following group demo #1202

Merged
merged 5 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,55 @@
from dcpy.utils import s3
from dcpy.models.product import metadata as product_metadata
from dcpy.models.product.dataset import metadata_v2 as dataset_metadata
from dcpy.models.lifecycle import ingest as ingest_models

import json
from tempfile import TemporaryDirectory
from pathlib import Path

RECIPES_BUCKET = "edm-recipes"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious your rationale on adding a new bucket vs having all our schemas colocated. Just thinking that if we keep adding schemas, not sure they'll be 1-1 with buckets.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree with that, it just on one level made sense to have them in the same place as all the objects that refer to them. But I'm with you - this should really just live in one place

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And maybe we move it somewhere more "neutral" but that place can be publishing for now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in new commit

PUBLISHING_BUCKET = "edm-publishing"
DO_SCHEMA_FOLDER = "data-engineering-devops/schemas/"

schemas = [
{
"name": "org_metadata.schema.json",
"bucket": PUBLISHING_BUCKET,
"folder": "product/",
"schema": product_metadata.OrgMetadataFile.model_json_schema(),
},
{
"name": "product_metadata.schema.json",
"bucket": PUBLISHING_BUCKET,
"folder": "product/",
"schema": product_metadata.ProductMetadataFile.model_json_schema(),
},
{
"name": "dataset_metadata.schema.json",
"bucket": PUBLISHING_BUCKET,
"folder": "product/dataset/",
"schema": dataset_metadata.Metadata.model_json_schema(),
},
{
"name": "template.json",
"bucket": RECIPES_BUCKET,
"folder": "ingest/",
"schema": ingest_models.Template.model_json_schema(),
},
{
"name": "config.json",
"bucket": RECIPES_BUCKET,
"folder": "ingest/",
"schema": ingest_models.Config.model_json_schema(),
},
]

for schema in schemas:
with TemporaryDirectory() as _dir:
p = Path(_dir) / schema["name"]
open(p, "w").write(json.dumps(schema["schema"]))
s3.upload_file(
bucket="edm-publishing",
bucket=schema["bucket"],
path=p,
key=DO_SCHEMA_FOLDER + schema["folder"] + schema["name"],
acl="public-read",
Expand Down
78 changes: 46 additions & 32 deletions dcpy/lifecycle/ingest/configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
S3Source,
ScriptSource,
Source,
PreprocessingStep,
ProcessingStep,
Template,
Config,
)
Expand Down Expand Up @@ -98,6 +98,40 @@ def get_filename(source: Source, ds_id: str) -> str:
)


def determine_processing_steps(
steps: list[ProcessingStep],
*,
target_crs: str | None,
has_geom: bool,
mode: str | None,
) -> list[ProcessingStep]:
# TODO default steps like this should probably be configuration
step_names = {p.name for p in steps}

if target_crs and "clean_column_names" not in step_names:
reprojection = ProcessingStep(name="reproject", args={"target_crs": target_crs})
steps = [reprojection] + steps

if "clean_column_names" not in step_names:
clean_column_names = ProcessingStep(
name="clean_column_names", args={"replace": {" ": "_"}, "lower": True}
)
steps.append(clean_column_names)

if has_geom and "multi" not in step_names:
multi = ProcessingStep(name="multi")
steps.append(multi)

if mode:
modes = {s.mode for s in steps}
if mode not in modes:
raise ValueError(f"mode '{mode}' is not present in template")

steps = [s for s in steps if s.mode is None or s.mode == mode]

return steps


def get_config(
dataset_id: str,
version: str | None = None,
Expand All @@ -113,37 +147,11 @@ def get_config(
version = version or get_version(template.ingestion.source, run_details.timestamp)
template = read_template(dataset_id, version=version, template_dir=template_dir)

processing_steps = template.ingestion.processing_steps

if template.ingestion.target_crs:
reprojection = PreprocessingStep(
name="reproject", args={"target_crs": template.ingestion.target_crs}
)
processing_steps = [reprojection] + processing_steps

# TODO default steps like this should probably be configuration
processing_step_names = {p.name for p in processing_steps}
if "clean_column_names" not in processing_step_names:
clean_column_names = PreprocessingStep(
name="clean_column_names", args={"replace": {" ": "_"}, "lower": True}
)
processing_steps.append(clean_column_names)

if "multi" not in processing_step_names and template.has_geom:
multi = PreprocessingStep(name="multi")
processing_steps.append(multi)

if mode:
modes = {s.mode for s in processing_steps}
if mode not in modes:
raise ValueError(f"mode '{mode}' is not present in template '{dataset_id}'")

processing_steps = [s for s in processing_steps if s.mode is None or s.mode == mode]

archival = ArchivalMetadata(
archival_timestamp=run_details.timestamp,
raw_filename=filename,
acl=template.acl,
processing_steps = determine_processing_steps(
template.ingestion.processing_steps,
target_crs=template.ingestion.target_crs,
has_geom=template.has_geom,
mode=mode,
)

ingestion = Ingestion(
Expand All @@ -154,6 +162,12 @@ def get_config(
processing_steps=processing_steps,
)

archival = ArchivalMetadata(
archival_timestamp=run_details.timestamp,
raw_filename=filename,
acl=template.acl,
)

# create config object
return Config(
id=template.id,
Expand Down
10 changes: 1 addition & 9 deletions dcpy/lifecycle/ingest/extract.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import importlib
from pandas import DataFrame
from pathlib import Path
import shutil
from typing import Callable

from dcpy.models.lifecycle.ingest import (
LocalFileSource,
Expand All @@ -13,7 +10,6 @@
from dcpy.models.connectors import socrata, web as web_models
from dcpy.models.connectors.edm.publishing import GisDataset
from dcpy.utils import s3
from dcpy.utils.logging import logger
from dcpy.connectors.edm import publishing
from dcpy.connectors.socrata import extract as extract_socrata
from dcpy.connectors import web
Expand All @@ -36,11 +32,7 @@ def download_file_from_source(
case S3Source():
s3.download_file(source.bucket, source.key, path)
case ScriptSource():
module = importlib.import_module(f"dcpy.connectors.{source.connector}")
extract: Callable = getattr(module, source.function)
logger.info(f"Running custom ingestion script {source.function}.py")
df: DataFrame = extract()
df.to_parquet(path)
raise NotImplementedError("Custom scripts not yet supported in ingest.")

## request-based methods
case web_models.FileDownloadSource():
Expand Down
2 changes: 1 addition & 1 deletion dcpy/lifecycle/ingest/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def run(
output_filename=init_parquet,
)

transform.preprocess(
transform.process(
config.id,
config.ingestion.processing_steps,
config.columns,
Expand Down
109 changes: 0 additions & 109 deletions dcpy/lifecycle/ingest/scripts/doe_pepmeetingurls.py

This file was deleted.

24 changes: 12 additions & 12 deletions dcpy/lifecycle/ingest/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Callable, Literal

from dcpy.models import file
from dcpy.models.lifecycle.ingest import PreprocessingStep, Column
from dcpy.models.lifecycle.ingest import ProcessingStep, Column
from dcpy.utils import data, introspect
from dcpy.utils.geospatial import transform, parquet as geoparquet
from dcpy.utils.logging import logger
Expand Down Expand Up @@ -62,10 +62,10 @@ def to_parquet(
)


class Preprocessor:
class ProcessingFunctions:
"""
This class is very much a first pass at something that would support the validate/run_processing_steps functions
This should/will be iterated on when implementing actual preprocessing steps for chosen templates
This should/will be iterated on when implementing actual processing steps for chosen templates
"""

def __init__(self, dataset_id: str):
Expand Down Expand Up @@ -242,23 +242,23 @@ def validate_pd_series_func(


def validate_processing_steps(
dataset_id: str, processing_steps: list[PreprocessingStep]
dataset_id: str, processing_steps: list[ProcessingStep]
) -> list[Callable]:
"""
Given config of ingest dataset, violates that defined preprocessing steps
Given config of ingest dataset, violates that defined processing steps
exist and that appropriate arguments are supplied. Raises error detailing
violations if any are found

Returns list of callables, which expect a dataframe and return a dataframe
"""
violations: dict[str, str | dict[str, str]] = {}
compiled_steps: list[Callable] = []
preprocessor = Preprocessor(dataset_id)
processor = ProcessingFunctions(dataset_id)
for step in processing_steps:
if step.name not in preprocessor.__dir__():
if step.name not in processor.__dir__():
violations[step.name] = "Function not found"
else:
func = getattr(preprocessor, step.name)
func = getattr(processor, step.name)

# assume that function takes args "self, df"
kw_error = introspect.validate_kwargs(
Expand All @@ -276,7 +276,7 @@ def validate_processing_steps(
compiled_steps.append(partial(func, **step.args))

if violations:
raise Exception(f"Invalid preprocessing steps:\n{violations}")
raise Exception(f"Invalid processing steps:\n{violations}")

return compiled_steps

Expand All @@ -293,15 +293,15 @@ def validate_columns(df: pd.DataFrame, columns: list[Column]) -> None:
)


def preprocess(
def process(
dataset_id: str,
processing_steps: list[PreprocessingStep],
processing_steps: list[ProcessingStep],
expected_columns: list[Column],
input_path: Path,
output_path: Path,
output_csv: bool = False,
):
"""Validates and runs preprocessing steps defined in config object"""
"""Validates and runs processing steps defined in config object"""
df = geoparquet.read_df(input_path)
compiled_steps = validate_processing_steps(dataset_id, processing_steps)

Expand Down
Loading
Loading