Skip to content

Commit

Permalink
feat: add ndjson reading methods
Browse files Browse the repository at this point in the history
Specifically:
- list_ndjson_in_dir: scans for files & their resource types
- read_ndjson: returns line by line results from a file
- read_ndjson_from_dir: returns line by line results from a folder

This hides all the I/O error handling & odd format challenges
(like .jsonl vs .ndjson or handling new lines).
  • Loading branch information
mikix committed Jul 1, 2024
1 parent 51fd987 commit b085f01
Show file tree
Hide file tree
Showing 5 changed files with 495 additions and 3 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
repos:
- repo: https://github.com/psf/black
#this version is synced with the black mentioned in .github/workflows/ci.yml
rev: 23.10.0
rev: 24.4.2
hooks:
- id: black
entry: bash -c 'black "$@"; git add -u' --
3 changes: 2 additions & 1 deletion cumulus_fhir_support/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""FHIR support code for the Cumulus project"""

__version__ = "1.1.0"
__version__ = "1.2.0"

from .ndjson import list_ndjson_in_dir, read_ndjson, read_ndjson_from_dir
from .schemas import pyarrow_schema_from_rows
237 changes: 237 additions & 0 deletions cumulus_fhir_support/ndjson.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
"""
Find and read multi-line JSON files.
FHIR NDJSON files can come in many different filename pattern flavors.
And file parsing can have its own oddities.
These utility functions allow the Cumulus family of tools to all handle
being given "a folder of NDJSON input" the same way.
** Error handling
In general, these functions log and ignore errors.
This library is optimized for parsing large amounts of externally-provided NDJSON,
where aborting on a single error you may not have control over rarely makes sense.
** fsspec
This module has optional support for file access via fsspec.
It is not a required dependency, but will be used if provided.
** File format
There are two multi-line JSON specs at the time of writing:
- https://github.com/ndjson/ndjson-spec (unmaintained but more popular?)
- See https://github.com/ndjson/ndjson-spec/issues/35 for more details
- But notably, ndjson.org was allowed to expire and is now a gambling site
- https://jsonlines.org/ (maintained)
The only real differences are:
- different file extensions (.ndjson vs .jsonl)
- NDJSON allows parsers to ignore empty lines
This module splits the difference by allowing empty lines and looking for both extensions.
The FHIR world in general seems to prefer NDJSON for multi-line JSON,
so we use that terminology by default too.
But we also look for JSON Lines filenames, in case that catches on more in the future.
"""

import json
import logging
import os
import pathlib
from typing import TYPE_CHECKING, Any, Iterable, Optional, Union

if TYPE_CHECKING:
import fsspec

PathType = Union[str, pathlib.Path]
ResourceType = Union[str, Iterable[str], None]

logger = logging.getLogger(__name__)


def list_ndjson_in_dir(
path: PathType,
resource: ResourceType = None,
*,
fsspec_fs: Optional["fsspec.AbstractFileSystem"] = None,
) -> dict[str, Optional[str]]:
"""
Returns file info in the target folder that are NDJSON files for the given resources.
- This will not recurse into sub-folders.
- I/O and JSON errors will be logged, not raised.
- Will return an empty dict if the path does not exist.
- Passing None as the resource filter (the default) will return all NDJSON files found.
- Returned filenames will be full paths.
- The order of filenames will be consistent across calls.
Examples:
list_ndjson_in_dir("/") -> {
"/random.ndjson": None,
"/con1.ndjson": "Condition",
"/pat1.ndjson": "Patient",
}
list_ndjson_in_dir("/", "Patient") -> {"/pat1.ndjson": "Patient"}
list_ndjson_in_dir("/", ["Condition", "Patient"]) -> {
"/con1.ndjson": "Condition",
"/pat1.ndjson": "Patient",
}
:param path: the folder to examine
:param resource: the type of FHIR resource(s) for which to return files
:param fsspec_fs: optional fsspec FileSystem to use for I/O
:return: a dict of {path: resourceType} for all child files of the appropriate type(s)
"""
path = str(path)
if fsspec_fs:
if not fsspec_fs.exists(path):
return {}
children = fsspec_fs.ls(path, detail=False)
else:
if not os.path.exists(path):
return {}
children = [f"{path}/{child}" for child in os.listdir(path)]

# Coalesce resource to None or a set of strings
if isinstance(resource, str):
resource = {resource}
elif resource is not None:
resource = set(resource)

# Now grab filenames for all target resource types
results = {}
for child in sorted(children): # sort for reproducibility
results.update(_get_resource_type(child, resource, fsspec_fs=fsspec_fs))
return results


def _get_resource_type(
path: str,
target_resources: Optional[set[str]],
fsspec_fs: Optional["fsspec.AbstractFileSystem"] = None,
) -> dict[str, Optional[str]]:
"""
Returns path & resource type if the file appears to be for the given resources.
** Digression into the wide world of FHIR NDJSON filenames **
Here's the filenames that the "official" tools use:
- bulk-data-client creates files like "1.Condition.ndjson"
- cumulus-etl creates files like "Condition.001.ndjson"
The servers themselves aren't any help / give no guidance, if you were doing raw bulk:
- Cerner provides download files like "11ef-34be-b3c0dc02-87c5-6a5c9bab18ec"
- Epic provides download files like "eNZwsy9bU7LX8nBB.RJXkpA3"
Plus sometimes you don't have full control of the filenames.
We've seen IT groups provide bulk files like "Condition20240614-112115.ndjson"
on a shared drive that you don't control.
But even if you do control the filename, it might be convenient to rename the NDJSON files
with some extra context like "vital-signs.ndjson" or whatever the case may be.
Because of that wide variety, we'll avoid assuming anything about the filename.
Instead, we'll just sniff the first line of every file to examine whether it matches the target
resource.
We will look for a .ndjson file ending at least, which the bulk servers don't provide,
but it seems reasonable to require to avoid reading the first line of big binary files.
:param path: the file to examine
:param target_resources: the type of FHIR resource(s) to accept for this file
:param fsspec_fs: optional fsspec FileSystem to use for I/O
:return: a tiny dict of {path: resourceType} if the file is valid else {}
"""
# Must look like a multi-line JSON file
if pathlib.Path(path).suffix not in {".jsonl", ".ndjson"}:
return {}

# Must be a regular file
isfile_func = fsspec_fs.isfile if fsspec_fs else os.path.isfile
if not isfile_func(path):
return {}

try:
# Read just the first line.
# We cannot assume that "resourceType" is the first field,
# so we must parse the whole first line.
# See https://www.hl7.org/fhir/R4/json.html#resources
open_func = fsspec_fs.open if fsspec_fs else open
with open_func(path, "r", encoding="utf8") as f:
if not (line := f.readline()):
return {}
parsed = json.loads(line)
except Exception as exc: # pylint: disable=broad-except
logger.warning("Could not read from '%s': %s", path, str(exc))
return {}

resource_type = parsed.get("resourceType") if isinstance(parsed, dict) else None

if target_resources is None or resource_type in target_resources:
return {path: resource_type}

# Didn't match our target resource types, just pretend it doesn't exist
return {}


def read_ndjson(
path: PathType,
*,
fsspec_fs: Optional["fsspec.AbstractFileSystem"] = None,
) -> Iterable[Any]:
"""
Generator that yields lines of NDJSON from the target file.
- Empty lines will be ignored.
- I/O and JSON errors will be logged, not raised.
- Will return an empty result if the path does not exist or is not readable.
- The lines of JSON are not required to be dictionaries.
:param path: the file to read
:param fsspec_fs: optional fsspec FileSystem to use for I/O
:return: an iterable of parsed JSON results, line by line
"""
path = str(path)
open_func = fsspec_fs.open if fsspec_fs else open
try:
with open_func(path, "r", encoding="utf8") as f:
for line_num, line in enumerate(f):
if not line.rstrip("\r\n"):
# ignore empty lines (shouldn't normally happen,
# but maybe the file has an extra trailing new line
# or some other oddity - let's be graceful)
continue
try:
yield json.loads(line)
except json.JSONDecodeError as exc:
logger.warning("Could not decode '%s:%d': %s", path, line_num + 1, str(exc))
except Exception as exc:
logger.error("Could not read from '%s': %s", path, str(exc))


def read_ndjson_from_dir(
path: PathType,
resource: ResourceType = None,
*,
fsspec_fs: Optional["fsspec.AbstractFileSystem"] = None,
) -> Iterable[Any]:
"""
Generator that yields lines of NDJSON from the target folder.
- This will not recurse into sub-folders.
- Empty files and lines will be ignored.
- I/O and JSON errors will be logged, not raised.
- Will return an empty result if the path does not exist or is not readable.
- Passing None as the resource filter (the default) will return all NDJSON files found.
- The lines of JSON are not required to be dictionaries.
- The order of results will be consistent across calls.
:param path: the folder to scan
:param resource: the type of FHIR resource(s) for which to return files
:param fsspec_fs: optional fsspec FileSystem to use for I/O
:return: an iterable of parsed JSON results, line by line
"""
for filename in list_ndjson_in_dir(path, resource, fsspec_fs=fsspec_fs):
yield from read_ndjson(filename, fsspec_fs=fsspec_fs)
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ line-length = 100

[project.optional-dependencies]
tests = [
"ddt",
"pytest",
]
dev = [
"black >= 23, < 24",
"black >= 24, < 25",
"pre-commit",
]
Loading

0 comments on commit b085f01

Please sign in to comment.