Skip to content

Commit

Permalink
Attach obfuscated backtraces to public logs in case of unexpected error
Browse files Browse the repository at this point in the history
  • Loading branch information
ximion committed Nov 2, 2024
1 parent 58bf956 commit f438f4f
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 3 deletions.
14 changes: 11 additions & 3 deletions src/laniakea/archive/pkgimport.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@
ArchiveVersionMemory,
ArchiveRepoSuiteSettings,
)
from laniakea.utils import safe_strip, safe_rename, split_strip, hardlink_or_copy
from laniakea.utils import (
safe_strip,
safe_rename,
split_strip,
hardlink_or_copy,
format_encrypted_traceback,
)
from laniakea.logging import log, archive_log
from laniakea.msgstream import EventEmitter
from laniakea.localconfig import LocalConfig, LintianConfig
Expand Down Expand Up @@ -1331,7 +1337,8 @@ def _import_trusted_changes(
os.path.join(spkg_queue_dir, '{}_{}.changes'.format(spkg.name, split_epoch(spkg.version)[1])),
)
except Exception as e:
raise UploadError('Failed to import source package: {}'.format(str(e)))
tb_s = format_encrypted_traceback(e)
raise UploadError('Failed to import source package: {}\nDiagnostic Code: {}'.format(str(e), tb_s))

# import binary packages
for file in files.values():
Expand All @@ -1341,7 +1348,8 @@ def _import_trusted_changes(
try:
pi.import_binary(os.path.join(changes.directory, file.fname), file.component)
except Exception as e:
raise UploadError('Failed to import binary package: {}'.format(str(e)))
tb_s = format_encrypted_traceback(e)
raise UploadError('Failed to import binary package: {}\nDiagnostic Code: {}'.format(str(e), tb_s))

# looks like the package was accepted - spread the news!
ev_data = build_event_data_for_accepted_upload(rss, spkg, changes, is_new, uploader)
Expand Down
6 changes: 6 additions & 0 deletions src/laniakea/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
hardlink_or_copy,
check_filename_safe,
)
from laniakea.utils.traceback import (
decrypt_traceback_string,
format_encrypted_traceback,
)

__all__ = [
'arch_matches',
Expand Down Expand Up @@ -64,4 +68,6 @@
'safe_rename',
'hardlink_or_copy',
'set_process_title',
'format_encrypted_traceback',
'decrypt_traceback_string',
]
91 changes: 91 additions & 0 deletions src/laniakea/utils/traceback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2023-2024 Matthias Klumpp <[email protected]>
#
# SPDX-License-Identifier: LGPL-3.0+


import os
import base64
import hashlib
import platform
import traceback

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import Cipher, modes, algorithms

from laniakea.logging import log


def create_obfuscation_key():
"""Generate a 256-bit AES key from the machine ID."""

try:
with open('/etc/machine-id', 'r') as f:
machine_id = f.read().strip()
except FileNotFoundError:
log.warning('Machine ID not found, using a fallback value.')
machine_id = platform.node()

key_seed = 'laniakea-' + machine_id
return hashlib.sha256(key_seed.encode()).digest()


def compact_traceback(exc):
"""Generate a more compacted traceback string."""

tb_list = traceback.format_exception(type(exc), exc, exc.__traceback__)
compacted_tb = ''.join(line for line in tb_list if not line.startswith("Traceback (most recent call last):"))
return compacted_tb.strip()


def format_encrypted_traceback(exc, *, key: bytes | None = None):
"""Format a traceback string and lightly encrypt it for public logging.
:param exc: The exception to format
"""

# generate a compacted traceback string
tb_str = compact_traceback(exc)

if not key:
key = create_obfuscation_key()

# pad the data to align with AES block size
padder = padding.PKCS7(128).padder()
padded_data = padder.update(tb_str.encode('utf-8')) + padder.finalize()

# encrypt with AES-CBC
iv = os.urandom(16)
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
encrypted_tb = cipher.encryptor().update(padded_data) + cipher.encryptor().finalize()

# concatenate IV and encrypted data for encoding
encrypted_payload = iv + encrypted_tb

# encode as base85
return base64.b85encode(encrypted_payload).decode('utf-8')


def decrypt_traceback_string(encoded_traceback: str, *, key: bytes | None = None):
"""Decrypt an encrypted traceback string with the encryption key of the current machine."""

if not key:
key = create_obfuscation_key()

# decode from base85
encrypted_payload = base64.b85decode(encoded_traceback)

# extract the IV and encrypted data
iv = encrypted_payload[:16]
encrypted_tb = encrypted_payload[16:]

# decrypt using AES-CBC
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
padded_data = cipher.decryptor().update(encrypted_tb) + cipher.decryptor().finalize()

# remove padding and return the original traceback
unpadder = padding.PKCS7(128).unpadder()
tb_str = unpadder.update(padded_data) + unpadder.finalize()

return tb_str.decode('utf-8')
12 changes: 12 additions & 0 deletions src/lkadmin/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from laniakea import LkModule
from laniakea.db import session_scope, session_factory
from laniakea.utils import decrypt_traceback_string

from .utils import input_str, print_header, print_error_exit

Expand Down Expand Up @@ -94,3 +95,14 @@ def shell():
# pylint: disable=possibly-unused-variable
with session_scope() as session:
bpython.embed(locals_=locals())


@core.command()
@click.argument('tb_string', nargs=1)
def decode_traceback(tb_string):
"""Decode a traceback string that was generated on this machine."""

try:
print(decrypt_traceback_string(tb_string))
except Exception as exc:
print_error_exit(f'Error decoding traceback: {exc}')
21 changes: 21 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,24 @@ def test_renameat2():

assert apple_path.exists()
assert orange_path.exists()


def test_traceback_decrypt():
from laniakea.utils import decrypt_traceback_string, format_encrypted_traceback
from laniakea.utils.traceback import compact_traceback

try:
# Code that raises an exception
1 / 0
except Exception as e:
orig_tb = compact_traceback(e)
encrypted = format_encrypted_traceback(e)
decrypted = decrypt_traceback_string(encrypted)
assert "ZeroDivisionError" in decrypted
assert orig_tb == decrypted
assert encrypted != decrypted

# test decryption with wrong key
key = b"wrongkey"
with pytest.raises(Exception):
decrypt_traceback_string(encrypted, key=key)

0 comments on commit f438f4f

Please sign in to comment.