Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finalize/clean up ingest models, add additional preprocessing #1166

Merged
merged 13 commits into from
Oct 16, 2024
Merged
10 changes: 5 additions & 5 deletions dcpy/connectors/edm/recipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def _archive_dataset(config: ingest.Config, file_path: Path, s3_path: str) -> No
BUCKET,
tmp_dir_path,
Path(s3_path),
acl=config.acl,
acl=config.archival.acl,
contents_only=True,
)

Expand Down Expand Up @@ -103,7 +103,7 @@ def archive_dataset(config: ingest.Config, file_path: Path, *, latest: bool = Fa
s3_path = s3_folder_path(config.dataset_key)
_archive_dataset(config, file_path, s3_path)
if latest:
set_latest(config.dataset_key, config.acl)
set_latest(config.dataset_key, config.archival.acl)


def update_freshness(ds: DatasetKey, timestamp: datetime) -> datetime:
Expand All @@ -113,16 +113,16 @@ def update_freshness(ds: DatasetKey, timestamp: datetime) -> datetime:
raise TypeError(
f"Cannot update freshness of dataset {ds} as it was archived by library, not ingest"
)
config.check_timestamps.append(timestamp)
config.archival.check_timestamps.append(timestamp)
config_str = json.dumps(config.model_dump(mode="json"))
s3.upload_file_obj(
BytesIO(config_str.encode()),
BUCKET,
path,
config.acl,
config.archival.acl,
metadata=s3.get_custom_metadata(BUCKET, path),
)
return config.archival_timestamp
return config.archival.archival_timestamp


def get_config(name: str, version="latest") -> library.Config | ingest.Config:
Expand Down
53 changes: 38 additions & 15 deletions dcpy/lifecycle/ingest/configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import yaml

from dcpy.models.lifecycle.ingest import (
ArchivalMetadata,
Ingestion,
LocalFileSource,
S3Source,
ScriptSource,
Expand Down Expand Up @@ -97,19 +99,25 @@ def get_filename(source: Source, ds_id: str) -> str:


def get_config(
dataset_id: str, version: str | None = None, mode: str | None = None
dataset_id: str,
version: str | None = None,
*,
mode: str | None = None,
template_dir: Path = TEMPLATE_DIR,
) -> Config:
"""Generate config object for dataset and optional version"""
run_details = metadata.get_run_details()
template = read_template(dataset_id, version=version)
filename = get_filename(template.source, template.id)
version = version or get_version(template.source, run_details.timestamp)
template = read_template(dataset_id, version=version)
processing_steps = template.processing_steps
template = read_template(dataset_id, version=version, template_dir=template_dir)

if template.target_crs:
filename = get_filename(template.ingestion.source, template.id)
version = version or get_version(template.ingestion.source, run_details.timestamp)
template = read_template(dataset_id, version=version, template_dir=template_dir)

processing_steps = template.ingestion.processing_steps

if template.ingestion.target_crs:
reprojection = PreprocessingStep(
name="reproject", args={"target_crs": template.target_crs}
name="reproject", args={"target_crs": template.ingestion.target_crs}
)
processing_steps = [reprojection] + processing_steps

Expand All @@ -121,24 +129,39 @@ def get_config(
)
processing_steps.append(clean_column_names)

if "multi" not in processing_step_names and template.has_geom:
multi = PreprocessingStep(name="multi")
processing_steps.append(multi)

if mode:
modes = {s.mode for s in processing_steps}
if mode not in modes:
raise ValueError(f"mode '{mode}' is not present in template '{dataset_id}'")

processing_steps = [s for s in processing_steps if s.mode is None or s.mode == mode]

# create config object
return Config(
id=template.id,
version=version,
archival = ArchivalMetadata(
archival_timestamp=run_details.timestamp,
raw_filename=filename,
acl=template.acl,
target_crs=template.target_crs,
source=template.source,
file_format=template.file_format,
)

ingestion = Ingestion(
target_crs=template.ingestion.target_crs,
source=template.ingestion.source,
file_format=template.ingestion.file_format,
processing_mode=mode,
processing_steps=processing_steps,
)

# create config object
return Config(
id=template.id,
version=version,
crs=ingestion.target_crs,
attributes=template.attributes,
archival=archival,
ingestion=ingestion,
columns=template.columns,
run_details=run_details,
)
37 changes: 26 additions & 11 deletions dcpy/lifecycle/ingest/run.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import pandas as pd
from pathlib import Path
import typer
Expand Down Expand Up @@ -28,11 +29,11 @@ def update_freshness(
comparison = recipes.read_df(config.dataset)
if new.equals(comparison):
original_archival_timestamp = recipes.update_freshness(
config.dataset_key, config.archival_timestamp
config.dataset_key, config.archival.archival_timestamp
)
config.archival_timestamp = original_archival_timestamp
config.archival.archival_timestamp = original_archival_timestamp
if latest:
recipes.set_latest(config.dataset_key, config.acl)
recipes.set_latest(config.dataset_key, config.archival.acl)
return config
else:
raise FileExistsError(
Expand All @@ -49,34 +50,46 @@ def run(
latest: bool = False,
skip_archival: bool = False,
output_csv: bool = False,
template_dir: Path = configure.TEMPLATE_DIR,
) -> Config:
config = configure.get_config(dataset_id, version=version, mode=mode)
transform.validate_processing_steps(config.id, config.processing_steps)
config = configure.get_config(
dataset_id, version=version, mode=mode, template_dir=template_dir
)
transform.validate_processing_steps(config.id, config.ingestion.processing_steps)
Copy link
Member

@damonmcc damonmcc Oct 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this redundant since transform.validate_process_steps is called during transform.preprocess?

and if we do wanna validate early, would it make sense to do it during configure.get_config or maybe read_template?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

talked about it and we'll keep it here. it's nicer to assemble these pieces at a high-level rather than have ingest modules importing each other


if not staging_dir:
staging_dir = TMP_DIR / dataset_id / config.archival_timestamp.isoformat()
staging_dir = (
TMP_DIR / dataset_id / config.archival.archival_timestamp.isoformat()
)
staging_dir.mkdir(parents=True)
else:
staging_dir.mkdir(parents=True, exist_ok=True)

# download dataset
extract.download_file_from_source(
config.source, config.raw_filename, config.version, staging_dir
config.ingestion.source,
config.archival.raw_filename,
config.version,
staging_dir,
)
file_path = staging_dir / config.raw_filename
file_path = staging_dir / config.archival.raw_filename

if not skip_archival:
# archive to edm-recipes/raw_datasets
recipes.archive_raw_dataset(config, staging_dir / config.raw_filename)
recipes.archive_raw_dataset(config, file_path)

init_parquet = "init.parquet"
transform.to_parquet(
config.file_format, file_path, dir=staging_dir, output_filename=init_parquet
config.ingestion.file_format,
file_path,
dir=staging_dir,
output_filename=init_parquet,
)

transform.preprocess(
config.id,
config.processing_steps,
config.ingestion.processing_steps,
config.columns,
staging_dir / init_parquet,
staging_dir / config.filename,
output_csv=output_csv,
Expand All @@ -90,6 +103,8 @@ def run(
config, staging_dir / config.filename, latest=latest
)

with open(staging_dir / "config.json", "w") as f:
json.dump(config.model_dump(mode="json"), f, indent=4)
return config


Expand Down
32 changes: 32 additions & 0 deletions dcpy/lifecycle/ingest/templates/dcp_commercialoverlay.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
id: dcp_commercialoverlay
acl: public-read

attributes:
name: DCP NYC Commercial Overlay Districts
description: |
Polygon features representing the within-tax-block limits for commercial overlay districts,
as shown on the DCP zoning maps. Commercial overlay district designations are indicated in the OVERLAY attribute.
url: https://www1.nyc.gov/site/planning/data-maps/open-data/dwn-gis-zoning.page#metadata

ingestion:
target_crs: EPSG:4326
source:
type: edm_publishing_gis_dataset
name: dcp_commercial_overlays
file_format:
type: shapefile
crs: EPSG:2263
processing_steps:
- name: rename_columns
args:
map: {"geom": "wkb_geometry"}

columns:
- id: overlay
data_type: text
- id: shape_leng
data_type: decimal
- id: shape_area
data_type: decimal
- id: wkb_geometry
data_type: geometry
72 changes: 24 additions & 48 deletions dcpy/lifecycle/ingest/templates/dcp_pop_acs2010_demographic.yml
Original file line number Diff line number Diff line change
@@ -1,52 +1,28 @@
id: dcp_pop_acs2010_demographic
acl: public-read
source:
type: local_file
path: .library/upload/CCD2023_ACS0610Data_for1822Update.xlsx
file_format:
type: xlsx
sheet_name: CCD2023_Dem0610
dtype:
GeoID: str
processing_steps:
- name: clean_column_names
args:
lower: true
- name: append_prev
mode: append
- name: upsert_column_of_previous_version
args:
key: [geotype, geoid]
insert_behavior: error
missing_key_behavior: error
mode: update_column

library_dataset:
name: dcp_pop_acs2010_demographic
version: ""
acl: public-read
source:
script:
name: excel
path: https://nyc3.digitaloceanspaces.com/edm-recipes/inbox/dcp_pop_acs2010/{{ version }}/dcp_pop_acs.xlsx
sheet_name: Dem0610
geometry:
SRS: null
type: NONE

destination:
geometry:
SRS: null
type: NONE
fields: []
sql: null
attributes:
name: DCP Population 2010 ACS Demographic Data
description: |
This file is produced internally by the Population division. 2010 version is used as a reference dataset
for the latest ACS data, and occasionally is modified so these different subsections are archived as their
own recipe datasets so that they can easily be updated individually

info:
description: |
## 2010 ACS file from Population
This file is produced internally by the Population division. 2010 version is used as a reference dataset
for the latest ACS data, and occasionally is modified so these different subsections are archived as their
own recipe datasets so that they can easily be updated individually

url: null
dependents: []
ingestion:
source:
type: local_file
path: .library/upload/CCD2023_ACS0610Data_for1822Update.xlsx
file_format:
type: xlsx
sheet_name: CCD2023_Dem0610
dtype:
GeoID: str
processing_steps:
- name: append_prev
mode: append
- name: upsert_column_of_previous_version
args:
key: [geotype, geoid]
insert_behavior: error
missing_key_behavior: error
mode: update_column
72 changes: 24 additions & 48 deletions dcpy/lifecycle/ingest/templates/dcp_pop_acs2010_economic.yml
Original file line number Diff line number Diff line change
@@ -1,52 +1,28 @@
id: dcp_pop_acs2010_economic
acl: public-read
source:
type: local_file
path: .library/upload/CCD2023_ACS0610Data_for1822Update.xlsx
file_format:
type: xlsx
sheet_name: CCD2023_Econ0610_NotInflated
dtype:
GeoID: str
processing_steps:
- name: clean_column_names
args:
lower: true
- name: append_prev
mode: append
- name: upsert_column_of_previous_version
args:
key: [geotype, geoid]
insert_behavior: error
missing_key_behavior: error
mode: update_column

library_dataset:
name: dcp_pop_acs2010_economic
version: ""
acl: public-read
source:
script:
name: excel
path: https://nyc3.digitaloceanspaces.com/edm-recipes/inbox/dcp_pop_acs2010/{{ version }}/dcp_pop_acs.xlsx
sheet_name: Econ0610
geometry:
SRS: null
type: NONE

destination:
geometry:
SRS: null
type: NONE
fields: []
sql: null
attributes:
name: DCP Population 2010 ACS Economic Data
description: |
This file is produced internally by the Population division. 2010 version is used as a reference dataset
for the latest ACS data, and occasionally is modified so these different subsections are archived as their
own recipe datasets so that they can easily be updated individually

info:
description: |
## 2010 ACS file from Population
This file is produced internally by the Population division. 2010 version is used as a reference dataset
for the latest ACS data, and occasionally is modified so these different subsections are archived as their
own recipe datasets so that they can easily be updated individually

url: null
dependents: []
ingestion:
source:
type: local_file
path: .library/upload/CCD2023_ACS0610Data_for1822Update.xlsx
file_format:
type: xlsx
sheet_name: CCD2023_Econ0610_NotInflated
dtype:
GeoID: str
processing_steps:
- name: append_prev
mode: append
- name: upsert_column_of_previous_version
args:
key: [geotype, geoid]
insert_behavior: error
missing_key_behavior: error
mode: update_column
Loading
Loading