Skip to content

Commit

Permalink
add default step to convert geoms to multi
Browse files Browse the repository at this point in the history
  • Loading branch information
fvankrieken committed Oct 10, 2024
1 parent c26d849 commit 8858670
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 17 deletions.
4 changes: 4 additions & 0 deletions dcpy/lifecycle/ingest/configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ def get_config(
)
processing_steps.append(clean_column_names)

if "multi" not in processing_step_names and template.has_geom:
multi = PreprocessingStep(name="multi")
processing_steps.append(multi)

if mode:
modes = {s.mode for s in processing_steps}
if mode not in modes:
Expand Down
12 changes: 8 additions & 4 deletions dcpy/lifecycle/ingest/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,14 @@ def strip_columns(
df = df.apply(lambda x: x.str.strip() if x.dtype == "object" else x)
return df

def multi(self, df: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
multi_gdf = df.copy()
multi_gdf.set_geometry(
gpd.GeoSeries([transform.multi(feature) for feature in multi_gdf.geometry]),
inplace=True,
)
return multi_gdf

def pd_series_func(
self,
df: pd.DataFrame,
Expand Down Expand Up @@ -218,10 +226,6 @@ def pd_series_func(
transformed[output_column_name or column_name] = func(**kwargs) # type: ignore
return transformed

def no_arg_function(self, df: pd.DataFrame) -> pd.DataFrame:
"""Dummy/stub for testing. Can be dropped if we implement actual function with no args other than df"""
return df


def validate_pd_series_func(
*, function_name: str, column_name: str = "", **kwargs
Expand Down
8 changes: 8 additions & 0 deletions dcpy/models/lifecycle/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ class Template(BaseModel, extra="forbid"):
## this is the original library template, included just for reference while we build out our new templates
library_dataset: library.DatasetDefinition | None = None

@property
def has_geom(self):
match self.ingestion.file_format:
case file.Shapefile() | file.Geodatabase() | file.GeoJson():
return True
case file.Csv() | file.Xlsx() | file.Json() as format:
return format.geometry is not None


class Config(SortedSerializedBase, extra="forbid"):
"""
Expand Down
10 changes: 10 additions & 0 deletions dcpy/test/lifecycle/ingest/resources/templates/dob_now_permits.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
id: dob_now_permits
acl: public-read

ingestion:
source:
type: s3
bucket: edm-private
key: dob_now/dob_now_permits/DOB_Now_Permit_Filing_File_{{ version }}.csv
file_format:
type: csv
19 changes: 13 additions & 6 deletions dcpy/test/lifecycle/ingest/test_configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,28 +94,35 @@ def test_get_filename_invalid_source():


class TestGetConfig:
def test_standard_no_geom(self):
config = configure.get_config("dob_now_permits", template_dir=TEMPLATE_DIR)
# ensure no reprojection or multi step (no target_crs, no geom respectively)
# ensure default 'clean_column_names' step is added
assert len(config.ingestion.processing_steps) == 1
assert config.ingestion.processing_steps[0].name == "clean_column_names"

def test_standard(self):
config = configure.get_config(
"dca_operatingbusinesses", template_dir=TEMPLATE_DIR
)
# ensure no reprojection step
# ensure default 'clean_column_names' step is added
assert len(config.ingestion.processing_steps) == 1
# ensure no reprojection
# ensure default 'clean_column_names' and multi steps are added
assert len(config.ingestion.processing_steps) == 2
assert config.ingestion.processing_steps[0].name == "clean_column_names"

def test_clean_column_names_defined(self):
config = configure.get_config("bpl_libraries", template_dir=TEMPLATE_DIR)
# ensure no reprojection step
# ensure default 'clean_column_names' step is added
assert len(config.ingestion.processing_steps) == 1
# ensure default 'clean_column_names' and 'multi' steps are added
assert len(config.ingestion.processing_steps) == 2
assert config.ingestion.processing_steps[0].name == "clean_column_names"
assert config.ingestion.processing_steps[0].args == {"replace": {"data.": ""}}

def test_reproject(self):
config = configure.get_config(
"dcp_addresspoints", version="24c", template_dir=TEMPLATE_DIR
)
assert len(config.ingestion.processing_steps) == 2
assert len(config.ingestion.processing_steps) == 3
assert config.ingestion.processing_steps[0].name == "reproject"

def test_no_mode(self):
Expand Down
5 changes: 2 additions & 3 deletions dcpy/test/lifecycle/ingest/test_run.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import pandas as pd
import geopandas as gpd
import pytest
from unittest import mock
import shutil
Expand Down Expand Up @@ -59,7 +59,6 @@ def test_run_update_freshness(mock_request_get, create_buckets, create_temp_file
)
config = recipes.get_config(DATASET, FAKE_VERSION)
assert config.archival.check_timestamps == []

run(
dataset_id=DATASET,
version=FAKE_VERSION,
Expand Down Expand Up @@ -90,7 +89,7 @@ def test_run_update_freshness_fails_if_data_diff(

# this time, replace the dataframe with a different one in the middle of the ingest process
with mock.patch("dcpy.utils.geospatial.parquet.read_df") as patch_read_df:
patch_read_df.return_value = pd.DataFrame({"a": ["b"]})
patch_read_df.return_value = gpd.GeoDataFrame({"a": [None]}).set_geometry("a")
with pytest.raises(
FileExistsError,
match=f"Archived dataset 'id='{DATASET}' version='{FAKE_VERSION}'' already exists and has different data.",
Expand Down
55 changes: 51 additions & 4 deletions dcpy/test/lifecycle/ingest/test_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pathlib import Path
from pydantic import TypeAdapter, BaseModel
import pytest
from shapely import Polygon, MultiPolygon
import yaml
from unittest import TestCase, mock

Expand Down Expand Up @@ -91,16 +92,24 @@ def test_to_parquet(file: dict, create_temp_filesystem: Path):

def test_validate_processing_steps():
steps = [
PreprocessingStep(name="no_arg_function"),
PreprocessingStep(name="multi"),
PreprocessingStep(name="drop_columns", args={"columns": ["col1", "col2"]}),
]
compiled_steps = transform.validate_processing_steps("test", steps)
assert len(compiled_steps) == 2

df = pd.DataFrame({"col1": [1, 2, 3], "col2": [4, 5, 6], "col3": [7, 8, 9]})
df = gpd.GeoDataFrame(
{
"col1": [1, 2, 3],
"col2": [4, 5, 6],
"col3": gpd.GeoSeries([None, None, None]),
}
).set_geometry("col3")
for step in compiled_steps:
df = step(df)
expected = pd.DataFrame({"col3": [7, 8, 9]})
expected = gpd.GeoDataFrame(
{"col3": gpd.GeoSeries([None, None, None])}
).set_geometry("col3")
assert df.equals(expected)


Expand Down Expand Up @@ -155,7 +164,7 @@ def test_invalid_function(self):

class TestPreprocessors(TestCase):
proc = transform.Preprocessor(TEST_DATASET_NAME)
gdf: gpd.GeoDataFrame = gpd.read_parquet(RESOURCES / TEST_DATA_DIR / "test.parquet")
gdf = gpd.read_parquet(RESOURCES / TEST_DATA_DIR / "test.parquet")
basic_df = pd.DataFrame({"a": [2, 3, 1], "b": ["b_1", "b_2", "c_3"]})
messy_names_df = pd.DataFrame({"Column": [1, 2], "Two_Words": [3, 4]})
dupe_df = pd.DataFrame({"a": [1, 1, 1, 2], "b": [3, 1, 3, 2]})
Expand Down Expand Up @@ -287,6 +296,44 @@ def test_rename_geodataframe(self):
expected = gpd.read_parquet(RESOURCES / TEST_DATA_DIR / "renamed.parquet")
assert transformed.equals(expected)

def test_multi(self):
gdf = gpd.GeoDataFrame(
{
"a": [1, 2, 3],
"wkt": gpd.GeoSeries(
[
None,
Polygon([(0, 0), (0, 1), (1, 0), (0, 0)]),
MultiPolygon(
[
Polygon([(0, 0), (0, 1), (1, 0), (0, 0)]),
Polygon([(0, 0), (0, -1), (-1, 0), (0, 0)]),
]
),
]
),
}
).set_geometry("wkt")
transformed = self.proc.multi(gdf)
expected = gpd.GeoDataFrame(
{
"a": [1, 2, 3],
"wkt": gpd.GeoSeries(
[
None,
MultiPolygon([Polygon([(0, 0), (0, 1), (1, 0), (0, 0)])]),
MultiPolygon(
[
Polygon([(0, 0), (0, 1), (1, 0), (0, 0)]),
Polygon([(0, 0), (0, -1), (-1, 0), (0, 0)]),
]
),
]
),
}
)
assert transformed.equals(expected)


def test_preprocess_no_steps(create_temp_filesystem: Path):
input = RESOURCES / TEST_DATA_DIR / "test.parquet"
Expand Down
43 changes: 43 additions & 0 deletions dcpy/test/utils/test_geospatial.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@
import pandas as pd
import geopandas as gpd
import shapely
from shapely import (
Point,
MultiPoint,
LineString,
MultiLineString,
Polygon,
MultiPolygon,
)
from tempfile import TemporaryDirectory

from dcpy.models.geospatial import geometry
Expand Down Expand Up @@ -146,3 +154,38 @@ def test_read_parquet(self):

gdf = parquet.read_df(RESOURCES_DIR / "geo.parquet")
assert isinstance(gdf, gpd.GeoDataFrame)


@pytest.mark.parametrize(
"input, expected",
[
(None, None),
(Point(0, 1), MultiPoint([(0, 1)])),
(MultiPoint([(2, 3), (4, 5)]), MultiPoint([(2, 3), (4, 5)])),
(LineString([(0, 0), (1, 1)]), MultiLineString([[(0, 0), (1, 1)]])),
(
MultiLineString([[(0, 0), (-1, 1)], [(0, 0), (1, -1)]]),
MultiLineString([[(0, 0), (-1, 1)], [(0, 0), (1, -1)]]),
),
(
Polygon([(0, 0), (0, 1), (1, 0), (0, 0)]),
MultiPolygon([Polygon([(0, 0), (0, 1), (1, 0), (0, 0)])]),
),
(
MultiPolygon(
[
Polygon([(0, 0), (0, 1), (1, 0), (0, 0)]),
Polygon([(0, 0), (0, -1), (-1, 0), (0, 0)]),
]
),
MultiPolygon(
[
Polygon([(0, 0), (0, 1), (1, 0), (0, 0)]),
Polygon([(0, 0), (0, -1), (-1, 0), (0, 0)]),
]
),
),
],
)
def test_multi(input, expected):
assert transform.multi(input) == expected
21 changes: 21 additions & 0 deletions dcpy/utils/geospatial/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,33 @@
TextColumn,
TimeRemainingColumn,
)
from shapely import (
Geometry,
Point,
MultiPoint,
LineString,
MultiLineString,
Polygon,
MultiPolygon,
)

from dcpy.models import file
from dcpy.models.geospatial import geometry as geom
from dcpy.utils.logging import logger


def multi(geom: Geometry | None) -> Geometry | None:
match geom:
case Point():
return MultiPoint([geom])
case LineString():
return MultiLineString([geom])
case Polygon():
return MultiPolygon([geom])
case _:
return geom


def df_to_gdf(df: pd.DataFrame, geometry: file.Geometry) -> gpd.GeoDataFrame:
"""
Convert a pandas DataFrame to a GeoDataFrame based on the provided geometry information.
Expand Down

0 comments on commit 8858670

Please sign in to comment.