From 40afa80bb68d4e032721df74298fd7837ee74938 Mon Sep 17 00:00:00 2001 From: Finn van Krieken Date: Tue, 8 Oct 2024 10:29:26 -0400 Subject: [PATCH 01/13] cleanup #1155 --- .../lifecycle/scripts}/ingest_validation.py | 5 +- dcpy/test/utils/test_collections.py | 53 +++++++++++++++++++ dcpy/utils/collections.py | 5 +- 3 files changed, 57 insertions(+), 6 deletions(-) rename {analysis => dcpy/lifecycle/scripts}/ingest_validation.py (96%) diff --git a/analysis/ingest_validation.py b/dcpy/lifecycle/scripts/ingest_validation.py similarity index 96% rename from analysis/ingest_validation.py rename to dcpy/lifecycle/scripts/ingest_validation.py index 8fa353bf3..3773408b8 100644 --- a/analysis/ingest_validation.py +++ b/dcpy/lifecycle/scripts/ingest_validation.py @@ -16,7 +16,7 @@ def compare_recipes_in_postgres( build_name: str, key_columns: list[str] | None = None, ignore_columns: list[str] | None = None, - local_library_dir: str = recipes.LIBRARY_DEFAULT_PATH, + local_library_dir: Path = recipes.LIBRARY_DEFAULT_PATH, left_type: recipes.DatasetType = recipes.DatasetType.pg_dump, right_type: recipes.DatasetType = recipes.DatasetType.pg_dump, ): @@ -37,14 +37,12 @@ def compare_recipes_in_postgres( left_ds, client, import_as=left_table, - rename_pk=True, local_library_dir=local_library_dir, ) recipes.import_dataset( right_ds, client, import_as=right_table, - rename_pk=True, local_library_dir=local_library_dir, ) if key_columns: @@ -70,7 +68,6 @@ def run_ingest_and_library( library_file_type: str = "pg_dump", ): ingest_dir = ingest_parent_dir / dataset / "special_folder" - print(ingest_dir) ingest.run(dataset, staging_dir=ingest_dir, skip_archival=True) # BEWARE: once you import library, parquet file writing fails diff --git a/dcpy/test/utils/test_collections.py b/dcpy/test/utils/test_collections.py index f50c1473c..a56cfcb55 100644 --- a/dcpy/test/utils/test_collections.py +++ b/dcpy/test/utils/test_collections.py @@ -143,6 +143,59 @@ def test_pretty_print(self): " 1 2 4", ] + def test_max_df_length(self): + assert collections.flatten_and_indent( + self.outer.model_dump(), max_df_length=1 + ) == [ + "a", + " b", + " c", + "d", + " 2 rows. First 1 shown", + " x y", + " 0 1 3", + "e: None", + "f", + " g: h", + " i", + " j", + " k", + " l", + " m", + " n: o", + " p: q", + " r", + " 2 rows. First 1 shown", + " x y", + " 0 1 3", + ] + + def test_max_recursion_depth(self): + assert collections.flatten_and_indent( + self.outer.model_dump(), max_recursion_depth=2 + ) == [ + "a", + " b", + " c", + "d", + " x y", + " 0 1 3", + " 1 2 4", + "e: None", + "f", + " g: h", + " i", + " [Truncated - max depth exceeded]", + " [Truncated - max depth exceeded]", + " m", + " n: o", + " p: q", + " r", + " x y", + " 0 1 3", + " 1 2 4", + ] + def test_to_report(self): collections.indented_report(self.outer.model_dump()) == """a b diff --git a/dcpy/utils/collections.py b/dcpy/utils/collections.py index 95d9e14db..4d5502c6a 100644 --- a/dcpy/utils/collections.py +++ b/dcpy/utils/collections.py @@ -30,6 +30,7 @@ def flatten_and_indent( max_df_length: int = 20, include_line_breaks: bool = False, pretty_print_fields: bool = False, + max_recursion_depth: int = 10, ) -> list[str]: """ Recursively prints a dictionary or list, indenting as the nested structure goes deeper. @@ -65,8 +66,8 @@ def df_to_list(df: DataFrame): return df.to_string().split("\n") def flatten(li: list | dict, level: int) -> list: - if level == 10: - return [] + if level > max_recursion_depth: + return [apply_indent("[Truncated - max depth exceeded]", level)] if isinstance(li, dict): return flatten([Pair(key=k, value=li[k]) for k in li], level) From c371fd738efdcf0c595c4536d56e6b94a9b16fd9 Mon Sep 17 00:00:00 2001 From: Finn van Krieken Date: Thu, 26 Sep 2024 11:29:01 -0700 Subject: [PATCH 02/13] add dcp_commercialoverlay to ingest --- .../ingest/templates/dcp_commercialoverlay.yml | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 dcpy/lifecycle/ingest/templates/dcp_commercialoverlay.yml diff --git a/dcpy/lifecycle/ingest/templates/dcp_commercialoverlay.yml b/dcpy/lifecycle/ingest/templates/dcp_commercialoverlay.yml new file mode 100644 index 000000000..9f529c4e2 --- /dev/null +++ b/dcpy/lifecycle/ingest/templates/dcp_commercialoverlay.yml @@ -0,0 +1,13 @@ +id: dcp_commercialoverlay +acl: public-read +target_crs: EPSG:4326 +source: + type: edm_publishing_gis_dataset + name: dcp_commercial_overlays +file_format: + type: shapefile + crs: EPSG:2263 +processing_steps: +- name: rename_columns + args: + map: {"geom": "wkb_geometry"} From b1a886da0726c2909cd93115fe9aefd010bb8988 Mon Sep 17 00:00:00 2001 From: Finn van Krieken Date: Mon, 30 Sep 2024 13:21:07 -0400 Subject: [PATCH 03/13] move ingest templates to 'dev' folder --- .../ingest/{templates => dev_templates}/bpl_libraries.yml | 0 .../{templates => dev_templates}/dca_operatingbusinesses.yml | 0 .../ingest/{templates => dev_templates}/dcp_addresspoints.yml | 0 .../ingest/{templates => dev_templates}/dcp_atomicpolygons.yml | 0 dcpy/lifecycle/ingest/{templates => dev_templates}/dcp_pad.yml | 0 .../{templates => dev_templates}/dcp_pop_acs2010_demographic.yml | 0 .../{templates => dev_templates}/dcp_pop_acs2010_economic.yml | 0 .../{templates => dev_templates}/dcp_pop_acs2010_housing.yml | 0 .../{templates => dev_templates}/dcp_pop_acs2010_social.yml | 0 dcpy/lifecycle/ingest/{templates => dev_templates}/dcp_sfpsd.yml | 0 dcpy/lifecycle/ingest/{templates => dev_templates}/dob_cofos.yml | 0 .../ingest/{templates => dev_templates}/dob_jobapplications.yml | 0 .../ingest/{templates => dev_templates}/dob_now_applications.yml | 0 .../ingest/{templates => dev_templates}/dob_now_permits.yml | 0 .../ingest/{templates => dev_templates}/dob_permitissuance.yml | 0 .../ingest/{templates => dev_templates}/doe_pepmeetingurls.yml | 0 .../ingest/{templates => dev_templates}/dpr_capitalprojects.yml | 0 .../{templates => dev_templates}/fisa_capitalcommitments.yml | 0 .../ingest/{templates => dev_templates}/fisa_dailybudget.yml | 0 .../ingest/{templates => dev_templates}/nypl_libraries.yml | 0 .../ingest/{templates => dev_templates}/nysdoh_nursinghomes.yml | 0 .../{templates => dev_templates}/nysed_nonpublicenrollment.yml | 0 22 files changed, 0 insertions(+), 0 deletions(-) rename dcpy/lifecycle/ingest/{templates => dev_templates}/bpl_libraries.yml (100%) rename dcpy/lifecycle/ingest/{templates => dev_templates}/dca_operatingbusinesses.yml (100%) rename dcpy/lifecycle/ingest/{templates => dev_templates}/dcp_addresspoints.yml (100%) rename dcpy/lifecycle/ingest/{templates => dev_templates}/dcp_atomicpolygons.yml (100%) rename dcpy/lifecycle/ingest/{templates => dev_templates}/dcp_pad.yml (100%) rename dcpy/lifecycle/ingest/{templates => dev_templates}/dcp_pop_acs2010_demographic.yml (100%) rename dcpy/lifecycle/ingest/{templates => dev_templates}/dcp_pop_acs2010_economic.yml (100%) rename dcpy/lifecycle/ingest/{templates => dev_templates}/dcp_pop_acs2010_housing.yml (100%) rename dcpy/lifecycle/ingest/{templates => dev_templates}/dcp_pop_acs2010_social.yml (100%) rename dcpy/lifecycle/ingest/{templates => dev_templates}/dcp_sfpsd.yml (100%) rename dcpy/lifecycle/ingest/{templates => dev_templates}/dob_cofos.yml (100%) rename dcpy/lifecycle/ingest/{templates => dev_templates}/dob_jobapplications.yml (100%) rename dcpy/lifecycle/ingest/{templates => dev_templates}/dob_now_applications.yml (100%) rename dcpy/lifecycle/ingest/{templates => dev_templates}/dob_now_permits.yml (100%) rename dcpy/lifecycle/ingest/{templates => dev_templates}/dob_permitissuance.yml (100%) rename dcpy/lifecycle/ingest/{templates => dev_templates}/doe_pepmeetingurls.yml (100%) rename dcpy/lifecycle/ingest/{templates => dev_templates}/dpr_capitalprojects.yml (100%) rename dcpy/lifecycle/ingest/{templates => dev_templates}/fisa_capitalcommitments.yml (100%) rename dcpy/lifecycle/ingest/{templates => dev_templates}/fisa_dailybudget.yml (100%) rename dcpy/lifecycle/ingest/{templates => dev_templates}/nypl_libraries.yml (100%) rename dcpy/lifecycle/ingest/{templates => dev_templates}/nysdoh_nursinghomes.yml (100%) rename dcpy/lifecycle/ingest/{templates => dev_templates}/nysed_nonpublicenrollment.yml (100%) diff --git a/dcpy/lifecycle/ingest/templates/bpl_libraries.yml b/dcpy/lifecycle/ingest/dev_templates/bpl_libraries.yml similarity index 100% rename from dcpy/lifecycle/ingest/templates/bpl_libraries.yml rename to dcpy/lifecycle/ingest/dev_templates/bpl_libraries.yml diff --git a/dcpy/lifecycle/ingest/templates/dca_operatingbusinesses.yml b/dcpy/lifecycle/ingest/dev_templates/dca_operatingbusinesses.yml similarity index 100% rename from dcpy/lifecycle/ingest/templates/dca_operatingbusinesses.yml rename to dcpy/lifecycle/ingest/dev_templates/dca_operatingbusinesses.yml diff --git a/dcpy/lifecycle/ingest/templates/dcp_addresspoints.yml b/dcpy/lifecycle/ingest/dev_templates/dcp_addresspoints.yml similarity index 100% rename from dcpy/lifecycle/ingest/templates/dcp_addresspoints.yml rename to dcpy/lifecycle/ingest/dev_templates/dcp_addresspoints.yml diff --git a/dcpy/lifecycle/ingest/templates/dcp_atomicpolygons.yml b/dcpy/lifecycle/ingest/dev_templates/dcp_atomicpolygons.yml similarity index 100% rename from dcpy/lifecycle/ingest/templates/dcp_atomicpolygons.yml rename to dcpy/lifecycle/ingest/dev_templates/dcp_atomicpolygons.yml diff --git a/dcpy/lifecycle/ingest/templates/dcp_pad.yml b/dcpy/lifecycle/ingest/dev_templates/dcp_pad.yml similarity index 100% rename from dcpy/lifecycle/ingest/templates/dcp_pad.yml rename to dcpy/lifecycle/ingest/dev_templates/dcp_pad.yml diff --git a/dcpy/lifecycle/ingest/templates/dcp_pop_acs2010_demographic.yml b/dcpy/lifecycle/ingest/dev_templates/dcp_pop_acs2010_demographic.yml similarity index 100% rename from dcpy/lifecycle/ingest/templates/dcp_pop_acs2010_demographic.yml rename to dcpy/lifecycle/ingest/dev_templates/dcp_pop_acs2010_demographic.yml diff --git a/dcpy/lifecycle/ingest/templates/dcp_pop_acs2010_economic.yml b/dcpy/lifecycle/ingest/dev_templates/dcp_pop_acs2010_economic.yml similarity index 100% rename from dcpy/lifecycle/ingest/templates/dcp_pop_acs2010_economic.yml rename to dcpy/lifecycle/ingest/dev_templates/dcp_pop_acs2010_economic.yml diff --git a/dcpy/lifecycle/ingest/templates/dcp_pop_acs2010_housing.yml b/dcpy/lifecycle/ingest/dev_templates/dcp_pop_acs2010_housing.yml similarity index 100% rename from dcpy/lifecycle/ingest/templates/dcp_pop_acs2010_housing.yml rename to dcpy/lifecycle/ingest/dev_templates/dcp_pop_acs2010_housing.yml diff --git a/dcpy/lifecycle/ingest/templates/dcp_pop_acs2010_social.yml b/dcpy/lifecycle/ingest/dev_templates/dcp_pop_acs2010_social.yml similarity index 100% rename from dcpy/lifecycle/ingest/templates/dcp_pop_acs2010_social.yml rename to dcpy/lifecycle/ingest/dev_templates/dcp_pop_acs2010_social.yml diff --git a/dcpy/lifecycle/ingest/templates/dcp_sfpsd.yml b/dcpy/lifecycle/ingest/dev_templates/dcp_sfpsd.yml similarity index 100% rename from dcpy/lifecycle/ingest/templates/dcp_sfpsd.yml rename to dcpy/lifecycle/ingest/dev_templates/dcp_sfpsd.yml diff --git a/dcpy/lifecycle/ingest/templates/dob_cofos.yml b/dcpy/lifecycle/ingest/dev_templates/dob_cofos.yml similarity index 100% rename from dcpy/lifecycle/ingest/templates/dob_cofos.yml rename to dcpy/lifecycle/ingest/dev_templates/dob_cofos.yml diff --git a/dcpy/lifecycle/ingest/templates/dob_jobapplications.yml b/dcpy/lifecycle/ingest/dev_templates/dob_jobapplications.yml similarity index 100% rename from dcpy/lifecycle/ingest/templates/dob_jobapplications.yml rename to dcpy/lifecycle/ingest/dev_templates/dob_jobapplications.yml diff --git a/dcpy/lifecycle/ingest/templates/dob_now_applications.yml b/dcpy/lifecycle/ingest/dev_templates/dob_now_applications.yml similarity index 100% rename from dcpy/lifecycle/ingest/templates/dob_now_applications.yml rename to dcpy/lifecycle/ingest/dev_templates/dob_now_applications.yml diff --git a/dcpy/lifecycle/ingest/templates/dob_now_permits.yml b/dcpy/lifecycle/ingest/dev_templates/dob_now_permits.yml similarity index 100% rename from dcpy/lifecycle/ingest/templates/dob_now_permits.yml rename to dcpy/lifecycle/ingest/dev_templates/dob_now_permits.yml diff --git a/dcpy/lifecycle/ingest/templates/dob_permitissuance.yml b/dcpy/lifecycle/ingest/dev_templates/dob_permitissuance.yml similarity index 100% rename from dcpy/lifecycle/ingest/templates/dob_permitissuance.yml rename to dcpy/lifecycle/ingest/dev_templates/dob_permitissuance.yml diff --git a/dcpy/lifecycle/ingest/templates/doe_pepmeetingurls.yml b/dcpy/lifecycle/ingest/dev_templates/doe_pepmeetingurls.yml similarity index 100% rename from dcpy/lifecycle/ingest/templates/doe_pepmeetingurls.yml rename to dcpy/lifecycle/ingest/dev_templates/doe_pepmeetingurls.yml diff --git a/dcpy/lifecycle/ingest/templates/dpr_capitalprojects.yml b/dcpy/lifecycle/ingest/dev_templates/dpr_capitalprojects.yml similarity index 100% rename from dcpy/lifecycle/ingest/templates/dpr_capitalprojects.yml rename to dcpy/lifecycle/ingest/dev_templates/dpr_capitalprojects.yml diff --git a/dcpy/lifecycle/ingest/templates/fisa_capitalcommitments.yml b/dcpy/lifecycle/ingest/dev_templates/fisa_capitalcommitments.yml similarity index 100% rename from dcpy/lifecycle/ingest/templates/fisa_capitalcommitments.yml rename to dcpy/lifecycle/ingest/dev_templates/fisa_capitalcommitments.yml diff --git a/dcpy/lifecycle/ingest/templates/fisa_dailybudget.yml b/dcpy/lifecycle/ingest/dev_templates/fisa_dailybudget.yml similarity index 100% rename from dcpy/lifecycle/ingest/templates/fisa_dailybudget.yml rename to dcpy/lifecycle/ingest/dev_templates/fisa_dailybudget.yml diff --git a/dcpy/lifecycle/ingest/templates/nypl_libraries.yml b/dcpy/lifecycle/ingest/dev_templates/nypl_libraries.yml similarity index 100% rename from dcpy/lifecycle/ingest/templates/nypl_libraries.yml rename to dcpy/lifecycle/ingest/dev_templates/nypl_libraries.yml diff --git a/dcpy/lifecycle/ingest/templates/nysdoh_nursinghomes.yml b/dcpy/lifecycle/ingest/dev_templates/nysdoh_nursinghomes.yml similarity index 100% rename from dcpy/lifecycle/ingest/templates/nysdoh_nursinghomes.yml rename to dcpy/lifecycle/ingest/dev_templates/nysdoh_nursinghomes.yml diff --git a/dcpy/lifecycle/ingest/templates/nysed_nonpublicenrollment.yml b/dcpy/lifecycle/ingest/dev_templates/nysed_nonpublicenrollment.yml similarity index 100% rename from dcpy/lifecycle/ingest/templates/nysed_nonpublicenrollment.yml rename to dcpy/lifecycle/ingest/dev_templates/nysed_nonpublicenrollment.yml From ad740fac6ee8092dfab1971b56cb52292a39bad9 Mon Sep 17 00:00:00 2001 From: Finn van Krieken Date: Mon, 30 Sep 2024 13:50:14 -0400 Subject: [PATCH 04/13] add attributes section to ingest templates --- dcpy/lifecycle/ingest/configure.py | 1 + .../ingest/templates/dcp_commercialoverlay.yml | 8 ++++++++ dcpy/models/lifecycle/ingest.py | 17 ++++++++++++----- 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/dcpy/lifecycle/ingest/configure.py b/dcpy/lifecycle/ingest/configure.py index c2a4b81db..28fa0b49d 100644 --- a/dcpy/lifecycle/ingest/configure.py +++ b/dcpy/lifecycle/ingest/configure.py @@ -132,6 +132,7 @@ def get_config( return Config( id=template.id, version=version, + attributes=template.attributes, archival_timestamp=run_details.timestamp, raw_filename=filename, acl=template.acl, diff --git a/dcpy/lifecycle/ingest/templates/dcp_commercialoverlay.yml b/dcpy/lifecycle/ingest/templates/dcp_commercialoverlay.yml index 9f529c4e2..b7ac740c9 100644 --- a/dcpy/lifecycle/ingest/templates/dcp_commercialoverlay.yml +++ b/dcpy/lifecycle/ingest/templates/dcp_commercialoverlay.yml @@ -1,5 +1,13 @@ id: dcp_commercialoverlay acl: public-read + +attributes: + name: DCP NYC Commercial Overlay Districts + description: | + Polygon features representing the within-tax-block limits for commercial overlay districts, + as shown on the DCP zoning maps. Commercial overlay district designations are indicated in the OVERLAY attribute. + url: https://www1.nyc.gov/site/planning/data-maps/open-data/dwn-gis-zoning.page#metadata + target_crs: EPSG:4326 source: type: edm_publishing_gis_dataset diff --git a/dcpy/models/lifecycle/ingest.py b/dcpy/models/lifecycle/ingest.py index 07d4ee97f..8e516893e 100644 --- a/dcpy/models/lifecycle/ingest.py +++ b/dcpy/models/lifecycle/ingest.py @@ -45,20 +45,24 @@ class PreprocessingStep(BaseModel): mode: str | None = None +class DatasetAttributes(BaseModel): + name: str | None = None + description: str | None = None + url: str | None = None + custom: dict | None = None + + class Template(BaseModel, extra="forbid", arbitrary_types_allowed=True): """Definition of a dataset for ingestion/processing/archiving in edm-recipes""" id: str acl: recipes.ValidAclValues - target_crs: str | None = None + attributes: DatasetAttributes | None = None - ## these two fields might merge to "source" or something equivalent at some point - ## for now, they are distinct so that they can be worked on separately - ## when implemented, "None" should not be valid type + target_crs: str | None = None source: Source file_format: file.Format - processing_steps: list[PreprocessingStep] = [] ## this is the original library template, included just for reference while we build out our new templates @@ -72,6 +76,9 @@ class Config(BaseModel, extra="forbid", arbitrary_types_allowed=True): """ id: str = Field(validation_alias=AliasChoices("id", "name")) + + attributes: DatasetAttributes | None = None + version: str archival_timestamp: datetime check_timestamps: list[datetime] = [] From 8f2d9e19a4dd8e284f45ada9b957841fbcab55c4 Mon Sep 17 00:00:00 2001 From: Finn van Krieken Date: Mon, 30 Sep 2024 15:17:48 -0400 Subject: [PATCH 05/13] use sorted base, rework ingest models --- dcpy/connectors/edm/recipes.py | 10 +-- dcpy/lifecycle/ingest/configure.py | 50 +++++++++----- dcpy/lifecycle/ingest/run.py | 31 ++++++--- .../templates/dcp_commercialoverlay.yml | 23 ++++--- dcpy/models/file.py | 15 +++-- dcpy/models/lifecycle/ingest.py | 67 ++++++++++++------- 6 files changed, 122 insertions(+), 74 deletions(-) diff --git a/dcpy/connectors/edm/recipes.py b/dcpy/connectors/edm/recipes.py index 6873ea7ed..5f7420966 100644 --- a/dcpy/connectors/edm/recipes.py +++ b/dcpy/connectors/edm/recipes.py @@ -73,7 +73,7 @@ def _archive_dataset(config: ingest.Config, file_path: Path, s3_path: str) -> No BUCKET, tmp_dir_path, Path(s3_path), - acl=config.acl, + acl=config.archival.acl, contents_only=True, ) @@ -103,7 +103,7 @@ def archive_dataset(config: ingest.Config, file_path: Path, *, latest: bool = Fa s3_path = s3_folder_path(config.dataset_key) _archive_dataset(config, file_path, s3_path) if latest: - set_latest(config.dataset_key, config.acl) + set_latest(config.dataset_key, config.archival.acl) def update_freshness(ds: DatasetKey, timestamp: datetime) -> datetime: @@ -113,16 +113,16 @@ def update_freshness(ds: DatasetKey, timestamp: datetime) -> datetime: raise TypeError( f"Cannot update freshness of dataset {ds} as it was archived by library, not ingest" ) - config.check_timestamps.append(timestamp) + config.archival.check_timestamps.append(timestamp) config_str = json.dumps(config.model_dump(mode="json")) s3.upload_file_obj( BytesIO(config_str.encode()), BUCKET, path, - config.acl, + config.archival.acl, metadata=s3.get_custom_metadata(BUCKET, path), ) - return config.archival_timestamp + return config.archival.archival_timestamp def get_config(name: str, version="latest") -> library.Config | ingest.Config: diff --git a/dcpy/lifecycle/ingest/configure.py b/dcpy/lifecycle/ingest/configure.py index 28fa0b49d..ad4dfd60d 100644 --- a/dcpy/lifecycle/ingest/configure.py +++ b/dcpy/lifecycle/ingest/configure.py @@ -7,6 +7,8 @@ import yaml from dcpy.models.lifecycle.ingest import ( + ArchivalMetadata, + Ingestion, LocalFileSource, S3Source, ScriptSource, @@ -97,19 +99,25 @@ def get_filename(source: Source, ds_id: str) -> str: def get_config( - dataset_id: str, version: str | None = None, mode: str | None = None + dataset_id: str, + version: str | None = None, + *, + mode: str | None = None, + template_dir: Path = TEMPLATE_DIR, ) -> Config: """Generate config object for dataset and optional version""" run_details = metadata.get_run_details() - template = read_template(dataset_id, version=version) - filename = get_filename(template.source, template.id) - version = version or get_version(template.source, run_details.timestamp) - template = read_template(dataset_id, version=version) - processing_steps = template.processing_steps + template = read_template(dataset_id, version=version, template_dir=template_dir) - if template.target_crs: + filename = get_filename(template.ingestion.source, template.id) + version = version or get_version(template.ingestion.source, run_details.timestamp) + template = read_template(dataset_id, version=version, template_dir=template_dir) + + processing_steps = template.ingestion.processing_steps + + if template.ingestion.target_crs: reprojection = PreprocessingStep( - name="reproject", args={"target_crs": template.target_crs} + name="reproject", args={"target_crs": template.ingestion.target_crs} ) processing_steps = [reprojection] + processing_steps @@ -128,18 +136,28 @@ def get_config( processing_steps = [s for s in processing_steps if s.mode is None or s.mode == mode] - # create config object - return Config( - id=template.id, - version=version, - attributes=template.attributes, + archival = ArchivalMetadata( archival_timestamp=run_details.timestamp, raw_filename=filename, acl=template.acl, - target_crs=template.target_crs, - source=template.source, - file_format=template.file_format, + ) + + ingestion = Ingestion( + target_crs=template.ingestion.target_crs, + source=template.ingestion.source, + file_format=template.ingestion.file_format, processing_mode=mode, processing_steps=processing_steps, + ) + + # create config object + return Config( + id=template.id, + version=version, + crs=ingestion.target_crs, + attributes=template.attributes, + archival=archival, + ingestion=ingestion, + columns=template.columns, run_details=run_details, ) diff --git a/dcpy/lifecycle/ingest/run.py b/dcpy/lifecycle/ingest/run.py index c11c6cf73..2e299c1e0 100644 --- a/dcpy/lifecycle/ingest/run.py +++ b/dcpy/lifecycle/ingest/run.py @@ -1,3 +1,4 @@ +import json import pandas as pd from pathlib import Path import typer @@ -28,11 +29,11 @@ def update_freshness( comparison = recipes.read_df(config.dataset) if new.equals(comparison): original_archival_timestamp = recipes.update_freshness( - config.dataset_key, config.archival_timestamp + config.dataset_key, config.archival.archival_timestamp ) - config.archival_timestamp = original_archival_timestamp + config.archival.archival_timestamp = original_archival_timestamp if latest: - recipes.set_latest(config.dataset_key, config.acl) + recipes.set_latest(config.dataset_key, config.archival.acl) return config else: raise FileExistsError( @@ -51,32 +52,40 @@ def run( output_csv: bool = False, ) -> Config: config = configure.get_config(dataset_id, version=version, mode=mode) - transform.validate_processing_steps(config.id, config.processing_steps) + transform.validate_processing_steps(config.id, config.ingestion.processing_steps) if not staging_dir: - staging_dir = TMP_DIR / dataset_id / config.archival_timestamp.isoformat() + staging_dir = ( + TMP_DIR / dataset_id / config.archival.archival_timestamp.isoformat() + ) staging_dir.mkdir(parents=True) else: staging_dir.mkdir(parents=True, exist_ok=True) # download dataset extract.download_file_from_source( - config.source, config.raw_filename, config.version, staging_dir + config.ingestion.source, + config.archival.raw_filename, + config.version, + staging_dir, ) - file_path = staging_dir / config.raw_filename + file_path = staging_dir / config.archival.raw_filename if not skip_archival: # archive to edm-recipes/raw_datasets - recipes.archive_raw_dataset(config, staging_dir / config.raw_filename) + recipes.archive_raw_dataset(config, file_path) init_parquet = "init.parquet" transform.to_parquet( - config.file_format, file_path, dir=staging_dir, output_filename=init_parquet + config.ingestion.file_format, + file_path, + dir=staging_dir, + output_filename=init_parquet, ) transform.preprocess( config.id, - config.processing_steps, + config.ingestion.processing_steps, staging_dir / init_parquet, staging_dir / config.filename, output_csv=output_csv, @@ -90,6 +99,8 @@ def run( config, staging_dir / config.filename, latest=latest ) + with open(staging_dir / "config.json", "w") as f: + json.dump(config.model_dump(mode="json"), f, indent=4) return config diff --git a/dcpy/lifecycle/ingest/templates/dcp_commercialoverlay.yml b/dcpy/lifecycle/ingest/templates/dcp_commercialoverlay.yml index b7ac740c9..d7c24c32f 100644 --- a/dcpy/lifecycle/ingest/templates/dcp_commercialoverlay.yml +++ b/dcpy/lifecycle/ingest/templates/dcp_commercialoverlay.yml @@ -8,14 +8,15 @@ attributes: as shown on the DCP zoning maps. Commercial overlay district designations are indicated in the OVERLAY attribute. url: https://www1.nyc.gov/site/planning/data-maps/open-data/dwn-gis-zoning.page#metadata -target_crs: EPSG:4326 -source: - type: edm_publishing_gis_dataset - name: dcp_commercial_overlays -file_format: - type: shapefile - crs: EPSG:2263 -processing_steps: -- name: rename_columns - args: - map: {"geom": "wkb_geometry"} +ingestion: + target_crs: EPSG:4326 + source: + type: edm_publishing_gis_dataset + name: dcp_commercial_overlays + file_format: + type: shapefile + crs: EPSG:2263 + processing_steps: + - name: rename_columns + args: + map: {"geom": "wkb_geometry"} diff --git a/dcpy/models/file.py b/dcpy/models/file.py index 4d63fdb6d..b8ee9bd1e 100644 --- a/dcpy/models/file.py +++ b/dcpy/models/file.py @@ -2,10 +2,11 @@ from pydantic import BaseModel from typing import Literal, TypeAlias +from dcpy.models.base import SortedSerializedBase from dcpy.models.geospatial import geometry -class Geometry(BaseModel, extra="forbid"): +class Geometry(SortedSerializedBase, extra="forbid"): """ Represents the geometric configuration for geospatial data. Attributes: @@ -26,7 +27,7 @@ class PointColumns(BaseModel, extra="forbid"): y: str -class Csv(BaseModel, extra="forbid"): +class Csv(SortedSerializedBase, extra="forbid"): type: Literal["csv"] unzipped_filename: str | None = None encoding: str = "utf-8" @@ -36,7 +37,7 @@ class Csv(BaseModel, extra="forbid"): geometry: Geometry | None = None -class Xlsx(BaseModel, extra="forbid"): +class Xlsx(SortedSerializedBase, extra="forbid"): type: Literal["xlsx"] unzipped_filename: str | None = None sheet_name: str @@ -44,14 +45,14 @@ class Xlsx(BaseModel, extra="forbid"): geometry: Geometry | None = None -class Shapefile(BaseModel, extra="forbid"): +class Shapefile(SortedSerializedBase, extra="forbid"): type: Literal["shapefile"] unzipped_filename: str | None = None encoding: str = "utf-8" crs: str -class Geodatabase(BaseModel, extra="forbid"): +class Geodatabase(SortedSerializedBase, extra="forbid"): type: Literal["geodatabase"] unzipped_filename: str | None = None layer: str | None = None @@ -59,7 +60,7 @@ class Geodatabase(BaseModel, extra="forbid"): crs: str -class Json(BaseModel, extra="forbid"): +class Json(SortedSerializedBase, extra="forbid"): type: Literal["json"] json_read_fn: Literal["normalize", "read_json"] json_read_kwargs: dict = {} @@ -67,7 +68,7 @@ class Json(BaseModel, extra="forbid"): geometry: Geometry | None = None -class GeoJson(BaseModel, extra="forbid"): +class GeoJson(SortedSerializedBase, extra="forbid"): type: Literal["geojson"] unzipped_filename: str | None = None encoding: str = "utf-8" diff --git a/dcpy/models/lifecycle/ingest.py b/dcpy/models/lifecycle/ingest.py index 8e516893e..a60b39f10 100644 --- a/dcpy/models/lifecycle/ingest.py +++ b/dcpy/models/lifecycle/ingest.py @@ -8,6 +8,7 @@ from dcpy.models.connectors.edm import recipes, publishing from dcpy.models.connectors import web, socrata from dcpy.models import library, file +from dcpy.models.base import SortedSerializedBase class LocalFileSource(BaseModel, extra="forbid"): @@ -38,62 +39,76 @@ class ScriptSource(BaseModel, extra="forbid"): ) -class PreprocessingStep(BaseModel): +class PreprocessingStep(SortedSerializedBase): name: str args: dict[str, Any] = {} # mode allows for certain preprocessing steps only to be run if specified at runtime mode: str | None = None -class DatasetAttributes(BaseModel): +class DatasetAttributes(SortedSerializedBase): name: str | None = None description: str | None = None url: str | None = None custom: dict | None = None + _head_sort_order = ["name", "description", "url"] -class Template(BaseModel, extra="forbid", arbitrary_types_allowed=True): - """Definition of a dataset for ingestion/processing/archiving in edm-recipes""" - id: str +class ArchivalMetadata(SortedSerializedBase): + archival_timestamp: datetime + check_timestamps: list[datetime] = [] + raw_filename: str acl: recipes.ValidAclValues - attributes: DatasetAttributes | None = None +class Ingestion(SortedSerializedBase): target_crs: str | None = None source: Source file_format: file.Format + processing_mode: str | None = None processing_steps: list[PreprocessingStep] = [] + +class Template(BaseModel, extra="forbid"): + """Definition of a dataset for ingestion/processing/archiving in edm-recipes""" + + id: str + acl: recipes.ValidAclValues + + attributes: DatasetAttributes | None = None + ingestion: Ingestion + ## this is the original library template, included just for reference while we build out our new templates library_dataset: library.DatasetDefinition | None = None -class Config(BaseModel, extra="forbid", arbitrary_types_allowed=True): +class Config(SortedSerializedBase, extra="forbid"): """ Computed Template of ingest dataset Stored in config.json in edm-recipes/raw_datasets and edm-recipes/datasets """ id: str = Field(validation_alias=AliasChoices("id", "name")) - - attributes: DatasetAttributes | None = None - version: str - archival_timestamp: datetime - check_timestamps: list[datetime] = [] - raw_filename: str - acl: recipes.ValidAclValues - - target_crs: str | None = None - - source: Source - file_format: file.Format - processing_mode: str | None = None - processing_steps: list[PreprocessingStep] = [] + crs: str | None = None + attributes: DatasetAttributes | None = None + archival: ArchivalMetadata + ingestion: Ingestion run_details: RunDetails + _head_sort_order = [ + "id", + "version", + "crs", + "attributes", + "archival", + "ingestion", + "columns", + "run_details", + ] + @property def dataset(self) -> recipes.Dataset: return recipes.Dataset( @@ -110,12 +125,14 @@ def filename(self) -> str: @property def raw_dataset_key(self) -> recipes.RawDatasetKey: - return recipes.RawDatasetKey(id=self.id, timestamp=self.archival_timestamp) + return recipes.RawDatasetKey( + id=self.id, timestamp=self.archival.archival_timestamp + ) @property def freshness(self) -> datetime: return ( - self.archival_timestamp - if not self.check_timestamps - else max(self.check_timestamps) + self.archival.archival_timestamp + if not self.archival.check_timestamps + else max(self.archival.check_timestamps) ) From aa6cd11972fdcdc8ccb28ddf7387fd88b8461b53 Mon Sep 17 00:00:00 2001 From: Finn van Krieken Date: Mon, 30 Sep 2024 15:17:58 -0400 Subject: [PATCH 06/13] adjust tests for previous commit --- dcpy/lifecycle/ingest/run.py | 5 +- .../edm/resources/ingest/config.json | 20 +++--- dcpy/test/connectors/edm/test_recipes.py | 26 ++++---- dcpy/test/lifecycle/ingest/__init__.py | 1 + .../ingest/resources/invalid_jinja.yml | 9 +-- .../resources/templates/bpl_libraries.yml | 22 +++++++ .../templates/dca_operatingbusinesses.yml | 14 +++++ .../resources/templates/dcp_addresspoints.yml | 12 ++++ .../templates/dcp_atomicpolygons.yml | 11 ++++ .../templates/dcp_pop_acs2010_demographic.yml | 24 +++++++ dcpy/test/lifecycle/ingest/test_configure.py | 63 ++++++++++++------- dcpy/test/lifecycle/ingest/test_ingest.py | 4 +- dcpy/test/lifecycle/ingest/test_run.py | 32 +++++++--- 13 files changed, 188 insertions(+), 55 deletions(-) create mode 100644 dcpy/test/lifecycle/ingest/resources/templates/bpl_libraries.yml create mode 100644 dcpy/test/lifecycle/ingest/resources/templates/dca_operatingbusinesses.yml create mode 100644 dcpy/test/lifecycle/ingest/resources/templates/dcp_addresspoints.yml create mode 100644 dcpy/test/lifecycle/ingest/resources/templates/dcp_atomicpolygons.yml create mode 100644 dcpy/test/lifecycle/ingest/resources/templates/dcp_pop_acs2010_demographic.yml diff --git a/dcpy/lifecycle/ingest/run.py b/dcpy/lifecycle/ingest/run.py index 2e299c1e0..eaff9b3af 100644 --- a/dcpy/lifecycle/ingest/run.py +++ b/dcpy/lifecycle/ingest/run.py @@ -50,8 +50,11 @@ def run( latest: bool = False, skip_archival: bool = False, output_csv: bool = False, + template_dir: Path = configure.TEMPLATE_DIR, ) -> Config: - config = configure.get_config(dataset_id, version=version, mode=mode) + config = configure.get_config( + dataset_id, version=version, mode=mode, template_dir=template_dir + ) transform.validate_processing_steps(config.id, config.ingestion.processing_steps) if not staging_dir: diff --git a/dcpy/test/connectors/edm/resources/ingest/config.json b/dcpy/test/connectors/edm/resources/ingest/config.json index 21bdcf479..437855308 100644 --- a/dcpy/test/connectors/edm/resources/ingest/config.json +++ b/dcpy/test/connectors/edm/resources/ingest/config.json @@ -1,15 +1,19 @@ { "name": "test", "version": "ingest", - "archival_timestamp": "2024-09-05T12:04:03.450135-04:00", - "raw_filename": "dummy.csv", - "acl": "public-read", - "source": { - "type": "local_file", - "path": "dummy.csv" + "archival": { + "archival_timestamp": "2024-09-05T12:04:03.450135-04:00", + "raw_filename": "dummy.csv", + "acl": "public-read" }, - "file_format": { - "type": "csv" + "ingestion": { + "source": { + "type": "local_file", + "path": "dummy.csv" + }, + "file_format": { + "type": "csv" + } }, "run_details": { "type": "manual", diff --git a/dcpy/test/connectors/edm/test_recipes.py b/dcpy/test/connectors/edm/test_recipes.py index e322a05e7..918455695 100644 --- a/dcpy/test/connectors/edm/test_recipes.py +++ b/dcpy/test/connectors/edm/test_recipes.py @@ -71,13 +71,17 @@ class TestArchiveDataset: config = ingest.Config( id=dataset, version="dummy", - archival_timestamp=datetime.now(), - acl="private", - raw_filename=raw_file_name, - source=ingest.ScriptSource( - type="script", connector="dummy", function="dummy" - ), # easiest to mock - file_format=file.Csv(type="csv"), # easiest to mock + archival=ingest.ArchivalMetadata( + archival_timestamp=datetime.now(), + acl="private", + raw_filename=raw_file_name, + ), + ingestion=ingest.Ingestion( + source=ingest.ScriptSource( + type="script", connector="dummy", function="dummy" + ), # easiest to mock + file_format=file.Csv(type="csv"), # easiest to mock + ), run_details=metadata.get_run_details(), ) @@ -247,19 +251,19 @@ def get_config(): return config config = get_config() - assert config.check_timestamps == [] - assert config.freshness == config.archival_timestamp + assert config.archival.check_timestamps == [] + assert config.freshness == config.archival.archival_timestamp timestamp = datetime.now() recipes.update_freshness(load_ingest.dataset_key, timestamp) config2 = get_config() - assert config2.check_timestamps == [timestamp] + assert config2.archival.check_timestamps == [timestamp] assert config2.freshness == timestamp timestamp2 = datetime.now() recipes.update_freshness(load_ingest.dataset_key, timestamp2) config3 = get_config() - assert config3.check_timestamps == [timestamp, timestamp2] + assert config3.archival.check_timestamps == [timestamp, timestamp2] assert config3.freshness == timestamp2 diff --git a/dcpy/test/lifecycle/ingest/__init__.py b/dcpy/test/lifecycle/ingest/__init__.py index ac7a3657e..f5387a31c 100644 --- a/dcpy/test/lifecycle/ingest/__init__.py +++ b/dcpy/test/lifecycle/ingest/__init__.py @@ -11,6 +11,7 @@ from dcpy.test.conftest import RECIPES_BUCKET RESOURCES = Path(__file__).parent / "resources" +TEMPLATE_DIR = RESOURCES / "templates" TEST_DATA_DIR = "test_data" TEST_DATASET_NAME = "test_dataset" FAKE_VERSION = "20240101" diff --git a/dcpy/test/lifecycle/ingest/resources/invalid_jinja.yml b/dcpy/test/lifecycle/ingest/resources/invalid_jinja.yml index 1ac2567eb..46c145cef 100644 --- a/dcpy/test/lifecycle/ingest/resources/invalid_jinja.yml +++ b/dcpy/test/lifecycle/ingest/resources/invalid_jinja.yml @@ -1,6 +1,7 @@ id: dcp_atomicpolygons acl: public-read -source: - type: file_download - url: https://s-media.nyc.gov/agencies/dcp/assets/files/zip/{{ dummy_jinja_var }}/bytes/nyap_{{ version }}.zip -file_format: +ingestion: + source: + type: file_download + url: https://s-media.nyc.gov/agencies/dcp/assets/files/zip/{{ dummy_jinja_var }}/bytes/nyap_{{ version }}.zip + file_format: diff --git a/dcpy/test/lifecycle/ingest/resources/templates/bpl_libraries.yml b/dcpy/test/lifecycle/ingest/resources/templates/bpl_libraries.yml new file mode 100644 index 000000000..8773a84d9 --- /dev/null +++ b/dcpy/test/lifecycle/ingest/resources/templates/bpl_libraries.yml @@ -0,0 +1,22 @@ +id: bpl_libraries +acl: public-read + +ingestion: + source: + type: api + endpoint: https://www.bklynlibrary.org/locations/json + format: json + file_format: + type: json + json_read_fn: normalize + json_read_kwargs: { "record_path": ["locations"] } + geometry: + geom_column: data.position + crs: EPSG:4326 + format: + point_xy_str: "y, x" + processing_steps: + - name: clean_column_names + args: + replace: + "data.": "" diff --git a/dcpy/test/lifecycle/ingest/resources/templates/dca_operatingbusinesses.yml b/dcpy/test/lifecycle/ingest/resources/templates/dca_operatingbusinesses.yml new file mode 100644 index 000000000..5cc706991 --- /dev/null +++ b/dcpy/test/lifecycle/ingest/resources/templates/dca_operatingbusinesses.yml @@ -0,0 +1,14 @@ +id: dca_operatingbusinesses +acl: public-read + +ingestion: + source: + type: socrata + org: nyc + uid: w7w3-xahh + format: csv + file_format: + type: csv + geometry: + geom_column: location + crs: EPSG:4326 diff --git a/dcpy/test/lifecycle/ingest/resources/templates/dcp_addresspoints.yml b/dcpy/test/lifecycle/ingest/resources/templates/dcp_addresspoints.yml new file mode 100644 index 000000000..dd854bcce --- /dev/null +++ b/dcpy/test/lifecycle/ingest/resources/templates/dcp_addresspoints.yml @@ -0,0 +1,12 @@ +id: dcp-addresspoints +acl: public-read + +ingestion: + source: + type: edm_publishing_gis_dataset + name: dcp_address_points + file_format: + unzipped_filename: dcp_address_points.shp + type: shapefile + crs: EPSG:2263 + target_crs: EPSG:4326 diff --git a/dcpy/test/lifecycle/ingest/resources/templates/dcp_atomicpolygons.yml b/dcpy/test/lifecycle/ingest/resources/templates/dcp_atomicpolygons.yml new file mode 100644 index 000000000..8ce4c3023 --- /dev/null +++ b/dcpy/test/lifecycle/ingest/resources/templates/dcp_atomicpolygons.yml @@ -0,0 +1,11 @@ +id: dcp_atomicpolygons +acl: public-read + +ingestion: + source: + type: file_download + url: https://s-media.nyc.gov/agencies/dcp/assets/files/zip/data-tools/bytes/nyap_{{ version }}.zip + file_format: + unzipped_filename: nyap_{{ version }}/nyap.shp + type: shapefile + crs: EPSG:2263 diff --git a/dcpy/test/lifecycle/ingest/resources/templates/dcp_pop_acs2010_demographic.yml b/dcpy/test/lifecycle/ingest/resources/templates/dcp_pop_acs2010_demographic.yml new file mode 100644 index 000000000..706afeeae --- /dev/null +++ b/dcpy/test/lifecycle/ingest/resources/templates/dcp_pop_acs2010_demographic.yml @@ -0,0 +1,24 @@ +id: dcp_pop_acs2010_demographic +acl: public-read + +ingestion: + source: + type: local_file + path: .library/upload/CCD2023_ACS0610Data_for1822Update.xlsx + file_format: + type: xlsx + sheet_name: CCD2023_Dem0610 + dtype: + GeoID: str + processing_steps: + - name: clean_column_names + args: + lower: true + - name: append_prev + mode: append + - name: upsert_column_of_previous_version + args: + key: [geotype, geoid] + insert_behavior: error + missing_key_behavior: error + mode: update_column diff --git a/dcpy/test/lifecycle/ingest/test_configure.py b/dcpy/test/lifecycle/ingest/test_configure.py index d7d0bc6d4..779ce5ceb 100644 --- a/dcpy/test/lifecycle/ingest/test_configure.py +++ b/dcpy/test/lifecycle/ingest/test_configure.py @@ -11,7 +11,7 @@ from dcpy.lifecycle.ingest import configure from dcpy.test.conftest import mock_request_get -from . import RESOURCES, TEST_DATASET_NAME, Sources, SOURCE_FILENAMES +from . import RESOURCES, TEST_DATASET_NAME, Sources, SOURCE_FILENAMES, TEMPLATE_DIR def test_jinja_vars(): @@ -28,18 +28,20 @@ class TestReadTemplate: """ def test_simple(self): - template = configure.read_template("bpl_libraries") - assert isinstance(template.source, web.GenericApiSource) + template = configure.read_template("bpl_libraries", template_dir=TEMPLATE_DIR) + assert isinstance(template.ingestion.source, web.GenericApiSource) assert isinstance( - template.file_format, + template.ingestion.file_format, file.Json, ) def test_jinja(self): - template = configure.read_template("dcp_atomicpolygons", version="test") - assert isinstance(template.source, web.FileDownloadSource) + template = configure.read_template( + "dcp_atomicpolygons", version="test", template_dir=TEMPLATE_DIR + ) + assert isinstance(template.ingestion.source, web.FileDownloadSource) assert isinstance( - template.file_format, + template.ingestion.file_format, file.Shapefile, ) @@ -93,37 +95,52 @@ def test_get_filename_invalid_source(): class TestGetConfig: def test_standard(self): - config = configure.get_config("dca_operatingbusinesses") + config = configure.get_config( + "dca_operatingbusinesses", template_dir=TEMPLATE_DIR + ) # ensure no reprojection step # ensure default 'clean_column_names' step is added - assert len(config.processing_steps) == 1 - assert config.processing_steps[0].name == "clean_column_names" + assert len(config.ingestion.processing_steps) == 1 + assert config.ingestion.processing_steps[0].name == "clean_column_names" def test_clean_column_names_defined(self): - config = configure.get_config("bpl_libraries") + config = configure.get_config("bpl_libraries", template_dir=TEMPLATE_DIR) # ensure no reprojection step # ensure default 'clean_column_names' step is added - assert len(config.processing_steps) == 1 - assert config.processing_steps[0].name == "clean_column_names" - assert config.processing_steps[0].args == {"replace": {"data.": ""}} + assert len(config.ingestion.processing_steps) == 1 + assert config.ingestion.processing_steps[0].name == "clean_column_names" + assert config.ingestion.processing_steps[0].args == {"replace": {"data.": ""}} def test_reproject(self): - config = configure.get_config("dcp_addresspoints", version="24c") - assert len(config.processing_steps) == 2 - assert config.processing_steps[0].name == "reproject" + config = configure.get_config( + "dcp_addresspoints", version="24c", template_dir=TEMPLATE_DIR + ) + assert len(config.ingestion.processing_steps) == 2 + assert config.ingestion.processing_steps[0].name == "reproject" def test_no_mode(self): - standard = configure.get_config("dcp_pop_acs2010_demographic", version="test") - assert standard.processing_steps - assert "append_prev" not in [s.name for s in standard.processing_steps] + standard = configure.get_config( + "dcp_pop_acs2010_demographic", version="test", template_dir=TEMPLATE_DIR + ) + assert standard.ingestion.processing_steps + assert "append_prev" not in [ + s.name for s in standard.ingestion.processing_steps + ] def test_mode(self): append = configure.get_config( - "dcp_pop_acs2010_demographic", version="test", mode="append" + "dcp_pop_acs2010_demographic", + version="test", + mode="append", + template_dir=TEMPLATE_DIR, ) - assert "append_prev" in [s.name for s in append.processing_steps] + assert "append_prev" in [s.name for s in append.ingestion.processing_steps] + def test_invalid_mode(self): with pytest.raises(ValueError): configure.get_config( - "dcp_pop_acs2010_demographic", version="test", mode="fake_mode" + "dcp_pop_acs2010_demographic", + version="test", + mode="fake_mode", + template_dir=TEMPLATE_DIR, ) diff --git a/dcpy/test/lifecycle/ingest/test_ingest.py b/dcpy/test/lifecycle/ingest/test_ingest.py index 79fb298a1..5af6cbc6b 100644 --- a/dcpy/test/lifecycle/ingest/test_ingest.py +++ b/dcpy/test/lifecycle/ingest/test_ingest.py @@ -11,4 +11,6 @@ def test_validate_all_datasets(): with open(file, "r") as f: s = yaml.safe_load(f) template = Template(**s) - transform.validate_processing_steps(template.id, template.processing_steps) + transform.validate_processing_steps( + template.id, template.ingestion.processing_steps + ) diff --git a/dcpy/test/lifecycle/ingest/test_run.py b/dcpy/test/lifecycle/ingest/test_run.py index 4cc146365..fb3115989 100644 --- a/dcpy/test/lifecycle/ingest/test_run.py +++ b/dcpy/test/lifecycle/ingest/test_run.py @@ -9,7 +9,7 @@ from dcpy.lifecycle.ingest.run import run, TMP_DIR from dcpy.test.conftest import mock_request_get -from . import FAKE_VERSION +from . import FAKE_VERSION, TEMPLATE_DIR DATASET = "bpl_libraries" S3_PATH = f"datasets/{DATASET}/{FAKE_VERSION}/{DATASET}.parquet" @@ -19,14 +19,19 @@ @mock.patch("requests.get", side_effect=mock_request_get) def test_run(mock_request_get, create_buckets, create_temp_filesystem): """Mainly an integration test to make sure code runs without error""" - run(dataset_id=DATASET, version=FAKE_VERSION, staging_dir=create_temp_filesystem) + run( + dataset_id=DATASET, + version=FAKE_VERSION, + staging_dir=create_temp_filesystem, + template_dir=TEMPLATE_DIR, + ) assert len(s3.get_subfolders(RECIPES_BUCKET, RAW_FOLDER)) == 1 assert s3.object_exists(RECIPES_BUCKET, S3_PATH) @mock.patch("requests.get", side_effect=mock_request_get) def test_run_default_folder(mock_request_get, create_buckets, create_temp_filesystem): - run(dataset_id=DATASET, version=FAKE_VERSION) + run(dataset_id=DATASET, version=FAKE_VERSION, template_dir=TEMPLATE_DIR) assert s3.object_exists(RECIPES_BUCKET, S3_PATH) assert (TMP_DIR / DATASET).exists() shutil.rmtree(TMP_DIR) @@ -39,25 +44,32 @@ def test_skip_archival(mock_request_get, create_buckets, create_temp_filesystem) version=FAKE_VERSION, staging_dir=create_temp_filesystem, skip_archival=True, + template_dir=TEMPLATE_DIR, ) assert not s3.object_exists(RECIPES_BUCKET, S3_PATH) @mock.patch("requests.get", side_effect=mock_request_get) def test_run_update_freshness(mock_request_get, create_buckets, create_temp_filesystem): - run(dataset_id=DATASET, version=FAKE_VERSION, staging_dir=create_temp_filesystem) + run( + dataset_id=DATASET, + version=FAKE_VERSION, + staging_dir=create_temp_filesystem, + template_dir=TEMPLATE_DIR, + ) config = recipes.get_config(DATASET, FAKE_VERSION) - assert config.check_timestamps == [] + assert config.archival.check_timestamps == [] run( dataset_id=DATASET, version=FAKE_VERSION, staging_dir=create_temp_filesystem, latest=True, + template_dir=TEMPLATE_DIR, ) config2 = recipes.get_config(DATASET, FAKE_VERSION) - assert len(config2.check_timestamps) == 1 + assert len(config2.archival.check_timestamps) == 1 assert config2.freshness > config.freshness latest = recipes.get_config(DATASET) @@ -69,7 +81,12 @@ def test_run_update_freshness_fails_if_data_diff( mock_request_get, create_buckets, create_temp_filesystem ): """Mainly an integration test to make sure code runs without error""" - run(dataset_id=DATASET, version=FAKE_VERSION, staging_dir=create_temp_filesystem) + run( + dataset_id=DATASET, + version=FAKE_VERSION, + staging_dir=create_temp_filesystem, + template_dir=TEMPLATE_DIR, + ) # this time, replace the dataframe with a different one in the middle of the ingest process with mock.patch("dcpy.utils.geospatial.parquet.read_df") as patch_read_df: @@ -82,4 +99,5 @@ def test_run_update_freshness_fails_if_data_diff( dataset_id=DATASET, version=FAKE_VERSION, staging_dir=create_temp_filesystem, + template_dir=TEMPLATE_DIR, ) From 9a2c5bc366d080215f14d37939983e2d25f8f151 Mon Sep 17 00:00:00 2001 From: Finn van Krieken Date: Mon, 30 Sep 2024 15:42:25 -0400 Subject: [PATCH 07/13] add acs2010 datasets to 'production' --- .../dcp_pop_acs2010_demographic.yml | 52 ----------------- .../dcp_pop_acs2010_economic.yml | 52 ----------------- .../dev_templates/dcp_pop_acs2010_social.yml | 56 ------------------- .../templates/dcp_pop_acs2010_demographic.yml | 28 ++++++++++ .../templates/dcp_pop_acs2010_economic.yml | 28 ++++++++++ .../dcp_pop_acs2010_housing.yml | 48 +++++++++------- .../templates/dcp_pop_acs2010_social.yml | 28 ++++++++++ 7 files changed, 111 insertions(+), 181 deletions(-) delete mode 100644 dcpy/lifecycle/ingest/dev_templates/dcp_pop_acs2010_demographic.yml delete mode 100644 dcpy/lifecycle/ingest/dev_templates/dcp_pop_acs2010_economic.yml delete mode 100644 dcpy/lifecycle/ingest/dev_templates/dcp_pop_acs2010_social.yml create mode 100644 dcpy/lifecycle/ingest/templates/dcp_pop_acs2010_demographic.yml create mode 100644 dcpy/lifecycle/ingest/templates/dcp_pop_acs2010_economic.yml rename dcpy/lifecycle/ingest/{dev_templates => templates}/dcp_pop_acs2010_housing.yml (50%) create mode 100644 dcpy/lifecycle/ingest/templates/dcp_pop_acs2010_social.yml diff --git a/dcpy/lifecycle/ingest/dev_templates/dcp_pop_acs2010_demographic.yml b/dcpy/lifecycle/ingest/dev_templates/dcp_pop_acs2010_demographic.yml deleted file mode 100644 index fadcc0c24..000000000 --- a/dcpy/lifecycle/ingest/dev_templates/dcp_pop_acs2010_demographic.yml +++ /dev/null @@ -1,52 +0,0 @@ -id: dcp_pop_acs2010_demographic -acl: public-read -source: - type: local_file - path: .library/upload/CCD2023_ACS0610Data_for1822Update.xlsx -file_format: - type: xlsx - sheet_name: CCD2023_Dem0610 - dtype: - GeoID: str -processing_steps: -- name: clean_column_names - args: - lower: true -- name: append_prev - mode: append -- name: upsert_column_of_previous_version - args: - key: [geotype, geoid] - insert_behavior: error - missing_key_behavior: error - mode: update_column - -library_dataset: - name: dcp_pop_acs2010_demographic - version: "" - acl: public-read - source: - script: - name: excel - path: https://nyc3.digitaloceanspaces.com/edm-recipes/inbox/dcp_pop_acs2010/{{ version }}/dcp_pop_acs.xlsx - sheet_name: Dem0610 - geometry: - SRS: null - type: NONE - - destination: - geometry: - SRS: null - type: NONE - fields: [] - sql: null - - info: - description: | - ## 2010 ACS file from Population - This file is produced internally by the Population division. 2010 version is used as a reference dataset - for the latest ACS data, and occasionally is modified so these different subsections are archived as their - own recipe datasets so that they can easily be updated individually - - url: null - dependents: [] diff --git a/dcpy/lifecycle/ingest/dev_templates/dcp_pop_acs2010_economic.yml b/dcpy/lifecycle/ingest/dev_templates/dcp_pop_acs2010_economic.yml deleted file mode 100644 index fdeb2f159..000000000 --- a/dcpy/lifecycle/ingest/dev_templates/dcp_pop_acs2010_economic.yml +++ /dev/null @@ -1,52 +0,0 @@ -id: dcp_pop_acs2010_economic -acl: public-read -source: - type: local_file - path: .library/upload/CCD2023_ACS0610Data_for1822Update.xlsx -file_format: - type: xlsx - sheet_name: CCD2023_Econ0610_NotInflated - dtype: - GeoID: str -processing_steps: -- name: clean_column_names - args: - lower: true -- name: append_prev - mode: append -- name: upsert_column_of_previous_version - args: - key: [geotype, geoid] - insert_behavior: error - missing_key_behavior: error - mode: update_column - -library_dataset: - name: dcp_pop_acs2010_economic - version: "" - acl: public-read - source: - script: - name: excel - path: https://nyc3.digitaloceanspaces.com/edm-recipes/inbox/dcp_pop_acs2010/{{ version }}/dcp_pop_acs.xlsx - sheet_name: Econ0610 - geometry: - SRS: null - type: NONE - - destination: - geometry: - SRS: null - type: NONE - fields: [] - sql: null - - info: - description: | - ## 2010 ACS file from Population - This file is produced internally by the Population division. 2010 version is used as a reference dataset - for the latest ACS data, and occasionally is modified so these different subsections are archived as their - own recipe datasets so that they can easily be updated individually - - url: null - dependents: [] diff --git a/dcpy/lifecycle/ingest/dev_templates/dcp_pop_acs2010_social.yml b/dcpy/lifecycle/ingest/dev_templates/dcp_pop_acs2010_social.yml deleted file mode 100644 index 81386e61a..000000000 --- a/dcpy/lifecycle/ingest/dev_templates/dcp_pop_acs2010_social.yml +++ /dev/null @@ -1,56 +0,0 @@ -id: dcp_pop_acs2010_social -acl: public-read -source: - type: local_file - path: .library/upload/CCD2023_ACS0610Data_for1822Update.xlsx -file_format: - type: xlsx - sheet_name: CCD2023_Social0610 - dtype: - GeoID: str -processing_steps: -- name: clean_column_names - args: - lower: true -- name: append_prev - mode: append -- name: upsert_column_of_previous_version - args: - key: [geotype, geoid] - insert_behavior: error - missing_key_behavior: error - mode: update_column - -library_dataset: - name: dcp_pop_acs2010_social - version: "" - acl: public-read - source: - script: - name: excel - path: https://nyc3.digitaloceanspaces.com/edm-recipes/inbox/dcp_pop_acs2010_social/{{ version }}/ACS0610SocialData_for1822Update.xlsx - sheet_name: Social0610_ModFor1822Update - geometry: - SRS: null - type: NONE - - destination: - geometry: - SRS: null - type: NONE - fields: [] - sql: null - - info: - description: | - ## 2010 ACS file from Population - This file is produced internally by the Population division. 2010 version is used as a reference dataset - for the latest ACS data, and occasionally is modified so these different subsections are archived as their - own recipe datasets so that they can easily be updated individually - - In 2024, this file had to be loaded "as_is". This doesn't load the xlsx from pop due to the way "scriptors" currently work, - but rather the csv output by the "scriptor". This was needed because the input sheet has over 2000 columns, which seems - to be a limit for gdal. - - url: null - dependents: [] diff --git a/dcpy/lifecycle/ingest/templates/dcp_pop_acs2010_demographic.yml b/dcpy/lifecycle/ingest/templates/dcp_pop_acs2010_demographic.yml new file mode 100644 index 000000000..9547713ef --- /dev/null +++ b/dcpy/lifecycle/ingest/templates/dcp_pop_acs2010_demographic.yml @@ -0,0 +1,28 @@ +id: dcp_pop_acs2010_demographic +acl: public-read + +attributes: + name: DCP Population 2010 ACS Demographic Data + description: | + This file is produced internally by the Population division. 2010 version is used as a reference dataset + for the latest ACS data, and occasionally is modified so these different subsections are archived as their + own recipe datasets so that they can easily be updated individually + +ingestion: + source: + type: local_file + path: .library/upload/CCD2023_ACS0610Data_for1822Update.xlsx + file_format: + type: xlsx + sheet_name: CCD2023_Dem0610 + dtype: + GeoID: str + processing_steps: + - name: append_prev + mode: append + - name: upsert_column_of_previous_version + args: + key: [geotype, geoid] + insert_behavior: error + missing_key_behavior: error + mode: update_column diff --git a/dcpy/lifecycle/ingest/templates/dcp_pop_acs2010_economic.yml b/dcpy/lifecycle/ingest/templates/dcp_pop_acs2010_economic.yml new file mode 100644 index 000000000..775a8db58 --- /dev/null +++ b/dcpy/lifecycle/ingest/templates/dcp_pop_acs2010_economic.yml @@ -0,0 +1,28 @@ +id: dcp_pop_acs2010_economic +acl: public-read + +attributes: + name: DCP Population 2010 ACS Economic Data + description: | + This file is produced internally by the Population division. 2010 version is used as a reference dataset + for the latest ACS data, and occasionally is modified so these different subsections are archived as their + own recipe datasets so that they can easily be updated individually + +ingestion: + source: + type: local_file + path: .library/upload/CCD2023_ACS0610Data_for1822Update.xlsx + file_format: + type: xlsx + sheet_name: CCD2023_Econ0610_NotInflated + dtype: + GeoID: str + processing_steps: + - name: append_prev + mode: append + - name: upsert_column_of_previous_version + args: + key: [geotype, geoid] + insert_behavior: error + missing_key_behavior: error + mode: update_column diff --git a/dcpy/lifecycle/ingest/dev_templates/dcp_pop_acs2010_housing.yml b/dcpy/lifecycle/ingest/templates/dcp_pop_acs2010_housing.yml similarity index 50% rename from dcpy/lifecycle/ingest/dev_templates/dcp_pop_acs2010_housing.yml rename to dcpy/lifecycle/ingest/templates/dcp_pop_acs2010_housing.yml index d738d4251..881a5bb94 100644 --- a/dcpy/lifecycle/ingest/dev_templates/dcp_pop_acs2010_housing.yml +++ b/dcpy/lifecycle/ingest/templates/dcp_pop_acs2010_housing.yml @@ -1,26 +1,32 @@ id: dcp_pop_acs2010_housing acl: public-read -source: - type: s3 - bucket: edm-recipes - key: inbox/dcp/dcp_pop_acs2010_housing/20240624/CorrectedVarsOnly_Housing0610_2020Geog.xlsx -file_format: - type: xlsx - sheet_name: Housing Vars Corrected - dtype: - GeoID: str -processing_steps: -- name: clean_column_names - args: - lower: true -- name: append_prev - mode: append -- name: upsert_column_of_previous_version - args: - key: [geotype, geoid] - insert_behavior: error - missing_key_behavior: error - mode: update_column + +attributes: + name: DCP Population 2010 ACS Housing Data + description: | + This file is produced internally by the Population division. 2010 version is used as a reference dataset + for the latest ACS data, and occasionally is modified so these different subsections are archived as their + own recipe datasets so that they can easily be updated individually + +ingestion: + source: + type: s3 + bucket: edm-recipes + key: inbox/dcp/dcp_pop_acs2010_housing/20240624/CorrectedVarsOnly_Housing0610_2020Geog.xlsx + file_format: + type: xlsx + sheet_name: Housing Vars Corrected + dtype: + GeoID: str + processing_steps: + - name: append_prev + mode: append + - name: upsert_column_of_previous_version + args: + key: [geotype, geoid] + insert_behavior: error + missing_key_behavior: error + mode: update_column library_dataset: name: dcp_pop_acs2010_housing diff --git a/dcpy/lifecycle/ingest/templates/dcp_pop_acs2010_social.yml b/dcpy/lifecycle/ingest/templates/dcp_pop_acs2010_social.yml new file mode 100644 index 000000000..cf2ce162b --- /dev/null +++ b/dcpy/lifecycle/ingest/templates/dcp_pop_acs2010_social.yml @@ -0,0 +1,28 @@ +id: dcp_pop_acs2010_social +acl: public-read + +attributes: + name: DCP Population 2010 ACS Demographic Data + description: | + This file is produced internally by the Population division. 2010 version is used as a reference dataset + for the latest ACS data, and occasionally is modified so these different subsections are archived as their + own recipe datasets so that they can easily be updated individually + +ingestion: + source: + type: local_file + path: .library/upload/CCD2023_ACS0610Data_for1822Update.xlsx + file_format: + type: xlsx + sheet_name: CCD2023_Social0610 + dtype: + GeoID: str + processing_steps: + - name: append_prev + mode: append + - name: upsert_column_of_previous_version + args: + key: [geotype, geoid] + insert_behavior: error + missing_key_behavior: error + mode: update_column From da3e437b3d3f17a36b458ac3bcd02064327e8f95 Mon Sep 17 00:00:00 2001 From: Finn van Krieken Date: Mon, 30 Sep 2024 16:01:29 -0400 Subject: [PATCH 08/13] add columns to ingest templates --- dcpy/lifecycle/ingest/run.py | 1 + .../templates/dcp_commercialoverlay.yml | 10 ++++ dcpy/lifecycle/ingest/transform.py | 35 ++++++++----- dcpy/models/lifecycle/ingest.py | 8 +++ dcpy/test/lifecycle/ingest/test_transform.py | 49 ++++++++++++++++--- 5 files changed, 84 insertions(+), 19 deletions(-) diff --git a/dcpy/lifecycle/ingest/run.py b/dcpy/lifecycle/ingest/run.py index eaff9b3af..c771730ff 100644 --- a/dcpy/lifecycle/ingest/run.py +++ b/dcpy/lifecycle/ingest/run.py @@ -89,6 +89,7 @@ def run( transform.preprocess( config.id, config.ingestion.processing_steps, + config.columns, staging_dir / init_parquet, staging_dir / config.filename, output_csv=output_csv, diff --git a/dcpy/lifecycle/ingest/templates/dcp_commercialoverlay.yml b/dcpy/lifecycle/ingest/templates/dcp_commercialoverlay.yml index d7c24c32f..90a30b599 100644 --- a/dcpy/lifecycle/ingest/templates/dcp_commercialoverlay.yml +++ b/dcpy/lifecycle/ingest/templates/dcp_commercialoverlay.yml @@ -20,3 +20,13 @@ ingestion: - name: rename_columns args: map: {"geom": "wkb_geometry"} + +columns: +- id: overlay + data_type: text +- id: shape_leng + data_type: decimal +- id: shape_area + data_type: decimal +- id: wkb_geometry + data_type: geometry diff --git a/dcpy/lifecycle/ingest/transform.py b/dcpy/lifecycle/ingest/transform.py index fb1e17f0d..c87697dc3 100644 --- a/dcpy/lifecycle/ingest/transform.py +++ b/dcpy/lifecycle/ingest/transform.py @@ -2,11 +2,10 @@ import geopandas as gpd import pandas as pd from pathlib import Path -import shutil from typing import Callable, Literal from dcpy.models import file -from dcpy.models.lifecycle.ingest import PreprocessingStep +from dcpy.models.lifecycle.ingest import PreprocessingStep, Column from dcpy.utils import data, introspect from dcpy.utils.geospatial import transform, parquet as geoparquet from dcpy.utils.logging import logger @@ -276,24 +275,36 @@ def validate_processing_steps( return compiled_steps +def validate_columns(df: pd.DataFrame, columns: list[Column]) -> None: + """ + For now, simply validates that expected columns exists + Does not validate data_type or other data checks + """ + missing_columns = [c.id for c in columns if c.id not in df.columns] + if missing_columns: + raise ValueError( + f"Columns {missing_columns} defined in template but not found in processed dataset.\n Existing columns: {list(df.columns)}" + ) + + def preprocess( dataset_id: str, processing_steps: list[PreprocessingStep], + expected_columns: list[Column], input_path: Path, output_path: Path, output_csv: bool = False, ): """Validates and runs preprocessing steps defined in config object""" - if len(processing_steps) == 0: - shutil.copy(input_path, output_path) - else: - df = geoparquet.read_df(input_path) - compiled_steps = validate_processing_steps(dataset_id, processing_steps) + df = geoparquet.read_df(input_path) + compiled_steps = validate_processing_steps(dataset_id, processing_steps) + + for step in compiled_steps: + df = step(df) - for step in compiled_steps: - df = step(df) + validate_columns(df, expected_columns) - if output_csv: - df.to_csv(output_path.parent / f"{dataset_id}.csv") + if output_csv: + df.to_csv(output_path.parent / f"{dataset_id}.csv") - df.to_parquet(output_path) + df.to_parquet(output_path) diff --git a/dcpy/models/lifecycle/ingest.py b/dcpy/models/lifecycle/ingest.py index a60b39f10..1d8a985c2 100644 --- a/dcpy/models/lifecycle/ingest.py +++ b/dcpy/models/lifecycle/ingest.py @@ -70,6 +70,12 @@ class Ingestion(SortedSerializedBase): processing_steps: list[PreprocessingStep] = [] +class Column(SortedSerializedBase): + id: str + data_type: Literal["text", "integer", "decimal", "geometry", "bool", "datetime"] + description: str | None = None + + class Template(BaseModel, extra="forbid"): """Definition of a dataset for ingestion/processing/archiving in edm-recipes""" @@ -78,6 +84,7 @@ class Template(BaseModel, extra="forbid"): attributes: DatasetAttributes | None = None ingestion: Ingestion + columns: list[Column] = [] ## this is the original library template, included just for reference while we build out our new templates library_dataset: library.DatasetDefinition | None = None @@ -96,6 +103,7 @@ class Config(SortedSerializedBase, extra="forbid"): attributes: DatasetAttributes | None = None archival: ArchivalMetadata ingestion: Ingestion + columns: list[Column] = [] run_details: RunDetails _head_sort_order = [ diff --git a/dcpy/test/lifecycle/ingest/test_transform.py b/dcpy/test/lifecycle/ingest/test_transform.py index f67a55fe3..c1f1cfd25 100644 --- a/dcpy/test/lifecycle/ingest/test_transform.py +++ b/dcpy/test/lifecycle/ingest/test_transform.py @@ -8,7 +8,7 @@ from unittest import TestCase, mock from dcpy.models.file import Format -from dcpy.models.lifecycle.ingest import PreprocessingStep +from dcpy.models.lifecycle.ingest import PreprocessingStep, Column from dcpy.utils import data from dcpy.utils.geospatial import parquet as geoparquet @@ -281,17 +281,22 @@ def test_pd_series_func_str(self): def test_preprocess_no_steps(create_temp_filesystem: Path): - input = create_temp_filesystem / "input.txt" - output = create_temp_filesystem / "output.txt" - input.touch() + input = RESOURCES / TEST_DATA_DIR / "test.parquet" + output = create_temp_filesystem / "output.parquet" + assert ( + not output.exists() + ), "Error in setup of test - output file should not exist yet" - transform.preprocess(TEST_DATASET_NAME, [], input, output) + transform.preprocess(TEST_DATASET_NAME, [], [], input, output) assert output.exists() def test_preprocess(create_temp_filesystem: Path): input = RESOURCES / TEST_DATA_DIR / "test.parquet" output = create_temp_filesystem / "output.parquet" + assert ( + not output.exists() + ), "Error in setup of test - output file should not exist yet" expected = RESOURCES / TEST_DATA_DIR / "output.parquet" steps = [ @@ -301,11 +306,41 @@ def test_preprocess(create_temp_filesystem: Path): ), ] - transform.preprocess(TEST_DATASET_NAME, steps, input, output) + columns = [ + Column(id="borough", data_type="integer"), + Column(id="block", data_type="integer"), + Column(id="lot", data_type="integer"), + Column(id="bbl", data_type="text"), + Column(id="text", data_type="text"), + Column(id="wkt", data_type="geometry"), + ] + + transform.preprocess(TEST_DATASET_NAME, steps, columns, input, output) assert output.exists() output_df = geoparquet.read_df(output) expected_df = geoparquet.read_df(expected) assert output_df.equals(expected_df) - transform.preprocess(TEST_DATASET_NAME, steps, input, output, output_csv=True) + assert not (create_temp_filesystem / f"{TEST_DATASET_NAME}.csv").exists() + transform.preprocess(TEST_DATASET_NAME, steps, [], input, output, output_csv=True) assert (create_temp_filesystem / f"{TEST_DATASET_NAME}.csv").exists() + + +class TestValidateColumns: + df = pd.DataFrame({"a": [2, 3, 1], "b": ["b_1", "b_2", "c_3"]}) + + def test_validate_all_columns(self): + transform.validate_columns( + self.df, + [Column(id="a", data_type="integer"), Column(id="b", data_type="text")], + ) + + def test_validate_partial_columns(self): + transform.validate_columns(self.df, [Column(id="a", data_type="integer")]) + + def test_validate_columns_fails(self): + with pytest.raises( + ValueError, + match="defined in template but not found in processed dataset", + ): + transform.validate_columns(self.df, [Column(id="c", data_type="integer")]) From d6d4e4adf0a7e2d8431cbe86c9bc959dc90f9cbe Mon Sep 17 00:00:00 2001 From: Finn van Krieken Date: Wed, 2 Oct 2024 18:07:14 +0000 Subject: [PATCH 09/13] fix renaming of geom column --- dcpy/lifecycle/ingest/transform.py | 2 ++ .../ingest/resources/test_data/renamed.parquet | Bin 0 -> 9748 bytes dcpy/test/lifecycle/ingest/test_transform.py | 8 ++++++++ 3 files changed, 10 insertions(+) create mode 100644 dcpy/test/lifecycle/ingest/resources/test_data/renamed.parquet diff --git a/dcpy/lifecycle/ingest/transform.py b/dcpy/lifecycle/ingest/transform.py index c87697dc3..14485e646 100644 --- a/dcpy/lifecycle/ingest/transform.py +++ b/dcpy/lifecycle/ingest/transform.py @@ -96,6 +96,8 @@ def rename_columns( self, df: pd.DataFrame, map: dict[str, str], drop_others=False ) -> pd.DataFrame: renamed = df.copy() + if isinstance(renamed, gpd.GeoDataFrame) and renamed.geometry.name in map: + renamed.rename_geometry(map.pop(renamed.geometry.name), inplace=True) renamed = renamed.rename(columns=map, errors="raise") if drop_others: renamed = renamed[list(map.values())] diff --git a/dcpy/test/lifecycle/ingest/resources/test_data/renamed.parquet b/dcpy/test/lifecycle/ingest/resources/test_data/renamed.parquet new file mode 100644 index 0000000000000000000000000000000000000000..3d5c88c21fadd1aefb6e6ccc7cb20ee83dc51e62 GIT binary patch literal 9748 zcmcgyNsJ?DR<<*{s%xr?ri99=Oel~^dQyd^Ix`mA>1YP2!*+Trhn-&1OEj${wzR{J zt?ckxw1haIK5!T$gn$!*IgP|BE+fsMG?xJh%^9^uFlr&-2oi_Uhy(Baf7t0<3Mwm0 zovLy=-v9q^```QCdr@^P*4W59k;jo}DRMUw`ngc({a82@3dP`W2*0qAP~=f$`)TBU z1Y zw>}T=W%ll^&h9(~nW4x=2tFRduTTh{pGKIa4qJ^oK;yY2AIC2-HWrJ+Z`^)Smv^4r zA&YDJY;J9Aeh}LG>&cl4Tvs(6yEyN@ZOI=-;KwKwbR#X^l{|l64;GiiEeL# z{3OGN-@O-)CBpB<1U^nl#8sf#R^;a|cxe-M5WU`eLjAP4`FS?9_vM{)#WpS1Cu)5s zy!Dsiy>H#w`@470Nx{);mGUA^DRPe}m5kj-@5B-U&oXf#8Gb(+X9X_7Gdz3e{oi1d z49D&p4snOr}4K59Hu zuDPm5*X_IJ?!I_%ZvQ*JF{79FPa9ibZtVTrdwW0m8eZP3n7zsYe+CYq_o@G81auB# z+Wo~(e)s#`AAIofpa1Yj>C}&nmyw-h?{6OcLu;}7()s+q{-l*ucXndR@8uuKll_ues?6VSjn!HYHzVsJ;Y6g8Tm^djAjC z?C%<`O#$WI%d(^odJiO%u-FwUNzW2pf5kFgh z_}C*!JeD1y}7GnvZnb@D*M0 z6_DbLE$g$V(KV@9zOIqDB)?s0kU&>u1;n>22~zW_q=5K#B_UQ{6}6&68M*rB2L?hr!d#ja(lLuEL};PmFoaOh5v+r$`-OxHmwd$`Z+GrJH_ zyMgSrMQ)$l=XXB~ZEu2p+y8$5#jZCrZQvK=2r^h|ex5ZiC=JCM4g|L8ba7+rSqf!q zLrj_%yB|;h$CaM%d!vsI4!oh;-*<=I1G+Rim@xZ?gxVju&OW8?<9B4V%$}U#c87}B zGqrTBPJ9()V=GsWCSyo}%o*FKhBGp3)xwlJPo9=LwDc(O&<)?zqV@U6H|!`YBxBLl ze&ArkNpKn*{p7XIK4}aUXJq=nik^MF6S+g4xyknAA%BzYEE~TOdp2 zLkMHVH%><+K!~EFvAT@Gl?4ictU$}PP+0{SsA>9R3igVs4vmQk6q<`Ag`kjb4gvK8 zn&S@r9_j=|!bcr-qdPQ=i(z%n-F3RJW>53x0((Ol5t)~{e~M<0G?x~rJa}l~I=*5> zHP`lB$8h|pdluz0QRhXlRyqS2 zh6|t*qt~~S-Kgt@-LM(ee2troEpM_5=xg`@V0d#Rop+C2)A4sd1xA6opL`gD7-8ak zjEg@7l@8P}47KZ#7?hw`i(b2Q zj@|tX@b-aNt5sSbQN-K+$EOGQb0}6tk^F^U;xkcyYAZW8nnGXYW~|zZnH5tsWv)3< z>xEGvT@2{)MWOF@jd{_LTY?{aw^X}4k#p8qo);3Zw_RrnW+^S29ljXsFAOxU?8+^+ zhi3|QkJ0m~Ma4``K=wp2;}gx+Cd)nAT;Qa1j4$!2xypIZHBK0poN`~uHT_OKPM!X3bghA|GGFXzxt`Zy%Jq{u(RG_kn7%Qg$ctGQ~I_$t?Al$O*z;jM)Zel28<;+0Ib`|L=I!FjLX zFr%CMLv(A4qglBj-sU`4UQ};$o-g+cw>V#zNs9rTXLXEWxiuRp?bJf$tU(a(>-Vu9 zQwz9zug$SN#TMpmE*n$Y62>McEwq@_ka*;$wBj7bhujowHJ4?;_hXr0yJlsLL*z0g*RWAT}(9?kQ$ZvgJI< z5t5ta{y~odch)96A)kdg<}Boa)#jHmL~}sno2H((ra>N9YHsFf`SL_*)p{z;pPrGk zNN!(>@%3C80dG&wHwVCn+F=*xa$EM0u5uJ(8sGz}XU)^>W*=gDqM8Ip#*lxELgle4 z*haCVWaEF;P!Dp?IcqMr%8(Pnn96DP zG`>pK^i~}5k?1s#TRQAJG0&PTrR_~r2mO=+AG1@HivvbfyDHb| zN=&t@1^HGosZYRWiiws^ayy`#9b1S6@wQNT$knW{?oWsVCN8BB19c1X|LlUj5KE=T zQH-qHfUQo=ZF4iUoB9LRD-99v#OeT^!P-Q%_2pe901IMLp2UeKquqduoi^w|?5o9z ze+JF!38PV*=KvouHZ#?Q)RzUst%8jh#RBRU<82;sx=G_@UaUecjrDYqHr*z7XqkmI-+^4{`=;+O*S(Lv4`gz1z*X#x3KqFathg z3F7XoDrT^rVto`d4esC|>55Q>p)SM60M;-kp&sU%u{Okqns555gIJJK+r&2k%@RD- zu35&?H|#Puu?Evxdnvz6$cYK^l%ejCxV0_FPq_c{x^-obmdk=J*#*4^S_?3yJM`W# z4#s+mokIO*jCP9FOIt7%%txAkKGeS+@L)Q4&$&1Ym~f@ObnMxr!;N08S5D|Fv@QpJ zE}yO84CXlG%raj=MmrV64%J!ZiqA1eX+A@2<+51EN&cu#bEIO-Uaa9GHOOK4s-ISC zU!8x7BdWXXL0uWNx%1i#c|g~kR1adJr?G2y;b4t8zjLtF*Yei7?6+`K3dsKEX_AJ~KVLHczu z58sYb@SGPlQ9Q2ZL@`sqHTXh)j=Ew8n|q+aUef zBK%4bf6`|h7g9tv!LM>A4tOS^Jo0>;gn0q+tUT9fe}C9cpZD|dJ<(1VT|Hl$Y72Kl zdv+IVw^M>njP&enX`%ymmgmA0{P$dO0RPPRqQmu|_dXhvXKQMfvQyf3G^b_-p0D)u zEZsw}F4)s#^hP1mWk8P+=yA2L$9;~k(LT7T75bi52x{&B*+)ZPrg1QD*fT5Oz3T1J z<`(}3Jkz}ss5ijZVxECsXfO0ss1x|K*wfoJ7yJfw*rIxp8mnssa|Zfj{-gfxOlovU zA7_9+F8Jd0`g>klh+bfmCxZ^uKWjbY(*Mm=qZas_)C#2cvA>5JSf8QJq!&hi!1vdf zmvBDl^3ojaBQ{wBJd1X$_ZI;_(!&ONlDdF&sd2|`5nG| zTlwwV$k#Z@yruolE#&u)7M1#K02g(xVNtFz5qy zNSMk2=G|z93u;!U4Rb|sK6uA64tz-FIMf$04&oH&bePq^4tW0*;JJ^p80f*vmuE5L zX)?1)hO7s^U7G`Oy+4>@49dh_VFW&L)_B9&6p7I{h(jiLUtq$#gN=k(Z5HbdR!d-S zD8qb$vvS2g@55<7(Z%72zK=u#HZsBcM=~_!Fy?35-9#SUQIH zqg)?8lE^YkUTTu2iN4LC-78kg+b7?>E*BXC(=PCyC5q!%W;eOB`^tjx|QH56arO7ifm97lnS89F6 zLo%#G3rIKM=dXvhqn Date: Wed, 9 Oct 2024 14:03:05 -0400 Subject: [PATCH 10/13] add default step to convert geoms to multi --- dcpy/lifecycle/ingest/configure.py | 4 ++ dcpy/lifecycle/ingest/transform.py | 12 ++-- dcpy/models/lifecycle/ingest.py | 8 +++ .../resources/templates/dob_now_permits.yml | 10 ++++ dcpy/test/lifecycle/ingest/test_configure.py | 19 +++++-- dcpy/test/lifecycle/ingest/test_run.py | 5 +- dcpy/test/lifecycle/ingest/test_transform.py | 55 +++++++++++++++++-- dcpy/test/utils/test_geospatial.py | 43 +++++++++++++++ dcpy/utils/geospatial/transform.py | 21 +++++++ 9 files changed, 160 insertions(+), 17 deletions(-) create mode 100644 dcpy/test/lifecycle/ingest/resources/templates/dob_now_permits.yml diff --git a/dcpy/lifecycle/ingest/configure.py b/dcpy/lifecycle/ingest/configure.py index ad4dfd60d..64e563d83 100644 --- a/dcpy/lifecycle/ingest/configure.py +++ b/dcpy/lifecycle/ingest/configure.py @@ -129,6 +129,10 @@ def get_config( ) processing_steps.append(clean_column_names) + if "multi" not in processing_step_names and template.has_geom: + multi = PreprocessingStep(name="multi") + processing_steps.append(multi) + if mode: modes = {s.mode for s in processing_steps} if mode not in modes: diff --git a/dcpy/lifecycle/ingest/transform.py b/dcpy/lifecycle/ingest/transform.py index 14485e646..c5bbd9129 100644 --- a/dcpy/lifecycle/ingest/transform.py +++ b/dcpy/lifecycle/ingest/transform.py @@ -181,6 +181,14 @@ def strip_columns( df = df.apply(lambda x: x.str.strip() if x.dtype == "object" else x) return df + def multi(self, df: gpd.GeoDataFrame) -> gpd.GeoDataFrame: + multi_gdf = df.copy() + multi_gdf.set_geometry( + gpd.GeoSeries([transform.multi(feature) for feature in multi_gdf.geometry]), + inplace=True, + ) + return multi_gdf + def pd_series_func( self, df: pd.DataFrame, @@ -218,10 +226,6 @@ def pd_series_func( transformed[output_column_name or column_name] = func(**kwargs) # type: ignore return transformed - def no_arg_function(self, df: pd.DataFrame) -> pd.DataFrame: - """Dummy/stub for testing. Can be dropped if we implement actual function with no args other than df""" - return df - def validate_pd_series_func( *, function_name: str, column_name: str = "", **kwargs diff --git a/dcpy/models/lifecycle/ingest.py b/dcpy/models/lifecycle/ingest.py index 1d8a985c2..bc7f928a9 100644 --- a/dcpy/models/lifecycle/ingest.py +++ b/dcpy/models/lifecycle/ingest.py @@ -89,6 +89,14 @@ class Template(BaseModel, extra="forbid"): ## this is the original library template, included just for reference while we build out our new templates library_dataset: library.DatasetDefinition | None = None + @property + def has_geom(self): + match self.ingestion.file_format: + case file.Shapefile() | file.Geodatabase() | file.GeoJson(): + return True + case file.Csv() | file.Xlsx() | file.Json() as format: + return format.geometry is not None + class Config(SortedSerializedBase, extra="forbid"): """ diff --git a/dcpy/test/lifecycle/ingest/resources/templates/dob_now_permits.yml b/dcpy/test/lifecycle/ingest/resources/templates/dob_now_permits.yml new file mode 100644 index 000000000..f50308a2e --- /dev/null +++ b/dcpy/test/lifecycle/ingest/resources/templates/dob_now_permits.yml @@ -0,0 +1,10 @@ +id: dob_now_permits +acl: public-read + +ingestion: + source: + type: s3 + bucket: edm-private + key: dob_now/dob_now_permits/DOB_Now_Permit_Filing_File_{{ version }}.csv + file_format: + type: csv diff --git a/dcpy/test/lifecycle/ingest/test_configure.py b/dcpy/test/lifecycle/ingest/test_configure.py index 779ce5ceb..52bdefd8f 100644 --- a/dcpy/test/lifecycle/ingest/test_configure.py +++ b/dcpy/test/lifecycle/ingest/test_configure.py @@ -94,20 +94,27 @@ def test_get_filename_invalid_source(): class TestGetConfig: + def test_standard_no_geom(self): + config = configure.get_config("dob_now_permits", template_dir=TEMPLATE_DIR) + # ensure no reprojection or multi step (no target_crs, no geom respectively) + # ensure default 'clean_column_names' step is added + assert len(config.ingestion.processing_steps) == 1 + assert config.ingestion.processing_steps[0].name == "clean_column_names" + def test_standard(self): config = configure.get_config( "dca_operatingbusinesses", template_dir=TEMPLATE_DIR ) - # ensure no reprojection step - # ensure default 'clean_column_names' step is added - assert len(config.ingestion.processing_steps) == 1 + # ensure no reprojection + # ensure default 'clean_column_names' and multi steps are added + assert len(config.ingestion.processing_steps) == 2 assert config.ingestion.processing_steps[0].name == "clean_column_names" def test_clean_column_names_defined(self): config = configure.get_config("bpl_libraries", template_dir=TEMPLATE_DIR) # ensure no reprojection step - # ensure default 'clean_column_names' step is added - assert len(config.ingestion.processing_steps) == 1 + # ensure default 'clean_column_names' and 'multi' steps are added + assert len(config.ingestion.processing_steps) == 2 assert config.ingestion.processing_steps[0].name == "clean_column_names" assert config.ingestion.processing_steps[0].args == {"replace": {"data.": ""}} @@ -115,7 +122,7 @@ def test_reproject(self): config = configure.get_config( "dcp_addresspoints", version="24c", template_dir=TEMPLATE_DIR ) - assert len(config.ingestion.processing_steps) == 2 + assert len(config.ingestion.processing_steps) == 3 assert config.ingestion.processing_steps[0].name == "reproject" def test_no_mode(self): diff --git a/dcpy/test/lifecycle/ingest/test_run.py b/dcpy/test/lifecycle/ingest/test_run.py index fb3115989..ed469525e 100644 --- a/dcpy/test/lifecycle/ingest/test_run.py +++ b/dcpy/test/lifecycle/ingest/test_run.py @@ -1,4 +1,4 @@ -import pandas as pd +import geopandas as gpd import pytest from unittest import mock import shutil @@ -59,7 +59,6 @@ def test_run_update_freshness(mock_request_get, create_buckets, create_temp_file ) config = recipes.get_config(DATASET, FAKE_VERSION) assert config.archival.check_timestamps == [] - run( dataset_id=DATASET, version=FAKE_VERSION, @@ -90,7 +89,7 @@ def test_run_update_freshness_fails_if_data_diff( # this time, replace the dataframe with a different one in the middle of the ingest process with mock.patch("dcpy.utils.geospatial.parquet.read_df") as patch_read_df: - patch_read_df.return_value = pd.DataFrame({"a": ["b"]}) + patch_read_df.return_value = gpd.GeoDataFrame({"a": [None]}).set_geometry("a") with pytest.raises( FileExistsError, match=f"Archived dataset 'id='{DATASET}' version='{FAKE_VERSION}'' already exists and has different data.", diff --git a/dcpy/test/lifecycle/ingest/test_transform.py b/dcpy/test/lifecycle/ingest/test_transform.py index 651f363e1..37587662d 100644 --- a/dcpy/test/lifecycle/ingest/test_transform.py +++ b/dcpy/test/lifecycle/ingest/test_transform.py @@ -4,6 +4,7 @@ from pathlib import Path from pydantic import TypeAdapter, BaseModel import pytest +from shapely import Polygon, MultiPolygon import yaml from unittest import TestCase, mock @@ -91,16 +92,24 @@ def test_to_parquet(file: dict, create_temp_filesystem: Path): def test_validate_processing_steps(): steps = [ - PreprocessingStep(name="no_arg_function"), + PreprocessingStep(name="multi"), PreprocessingStep(name="drop_columns", args={"columns": ["col1", "col2"]}), ] compiled_steps = transform.validate_processing_steps("test", steps) assert len(compiled_steps) == 2 - df = pd.DataFrame({"col1": [1, 2, 3], "col2": [4, 5, 6], "col3": [7, 8, 9]}) + df = gpd.GeoDataFrame( + { + "col1": [1, 2, 3], + "col2": [4, 5, 6], + "col3": gpd.GeoSeries([None, None, None]), + } + ).set_geometry("col3") for step in compiled_steps: df = step(df) - expected = pd.DataFrame({"col3": [7, 8, 9]}) + expected = gpd.GeoDataFrame( + {"col3": gpd.GeoSeries([None, None, None])} + ).set_geometry("col3") assert df.equals(expected) @@ -155,7 +164,7 @@ def test_invalid_function(self): class TestPreprocessors(TestCase): proc = transform.Preprocessor(TEST_DATASET_NAME) - gdf: gpd.GeoDataFrame = gpd.read_parquet(RESOURCES / TEST_DATA_DIR / "test.parquet") + gdf = gpd.read_parquet(RESOURCES / TEST_DATA_DIR / "test.parquet") basic_df = pd.DataFrame({"a": [2, 3, 1], "b": ["b_1", "b_2", "c_3"]}) messy_names_df = pd.DataFrame({"Column": [1, 2], "Two_Words": [3, 4]}) dupe_df = pd.DataFrame({"a": [1, 1, 1, 2], "b": [3, 1, 3, 2]}) @@ -287,6 +296,44 @@ def test_rename_geodataframe(self): expected = gpd.read_parquet(RESOURCES / TEST_DATA_DIR / "renamed.parquet") assert transformed.equals(expected) + def test_multi(self): + gdf = gpd.GeoDataFrame( + { + "a": [1, 2, 3], + "wkt": gpd.GeoSeries( + [ + None, + Polygon([(0, 0), (0, 1), (1, 0), (0, 0)]), + MultiPolygon( + [ + Polygon([(0, 0), (0, 1), (1, 0), (0, 0)]), + Polygon([(0, 0), (0, -1), (-1, 0), (0, 0)]), + ] + ), + ] + ), + } + ).set_geometry("wkt") + transformed = self.proc.multi(gdf) + expected = gpd.GeoDataFrame( + { + "a": [1, 2, 3], + "wkt": gpd.GeoSeries( + [ + None, + MultiPolygon([Polygon([(0, 0), (0, 1), (1, 0), (0, 0)])]), + MultiPolygon( + [ + Polygon([(0, 0), (0, 1), (1, 0), (0, 0)]), + Polygon([(0, 0), (0, -1), (-1, 0), (0, 0)]), + ] + ), + ] + ), + } + ) + assert transformed.equals(expected) + def test_preprocess_no_steps(create_temp_filesystem: Path): input = RESOURCES / TEST_DATA_DIR / "test.parquet" diff --git a/dcpy/test/utils/test_geospatial.py b/dcpy/test/utils/test_geospatial.py index dc52892bb..4b442d7f7 100644 --- a/dcpy/test/utils/test_geospatial.py +++ b/dcpy/test/utils/test_geospatial.py @@ -3,6 +3,14 @@ import pandas as pd import geopandas as gpd import shapely +from shapely import ( + Point, + MultiPoint, + LineString, + MultiLineString, + Polygon, + MultiPolygon, +) from tempfile import TemporaryDirectory from dcpy.models.geospatial import geometry @@ -146,3 +154,38 @@ def test_read_parquet(self): gdf = parquet.read_df(RESOURCES_DIR / "geo.parquet") assert isinstance(gdf, gpd.GeoDataFrame) + + +@pytest.mark.parametrize( + "input, expected", + [ + (None, None), + (Point(0, 1), MultiPoint([(0, 1)])), + (MultiPoint([(2, 3), (4, 5)]), MultiPoint([(2, 3), (4, 5)])), + (LineString([(0, 0), (1, 1)]), MultiLineString([[(0, 0), (1, 1)]])), + ( + MultiLineString([[(0, 0), (-1, 1)], [(0, 0), (1, -1)]]), + MultiLineString([[(0, 0), (-1, 1)], [(0, 0), (1, -1)]]), + ), + ( + Polygon([(0, 0), (0, 1), (1, 0), (0, 0)]), + MultiPolygon([Polygon([(0, 0), (0, 1), (1, 0), (0, 0)])]), + ), + ( + MultiPolygon( + [ + Polygon([(0, 0), (0, 1), (1, 0), (0, 0)]), + Polygon([(0, 0), (0, -1), (-1, 0), (0, 0)]), + ] + ), + MultiPolygon( + [ + Polygon([(0, 0), (0, 1), (1, 0), (0, 0)]), + Polygon([(0, 0), (0, -1), (-1, 0), (0, 0)]), + ] + ), + ), + ], +) +def test_multi(input, expected): + assert transform.multi(input) == expected diff --git a/dcpy/utils/geospatial/transform.py b/dcpy/utils/geospatial/transform.py index 9ade33ce3..2606eb28f 100644 --- a/dcpy/utils/geospatial/transform.py +++ b/dcpy/utils/geospatial/transform.py @@ -8,12 +8,33 @@ TextColumn, TimeRemainingColumn, ) +from shapely import ( + Geometry, + Point, + MultiPoint, + LineString, + MultiLineString, + Polygon, + MultiPolygon, +) from dcpy.models import file from dcpy.models.geospatial import geometry as geom from dcpy.utils.logging import logger +def multi(geom: Geometry | None) -> Geometry | None: + match geom: + case Point(): + return MultiPoint([geom]) + case LineString(): + return MultiLineString([geom]) + case Polygon(): + return MultiPolygon([geom]) + case _: + return geom + + def df_to_gdf(df: pd.DataFrame, geometry: file.Geometry) -> gpd.GeoDataFrame: """ Convert a pandas DataFrame to a GeoDataFrame based on the provided geometry information. From c4ba473246afaf4b42b6928f00b87e3871859af8 Mon Sep 17 00:00:00 2001 From: Finn van Krieken Date: Tue, 15 Oct 2024 11:17:20 -0400 Subject: [PATCH 11/13] REVIEW: remove one remaining BaseModel reference --- dcpy/models/file.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dcpy/models/file.py b/dcpy/models/file.py index b8ee9bd1e..f97977828 100644 --- a/dcpy/models/file.py +++ b/dcpy/models/file.py @@ -1,5 +1,4 @@ from __future__ import annotations -from pydantic import BaseModel from typing import Literal, TypeAlias from dcpy.models.base import SortedSerializedBase @@ -20,7 +19,7 @@ class Geometry(SortedSerializedBase, extra="forbid"): crs: str format: geometry.GeometryFormat | None = None - class PointColumns(BaseModel, extra="forbid"): + class PointColumns(SortedSerializedBase, extra="forbid"): """This class defines longitude and latitude column names.""" x: str From f2ecc055776ca446728796e83b4cba13283ae62b Mon Sep 17 00:00:00 2001 From: Finn van Krieken Date: Wed, 16 Oct 2024 10:24:05 -0400 Subject: [PATCH 12/13] REVIEW: missing type signature --- dcpy/lifecycle/ingest/transform.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dcpy/lifecycle/ingest/transform.py b/dcpy/lifecycle/ingest/transform.py index c5bbd9129..f832edbcc 100644 --- a/dcpy/lifecycle/ingest/transform.py +++ b/dcpy/lifecycle/ingest/transform.py @@ -71,7 +71,7 @@ class Preprocessor: def __init__(self, dataset_id: str): self.dataset_id = dataset_id - def reproject(self, df: gpd.GeoDataFrame, target_crs) -> gpd.GeoDataFrame: + def reproject(self, df: gpd.GeoDataFrame, target_crs: str) -> gpd.GeoDataFrame: return transform.reproject_gdf(df, target_crs=target_crs) def sort(self, df: pd.DataFrame, by: list[str], ascending=True) -> pd.DataFrame: From 2b856c38ab835cb43c543f8395ab512ca5a5a6df Mon Sep 17 00:00:00 2001 From: Finn van Krieken Date: Wed, 16 Oct 2024 10:28:48 -0400 Subject: [PATCH 13/13] make attributes field required in ingest config --- dcpy/models/lifecycle/ingest.py | 4 ++-- dcpy/test/connectors/edm/resources/ingest.yml | 2 ++ dcpy/test/connectors/edm/resources/ingest/config.json | 3 +++ dcpy/test/connectors/edm/test_recipes.py | 1 + .../lifecycle/ingest/resources/templates/bpl_libraries.yml | 3 +++ .../ingest/resources/templates/dca_operatingbusinesses.yml | 3 +++ .../ingest/resources/templates/dcp_addresspoints.yml | 5 ++++- .../ingest/resources/templates/dcp_atomicpolygons.yml | 3 +++ .../resources/templates/dcp_pop_acs2010_demographic.yml | 3 +++ .../lifecycle/ingest/resources/templates/dob_now_permits.yml | 3 +++ 10 files changed, 27 insertions(+), 3 deletions(-) diff --git a/dcpy/models/lifecycle/ingest.py b/dcpy/models/lifecycle/ingest.py index bc7f928a9..4c305bfb9 100644 --- a/dcpy/models/lifecycle/ingest.py +++ b/dcpy/models/lifecycle/ingest.py @@ -82,7 +82,7 @@ class Template(BaseModel, extra="forbid"): id: str acl: recipes.ValidAclValues - attributes: DatasetAttributes | None = None + attributes: DatasetAttributes ingestion: Ingestion columns: list[Column] = [] @@ -108,7 +108,7 @@ class Config(SortedSerializedBase, extra="forbid"): version: str crs: str | None = None - attributes: DatasetAttributes | None = None + attributes: DatasetAttributes archival: ArchivalMetadata ingestion: Ingestion columns: list[Column] = [] diff --git a/dcpy/test/connectors/edm/resources/ingest.yml b/dcpy/test/connectors/edm/resources/ingest.yml index 34a94b88a..23fa40c61 100644 --- a/dcpy/test/connectors/edm/resources/ingest.yml +++ b/dcpy/test/connectors/edm/resources/ingest.yml @@ -1,5 +1,7 @@ name: test acl: public-read +attributes: + name: Test source: type: local_file path: dcpy/test/connectors/edm/resources/ingest_output/test.csv diff --git a/dcpy/test/connectors/edm/resources/ingest/config.json b/dcpy/test/connectors/edm/resources/ingest/config.json index 437855308..95aaaea25 100644 --- a/dcpy/test/connectors/edm/resources/ingest/config.json +++ b/dcpy/test/connectors/edm/resources/ingest/config.json @@ -1,6 +1,9 @@ { "name": "test", "version": "ingest", + "attributes": { + "name": "Test" + }, "archival": { "archival_timestamp": "2024-09-05T12:04:03.450135-04:00", "raw_filename": "dummy.csv", diff --git a/dcpy/test/connectors/edm/test_recipes.py b/dcpy/test/connectors/edm/test_recipes.py index 918455695..7322974d7 100644 --- a/dcpy/test/connectors/edm/test_recipes.py +++ b/dcpy/test/connectors/edm/test_recipes.py @@ -71,6 +71,7 @@ class TestArchiveDataset: config = ingest.Config( id=dataset, version="dummy", + attributes=ingest.DatasetAttributes(name=dataset), archival=ingest.ArchivalMetadata( archival_timestamp=datetime.now(), acl="private", diff --git a/dcpy/test/lifecycle/ingest/resources/templates/bpl_libraries.yml b/dcpy/test/lifecycle/ingest/resources/templates/bpl_libraries.yml index 8773a84d9..fab228c33 100644 --- a/dcpy/test/lifecycle/ingest/resources/templates/bpl_libraries.yml +++ b/dcpy/test/lifecycle/ingest/resources/templates/bpl_libraries.yml @@ -1,6 +1,9 @@ id: bpl_libraries acl: public-read +attributes: + name: BPL Libraries + ingestion: source: type: api diff --git a/dcpy/test/lifecycle/ingest/resources/templates/dca_operatingbusinesses.yml b/dcpy/test/lifecycle/ingest/resources/templates/dca_operatingbusinesses.yml index 5cc706991..335052fa7 100644 --- a/dcpy/test/lifecycle/ingest/resources/templates/dca_operatingbusinesses.yml +++ b/dcpy/test/lifecycle/ingest/resources/templates/dca_operatingbusinesses.yml @@ -1,6 +1,9 @@ id: dca_operatingbusinesses acl: public-read +attributes: + name: DCA Operating Businesses + ingestion: source: type: socrata diff --git a/dcpy/test/lifecycle/ingest/resources/templates/dcp_addresspoints.yml b/dcpy/test/lifecycle/ingest/resources/templates/dcp_addresspoints.yml index dd854bcce..8e7d8380f 100644 --- a/dcpy/test/lifecycle/ingest/resources/templates/dcp_addresspoints.yml +++ b/dcpy/test/lifecycle/ingest/resources/templates/dcp_addresspoints.yml @@ -1,6 +1,9 @@ -id: dcp-addresspoints +id: dcp_addresspoints acl: public-read +attributes: + name: DCP Address Points + ingestion: source: type: edm_publishing_gis_dataset diff --git a/dcpy/test/lifecycle/ingest/resources/templates/dcp_atomicpolygons.yml b/dcpy/test/lifecycle/ingest/resources/templates/dcp_atomicpolygons.yml index 8ce4c3023..00ce6b6ef 100644 --- a/dcpy/test/lifecycle/ingest/resources/templates/dcp_atomicpolygons.yml +++ b/dcpy/test/lifecycle/ingest/resources/templates/dcp_atomicpolygons.yml @@ -1,6 +1,9 @@ id: dcp_atomicpolygons acl: public-read +attributes: + name: DCP Atomic Polygons + ingestion: source: type: file_download diff --git a/dcpy/test/lifecycle/ingest/resources/templates/dcp_pop_acs2010_demographic.yml b/dcpy/test/lifecycle/ingest/resources/templates/dcp_pop_acs2010_demographic.yml index 706afeeae..30a76a6bd 100644 --- a/dcpy/test/lifecycle/ingest/resources/templates/dcp_pop_acs2010_demographic.yml +++ b/dcpy/test/lifecycle/ingest/resources/templates/dcp_pop_acs2010_demographic.yml @@ -1,6 +1,9 @@ id: dcp_pop_acs2010_demographic acl: public-read +attributes: + name: DCP Population 2010 ACS Demographic Data + ingestion: source: type: local_file diff --git a/dcpy/test/lifecycle/ingest/resources/templates/dob_now_permits.yml b/dcpy/test/lifecycle/ingest/resources/templates/dob_now_permits.yml index f50308a2e..c35cff89b 100644 --- a/dcpy/test/lifecycle/ingest/resources/templates/dob_now_permits.yml +++ b/dcpy/test/lifecycle/ingest/resources/templates/dob_now_permits.yml @@ -1,6 +1,9 @@ id: dob_now_permits acl: public-read +attributes: + name: DOB Now Job Permits + ingestion: source: type: s3