Skip to content

Commit

Permalink
Merge pull request #12 from astronomy-commons/sean/add-catalog-loading
Browse files Browse the repository at this point in the history
Add Catalog class and Catalog Loading
  • Loading branch information
smcguire-cmu authored Mar 28, 2023
2 parents 7f72881 + ca73e96 commit f16acbd
Show file tree
Hide file tree
Showing 32 changed files with 486 additions and 39 deletions.
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ repos:
files: ^(src|tests)/
args:
[

"--ignore-missing-imports", # Ignore imports without type hints

]

# Make sure Sphinx can build the documentation without issues.
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ classifiers = [
]
dynamic = ["version"]
dependencies = [
"dask",
"hipscat",
"pyarrow",
"deprecated",
"ipykernel", # Support for Jupyter notebooks
]
Expand Down
3 changes: 2 additions & 1 deletion src/lsdb/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .example_module import *
from .catalog import Catalog
from .loaders import read_hipscat
1 change: 1 addition & 0 deletions src/lsdb/catalog/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .catalog import Catalog
67 changes: 67 additions & 0 deletions src/lsdb/catalog/catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from typing import Dict

import dask.dataframe as dd
import hipscat as hc

from lsdb.core.healpix.healpix_pixel import HealpixPixel

DaskDFPixelMap = Dict[HealpixPixel, int]


# pylint: disable=R0903, W0212
class Catalog:
"""LSDB Catalog DataFrame to perform analysis of sky catalogs and efficient
spatial operations.
Attributes:
name: Name of the catalog
hc_structure: `hipscat.Catalog` object representing the structure
and metadata of the HiPSCat catalog
"""

def __init__(
self,
ddf: dd.DataFrame,
ddf_pixel_map: DaskDFPixelMap,
hc_structure: hc.catalog.Catalog,
):
"""Initialise a Catalog object.
Not to be used to load a catalog directly, use one of the `lsdb.from_...` or
`lsdb.load_...` methods
Args:
ddf: Dask DataFrame with the source data of the catalog
ddf_pixel_map: Dictionary mapping HEALPix order and pixel to partition index of ddf
hc_structure: `hipscat.Catalog` object with hipscat metadata of the catalog
"""
self._ddf = ddf
self._ddf_pixel_map = ddf_pixel_map
self.hc_structure = hc_structure

def __repr__(self):
return self._ddf.__repr__()

def _repr_html_(self):
return self._ddf._repr_html_()

def compute(self):
"""Compute dask distributed dataframe to pandas dataframe"""
return self._ddf.compute()

def get_partition(self, order: int, pixel: int) -> dd.DataFrame:
"""Get the dask partition for a given HEALPix pixel
Args:
order: Order of HEALPix pixel
pixel: HEALPix pixel number in NESTED ordering scheme
Returns:
Dask Dataframe with a single partition with data at that pixel
Raises:
Value error if no data exists for the specified pixel
"""
hp_pixel = HealpixPixel(order, pixel)
if not hp_pixel in self._ddf_pixel_map:
raise ValueError(f"Pixel at order {order} pixel {pixel} not in Catalog")
partition_index = self._ddf_pixel_map[hp_pixel]
return self._ddf.partitions[partition_index]
File renamed without changes.
File renamed without changes.
44 changes: 44 additions & 0 deletions src/lsdb/core/healpix/healpix_pixel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from __future__ import annotations

import hipscat.pixel_math.hipscat_id

MAXIMUM_ORDER = hipscat.pixel_math.hipscat_id.HIPSCAT_ID_HEALPIX_ORDER


class HealpixPixel:
"""A HEALPix pixel, represented by an order and pixel number in NESTED ordering scheme
see https://lambda.gsfc.nasa.gov/toolbox/pixelcoords.html for more information
"""

def __init__(self, order: int, pixel: int) -> None:
"""Initialize a HEALPix pixel
Args:
order: HEALPix order
pixel: HEALPix pixel number in NESTED ordering scheme
"""
if order > MAXIMUM_ORDER:
raise ValueError(f"HEALPix order cannot be greater than {MAXIMUM_ORDER}")
self.order = order
self.pixel = pixel

def _key(self) -> tuple[int, int]:
"""Returns tuple of order and pixel, for use in hashing and equality"""
return self.order, self.pixel

def __eq__(self, other: object) -> bool:
"""Defines 2 pixels as equal if they have the same order and pixel"""
if not isinstance(other, HealpixPixel):
return False
return self._key() == other._key()

def __hash__(self) -> int:
"""Hashes pixels by order and pixel, so equal pixel objects are looked up the same in
hashable data structures"""
return hash(self._key())

def __str__(self) -> str:
return f"Order: {self.order}, Pixel: {self.pixel}"

def __repr__(self):
return self.__str__()
23 changes: 0 additions & 23 deletions src/lsdb/example_module.py

This file was deleted.

1 change: 1 addition & 0 deletions src/lsdb/io/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .parquet_io import *
12 changes: 12 additions & 0 deletions src/lsdb/io/parquet_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import pandas as pd
from hipscat.io import FilePointer
from pyarrow import Schema
import pyarrow.parquet as pq


def read_parquet_schema(file_pointer: FilePointer) -> Schema:
return pq.read_schema(file_pointer)


def read_parquet_file_to_pandas(file_pointer: FilePointer, **kwargs) -> pd.DataFrame:
return pd.read_parquet(file_pointer, **kwargs)
1 change: 1 addition & 0 deletions src/lsdb/loaders/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .hipscat import read_hipscat
1 change: 1 addition & 0 deletions src/lsdb/loaders/hipscat/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .read_hipscat import read_hipscat
95 changes: 95 additions & 0 deletions src/lsdb/loaders/hipscat/hipscat_catalog_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import dask.dataframe as dd
import hipscat as hc
import pyarrow

from lsdb import io
from lsdb.catalog.catalog import Catalog, DaskDFPixelMap
from lsdb.core.healpix.healpix_pixel import MAXIMUM_ORDER, HealpixPixel
from lsdb.loaders.hipscat.hipscat_loading_config import HipscatLoadingConfig


# pylint: disable=R0903
class HipscatCatalogLoader:
"""Loads a HiPSCat formatted Catalog"""

def __init__(self, path: str, config: HipscatLoadingConfig) -> None:
"""Initializes a HipscatCatalogLoader
Args:
path: path to the root of the HiPSCat catalog
config: options to configure how the catalog is loaded
"""
self.path = path
self.base_catalog_dir = hc.io.get_file_pointer_from_path(self.path)
self.config = config

def load_catalog(self) -> Catalog:
"""Load a catalog from the configuration specified when the loader was created
Returns:
Catalog object with data from the source given at loader initialization
"""
hc_catalog = self.load_hipscat_catalog()
dask_df, dask_df_pixel_map = self._load_dask_df_and_map(hc_catalog)
return Catalog(dask_df, dask_df_pixel_map, hc_catalog)

def load_hipscat_catalog(self) -> hc.catalog.Catalog:
"""Load `hipscat` library catalog object with catalog metadata and partition data"""
return hc.catalog.Catalog(catalog_path=self.path)

def _load_dask_df_and_map(
self, catalog: hc.catalog.Catalog
) -> tuple[dd.DataFrame, DaskDFPixelMap]:
"""Load Dask DF from parquet files and make dict of HEALPix pixel to partition index"""
ordered_pixels = self._get_ordered_pixel_list(catalog)
ordered_paths = self._get_paths_from_pixels(catalog, ordered_pixels)
pixel_to_index_map = {
pixel: index for index, pixel in enumerate(ordered_pixels)
}
ddf = self._load_df_from_paths(catalog, ordered_paths)
return ddf, pixel_to_index_map

def _get_ordered_pixel_list(
self, catalog: hc.catalog.Catalog
) -> list[HealpixPixel]:
pixels = []
for _, row in catalog.get_pixels().iterrows():
order = row[hc.catalog.PartitionInfo.METADATA_ORDER_COLUMN_NAME]
pixel = row[hc.catalog.PartitionInfo.METADATA_PIXEL_COLUMN_NAME]
pixels.append(HealpixPixel(order, pixel))
# Sort pixels by pixel number at highest order
sorted_pixels = sorted(
pixels, key=lambda pixel: (4 ** (MAXIMUM_ORDER - pixel.order)) * pixel.pixel
)
return sorted_pixels

def _get_paths_from_pixels(
self, catalog: hc.catalog.Catalog, ordered_pixels: list[HealpixPixel]
) -> list[hc.io.FilePointer]:
paths = [
hc.io.paths.pixel_catalog_file(
catalog_base_dir=catalog.catalog_base_dir,
pixel_order=pixel.order,
pixel_number=pixel.pixel,
)
for pixel in ordered_pixels
]
return paths

def _load_df_from_paths(
self, catalog: hc.catalog.Catalog, paths: list[hc.io.FilePointer]
) -> dd.DataFrame:
metadata_schema = self._load_parquet_metadata_schema(catalog, paths)
dask_meta_schema = metadata_schema.empty_table().to_pandas()
ddf = dd.from_map(io.read_parquet_file_to_pandas, paths, meta=dask_meta_schema)
return ddf

def _load_parquet_metadata_schema(
self, catalog: hc.catalog.Catalog, paths: list[hc.io.FilePointer]
) -> pyarrow.Schema:
metadata_pointer = hc.io.paths.get_parquet_metadata_pointer(
catalog.catalog_base_dir
)
if hc.io.file_io.does_file_or_directory_exist(metadata_pointer):
return io.read_parquet_schema(metadata_pointer)
return io.read_parquet_schema(paths[0])
8 changes: 8 additions & 0 deletions src/lsdb/loaders/hipscat/hipscat_loading_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from dataclasses import dataclass


@dataclass
class HipscatLoadingConfig:
"""Configuration for loading a HiPSCat catalog in lsdb"""

pass
33 changes: 33 additions & 0 deletions src/lsdb/loaders/hipscat/read_hipscat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import dataclasses

from lsdb import Catalog
from lsdb.loaders.hipscat.hipscat_catalog_loader import HipscatCatalogLoader
from lsdb.loaders.hipscat.hipscat_loading_config import HipscatLoadingConfig


def read_hipscat(
path: str,
) -> Catalog:
"""Load a catalog from a HiPSCat formatted catalog.
Args:
path: The path that locates the root of the HiPSCat catalog
source: Source to load the catalog from. Default is `None`, in which case the source is
inferred from the path. Currently supported options are:
-`'local'`: HiPSCat files stored locally on disk
"""

# Creates a config object to store loading parameters from all keyword arguments. I
# originally had a few parameters in here, but after changing the file loading implementation
# they weren't needed, so this object is now empty. But I wanted to keep this here for future
# use
kwd_args = locals().copy()
config_args = {
field.name: kwd_args[field.name]
for field in dataclasses.fields(HipscatLoadingConfig)
}
config = HipscatLoadingConfig(**config_args)

loader = HipscatCatalogLoader(path, config)

return loader.load_catalog()
1 change: 1 addition & 0 deletions tests/.pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ disable=raw-checker-failed,
redefined-outer-name,
protected-access,
missing-module-docstring,
unnecessary-pass,

# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
Expand Down
46 changes: 46 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import os

import hipscat as hc
import pytest

import lsdb

DATA_DIR_NAME = "data"
SMALL_SKY_DIR_NAME = "small_sky"
SMALL_SKY_ORDER1_DIR_NAME = "small_sky_order1"
TEST_DIR = os.path.dirname(__file__)


@pytest.fixture
def test_data_dir():
return os.path.join(TEST_DIR, DATA_DIR_NAME)


@pytest.fixture
def small_sky_dir(test_data_dir):
return os.path.join(test_data_dir, SMALL_SKY_DIR_NAME)


@pytest.fixture
def small_sky_order1_dir(test_data_dir):
return os.path.join(test_data_dir, SMALL_SKY_ORDER1_DIR_NAME)


@pytest.fixture
def small_sky_hipscat_catalog(small_sky_dir):
return hc.catalog.Catalog(small_sky_dir)


@pytest.fixture
def small_sky_catalog(small_sky_dir):
return lsdb.read_hipscat(small_sky_dir)


@pytest.fixture
def small_sky_order1_hipscat_catalog(small_sky_order1_dir):
return hc.catalog.Catalog(small_sky_order1_dir)


@pytest.fixture
def small_sky_order1_catalog(small_sky_order1_dir):
return lsdb.read_hipscat(small_sky_order1_dir)
Binary file not shown.
11 changes: 11 additions & 0 deletions tests/data/small_sky/catalog_info.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"catalog_name": "small_sky",
"version": "0.0.1",
"generation_date": "2022.12.20",
"epoch": "J2000",
"ra_kw": "ra",
"dec_kw": "dec",
"id_kw": "id",
"total_objects": 131,
"pixel_threshold": 1000000
}
2 changes: 2 additions & 0 deletions tests/data/small_sky/partition_info.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Norder,Dir,Npix,num_rows
0,0,11,131
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading

0 comments on commit f16acbd

Please sign in to comment.