Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finalize/clean up ingest models, add additional preprocessing #1166

Merged
merged 13 commits into from
Oct 16, 2024
5 changes: 4 additions & 1 deletion dcpy/lifecycle/ingest/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,11 @@ def run(
latest: bool = False,
skip_archival: bool = False,
output_csv: bool = False,
template_dir: Path = configure.TEMPLATE_DIR,
) -> Config:
config = configure.get_config(dataset_id, version=version, mode=mode)
config = configure.get_config(
dataset_id, version=version, mode=mode, template_dir=template_dir
)
transform.validate_processing_steps(config.id, config.ingestion.processing_steps)
Copy link
Member

@damonmcc damonmcc Oct 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this redundant since transform.validate_process_steps is called during transform.preprocess?

and if we do wanna validate early, would it make sense to do it during configure.get_config or maybe read_template?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

talked about it and we'll keep it here. it's nicer to assemble these pieces at a high-level rather than have ingest modules importing each other


if not staging_dir:
Expand Down
20 changes: 12 additions & 8 deletions dcpy/test/connectors/edm/resources/ingest/config.json
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
{
"name": "test",
"version": "ingest",
"archival_timestamp": "2024-09-05T12:04:03.450135-04:00",
"raw_filename": "dummy.csv",
"acl": "public-read",
"source": {
"type": "local_file",
"path": "dummy.csv"
"archival": {
"archival_timestamp": "2024-09-05T12:04:03.450135-04:00",
"raw_filename": "dummy.csv",
"acl": "public-read"
},
"file_format": {
"type": "csv"
"ingestion": {
"source": {
"type": "local_file",
"path": "dummy.csv"
},
"file_format": {
"type": "csv"
}
},
"run_details": {
"type": "manual",
Expand Down
26 changes: 15 additions & 11 deletions dcpy/test/connectors/edm/test_recipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,17 @@ class TestArchiveDataset:
config = ingest.Config(
id=dataset,
version="dummy",
archival_timestamp=datetime.now(),
acl="private",
raw_filename=raw_file_name,
source=ingest.ScriptSource(
type="script", connector="dummy", function="dummy"
), # easiest to mock
file_format=file.Csv(type="csv"), # easiest to mock
archival=ingest.ArchivalMetadata(
archival_timestamp=datetime.now(),
acl="private",
raw_filename=raw_file_name,
),
ingestion=ingest.Ingestion(
source=ingest.ScriptSource(
type="script", connector="dummy", function="dummy"
), # easiest to mock
file_format=file.Csv(type="csv"), # easiest to mock
),
run_details=metadata.get_run_details(),
)

Expand Down Expand Up @@ -247,19 +251,19 @@ def get_config():
return config

config = get_config()
assert config.check_timestamps == []
assert config.freshness == config.archival_timestamp
assert config.archival.check_timestamps == []
assert config.freshness == config.archival.archival_timestamp

timestamp = datetime.now()
recipes.update_freshness(load_ingest.dataset_key, timestamp)
config2 = get_config()
assert config2.check_timestamps == [timestamp]
assert config2.archival.check_timestamps == [timestamp]
assert config2.freshness == timestamp

timestamp2 = datetime.now()
recipes.update_freshness(load_ingest.dataset_key, timestamp2)
config3 = get_config()
assert config3.check_timestamps == [timestamp, timestamp2]
assert config3.archival.check_timestamps == [timestamp, timestamp2]
assert config3.freshness == timestamp2


Expand Down
1 change: 1 addition & 0 deletions dcpy/test/lifecycle/ingest/__init__.py
Copy link
Contributor

@sf-dcp sf-dcp Oct 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't seen this file. Nice!

Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from dcpy.test.conftest import RECIPES_BUCKET

RESOURCES = Path(__file__).parent / "resources"
TEMPLATE_DIR = RESOURCES / "templates"
TEST_DATA_DIR = "test_data"
TEST_DATASET_NAME = "test_dataset"
FAKE_VERSION = "20240101"
Expand Down
9 changes: 5 additions & 4 deletions dcpy/test/lifecycle/ingest/resources/invalid_jinja.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
id: dcp_atomicpolygons
acl: public-read
source:
type: file_download
url: https://s-media.nyc.gov/agencies/dcp/assets/files/zip/{{ dummy_jinja_var }}/bytes/nyap_{{ version }}.zip
file_format:
ingestion:
source:
type: file_download
url: https://s-media.nyc.gov/agencies/dcp/assets/files/zip/{{ dummy_jinja_var }}/bytes/nyap_{{ version }}.zip
file_format:
22 changes: 22 additions & 0 deletions dcpy/test/lifecycle/ingest/resources/templates/bpl_libraries.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
id: bpl_libraries
acl: public-read

ingestion:
source:
type: api
endpoint: https://www.bklynlibrary.org/locations/json
format: json
file_format:
type: json
json_read_fn: normalize
json_read_kwargs: { "record_path": ["locations"] }
geometry:
geom_column: data.position
crs: EPSG:4326
format:
point_xy_str: "y, x"
processing_steps:
- name: clean_column_names
args:
replace:
"data.": ""
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
id: dca_operatingbusinesses
acl: public-read

ingestion:
source:
type: socrata
org: nyc
uid: w7w3-xahh
format: csv
file_format:
type: csv
geometry:
geom_column: location
crs: EPSG:4326
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
id: dcp-addresspoints
acl: public-read

ingestion:
source:
type: edm_publishing_gis_dataset
name: dcp_address_points
file_format:
unzipped_filename: dcp_address_points.shp
type: shapefile
crs: EPSG:2263
target_crs: EPSG:4326
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
id: dcp_atomicpolygons
acl: public-read

ingestion:
source:
type: file_download
url: https://s-media.nyc.gov/agencies/dcp/assets/files/zip/data-tools/bytes/nyap_{{ version }}.zip
file_format:
unzipped_filename: nyap_{{ version }}/nyap.shp
type: shapefile
crs: EPSG:2263
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
id: dcp_pop_acs2010_demographic
acl: public-read

ingestion:
source:
type: local_file
path: .library/upload/CCD2023_ACS0610Data_for1822Update.xlsx
file_format:
type: xlsx
sheet_name: CCD2023_Dem0610
dtype:
GeoID: str
processing_steps:
- name: clean_column_names
args:
lower: true
- name: append_prev
mode: append
- name: upsert_column_of_previous_version
args:
key: [geotype, geoid]
insert_behavior: error
missing_key_behavior: error
mode: update_column
63 changes: 40 additions & 23 deletions dcpy/test/lifecycle/ingest/test_configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from dcpy.lifecycle.ingest import configure

from dcpy.test.conftest import mock_request_get
from . import RESOURCES, TEST_DATASET_NAME, Sources, SOURCE_FILENAMES
from . import RESOURCES, TEST_DATASET_NAME, Sources, SOURCE_FILENAMES, TEMPLATE_DIR


def test_jinja_vars():
Expand All @@ -28,18 +28,20 @@ class TestReadTemplate:
"""

def test_simple(self):
template = configure.read_template("bpl_libraries")
assert isinstance(template.source, web.GenericApiSource)
template = configure.read_template("bpl_libraries", template_dir=TEMPLATE_DIR)
assert isinstance(template.ingestion.source, web.GenericApiSource)
assert isinstance(
template.file_format,
template.ingestion.file_format,
file.Json,
)

def test_jinja(self):
template = configure.read_template("dcp_atomicpolygons", version="test")
assert isinstance(template.source, web.FileDownloadSource)
template = configure.read_template(
"dcp_atomicpolygons", version="test", template_dir=TEMPLATE_DIR
)
assert isinstance(template.ingestion.source, web.FileDownloadSource)
assert isinstance(
template.file_format,
template.ingestion.file_format,
file.Shapefile,
)

Expand Down Expand Up @@ -93,37 +95,52 @@ def test_get_filename_invalid_source():

class TestGetConfig:
def test_standard(self):
config = configure.get_config("dca_operatingbusinesses")
config = configure.get_config(
"dca_operatingbusinesses", template_dir=TEMPLATE_DIR
)
# ensure no reprojection step
# ensure default 'clean_column_names' step is added
assert len(config.processing_steps) == 1
assert config.processing_steps[0].name == "clean_column_names"
assert len(config.ingestion.processing_steps) == 1
assert config.ingestion.processing_steps[0].name == "clean_column_names"

def test_clean_column_names_defined(self):
config = configure.get_config("bpl_libraries")
config = configure.get_config("bpl_libraries", template_dir=TEMPLATE_DIR)
# ensure no reprojection step
# ensure default 'clean_column_names' step is added
assert len(config.processing_steps) == 1
assert config.processing_steps[0].name == "clean_column_names"
assert config.processing_steps[0].args == {"replace": {"data.": ""}}
assert len(config.ingestion.processing_steps) == 1
assert config.ingestion.processing_steps[0].name == "clean_column_names"
assert config.ingestion.processing_steps[0].args == {"replace": {"data.": ""}}

def test_reproject(self):
config = configure.get_config("dcp_addresspoints", version="24c")
assert len(config.processing_steps) == 2
assert config.processing_steps[0].name == "reproject"
config = configure.get_config(
"dcp_addresspoints", version="24c", template_dir=TEMPLATE_DIR
)
assert len(config.ingestion.processing_steps) == 2
assert config.ingestion.processing_steps[0].name == "reproject"

def test_no_mode(self):
standard = configure.get_config("dcp_pop_acs2010_demographic", version="test")
assert standard.processing_steps
assert "append_prev" not in [s.name for s in standard.processing_steps]
standard = configure.get_config(
"dcp_pop_acs2010_demographic", version="test", template_dir=TEMPLATE_DIR
)
assert standard.ingestion.processing_steps
assert "append_prev" not in [
s.name for s in standard.ingestion.processing_steps
]

def test_mode(self):
append = configure.get_config(
"dcp_pop_acs2010_demographic", version="test", mode="append"
"dcp_pop_acs2010_demographic",
version="test",
mode="append",
template_dir=TEMPLATE_DIR,
)
assert "append_prev" in [s.name for s in append.processing_steps]
assert "append_prev" in [s.name for s in append.ingestion.processing_steps]

def test_invalid_mode(self):
with pytest.raises(ValueError):
configure.get_config(
"dcp_pop_acs2010_demographic", version="test", mode="fake_mode"
"dcp_pop_acs2010_demographic",
version="test",
mode="fake_mode",
template_dir=TEMPLATE_DIR,
)
4 changes: 3 additions & 1 deletion dcpy/test/lifecycle/ingest/test_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ def test_validate_all_datasets():
with open(file, "r") as f:
s = yaml.safe_load(f)
template = Template(**s)
transform.validate_processing_steps(template.id, template.processing_steps)
transform.validate_processing_steps(
template.id, template.ingestion.processing_steps
)
32 changes: 25 additions & 7 deletions dcpy/test/lifecycle/ingest/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from dcpy.lifecycle.ingest.run import run, TMP_DIR

from dcpy.test.conftest import mock_request_get
from . import FAKE_VERSION
from . import FAKE_VERSION, TEMPLATE_DIR

DATASET = "bpl_libraries"
S3_PATH = f"datasets/{DATASET}/{FAKE_VERSION}/{DATASET}.parquet"
Expand All @@ -19,14 +19,19 @@
@mock.patch("requests.get", side_effect=mock_request_get)
def test_run(mock_request_get, create_buckets, create_temp_filesystem):
"""Mainly an integration test to make sure code runs without error"""
run(dataset_id=DATASET, version=FAKE_VERSION, staging_dir=create_temp_filesystem)
run(
dataset_id=DATASET,
version=FAKE_VERSION,
staging_dir=create_temp_filesystem,
template_dir=TEMPLATE_DIR,
)
assert len(s3.get_subfolders(RECIPES_BUCKET, RAW_FOLDER)) == 1
assert s3.object_exists(RECIPES_BUCKET, S3_PATH)


@mock.patch("requests.get", side_effect=mock_request_get)
def test_run_default_folder(mock_request_get, create_buckets, create_temp_filesystem):
run(dataset_id=DATASET, version=FAKE_VERSION)
run(dataset_id=DATASET, version=FAKE_VERSION, template_dir=TEMPLATE_DIR)
assert s3.object_exists(RECIPES_BUCKET, S3_PATH)
assert (TMP_DIR / DATASET).exists()
shutil.rmtree(TMP_DIR)
Expand All @@ -39,25 +44,32 @@ def test_skip_archival(mock_request_get, create_buckets, create_temp_filesystem)
version=FAKE_VERSION,
staging_dir=create_temp_filesystem,
skip_archival=True,
template_dir=TEMPLATE_DIR,
)
assert not s3.object_exists(RECIPES_BUCKET, S3_PATH)


@mock.patch("requests.get", side_effect=mock_request_get)
def test_run_update_freshness(mock_request_get, create_buckets, create_temp_filesystem):
run(dataset_id=DATASET, version=FAKE_VERSION, staging_dir=create_temp_filesystem)
run(
dataset_id=DATASET,
version=FAKE_VERSION,
staging_dir=create_temp_filesystem,
template_dir=TEMPLATE_DIR,
)
config = recipes.get_config(DATASET, FAKE_VERSION)
assert config.check_timestamps == []
assert config.archival.check_timestamps == []

run(
dataset_id=DATASET,
version=FAKE_VERSION,
staging_dir=create_temp_filesystem,
latest=True,
template_dir=TEMPLATE_DIR,
)
config2 = recipes.get_config(DATASET, FAKE_VERSION)

assert len(config2.check_timestamps) == 1
assert len(config2.archival.check_timestamps) == 1
assert config2.freshness > config.freshness

latest = recipes.get_config(DATASET)
Expand All @@ -69,7 +81,12 @@ def test_run_update_freshness_fails_if_data_diff(
mock_request_get, create_buckets, create_temp_filesystem
):
"""Mainly an integration test to make sure code runs without error"""
run(dataset_id=DATASET, version=FAKE_VERSION, staging_dir=create_temp_filesystem)
run(
dataset_id=DATASET,
version=FAKE_VERSION,
staging_dir=create_temp_filesystem,
template_dir=TEMPLATE_DIR,
)

# this time, replace the dataframe with a different one in the middle of the ingest process
with mock.patch("dcpy.utils.geospatial.parquet.read_df") as patch_read_df:
Expand All @@ -82,4 +99,5 @@ def test_run_update_freshness_fails_if_data_diff(
dataset_id=DATASET,
version=FAKE_VERSION,
staging_dir=create_temp_filesystem,
template_dir=TEMPLATE_DIR,
)