From b085f0129271dee2b6b660ed0441d4aeb0b205e0 Mon Sep 17 00:00:00 2001 From: Michael Terry Date: Mon, 1 Jul 2024 13:27:31 -0400 Subject: [PATCH] feat: add ndjson reading methods Specifically: - list_ndjson_in_dir: scans for files & their resource types - read_ndjson: returns line by line results from a file - read_ndjson_from_dir: returns line by line results from a folder This hides all the I/O error handling & odd format challenges (like .jsonl vs .ndjson or handling new lines). --- .pre-commit-config.yaml | 2 +- cumulus_fhir_support/__init__.py | 3 +- cumulus_fhir_support/ndjson.py | 237 +++++++++++++++++++++++++++++ pyproject.toml | 3 +- tests/test_ndjson.py | 253 +++++++++++++++++++++++++++++++ 5 files changed, 495 insertions(+), 3 deletions(-) create mode 100644 cumulus_fhir_support/ndjson.py create mode 100644 tests/test_ndjson.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f3d1e69..2ddb1ff 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,7 +1,7 @@ repos: - repo: https://github.com/psf/black #this version is synced with the black mentioned in .github/workflows/ci.yml - rev: 23.10.0 + rev: 24.4.2 hooks: - id: black entry: bash -c 'black "$@"; git add -u' -- diff --git a/cumulus_fhir_support/__init__.py b/cumulus_fhir_support/__init__.py index adde653..9ee67f4 100644 --- a/cumulus_fhir_support/__init__.py +++ b/cumulus_fhir_support/__init__.py @@ -1,5 +1,6 @@ """FHIR support code for the Cumulus project""" -__version__ = "1.1.0" +__version__ = "1.2.0" +from .ndjson import list_ndjson_in_dir, read_ndjson, read_ndjson_from_dir from .schemas import pyarrow_schema_from_rows diff --git a/cumulus_fhir_support/ndjson.py b/cumulus_fhir_support/ndjson.py new file mode 100644 index 0000000..5c2ea22 --- /dev/null +++ b/cumulus_fhir_support/ndjson.py @@ -0,0 +1,237 @@ +""" +Find and read multi-line JSON files. + +FHIR NDJSON files can come in many different filename pattern flavors. +And file parsing can have its own oddities. +These utility functions allow the Cumulus family of tools to all handle +being given "a folder of NDJSON input" the same way. + +** Error handling + +In general, these functions log and ignore errors. +This library is optimized for parsing large amounts of externally-provided NDJSON, +where aborting on a single error you may not have control over rarely makes sense. + +** fsspec + +This module has optional support for file access via fsspec. +It is not a required dependency, but will be used if provided. + +** File format + +There are two multi-line JSON specs at the time of writing: +- https://github.com/ndjson/ndjson-spec (unmaintained but more popular?) + - See https://github.com/ndjson/ndjson-spec/issues/35 for more details + - But notably, ndjson.org was allowed to expire and is now a gambling site +- https://jsonlines.org/ (maintained) + +The only real differences are: +- different file extensions (.ndjson vs .jsonl) +- NDJSON allows parsers to ignore empty lines + +This module splits the difference by allowing empty lines and looking for both extensions. + +The FHIR world in general seems to prefer NDJSON for multi-line JSON, +so we use that terminology by default too. +But we also look for JSON Lines filenames, in case that catches on more in the future. +""" + +import json +import logging +import os +import pathlib +from typing import TYPE_CHECKING, Any, Iterable, Optional, Union + +if TYPE_CHECKING: + import fsspec + +PathType = Union[str, pathlib.Path] +ResourceType = Union[str, Iterable[str], None] + +logger = logging.getLogger(__name__) + + +def list_ndjson_in_dir( + path: PathType, + resource: ResourceType = None, + *, + fsspec_fs: Optional["fsspec.AbstractFileSystem"] = None, +) -> dict[str, Optional[str]]: + """ + Returns file info in the target folder that are NDJSON files for the given resources. + + - This will not recurse into sub-folders. + - I/O and JSON errors will be logged, not raised. + - Will return an empty dict if the path does not exist. + - Passing None as the resource filter (the default) will return all NDJSON files found. + - Returned filenames will be full paths. + - The order of filenames will be consistent across calls. + + Examples: + list_ndjson_in_dir("/") -> { + "/random.ndjson": None, + "/con1.ndjson": "Condition", + "/pat1.ndjson": "Patient", + } + list_ndjson_in_dir("/", "Patient") -> {"/pat1.ndjson": "Patient"} + list_ndjson_in_dir("/", ["Condition", "Patient"]) -> { + "/con1.ndjson": "Condition", + "/pat1.ndjson": "Patient", + } + + :param path: the folder to examine + :param resource: the type of FHIR resource(s) for which to return files + :param fsspec_fs: optional fsspec FileSystem to use for I/O + :return: a dict of {path: resourceType} for all child files of the appropriate type(s) + """ + path = str(path) + if fsspec_fs: + if not fsspec_fs.exists(path): + return {} + children = fsspec_fs.ls(path, detail=False) + else: + if not os.path.exists(path): + return {} + children = [f"{path}/{child}" for child in os.listdir(path)] + + # Coalesce resource to None or a set of strings + if isinstance(resource, str): + resource = {resource} + elif resource is not None: + resource = set(resource) + + # Now grab filenames for all target resource types + results = {} + for child in sorted(children): # sort for reproducibility + results.update(_get_resource_type(child, resource, fsspec_fs=fsspec_fs)) + return results + + +def _get_resource_type( + path: str, + target_resources: Optional[set[str]], + fsspec_fs: Optional["fsspec.AbstractFileSystem"] = None, +) -> dict[str, Optional[str]]: + """ + Returns path & resource type if the file appears to be for the given resources. + + ** Digression into the wide world of FHIR NDJSON filenames ** + + Here's the filenames that the "official" tools use: + - bulk-data-client creates files like "1.Condition.ndjson" + - cumulus-etl creates files like "Condition.001.ndjson" + + The servers themselves aren't any help / give no guidance, if you were doing raw bulk: + - Cerner provides download files like "11ef-34be-b3c0dc02-87c5-6a5c9bab18ec" + - Epic provides download files like "eNZwsy9bU7LX8nBB.RJXkpA3" + + Plus sometimes you don't have full control of the filenames. + We've seen IT groups provide bulk files like "Condition20240614-112115.ndjson" + on a shared drive that you don't control. + + But even if you do control the filename, it might be convenient to rename the NDJSON files + with some extra context like "vital-signs.ndjson" or whatever the case may be. + + Because of that wide variety, we'll avoid assuming anything about the filename. + Instead, we'll just sniff the first line of every file to examine whether it matches the target + resource. + + We will look for a .ndjson file ending at least, which the bulk servers don't provide, + but it seems reasonable to require to avoid reading the first line of big binary files. + + :param path: the file to examine + :param target_resources: the type of FHIR resource(s) to accept for this file + :param fsspec_fs: optional fsspec FileSystem to use for I/O + :return: a tiny dict of {path: resourceType} if the file is valid else {} + """ + # Must look like a multi-line JSON file + if pathlib.Path(path).suffix not in {".jsonl", ".ndjson"}: + return {} + + # Must be a regular file + isfile_func = fsspec_fs.isfile if fsspec_fs else os.path.isfile + if not isfile_func(path): + return {} + + try: + # Read just the first line. + # We cannot assume that "resourceType" is the first field, + # so we must parse the whole first line. + # See https://www.hl7.org/fhir/R4/json.html#resources + open_func = fsspec_fs.open if fsspec_fs else open + with open_func(path, "r", encoding="utf8") as f: + if not (line := f.readline()): + return {} + parsed = json.loads(line) + except Exception as exc: # pylint: disable=broad-except + logger.warning("Could not read from '%s': %s", path, str(exc)) + return {} + + resource_type = parsed.get("resourceType") if isinstance(parsed, dict) else None + + if target_resources is None or resource_type in target_resources: + return {path: resource_type} + + # Didn't match our target resource types, just pretend it doesn't exist + return {} + + +def read_ndjson( + path: PathType, + *, + fsspec_fs: Optional["fsspec.AbstractFileSystem"] = None, +) -> Iterable[Any]: + """ + Generator that yields lines of NDJSON from the target file. + + - Empty lines will be ignored. + - I/O and JSON errors will be logged, not raised. + - Will return an empty result if the path does not exist or is not readable. + - The lines of JSON are not required to be dictionaries. + + :param path: the file to read + :param fsspec_fs: optional fsspec FileSystem to use for I/O + :return: an iterable of parsed JSON results, line by line + """ + path = str(path) + open_func = fsspec_fs.open if fsspec_fs else open + try: + with open_func(path, "r", encoding="utf8") as f: + for line_num, line in enumerate(f): + if not line.rstrip("\r\n"): + # ignore empty lines (shouldn't normally happen, + # but maybe the file has an extra trailing new line + # or some other oddity - let's be graceful) + continue + try: + yield json.loads(line) + except json.JSONDecodeError as exc: + logger.warning("Could not decode '%s:%d': %s", path, line_num + 1, str(exc)) + except Exception as exc: + logger.error("Could not read from '%s': %s", path, str(exc)) + + +def read_ndjson_from_dir( + path: PathType, + resource: ResourceType = None, + *, + fsspec_fs: Optional["fsspec.AbstractFileSystem"] = None, +) -> Iterable[Any]: + """ + Generator that yields lines of NDJSON from the target folder. + + - This will not recurse into sub-folders. + - Empty files and lines will be ignored. + - I/O and JSON errors will be logged, not raised. + - Will return an empty result if the path does not exist or is not readable. + - Passing None as the resource filter (the default) will return all NDJSON files found. + - The lines of JSON are not required to be dictionaries. + - The order of results will be consistent across calls. + + :param path: the folder to scan + :param resource: the type of FHIR resource(s) for which to return files + :param fsspec_fs: optional fsspec FileSystem to use for I/O + :return: an iterable of parsed JSON results, line by line + """ + for filename in list_ndjson_in_dir(path, resource, fsspec_fs=fsspec_fs): + yield from read_ndjson(filename, fsspec_fs=fsspec_fs) diff --git a/pyproject.toml b/pyproject.toml index 01ffc36..71941fd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,9 +41,10 @@ line-length = 100 [project.optional-dependencies] tests = [ + "ddt", "pytest", ] dev = [ - "black >= 23, < 24", + "black >= 24, < 25", "pre-commit", ] \ No newline at end of file diff --git a/tests/test_ndjson.py b/tests/test_ndjson.py new file mode 100644 index 0000000..8534209 --- /dev/null +++ b/tests/test_ndjson.py @@ -0,0 +1,253 @@ +"""Tests for ndjson.py""" + +import io +import json +import os +import tempfile +import unittest +from collections.abc import Generator +from unittest import mock + +import ddt + +import cumulus_fhir_support as support + + +@ddt.ddt +class NdjsonTests(unittest.TestCase): + """Test case for ndjson discovery and parsing""" + + # ******************* + # ** read_ndjson() ** + # ******************* + + def test_read_happy_path(self): + with tempfile.NamedTemporaryFile() as file: + with open(file.name, "w") as f: + f.write('{"id": "2"}\n{"id": "1"}') + with self.assertNoLogs(): + rows = support.read_ndjson(file.name) + self.assertIsInstance(rows, Generator) + self.assertEqual([{"id": "2"}, {"id": "1"}], list(rows)) + + def test_read_empty_lines_are_ignored(self): + with tempfile.NamedTemporaryFile() as file: + with open(file.name, "wb") as f: + f.write(b'\r\n\n\n{"id": "1"}\r\n\n\r\n{"id": "2"}\n\n\r\n') + with self.assertNoLogs(): + self.assertEqual([{"id": "1"}, {"id": "2"}], list(support.read_ndjson(file.name))) + + def test_read_open_error(self): + with tempfile.TemporaryDirectory() as tmpdir: + with self.assertLogs("cumulus_fhir_support", level="ERROR") as cm: + self.assertEqual([], list(support.read_ndjson(f"{tmpdir}/not-here"))) + self.assertEqual(1, len(cm.output)) + self.assertTrue( + cm.output[0].startswith( + "ERROR:cumulus_fhir_support.ndjson:Could not read from" + ), + cm.output[0], + ) + + def test_read_decode_error(self): + with tempfile.NamedTemporaryFile() as file: + with open(file.name, "w") as f: + f.write('{"id": "1"}\n{"id": "2" hello}\n{"id": "3"}') + with self.assertLogs("cumulus_fhir_support", level="WARNING") as cm: + self.assertEqual( + [ + {"id": "1"}, + {"id": "3"}, + ], + list(support.read_ndjson(file.name)), + ) + self.assertEqual(1, len(cm.output)) + self.assertTrue( + cm.output[0].startswith("WARNING:cumulus_fhir_support.ndjson:Could not decode"), + cm.output[0], + ) + + def test_read_non_dict_is_fine(self): + with tempfile.NamedTemporaryFile() as file: + with open(file.name, "w") as f: + f.write('1\n["2"]\n"3"') + with self.assertNoLogs(): + rows = support.read_ndjson(file.name) + self.assertIsInstance(rows, Generator) + self.assertEqual([1, ["2"], "3"], list(rows)) + + # ************************** + # ** list_ndjson_in_dir() ** + # ************************** + + @staticmethod + def fill_dir(tmpdir: str, files: dict[str, list[dict]]): + for basename, content in files.items(): + with open(f"{tmpdir}/{basename}", "w", encoding="utf8") as f: + for row in content: + json.dump(row, f) + f.write("\n") + + def test_list_any_ndjson_is_default(self): + with tempfile.TemporaryDirectory() as tmpdir: + self.fill_dir( + tmpdir, + { + "README.txt": [{"id": "ignored"}], + "file1.ndjson": [{"id": "file1", "resourceType": "Patient"}], + "file2.ndjson": [{"id": "file2"}], + }, + ) + with self.assertNoLogs(): + files = support.list_ndjson_in_dir(tmpdir) + self.assertEqual( + { + f"{tmpdir}/file1.ndjson": "Patient", + f"{tmpdir}/file2.ndjson": None, + }, + files, + ) + + def test_list_supports_multiple_formats(self): + with tempfile.TemporaryDirectory() as tmpdir: + self.fill_dir( + tmpdir, + { + "file1.ndjson": [{"id": "file1"}], + "file2.jsonl": [{"id": "file2"}], + "file3.txt": [{"id": "file3"}], + }, + ) + with self.assertNoLogs(): + files = support.list_ndjson_in_dir(tmpdir) + self.assertEqual(["file1.ndjson", "file2.jsonl"], [os.path.basename(p) for p in files]) + + @ddt.data( + # target resources, expected answer + (None, {"pat1", "pat2", "con", "obs", "none", "non-dict"}), + ([], []), + ("Patient", {"pat1", "pat2"}), + ({"Condition", "Observation"}, {"con", "obs"}), + (iter(["Condition", None]), {"con", "none", "non-dict"}), + ) + @ddt.unpack + def test_list_resource_filter(self, target_resources, expected_names): + with tempfile.TemporaryDirectory() as tmpdir: + self.fill_dir( + tmpdir, + { + "pat1.ndjson": [{"resourceType": "Patient"}], + "pat2.ndjson": [{"resourceType": "Patient"}], + "con.ndjson": [{"resourceType": "Condition"}], + "obs.ndjson": [{"resourceType": "Observation"}], + "none.ndjson": [{"id": "no-resource-type"}], + "non-dict.ndjson": [5, 6], + "empty.ndjson": [], + ".ndjson": [{"id": "ignored"}], + "README.txt": [{"id": "ignored"}], + }, + ) + os.mkdir(f"{tmpdir}/nope") + os.mkdir(f"{tmpdir}/nope.ndjson") + + expected_types = { + "pat1": "Patient", + "pat2": "Patient", + "con": "Condition", + "obs": "Observation", + } + + # Multiple resources + with self.assertNoLogs(): + files = support.list_ndjson_in_dir(tmpdir, target_resources) + self.assertIsInstance(files, dict) + self.assertEqual(list(files.keys()), sorted(files.keys())) # verify it's sorted + + expected_files = { + f"{tmpdir}/{name}.ndjson": expected_types.get(name) + for name in sorted(expected_names) + } + self.assertEqual(expected_files, files) + + def test_list_handles_missing_folder(self): + with tempfile.TemporaryDirectory() as tmpdir: + with self.assertNoLogs(): + files = support.list_ndjson_in_dir(f"{tmpdir}/nope") + self.assertEqual({}, files) + + def test_list_decode_error(self): + with tempfile.TemporaryDirectory() as tmpdir: + with open(f"{tmpdir}/decode-error.ndjson", "w") as f: + f.write("hello") + with open(f"{tmpdir}/decode-success.ndjson", "w") as f: + f.write('{"resourceType": "Patient"}') + with self.assertLogs("cumulus_fhir_support", level="WARNING") as cm: + files = support.list_ndjson_in_dir(tmpdir) + self.assertEqual( + { + f"{tmpdir}/decode-success.ndjson": "Patient", + }, + files, + ) + self.assertEqual(1, len(cm.output)) + self.assertTrue( + cm.output[0].startswith("WARNING:cumulus_fhir_support.ndjson:Could not read from"), + cm.output[0], + ) + + # **************************** + # ** read_ndjson_from_dir() ** + # **************************** + + def test_read_dir_happy_path(self): + with tempfile.TemporaryDirectory() as tmpdir: + self.fill_dir( + tmpdir, + { + "pat.ndjson": [{"resourceType": "Patient", "id": "P1"}], + "con.ndjson": [ + {"resourceType": "Condition", "id": "C1"}, + {"resourceType": "Condition", "id": "C2"}, + ], + "obs.ndjson": [{"resourceType": "Observation", "id": "O1"}], + "empty.ndjson": [], + }, + ) + + with self.assertNoLogs(): + rows = support.read_ndjson_from_dir(tmpdir, ["Condition", "Patient"]) + self.assertIsInstance(rows, Generator) + self.assertEqual(["C1", "C2", "P1"], [x["id"] for x in rows]) + + # ******************* + # ** Miscellaneous ** + # ******************* + + def test_fsspec_support(self): + fake_files = ["folder/1.ndjson", "folder/nope"] + fake_folders = ["folder/dir.ndjson"] + all_fakes = fake_files + fake_folders + + def fake_ls(folder, detail): + self.assertEqual(folder, "folder") + self.assertFalse(detail) + return all_fakes + + def fake_open(filename, mode, encoding): + self.assertEqual(filename, "folder/1.ndjson") + self.assertEqual(mode, "r") + self.assertEqual(encoding, "utf8") + return io.StringIO( + '{"id": "P2", "resourceType": "Patient"}\n' + '{"id": "P1", "resourceType": "Patient"}\n' + ) + + mock_fs = mock.Mock() + mock_fs.exists = lambda x: x == "folder" or x in all_fakes + mock_fs.isfile = lambda x: x in fake_files + mock_fs.ls = fake_ls + mock_fs.open = fake_open + + with self.assertNoLogs(): + rows = support.read_ndjson_from_dir("folder", "Patient", fsspec_fs=mock_fs) + self.assertEqual(["P2", "P1"], [x["id"] for x in rows])