Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions genesis_devtools/backup/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import typing as tp
import shutil
import multiprocessing as mp
import subprocess

from genesis_devtools.backup import base
from genesis_devtools.backup import qcow
from genesis_devtools.backup import zfs as zfs_backup
from genesis_devtools import logger as logger_base
from genesis_devtools.infra.libvirt import libvirt
from genesis_devtools import utils
Expand Down Expand Up @@ -282,3 +284,128 @@ def cleanup(self) -> None:

def restore(self, backup_path: str) -> None:
raise NotImplementedError()


class LocalZFSBackuper(zfs_backup.AbstractZfsBackuper):

def __init__(
self,
backup_dir: str,
snapshot_name: str = "backup",
logger: logger_base.AbstractLogger | None = None,
) -> None:
super().__init__(snapshot_name=snapshot_name, logger=logger)
self._backup_dir = backup_dir

def _save_volume_to_backup(
self,
volume: str,
backup_path: str,
encryption: base.EncryptionCreds | None = None,
) -> None:
disk_name = os.path.basename(volume) + ".raw"
target_path = os.path.join(backup_path, disk_name)

with open(target_path, "wb") as f:
subprocess.run(
[
"sudo",
"zfs",
"send",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe for future:

zfs send by default generates raw data with maximum compatibility with pretty old ZFS versions, i.e. if compression was on - it will decompress data. And vice versa - on zfs recv it will compress data again.

Compression is pretty interesting case, because it's easy to write really compressible data, which on backup will be inflated here by multiple size! (I saw example with x1600! so, 2GB of compressed junk of vim tmp file gave 2TB of inflight data). We should think about it too.

Good flags to use: -Lec:

  • -L - use "large" zfs blocks (i.e. support latest ZFS feature)
  • -e use embedded blocks (<~100bytes files may be written in block pointer itself)
  • -c don't decompress, send as-is

Last useful flag - -w/--raw - if native ZFS encryption used - it will send encrypted data as is, so keys are not needed at all.

I think we don't need to change anything now, but at least see this comment for future reference @akremenetsky

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, and additional note - in future we may support incremental backup too, you can set -I pool/dataset@parent_snap_name for that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, to be safe from decompress inflation - we may check zfs zvols' compressratio. If it's > x100 - something's pretty nasty.

f"{volume}@{self._snapshot_name}",
],
check=True,
stdout=f,
)

if encryption:
utils.encrypt_file(target_path, encryption.key, encryption.iv)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again - for future:

I propose to think about make a chain of different steps, maybe via pipes as a start. So we won't need much temporary space and time.

os.remove(target_path)
self._logger.info(f"Encryption of {target_path} done")

def backup_domain_spec(
self,
domain_spec: str,
domain_backup_path: str,
domain_filename: str = "domain.xml",
encryption: base.EncryptionCreds | None = None,
) -> None:
os.makedirs(domain_backup_path, exist_ok=True)

domain_path = os.path.join(domain_backup_path, domain_filename)
with open(domain_path, "w") as f:
f.write(domain_spec)

if encryption:
utils.encrypt_file(domain_path, encryption.key, encryption.iv)
os.remove(domain_path)
self._logger.info(f"Encryption of {domain_backup_path} done")

def backup_domain_disks(
self,
volumes: tp.Collection[str],
domain_backup_path: str,
encryption: base.EncryptionCreds | None = None,
) -> None:
os.makedirs(domain_backup_path, exist_ok=True)
for volume in volumes:
self._save_volume_to_backup(volume, domain_backup_path, encryption)

def backup(
self,
domains: tp.Collection[str],
compress: bool = False,
encryption: base.EncryptionCreds | None = None,
**kwargs: tp.Any,
) -> None:
backup_path = utils.backup_path(self._backup_dir)
os.makedirs(backup_path, exist_ok=True)

self.backup_domains(backup_path, list(domains), encryption)

if not compress:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compress is effective only before encryption, encrypted data is nearly incompressible. Maybe it's a little bit offtopic for this PR, but FYI.

Some security guys say that you should not even compress data before encryption at all, but it's not a practical way.

I see best case as - compress beforce, encrypt at the end.

return

self._logger.info(f"Compressing {backup_path}")
compressed_backup_path = (
f"{backup_path}{qcow.AbstractQcowBackuper.COMPRESS_SUFFIX}"
)
compress_directory = os.path.dirname(backup_path)
try:
utils.compress_dir(backup_path, compress_directory)
except Exception:
self._logger.error(f"Compression of {backup_path} failed")
if os.path.exists(compressed_backup_path):
os.remove(compressed_backup_path)
return

self._logger.info(f"Compression of {backup_path} done")
shutil.rmtree(backup_path)

def rotate(self, limit: int = 5) -> None:
if limit == 0:
return

all_backups = [
os.path.join(self._backup_dir, f)
for f in os.listdir(self._backup_dir)
if qcow.AbstractQcowBackuper._backup_dir_pattern.match(f)
]

all_backups.sort(key=lambda x: os.path.getctime(x))

if len(all_backups) > limit:
backups_to_remove = all_backups[:-limit]
for backup in backups_to_remove:
if os.path.isdir(backup):
shutil.rmtree(backup)
elif os.path.isfile(backup):
os.remove(backup)

self._logger.info(f"The backup {backup} was rotated")

def cleanup(self) -> None:
raise NotImplementedError()

def restore(self, **kwargs: tp.Any) -> None:
raise NotImplementedError()
107 changes: 107 additions & 0 deletions genesis_devtools/backup/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

import io
import os
import subprocess
import typing as tp

import boto3

from genesis_devtools.backup import base
from genesis_devtools.backup import qcow
from genesis_devtools.backup import zfs as zfs_backup
from genesis_devtools import logger as logger_base
from genesis_devtools import utils

Expand Down Expand Up @@ -137,3 +139,108 @@ def cleanup(self) -> None:

def restore(self, backup_path: str) -> None:
raise NotImplementedError()


class S3ZFSBackuper(zfs_backup.AbstractZfsBackuper):

def __init__(
self,
endpoint_url: str,
access_key: str,
secret_key: str,
host: str,
bucket_name: str,
snapshot_name: str = "backup",
logger: logger_base.AbstractLogger | None = None,
) -> None:
super().__init__(snapshot_name=snapshot_name, logger=logger)
self._access_key = access_key
self._secret_key = secret_key
self._host = host
self._bucket_name = bucket_name
self._endpoint_url = endpoint_url

def _upload_stream(
self,
stream: tp.IO,
s3_path: str,
encryption: base.EncryptionCreds | None = None,
) -> None:
s3_client = boto3.client(
"s3",
endpoint_url=self._endpoint_url,
aws_access_key_id=self._access_key,
aws_secret_access_key=self._secret_key,
)

if encryption:
stream = utils.ReaderEncryptorIO(
stream, encryption.key, encryption.iv
)
s3_path += self.ENCRYPTED_SUFFIX
s3_client.upload_fileobj(stream, self._bucket_name, s3_path)

def backup_domain_spec(
self,
domain_spec: str,
domain_backup_path: str,
domain_filename: str = "domain.xml",
encryption: base.EncryptionCreds | None = None,
) -> None:
self._upload_stream(
io.BytesIO(domain_spec.encode("utf-8")),
os.path.join(domain_backup_path, domain_filename),
encryption,
)

def backup_domain_disks(
self,
volumes: tp.Collection[str],
domain_backup_path: str,
encryption: base.EncryptionCreds | None = None,
) -> None:
for volume in volumes:
disk_name = os.path.basename(volume) + ".raw"
s3_path = os.path.join(domain_backup_path, disk_name)

proc = subprocess.Popen(
[
"sudo",
"zfs",
"send",
f"{volume}@{self._snapshot_name}",
],
stdout=subprocess.PIPE,
)

assert proc.stdout is not None
try:
self._upload_stream(proc.stdout, s3_path, encryption)
finally:
proc.stdout.close()
proc.wait()
if proc.returncode != 0:
raise subprocess.CalledProcessError(
proc.returncode, proc.args
)

def backup(
self,
domains: tp.Collection[str],
compress: bool = False,
encryption: base.EncryptionCreds | None = None,
**kwargs: tp.Any,
) -> None:
backup_path = self._host + "/" + utils.backup_path("")
self.backup_domains(backup_path, list(domains), encryption)

def rotate(self, limit: int = 5) -> None:
# Nothing to do for rotation right now.
# It will be implemented later.
pass

def cleanup(self) -> None:
raise NotImplementedError()

def restore(self, **kwargs: tp.Any) -> None:
raise NotImplementedError()
Loading