-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrecovery.py
87 lines (72 loc) · 3.49 KB
/
recovery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import os
import shutil
from google.cloud import storage
from google.oauth2 import service_account
import docker
from settings import Settings, parse_arguments
from storage import download_from_bucket, extract_tar
from concurrent.futures import ThreadPoolExecutor
import storage
def verify_or_create_directory(recovery_path: str) -> bool:
"""Verify if the recovery directory exists, create it if not, or ask for confirmation to delete contents."""
if not os.path.exists(recovery_path):
os.makedirs(recovery_path)
print(f"Created directory: {recovery_path}")
else:
if os.listdir(recovery_path):
# If the directory is not empty, ask for confirmation to delete its contents
confirm = input(f"The directory {recovery_path} is not empty. Do you want to delete its contents and recover from the snapshot? (y/n): ")
if confirm.lower() == 'y':
for filename in os.listdir(recovery_path):
file_path = os.path.join(recovery_path, filename)
try:
if os.path.isdir(file_path):
shutil.rmtree(file_path)
else:
os.remove(file_path)
except Exception as e:
print(f"Error removing {file_path}: {e}")
print(f"Deleted all contents of {recovery_path}.")
else:
print(f"Skipping recovery for {recovery_path}.")
return False
else:
print(f"The directory {recovery_path} is empty. Proceeding with recovery.")
return True
def recover_snapshot(storage_client: storage.Client, settings: Settings, container_config: dict[str, str]) -> None:
"""Download the snapshot tar file and extract it to the recovery path."""
tar_name = container_config['tar_name']
bucket_path = container_config['bucket_path']
recovery_path = container_config['recovery_path']
# Step 1: Verify or create the recovery directory and delete contents if necessary
if not verify_or_create_directory(recovery_path):
return # Skip this container's recovery if directory isn't ready
# Step 2: Download the snapshot tar file from Google Cloud Storage
tar_path = f"./{tar_name}"
download_from_bucket(storage_client, settings.bucket_name, bucket_path, tar_path)
# Step 3: Extract the tarball to the recovery path
extract_tar(tar_path, recovery_path)
# Step 4: Clean up the tar file after recovery
os.remove(tar_path)
print(f"Cleaned up tar file: {tar_path}")
def main() -> None:
"""Main function to parse arguments, initialize clients, and recover snapshots in parallel."""
args = parse_arguments()
settings = Settings(config_path=args.config)
credentials = service_account.Credentials.from_service_account_file(
settings.google_credentials_json
)
print(settings)
storage_client = storage.Client(credentials=credentials)
docker_client = docker.DockerClient(base_url=settings.docker_host)
containers_config = settings.docker_containers
# Recover snapshot for each configured container
with ThreadPoolExecutor() as executor:
futures = []
for container_config in containers_config:
futures.append(executor.submit(recover_snapshot, storage_client, settings, container_config))
for future in futures:
future.result()
print("All snapshots recovered.")
if __name__ == "__main__":
main()