From b95dcc695ba1ba4eb94c55bd101bc702c7eb9ab3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20W=C3=B6rpel?= Date: Fri, 3 Mar 2023 01:04:35 +0100 Subject: [PATCH 1/5] Add load-catalog cli command --- alephclient/cli.py | 59 ++++++++++++++++++- alephclient/load_catalog.py | 81 ++++++++++++++++++++++++++ alephclient/tests/test_load_catalog.py | 21 +++++++ 3 files changed, 158 insertions(+), 3 deletions(-) create mode 100644 alephclient/load_catalog.py create mode 100644 alephclient/tests/test_load_catalog.py diff --git a/alephclient/cli.py b/alephclient/cli.py index 1d373c8..4aa5ed3 100644 --- a/alephclient/cli.py +++ b/alephclient/cli.py @@ -1,13 +1,14 @@ import json -import click import logging -from pathlib import Path + +import click from alephclient import settings from alephclient.api import AlephAPI -from alephclient.errors import AlephException from alephclient.crawldir import crawl_dir +from alephclient.errors import AlephException from alephclient.fetchdir import fetch_collection, fetch_entity +from alephclient.load_catalog import load_catalog log = logging.getLogger(__name__) @@ -296,6 +297,58 @@ def read_json_stream(stream): raise click.Abort() +@cli.command("load-catalog") +@click.argument("url") +@click.option( + "-c", + "--chunksize", + default=1000, + type=click.INT, + help="chunk size when sending to API", +) +@click.option( + "--force", is_flag=True, default=False, help="continue after server errors" +) +@click.option( + "--unsafe", is_flag=True, default=False, help="disable server-side validation" +) +@click.option("--frequency", help="Add frequency label to collections") +@click.option("--exclude", help="Exclude dataset(s)", multiple=True) +@click.option("--include", help="Include dataset(s)", multiple=True) +@click.pass_context +def _load_catalog( + ctx, + url, + chunksize=1000, + force=False, + unsafe=False, + frequency=None, + exclude=[], + include=[], +): + """Import a catalog from a given url""" + api = ctx.obj["api"] + try: + for collection_id, loader in load_catalog( + api, + url, + exclude_datasets=exclude, + include_datasets=include, + frequency=frequency, + ): + api.write_entities( + collection_id, + loader, + chunk_size=chunksize, + unsafe=unsafe, + force=force, + ) + except AlephException as exc: + raise click.ClickException(exc.message) + except BrokenPipeError: + raise click.Abort() + + @cli.command("stream-entities") @click.option("-o", "--outfile", type=click.File("w"), default="-") # noqa @click.option("-s", "--schema", multiple=True, default=[]) # noqa diff --git a/alephclient/load_catalog.py b/alephclient/load_catalog.py new file mode 100644 index 0000000..f4c3b21 --- /dev/null +++ b/alephclient/load_catalog.py @@ -0,0 +1,81 @@ +import json +import logging +from typing import Any, Dict, Generator, List, Optional, Tuple + +import requests +from banal import ensure_list + +from alephclient.api import AlephAPI + +log = logging.getLogger(__name__) + +EntityData = Dict[str, Any] +Loader = Tuple[str, Generator[EntityData, None, None]] + +MIME_TYPE = "application/json+ftm" + + +def stream_resource(url: str, foreign_id: str) -> Generator[EntityData, None, None]: + res = requests.get(url, stream=True) + if not res.ok: + raise requests.HTTPError(res.status_code) + + for ix, data in enumerate(res.iter_lines()): + if ix and ix % 1000 == 0: + log.info("[%s] Bulk load entities: %s...", foreign_id, ix) + yield json.loads(data) + + +def load_catalog( + api: AlephAPI, + url: str, + exclude_datasets: Optional[List[str]] = [], + include_datasets: Optional[List[str]] = [], + frequency: Optional[str] = None, +) -> Generator[Loader, None, None]: + res = requests.get(url) + if not res.ok: + raise requests.HTTPError(res.status_code) + + catalog = res.json() + for dataset in catalog["datasets"]: + foreign_id = dataset["name"] + + if "type" in dataset and dataset["type"] != "collection": + continue + if exclude_datasets and foreign_id in exclude_datasets: + continue + if include_datasets and foreign_id not in include_datasets: + continue + + aleph_collection = api.get_collection_by_foreign_id(foreign_id) + data = { + "label": dataset["title"], + "summary": ( + dataset.get("description", "") + "\n\n" + dataset.get("summary", "") + ).strip(), + "publisher": dataset.get("publisher", {}).get("name"), + "publisher_url": dataset.get("publisher", {}).get("url"), + "countries": ensure_list(dataset.get("publisher", {}).get("country")), + "data_url": dataset.get("data", {}).get("url"), + "category": dataset.get("category", "other"), + } + if "frequency" in dataset or frequency is not None: + data["frequency"] = dataset.get("frequency", frequency) + + if aleph_collection is not None: + log.info("[%s] Creating collection ..." % foreign_id) + aleph_collection = api.update_collection( + aleph_collection["collection_id"], data + ) + else: + log.info("[%s] Updating collection metadata ..." % foreign_id) + aleph_collection = api.create_collection( + {**data, **{"foreign_id": dataset["name"]}} + ) + + for resource in ensure_list(dataset.get("resources")): + if resource["mime_type"] == MIME_TYPE: + yield aleph_collection["collection_id"], stream_resource( + resource["url"], foreign_id + ) diff --git a/alephclient/tests/test_load_catalog.py b/alephclient/tests/test_load_catalog.py new file mode 100644 index 0000000..bfbe47f --- /dev/null +++ b/alephclient/tests/test_load_catalog.py @@ -0,0 +1,21 @@ +from alephclient.api import AlephAPI +from alephclient.load_catalog import load_catalog + + +class TestLoadCatalog: + fake_url = "http://aleph.test/api/2/" + catalog_url = "https://data.opensanctions.org/datasets/latest/index.json" + + def setup_method(self, mocker): + self.api = AlephAPI(host=self.fake_url, api_key="fake_key") + + def test_load_catalog(self, mocker): + mocker.patch.object(self.api, "_request") + for collection_id, loader in load_catalog(self.api, self.catalog_url): + self.api._request.assert_called_with( + "GET", "{}collections/{}".format(self.fake_url, collection_id) + ) + for data in loader: + assert isinstance(data, dict) + break + break From 3b5d3d63ed98dca14c25444b70b5a6379b1723e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20W=C3=B6rpel?= Date: Fri, 3 Mar 2023 02:03:25 +0100 Subject: [PATCH 2/5] erf --- alephclient/load_catalog.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alephclient/load_catalog.py b/alephclient/load_catalog.py index f4c3b21..7f9e5d5 100644 --- a/alephclient/load_catalog.py +++ b/alephclient/load_catalog.py @@ -41,7 +41,7 @@ def load_catalog( for dataset in catalog["datasets"]: foreign_id = dataset["name"] - if "type" in dataset and dataset["type"] != "collection": + if "type" in dataset and dataset["type"] == "collection": continue if exclude_datasets and foreign_id in exclude_datasets: continue From 03c4d54371b6ecb36c43a6f27d59b58405e43ca5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20W=C3=B6rpel?= Date: Fri, 3 Mar 2023 08:34:43 +0100 Subject: [PATCH 3/5] Fail better --- alephclient/cli.py | 3 +++ alephclient/load_catalog.py | 15 ++++++++------- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/alephclient/cli.py b/alephclient/cli.py index 4aa5ed3..1204c50 100644 --- a/alephclient/cli.py +++ b/alephclient/cli.py @@ -2,6 +2,7 @@ import logging import click +from requests import HTTPError from alephclient import settings from alephclient.api import AlephAPI @@ -345,6 +346,8 @@ def _load_catalog( ) except AlephException as exc: raise click.ClickException(exc.message) + except HTTPError as exc: + raise click.ClickException(str(exc)) except BrokenPipeError: raise click.Abort() diff --git a/alephclient/load_catalog.py b/alephclient/load_catalog.py index 7f9e5d5..c4e0f1e 100644 --- a/alephclient/load_catalog.py +++ b/alephclient/load_catalog.py @@ -18,7 +18,8 @@ def stream_resource(url: str, foreign_id: str) -> Generator[EntityData, None, None]: res = requests.get(url, stream=True) if not res.ok: - raise requests.HTTPError(res.status_code) + log.error(f"[{foreign_id}] {res.status_code}") + return for ix, data in enumerate(res.iter_lines()): if ix and ix % 1000 == 0: @@ -35,7 +36,7 @@ def load_catalog( ) -> Generator[Loader, None, None]: res = requests.get(url) if not res.ok: - raise requests.HTTPError(res.status_code) + raise requests.HTTPError(f"Fetch catalog failed: {res.status_code}") catalog = res.json() for dataset in catalog["datasets"]: @@ -64,18 +65,18 @@ def load_catalog( data["frequency"] = dataset.get("frequency", frequency) if aleph_collection is not None: - log.info("[%s] Creating collection ..." % foreign_id) + log.info("[%s] Updating collection metadata ..." % foreign_id) aleph_collection = api.update_collection( aleph_collection["collection_id"], data ) else: - log.info("[%s] Updating collection metadata ..." % foreign_id) + log.info("[%s] Creating collection ..." % foreign_id) aleph_collection = api.create_collection( {**data, **{"foreign_id": dataset["name"]}} ) for resource in ensure_list(dataset.get("resources")): if resource["mime_type"] == MIME_TYPE: - yield aleph_collection["collection_id"], stream_resource( - resource["url"], foreign_id - ) + loader = stream_resource(resource["url"], foreign_id) + if loader is not None: + yield aleph_collection["collection_id"], loader From da01db11a811d19c4569610b55f051ac7f70f65e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20W=C3=B6rpel?= Date: Thu, 23 Mar 2023 17:19:27 +0100 Subject: [PATCH 4/5] Skip category for existing aleph_collections --- alephclient/load_catalog.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/alephclient/load_catalog.py b/alephclient/load_catalog.py index c4e0f1e..39fa4a0 100644 --- a/alephclient/load_catalog.py +++ b/alephclient/load_catalog.py @@ -66,6 +66,9 @@ def load_catalog( if aleph_collection is not None: log.info("[%s] Updating collection metadata ..." % foreign_id) + data.pop( + "category", None + ) # don't overwrite existing (probably user changed) category aleph_collection = api.update_collection( aleph_collection["collection_id"], data ) From cea93401d51fb03e4f1ae1fac795c214cf9f99fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20W=C3=B6rpel?= Date: Tue, 2 May 2023 05:37:28 +0200 Subject: [PATCH 5/5] Ensure str in description/summary, exclude collections --- alephclient/load_catalog.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/alephclient/load_catalog.py b/alephclient/load_catalog.py index 39fa4a0..5047f56 100644 --- a/alephclient/load_catalog.py +++ b/alephclient/load_catalog.py @@ -44,6 +44,8 @@ def load_catalog( if "type" in dataset and dataset["type"] == "collection": continue + if dataset.get("children") or dataset.get("datasets"): + continue if exclude_datasets and foreign_id in exclude_datasets: continue if include_datasets and foreign_id not in include_datasets: @@ -53,7 +55,9 @@ def load_catalog( data = { "label": dataset["title"], "summary": ( - dataset.get("description", "") + "\n\n" + dataset.get("summary", "") + dataset.get("description", "") + or "" + "\n\n" + dataset.get("summary", "") # noqa + or "" # noqa ).strip(), "publisher": dataset.get("publisher", {}).get("name"), "publisher_url": dataset.get("publisher", {}).get("url"),