Skip to content

Commit

Permalink
Merge pull request #44 from astronomy-commons/delucchi/get_pixels
Browse files Browse the repository at this point in the history
Run black formatting and remove `get_pixels`
  • Loading branch information
delucchi-cmu authored Oct 25, 2023
2 parents 4748fc2 + ffb69a1 commit 02756b7
Show file tree
Hide file tree
Showing 14 changed files with 108 additions and 86 deletions.
17 changes: 10 additions & 7 deletions cloud_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ def pytest_generate_tests(metafunc):
# This is called for every test. Only get/set command line arguments
# if the argument is specified in the list of test "fixturenames".
option_value = metafunc.config.option.cloud
if 'cloud' in metafunc.fixturenames and option_value is not None:
if "cloud" in metafunc.fixturenames and option_value is not None:
metafunc.parametrize("cloud", [option_value])


@pytest.fixture
def example_cloud_path(cloud):
if cloud == "abfs":
return "abfs:///hipscat/pytests/"

else:
raise NotImplementedError("Cloud format not implemented for lsdb tests!")

Expand All @@ -44,8 +44,8 @@ def example_cloud_path(cloud):
def example_cloud_storage_options(cloud):
if cloud == "abfs":
storage_options = {
"account_key" : os.environ.get("ABFS_LINCCDATA_ACCOUNT_KEY"),
"account_name" : os.environ.get("ABFS_LINCCDATA_ACCOUNT_NAME")
"account_key": os.environ.get("ABFS_LINCCDATA_ACCOUNT_KEY"),
"account_name": os.environ.get("ABFS_LINCCDATA_ACCOUNT_NAME"),
}
return storage_options

Expand Down Expand Up @@ -83,6 +83,7 @@ def small_sky_hipscat_catalog_cloud(small_sky_dir_cloud, example_cloud_storage_o
small_sky_dir_cloud, storage_options=example_cloud_storage_options
)


@pytest.fixture
def small_sky_catalog_cloud(small_sky_dir_cloud, example_cloud_storage_options):
return lsdb.read_hipscat(small_sky_dir_cloud, storage_options=example_cloud_storage_options)
Expand All @@ -95,7 +96,9 @@ def small_sky_xmatch_catalog_cloud(small_sky_xmatch_dir_cloud, example_cloud_sto

@pytest.fixture
def small_sky_order1_hipscat_catalog_cloud(small_sky_order1_dir_cloud, example_cloud_storage_options):
return hc.catalog.Catalog.read_from_hipscat(small_sky_order1_dir_cloud, storage_options=example_cloud_storage_options)
return hc.catalog.Catalog.read_from_hipscat(
small_sky_order1_dir_cloud, storage_options=example_cloud_storage_options
)


@pytest.fixture
Expand All @@ -107,14 +110,14 @@ def small_sky_order1_catalog_cloud(small_sky_order1_dir_cloud, example_cloud_sto
def xmatch_correct_cloud(small_sky_xmatch_dir_cloud, example_cloud_storage_options):
pathway = os.path.join(small_sky_xmatch_dir_cloud, XMATCH_CORRECT_FILE)
return file_io.load_csv_to_pandas(pathway, storage_options=example_cloud_storage_options)
#return pd.read_csv(os.path.join(small_sky_xmatch_dir_cloud, XMATCH_CORRECT_FILE), storage_options=example_cloud_storage_options)
# return pd.read_csv(os.path.join(small_sky_xmatch_dir_cloud, XMATCH_CORRECT_FILE), storage_options=example_cloud_storage_options)


@pytest.fixture
def xmatch_correct_005_cloud(small_sky_xmatch_dir_cloud, example_cloud_storage_options):
pathway = os.path.join(small_sky_xmatch_dir_cloud, XMATCH_CORRECT_005_FILE)
return file_io.load_csv_to_pandas(pathway, storage_options=example_cloud_storage_options)
#return pd.read_csv(os.path.join(small_sky_xmatch_dir, XMATCH_CORRECT_005_FILE))
# return pd.read_csv(os.path.join(small_sky_xmatch_dir, XMATCH_CORRECT_005_FILE))


@pytest.fixture
Expand Down
27 changes: 15 additions & 12 deletions cloud_tests/copy_data_to_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@


def copy_tree_fs_to_fs(
fs1_source: str, fs2_destination: str,
storage_options1: dict = None, storage_options2: dict = None,
verbose=False
):
fs1_source: str,
fs2_destination: str,
storage_options1: dict = None,
storage_options2: dict = None,
verbose=False,
):
"""Recursive Copies directory from one filesystem to the other.
Args:
Expand All @@ -22,7 +24,7 @@ def copy_tree_fs_to_fs(
copy_dir(source_fs, source_fp, destination_fs, desintation_fp, verbose=verbose)


def copy_dir(source_fs, source_fp, destination_fs, desintation_fp, verbose=False, chunksize=1024*1024):
def copy_dir(source_fs, source_fp, destination_fs, desintation_fp, verbose=False, chunksize=1024 * 1024):
"""Recursive method to copy directories and their contents.
Args:
Expand Down Expand Up @@ -56,17 +58,18 @@ def copy_dir(source_fs, source_fp, destination_fs, desintation_fp, verbose=False

dirs = [x for x in dir_contents if x["type"] == "directory"]
for _dir in dirs:
copy_dir(source_fs, _dir["name"], destination_fs, destination_folder, chunksize=chunksize, verbose=verbose)
copy_dir(
source_fs, _dir["name"], destination_fs, destination_folder, chunksize=chunksize, verbose=verbose
)


if __name__ == "__main__":

source_pw = f"{os.getcwd()}/../tests/data"
target_pw = "new_protocol:///path/to/pytest/lsdb"

target_so = {
"valid_storage_option_param1" : os.environ.get("NEW_PROTOCOL_PARAM1"),
"valid_storage_option_param1" : os.environ.get("NEW_PROTOCOL_PARAM2"),
"valid_storage_option_param1": os.environ.get("NEW_PROTOCOL_PARAM1"),
"valid_storage_option_param1": os.environ.get("NEW_PROTOCOL_PARAM2"),
}
copy_tree_fs_to_fs(
source_pw, target_pw, {}, target_so, verbose=True
)
copy_tree_fs_to_fs(source_pw, target_pw, {}, target_so, verbose=True)
10 changes: 6 additions & 4 deletions cloud_tests/lsdb/catalog/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ def test_catalog_html_repr_equals_ddf_html_repr(small_sky_order1_catalog_cloud):


def test_catalog_compute_equals_ddf_compute(small_sky_order1_catalog_cloud):
pd.testing.assert_frame_equal(small_sky_order1_catalog_cloud.compute(), small_sky_order1_catalog_cloud._ddf.compute())
pd.testing.assert_frame_equal(
small_sky_order1_catalog_cloud.compute(), small_sky_order1_catalog_cloud._ddf.compute()
)


def test_get_catalog_partition_gets_correct_partition(small_sky_order1_catalog_cloud):
for _, row in small_sky_order1_catalog_cloud.hc_structure.get_pixels().iterrows():
hp_order = row["Norder"]
hp_pixel = row["Npix"]
for pixel in small_sky_order1_catalog_cloud.hc_structure.get_healpix_pixels:
hp_order = pixel.order
hp_pixel = pixel.pixel
partition = small_sky_order1_catalog_cloud.get_partition(hp_order, hp_pixel)
pixel = HealpixPixel(order=hp_order, pixel=hp_pixel)
partition_index = small_sky_order1_catalog_cloud._ddf_pixel_map[pixel]
Expand Down
6 changes: 3 additions & 3 deletions cloud_tests/lsdb/catalog/test_cone_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ def test_cone_search_filters_correct_points(small_sky_order1_catalog_cloud):
ra = 0
dec = -80
radius = 20
center_coord = SkyCoord(ra, dec, unit='deg')
center_coord = SkyCoord(ra, dec, unit="deg")
cone_search_catalog = small_sky_order1_catalog_cloud.cone_search(ra, dec, radius).compute()
print(len(cone_search_catalog))
for _, row in small_sky_order1_catalog_cloud.compute().iterrows():
row_ra = row[small_sky_order1_catalog_cloud.hc_structure.catalog_info.ra_column]
row_dec = row[small_sky_order1_catalog_cloud.hc_structure.catalog_info.dec_column]
sep = SkyCoord(row_ra, row_dec, unit='deg').separation(center_coord)
sep = SkyCoord(row_ra, row_dec, unit="deg").separation(center_coord)
if sep.degree <= radius:
assert len(cone_search_catalog.loc[cone_search_catalog["id"] == row["id"]]) == 1
else:
Expand All @@ -25,7 +25,7 @@ def test_cone_search_filters_partitions(small_sky_order1_catalog_cloud):
radius = 20
hc_conesearch = small_sky_order1_catalog_cloud.hc_structure.filter_by_cone(ra, dec, radius)
consearch_catalog = small_sky_order1_catalog_cloud.cone_search(ra, dec, radius)
assert len(hc_conesearch.get_healpix_pixels()) == len(consearch_catalog.hc_structure.get_pixels())
assert len(hc_conesearch.get_healpix_pixels()) == len(consearch_catalog.get_healpix_pixels())
assert len(hc_conesearch.get_healpix_pixels()) == consearch_catalog._ddf.npartitions
print(hc_conesearch.get_healpix_pixels())
for pixel in hc_conesearch.get_healpix_pixels():
Expand Down
12 changes: 9 additions & 3 deletions cloud_tests/lsdb/catalog/test_crossmatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ def test_kdtree_crossmatch(small_sky_catalog_cloud, small_sky_xmatch_catalog_clo
assert xmatch_row["_DIST"].values == pytest.approx(correct_row["dist"])


def test_kdtree_crossmatch_thresh(small_sky_catalog_cloud, small_sky_xmatch_catalog_cloud, xmatch_correct_005_cloud):
def test_kdtree_crossmatch_thresh(
small_sky_catalog_cloud, small_sky_xmatch_catalog_cloud, xmatch_correct_005_cloud
):
xmatched = small_sky_catalog_cloud.crossmatch(small_sky_xmatch_catalog_cloud, d_thresh=0.005).compute()
assert len(xmatched) == len(xmatch_correct_005_cloud)
for _, correct_row in xmatch_correct_005_cloud.iterrows():
Expand All @@ -27,7 +29,9 @@ def test_kdtree_crossmatch_thresh(small_sky_catalog_cloud, small_sky_xmatch_cata
def test_kdtree_crossmatch_multiple_neighbors(
small_sky_catalog_cloud, small_sky_xmatch_catalog_cloud, xmatch_correct_3n_2t_no_margin_cloud
):
xmatched = small_sky_catalog_cloud.crossmatch(small_sky_xmatch_catalog_cloud, n_neighbors=3, d_thresh=2).compute()
xmatched = small_sky_catalog_cloud.crossmatch(
small_sky_xmatch_catalog_cloud, n_neighbors=3, d_thresh=2
).compute()
assert len(xmatched) == len(xmatch_correct_3n_2t_no_margin_cloud)
for _, correct_row in xmatch_correct_3n_2t_no_margin_cloud.iterrows():
assert correct_row["ss_id"] in xmatched["id_small_sky"].values
Expand All @@ -39,7 +43,9 @@ def test_kdtree_crossmatch_multiple_neighbors(
assert xmatch_row["_DIST"].values == pytest.approx(correct_row["dist"])


def test_custom_crossmatch_algorithm(small_sky_catalog_cloud, small_sky_xmatch_catalog_cloud, xmatch_mock_cloud):
def test_custom_crossmatch_algorithm(
small_sky_catalog_cloud, small_sky_xmatch_catalog_cloud, xmatch_mock_cloud
):
xmatched = small_sky_catalog_cloud.crossmatch(
small_sky_xmatch_catalog_cloud, algorithm=MockCrossmatchAlgorithm, mock_results=xmatch_mock_cloud
).compute()
Expand Down
52 changes: 29 additions & 23 deletions cloud_tests/lsdb/loaders/hipscat/test_read_hipscat.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@
import lsdb


def test_read_hipscat(small_sky_order1_dir_cloud, small_sky_order1_hipscat_catalog_cloud, example_cloud_storage_options):
def test_read_hipscat(
small_sky_order1_dir_cloud, small_sky_order1_hipscat_catalog_cloud, example_cloud_storage_options
):
catalog = lsdb.read_hipscat(small_sky_order1_dir_cloud, storage_options=example_cloud_storage_options)
assert isinstance(catalog, lsdb.Catalog)
assert catalog.hc_structure.catalog_base_dir == small_sky_order1_hipscat_catalog_cloud.catalog_base_dir
pd.testing.assert_frame_equal(
catalog.hc_structure.get_pixels(), small_sky_order1_hipscat_catalog_cloud.get_pixels()
)
assert catalog.get_healpix_pixels() == small_sky_order1_hipscat_catalog_cloud.get_healpix_pixels()


def test_pixels_in_map_equal_catalog_pixels(small_sky_order1_dir_cloud, small_sky_order1_hipscat_catalog_cloud, example_cloud_storage_options):
def test_pixels_in_map_equal_catalog_pixels(
small_sky_order1_dir_cloud, small_sky_order1_hipscat_catalog_cloud, example_cloud_storage_options
):
catalog = lsdb.read_hipscat(small_sky_order1_dir_cloud, storage_options=example_cloud_storage_options)
for _, row in small_sky_order1_hipscat_catalog_cloud.get_pixels().iterrows():
hp_order = row["Norder"]
hp_pixel = row["Npix"]
catalog.get_partition(hp_order, hp_pixel)
for healpix_pixel in small_sky_order1_hipscat_catalog_cloud.get_healpix_pixels():
catalog.get_partition(healpix_pixel.order, healpix_pixel.pixel)


def test_wrong_pixel_raises_value_error(small_sky_order1_dir_cloud, example_cloud_storage_options):
Expand All @@ -28,11 +28,13 @@ def test_wrong_pixel_raises_value_error(small_sky_order1_dir_cloud, example_clou
catalog.get_partition(-1, -1)


def test_parquet_data_in_partitions_match_files(small_sky_order1_dir_cloud, small_sky_order1_hipscat_catalog_cloud, example_cloud_storage_options):
def test_parquet_data_in_partitions_match_files(
small_sky_order1_dir_cloud, small_sky_order1_hipscat_catalog_cloud, example_cloud_storage_options
):
catalog = lsdb.read_hipscat(small_sky_order1_dir_cloud, storage_options=example_cloud_storage_options)
for _, row in small_sky_order1_hipscat_catalog_cloud.get_pixels().iterrows():
hp_order = row["Norder"]
hp_pixel = row["Npix"]
for healpix_pixel in small_sky_order1_hipscat_catalog_cloud.get_healpix_pixels():
hp_order = healpix_pixel.order
hp_pixel = healpix_pixel.pixel
partition = catalog.get_partition(hp_order, hp_pixel)
partition_df = partition.compute()
parquet_path = hc.io.paths.pixel_catalog_file(
Expand All @@ -42,22 +44,26 @@ def test_parquet_data_in_partitions_match_files(small_sky_order1_dir_cloud, smal
pd.testing.assert_frame_equal(partition_df, loaded_df)


def test_read_hipscat_specify_catalog_type(small_sky_catalog_cloud, small_sky_dir_cloud, example_cloud_storage_options):
catalog = lsdb.read_hipscat(small_sky_dir_cloud, catalog_type=lsdb.Catalog, storage_options=example_cloud_storage_options)
def test_read_hipscat_specify_catalog_type(
small_sky_catalog_cloud, small_sky_dir_cloud, example_cloud_storage_options
):
catalog = lsdb.read_hipscat(
small_sky_dir_cloud, catalog_type=lsdb.Catalog, storage_options=example_cloud_storage_options
)
assert isinstance(catalog, lsdb.Catalog)
pd.testing.assert_frame_equal(catalog.compute(), small_sky_catalog_cloud.compute())
pd.testing.assert_frame_equal(
catalog.hc_structure.get_pixels(), small_sky_catalog_cloud.hc_structure.get_pixels()
)
assert catalog.hc_structure.get_healpix_pixels() == small_sky_catalog_cloud.get_healpix_pixels()
assert catalog.hc_structure.catalog_info == small_sky_catalog_cloud.hc_structure.catalog_info


def test_read_hipscat_no_parquet_metadata(small_sky_catalog_cloud, small_sky_no_metadata_dir_cloud, example_cloud_storage_options):
catalog = lsdb.read_hipscat(small_sky_no_metadata_dir_cloud, storage_options=example_cloud_storage_options)
pd.testing.assert_frame_equal(catalog.compute(), small_sky_catalog_cloud.compute())
pd.testing.assert_frame_equal(
catalog.hc_structure.get_pixels(), small_sky_catalog_cloud.hc_structure.get_pixels()
def test_read_hipscat_no_parquet_metadata(
small_sky_catalog_cloud, small_sky_no_metadata_dir_cloud, example_cloud_storage_options
):
catalog = lsdb.read_hipscat(
small_sky_no_metadata_dir_cloud, storage_options=example_cloud_storage_options
)
pd.testing.assert_frame_equal(catalog.compute(), small_sky_catalog_cloud.compute())
assert catalog.hc_structure.get_healpix_pixels() == small_sky_catalog_cloud.get_healpix_pixels()
assert catalog.hc_structure.catalog_info == small_sky_catalog_cloud.hc_structure.catalog_info


Expand Down
36 changes: 18 additions & 18 deletions src/lsdb/loaders/dataframe/dataframe_catalog_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,25 @@ class DataframeCatalogLoader:
DEFAULT_THRESHOLD = 100_000

def __init__(
self,
df: pd.DataFrame,
lowest_order: int = 0,
highest_order: int = 5,
partition_size: float | None = None,
threshold: int | None = None,
**kwargs,
self,
dataframe: pd.DataFrame,
lowest_order: int = 0,
highest_order: int = 5,
partition_size: float | None = None,
threshold: int | None = None,
**kwargs,
) -> None:
"""Initializes a DataframeCatalogLoader
Args:
df (pd.Dataframe): Catalog Pandas Dataframe
dataframe (pd.Dataframe): Catalog Pandas Dataframe
lowest_order (int): The lowest partition order
highest_order (int): The highest partition order
partition_size (float): The desired partition size, in megabytes
threshold (int): The maximum number of data points per pixel
**kwargs: Arguments to pass to the creation of the catalog info
"""
self.df = df
self.dataframe = dataframe
self.lowest_order = lowest_order
self.highest_order = highest_order
self.threshold = self._calculate_threshold(partition_size, threshold)
Expand All @@ -66,11 +66,11 @@ def _calculate_threshold(self, partition_size: float | None = None, threshold: i
raise ValueError("Specify only one: threshold or partition_size")
if threshold is None:
if partition_size is not None:
df_size_bytes = self.df.memory_usage().sum()
df_size_bytes = self.dataframe.memory_usage().sum()
# Round the number of partitions to the next integer, otherwise the
# number of pixels per partition may exceed the threshold
num_partitions = math.ceil(df_size_bytes / (partition_size * (1 << 20)))
threshold = len(self.df.index) // num_partitions
threshold = len(self.dataframe.index) // num_partitions
else:
threshold = DataframeCatalogLoader.DEFAULT_THRESHOLD
return threshold
Expand Down Expand Up @@ -107,11 +107,11 @@ def load_catalog(self) -> Catalog:
def _set_hipscat_index(self):
"""Generates the hipscat indices for each data point and assigns
the hipscat index column as the Dataframe index."""
self.df[HIPSCAT_ID_COLUMN] = compute_hipscat_id(
ra_values=self.df[self.catalog_info.ra_column],
dec_values=self.df[self.catalog_info.dec_column],
self.dataframe[HIPSCAT_ID_COLUMN] = compute_hipscat_id(
ra_values=self.dataframe[self.catalog_info.ra_column],
dec_values=self.dataframe[self.catalog_info.dec_column],
)
self.df.set_index(HIPSCAT_ID_COLUMN, inplace=True)
self.dataframe.set_index(HIPSCAT_ID_COLUMN, inplace=True)

def _compute_pixel_map(self) -> Dict[HealpixPixel, HealpixInfo]:
"""Compute object histogram and generate the mapping between
Expand All @@ -123,7 +123,7 @@ def _compute_pixel_map(self) -> Dict[HealpixPixel, HealpixInfo]:
of objects in the HEALPix pixel, the second is the list of pixels
"""
raw_histogram = generate_histogram(
self.df,
self.dataframe,
highest_order=self.highest_order,
ra_column=self.catalog_info.ra_column,
dec_column=self.catalog_info.dec_column,
Expand Down Expand Up @@ -161,7 +161,7 @@ def _generate_dask_df_and_map(
ddf_pixel_map[hp_pixel] = hp_pixel_index

# Generate Dask Dataframe with original schema
schema = pd.DataFrame(columns=self.df.columns).astype(self.df.dtypes)
schema = pd.DataFrame(columns=self.dataframe.columns).astype(self.dataframe.dtypes)
ddf = self._generate_dask_dataframe(pixel_dfs, schema)

return ddf, ddf_pixel_map
Expand Down Expand Up @@ -199,4 +199,4 @@ def _get_dataframe_for_healpix(self, pixels: List[int]) -> pd.DataFrame:
"""
left_bound = healpix_to_hipscat_id(self.highest_order, pixels[0])
right_bound = healpix_to_hipscat_id(self.highest_order, pixels[-1] + 1)
return self.df.loc[(self.df.index >= left_bound) & (self.df.index < right_bound)]
return self.dataframe.loc[(self.dataframe.index >= left_bound) & (self.dataframe.index < right_bound)]
8 changes: 5 additions & 3 deletions src/lsdb/loaders/dataframe/from_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@


def from_dataframe(
df: pd.DataFrame,
dataframe: pd.DataFrame,
lowest_order: int = 0,
highest_order: int = 5,
partition_size: float | None = None,
Expand All @@ -17,7 +17,7 @@ def from_dataframe(
"""Load a catalog from a Pandas Dataframe in CSV format.
Args:
df (pd.Dataframe): The catalog Pandas Dataframe
dataframe (pd.Dataframe): The catalog Pandas Dataframe
lowest_order (int): The lowest partition order
highest_order (int): The highest partition order
partition_size (float): The desired partition size, in megabytes
Expand All @@ -27,5 +27,7 @@ def from_dataframe(
Returns:
Catalog object loaded from the given parameters
"""
loader = DataframeCatalogLoader(df, lowest_order, highest_order, partition_size, threshold, **kwargs)
loader = DataframeCatalogLoader(
dataframe, lowest_order, highest_order, partition_size, threshold, **kwargs
)
return loader.load_catalog()
Loading

0 comments on commit 02756b7

Please sign in to comment.