Skip to content

Commit

Permalink
Support 202409 (#100)
Browse files Browse the repository at this point in the history
Make client more robust when dealing with errors. #95

Co-authored-by: 20C <[email protected]>
  • Loading branch information
vegu and 20c-ed authored Oct 18, 2024
1 parent a3d61b3 commit 590404e
Show file tree
Hide file tree
Showing 9 changed files with 259 additions and 31 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@


## Unreleased
### Fixed
- handle api data errors more gracefully, allowing to skip broken objects and retry them later (#95)


## 2.2.0
Expand Down
4 changes: 3 additions & 1 deletion CHANGELOG.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
Unreleased:
added: []
fixed: []
fixed:
- handle api data errors more gracefully, allowing to skip broken objects and retry them
later (#95)
changed: []
deprecated: []
removed: []
Expand Down
44 changes: 33 additions & 11 deletions src/peeringdb/_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
"""

import logging
import sys
from datetime import datetime
from typing import List, Union

from peeringdb import get_backend
from peeringdb import config, get_backend
from peeringdb._sync import extract_relations, set_many_relations, set_single_relations
from peeringdb.fetch import Fetcher
from peeringdb.private import private_data_has_been_fetched
from peeringdb.util import group_fields
from peeringdb.util import group_fields, log_error


class Updater:
Expand All @@ -28,6 +29,7 @@ def __init__(self, fetcher: Fetcher):
self.resources = {}
self.backend = get_backend()
self.fetcher = fetcher
self.config = config.load_config()

def copy_object(self, new):
"""
Expand Down Expand Up @@ -160,14 +162,23 @@ def _handle_initial_sync(self, entries: list, res):
objs = []
retry = []
for row in entries:
obj, ret = self.create_obj(row, res)
if ret:
retry.append(row)
elif obj: # not retry and obj
try:
obj, _ = self.create_obj(row, res)
objs.append(obj)
for row in retry:
obj, _ = self.create_obj(row, res)
objs.append(obj)
except self.backend.object_missing_error(self.backend.get_concrete(res)):
try:
obj, _ = self.create_obj(row, res)
self.backend.save(obj)
except Exception as e:
self._log.info(
f"Error creating {res.tag} with id {row.get('id', 'Unknown')}: {e}"
)
log_error(self.config, res.tag, row.get("id", "Unknown"), str(e))
except Exception as e:
self._log.info(
f"Error updating {res.tag} with id {row.get('id', 'Unknown')}: {e}"
)
log_error(self.config, res.tag, row.get("id", "Unknown"), str(e))

self.backend.get_concrete(res).objects.bulk_create(objs)

Expand All @@ -186,8 +197,19 @@ def _handle_incremental_sync(self, entries: list, res):
obj, _ = self.create_obj(row, res)
self.copy_object(obj)
except self.backend.object_missing_error(self.backend.get_concrete(res)):
obj, _ = self.create_obj(row, res)
self.backend.save(obj)
try:
obj, _ = self.create_obj(row, res)
self.backend.save(obj)
except Exception as e:
self._log.info(
f"Error creating {res.tag} with id {row.get('id', 'Unknown')}: {e}"
)
log_error(self.config, res.tag, row.get("id", "Unknown"), str(e))
except Exception as e:
self._log.info(
f"Error updating {res.tag} with id {row.get('id', 'Unknown')}: {e}"
)
log_error(self.config, res.tag, row.get("id", "Unknown"), str(e))

def update_all(
self,
Expand Down
37 changes: 36 additions & 1 deletion src/peeringdb/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@
from peeringdb import resource, util
from peeringdb.client import Client
from peeringdb.output._dict import dump_python_dict
from peeringdb.util import load_failed_entries, save_failed_entries
from peeringdb.whois import WhoisFormat

_log = logging.getLogger(__name__)


def _handler(func):
"""Decorate a command handler"""
Expand Down Expand Up @@ -265,7 +268,39 @@ def handle(config, verbose, quiet, init, since, **kwargs):
print()
kwargs["fetch_private"] = False

client.updater.update_all(rs, since, fetch_private=kwargs["fetch_private"])
failed_entries = load_failed_entries(config)
if failed_entries:
_log.info("Retrying previously failed entries...")
Sync.retry_failed_entries(client, failed_entries)
try:
client.updater.update_all(rs, since, fetch_private=kwargs["fetch_private"])
except Exception as e:
_log.info(f"Error during sync : {e}")

def retry_failed_entries(client, failed_entries):
"""Retries entries that failed in previous sync runs.
Args:
client (peeringdb.Client): The PeeringDB client instance.
failed_entries (list): A list of dictionaries, where each dictionary
represents a failed entry and contains "resource_tag" and "pk".
"""
retried_entries = []
for entry in failed_entries:
resource_tag = entry["resource_tag"]
pk = entry["pk"]
try:
_log.info(f"Retrying {resource_tag}-{pk}...")
client.updater.update_one(resource.get_resource(resource_tag), pk)
retried_entries.append(entry)
_log.info(f"Successfully retried {resource_tag}-{pk}")
except Exception as e:
_log.info(f"Error retrying {resource_tag}-{pk}: {e}")

for entry in retried_entries:
failed_entries.remove(entry)

save_failed_entries(client.config, failed_entries)


class DropTables:
Expand Down
4 changes: 4 additions & 0 deletions src/peeringdb/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ class SyncSchema(_schema.Schema):
api_key = _schema.Str(
"api_key", blank=True, default=os.environ.get("PDB_SYNC_API_KEY", "")
)
failed_entries = _schema.Str(
"failed_entries",
default=os.environ.get("FAILED_ENTRIES_FILE", "failed_entries.json"),
)

class OrmSchema(_schema.Schema):
class OrmDbSchema(_schema.Schema):
Expand Down
4 changes: 2 additions & 2 deletions src/peeringdb/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def load(
and os.path.getmtime(cache_file) > (time.time() - 15 * 60)
):
self._log.info(f"[{resource}] Fetching from local cache")
self._log.debug(f"[{resource}] {cache_file}")
self._log.info(f"[{resource}] {cache_file}")
with open(cache_file) as f:
self.resources[resource] = json.load(f)["data"]
self.local_cache_used = True
Expand All @@ -125,7 +125,7 @@ def load(
elif not since and self.cache_url and not fetch_private:
cache_url = f"{self.cache_url}/{resource}-0.json"
self._log.info(f"[{resource}] Fetching from remote cache")
self._log.debug(f"[{resource}] {cache_url}")
self._log.info(f"[{resource}] {cache_url}")

resp = requests.get(cache_url, timeout=self.timeout)

Expand Down
85 changes: 69 additions & 16 deletions src/peeringdb/util.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,65 @@
import json
import logging
import os
import re

from django.core import serializers

from peeringdb import resource


def load_failed_entries(config):
"""
Load a list of failed entries from the failed entries file
Returns:
list: a list of failed entries
"""

FAILED_ENTRIES_FILE = config["sync"].get("failed_entries")

try:
with open(FAILED_ENTRIES_FILE) as f:
data = f.read()
if data:
return json.loads(data)
else:
return []
except FileNotFoundError:
return []


def save_failed_entries(config, entries):
"""
Save a list of failed entries to the failed entries file
Args:
entries (list): a list of failed entries
"""
FAILED_ENTRIES_FILE = config["sync"].get("failed_entries")

with open(FAILED_ENTRIES_FILE, "w") as f:
json.dump(entries, f, indent=4)


def log_error(config, resource_tag, pk, error_message):
"""
Log an error and save the failed entry to the failed entries file
Args:
resource_tag (str): the resource tag
pk (int): the primary key of the failed entry
error_message (str): the error message
"""
logging.error(f"Error syncing {resource_tag}-{pk}: {error_message}")
failed_entries = load_failed_entries(config)

new_entry = {"resource_tag": resource_tag, "pk": pk, "error": error_message}
if new_entry not in failed_entries:
failed_entries.append(new_entry)
save_failed_entries(config, failed_entries)


def split_ref(string):
"""splits a string into (tag, id)"""
re_tag = re.compile(r"^(?P<tag>[a-zA-Z]+)[\s-]*(?P<pk>\d+)$")
Expand All @@ -32,22 +85,6 @@ def pretty_speed(value):
return value


def prompt(msg, default=None):
"Prompt for input"
if default is not None:
msg = f"{msg} ({repr(default)})"
msg = f"{msg}: "
try:
s = input(msg)
except KeyboardInterrupt:
exit(1)
except EOFError:
s = ""
if not s:
s = default
return s


def group_fields(B, concrete):
"""Partition a concrete's fields into groups based on type"""
groups = ("scalars", "single_refs", "many_refs")
Expand Down Expand Up @@ -117,3 +154,19 @@ def get_log_level(level_str):
"NOTSET": logging.NOTSET,
}
return levels.get(level_str.strip().upper())


def prompt(msg, default=None):
"Prompt for input"
if default is not None:
msg = f"{msg} ({repr(default)})"
msg = f"{msg}: "
try:
s = input(msg)
except KeyboardInterrupt:
exit(1)
except EOFError:
s = ""
if not s:
s = default
return s
1 change: 1 addition & 0 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def test_schema_migration():
"timeout": 5,
"only": [],
"strip_tz": 1,
"failed_entries": "failed_entries.json",
},
"orm": {
"backend": "django_peeringdb",
Expand Down
Loading

0 comments on commit 590404e

Please sign in to comment.