Skip to content

Commit 9519ad2

Browse files
author
Bret Wortman
committed
Fixes to cleanup
1 parent cf60a93 commit 9519ad2

File tree

2 files changed

+237
-6
lines changed

2 files changed

+237
-6
lines changed

curator/actions/deepfreeze/cleanup.py

Lines changed: 200 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,18 @@ class Cleanup:
2626
2727
When objects are restored from Glacier, they're temporarily available in Standard tier
2828
for a specified duration. After that duration expires, they revert to Glacier storage.
29-
This action detects when thawed repositories have expired, unmounts them, and removes
30-
any indices that were only backed up to those repositories.
29+
This action:
30+
1. Detects thawed repositories that have passed their expires_at timestamp and marks them as expired
31+
2. Unmounts expired repositories and resets them to frozen state
32+
3. Deletes indices whose snapshots are only in expired repositories
33+
4. Cleans up old thaw requests based on status and retention settings
34+
5. Cleans up orphaned thawed ILM policies
3135
3236
:param client: A client connection object
3337
:type client: Elasticsearch
3438
3539
:methods:
36-
do_action: Perform the cleanup operation (unmount repos and delete indices).
40+
do_action: Perform the cleanup operation (detect expired repos, unmount, delete indices).
3741
do_dry_run: Perform a dry-run of the cleanup operation.
3842
do_singleton_action: Entry point for singleton CLI execution.
3943
"""
@@ -146,6 +150,133 @@ def _get_indices_to_delete(self, repos_to_cleanup: list) -> list[str]:
146150
self.loggit.info("Found %d indices to delete", len(indices_to_delete))
147151
return indices_to_delete
148152

153+
def _detect_and_mark_expired_repos(self) -> int:
154+
"""
155+
Detect repositories whose S3 restore has expired and mark them as expired.
156+
157+
Checks repositories in two ways:
158+
1. Thawed repos with expires_at timestamp that has passed
159+
2. Mounted repos (regardless of state) by checking S3 restore status directly
160+
161+
:return: Count of repositories marked as expired
162+
:rtype: int
163+
"""
164+
self.loggit.debug("Detecting expired repositories")
165+
166+
from curator.actions.deepfreeze.constants import THAW_STATE_THAWED
167+
all_repos = get_matching_repos(self.client, self.settings.repo_name_prefix)
168+
169+
# Get thawed repos for timestamp-based checking
170+
thawed_repos = [repo for repo in all_repos if repo.thaw_state == THAW_STATE_THAWED]
171+
172+
# Get mounted repos for S3-based checking (may overlap with thawed_repos)
173+
mounted_repos = [repo for repo in all_repos if repo.is_mounted]
174+
175+
self.loggit.debug(
176+
"Found %d thawed repositories and %d mounted repositories to check",
177+
len(thawed_repos),
178+
len(mounted_repos)
179+
)
180+
181+
now = datetime.now(timezone.utc)
182+
expired_count = 0
183+
checked_repos = set() # Track repos we've already processed
184+
185+
# METHOD 1: Check thawed repos with expires_at timestamp
186+
for repo in thawed_repos:
187+
if repo.name in checked_repos:
188+
continue
189+
190+
if repo.expires_at:
191+
expires_at = repo.expires_at
192+
if expires_at.tzinfo is None:
193+
expires_at = expires_at.replace(tzinfo=timezone.utc)
194+
195+
if now >= expires_at:
196+
self.loggit.info(
197+
"Repository %s has expired based on timestamp (expired at %s)",
198+
repo.name,
199+
expires_at.isoformat()
200+
)
201+
repo.mark_expired()
202+
try:
203+
repo.persist(self.client)
204+
self.loggit.info("Marked repository %s as expired", repo.name)
205+
expired_count += 1
206+
checked_repos.add(repo.name)
207+
except Exception as e:
208+
self.loggit.error(
209+
"Failed to mark repository %s as expired: %s",
210+
repo.name,
211+
e
212+
)
213+
else:
214+
checked_repos.add(repo.name)
215+
else:
216+
self.loggit.warning(
217+
"Repository %s is in thawed state but has no expires_at timestamp",
218+
repo.name
219+
)
220+
221+
# METHOD 2: Check mounted repos by querying S3 restore status
222+
self.loggit.debug("Checking S3 restore status for mounted repositories")
223+
for repo in mounted_repos:
224+
if repo.name in checked_repos:
225+
continue
226+
227+
try:
228+
# Check actual S3 restore status
229+
self.loggit.debug(
230+
"Checking S3 restore status for repository %s (bucket: %s, path: %s)",
231+
repo.name,
232+
repo.bucket,
233+
repo.base_path
234+
)
235+
236+
status = check_restore_status(self.s3, repo.bucket, repo.base_path)
237+
238+
# If all objects are back in Glacier (not restored), mark as expired
239+
if status["not_restored"] > 0 and status["restored"] == 0 and status["in_progress"] == 0:
240+
self.loggit.info(
241+
"Repository %s has expired based on S3 status: %d/%d objects not restored",
242+
repo.name,
243+
status["not_restored"],
244+
status["total"]
245+
)
246+
repo.mark_expired()
247+
try:
248+
repo.persist(self.client)
249+
self.loggit.info("Marked repository %s as expired", repo.name)
250+
expired_count += 1
251+
checked_repos.add(repo.name)
252+
except Exception as e:
253+
self.loggit.error(
254+
"Failed to mark repository %s as expired: %s",
255+
repo.name,
256+
e
257+
)
258+
elif status["restored"] > 0 or status["in_progress"] > 0:
259+
self.loggit.debug(
260+
"Repository %s still has restored objects: %d restored, %d in progress",
261+
repo.name,
262+
status["restored"],
263+
status["in_progress"]
264+
)
265+
checked_repos.add(repo.name)
266+
267+
except Exception as e:
268+
self.loggit.error(
269+
"Failed to check S3 restore status for repository %s: %s",
270+
repo.name,
271+
e
272+
)
273+
continue
274+
275+
if expired_count > 0:
276+
self.loggit.info("Marked %d repositories as expired", expired_count)
277+
278+
return expired_count
279+
149280
def _cleanup_old_thaw_requests(self) -> tuple[list[str], list[str]]:
150281
"""
151282
Clean up old thaw requests based on status and age.
@@ -276,6 +407,15 @@ def do_action(self) -> None:
276407
"""
277408
self.loggit.debug("Checking for expired thawed repositories")
278409

410+
# First, detect and mark any thawed repositories that have passed their expiration time
411+
self.loggit.info("Detecting expired thawed repositories based on expires_at timestamp")
412+
try:
413+
newly_expired = self._detect_and_mark_expired_repos()
414+
if newly_expired > 0:
415+
self.loggit.info("Detected and marked %d newly expired repositories", newly_expired)
416+
except Exception as e:
417+
self.loggit.error("Error detecting expired repositories: %s", e)
418+
279419
# Get all repositories and filter for expired ones
280420
from curator.actions.deepfreeze.constants import THAW_STATE_EXPIRED
281421
all_repos = get_matching_repos(self.client, self.settings.repo_name_prefix)
@@ -500,6 +640,55 @@ def do_dry_run(self) -> None:
500640
"""
501641
self.loggit.info("DRY-RUN MODE. No changes will be made.")
502642

643+
# First, show which thawed repositories would be detected as expired
644+
self.loggit.info("DRY-RUN: Checking for thawed repositories that have passed expiration time")
645+
from curator.actions.deepfreeze.constants import THAW_STATE_THAWED
646+
all_repos = get_matching_repos(self.client, self.settings.repo_name_prefix)
647+
thawed_repos = [repo for repo in all_repos if repo.thaw_state == THAW_STATE_THAWED]
648+
649+
if thawed_repos:
650+
now = datetime.now(timezone.utc)
651+
would_expire = []
652+
653+
for repo in thawed_repos:
654+
if repo.expires_at:
655+
expires_at = repo.expires_at
656+
if expires_at.tzinfo is None:
657+
expires_at = expires_at.replace(tzinfo=timezone.utc)
658+
659+
if now >= expires_at:
660+
time_expired = now - expires_at
661+
would_expire.append((repo.name, expires_at, time_expired))
662+
else:
663+
time_remaining = expires_at - now
664+
self.loggit.debug(
665+
"DRY-RUN: Repository %s not yet expired (expires in %s)",
666+
repo.name,
667+
time_remaining
668+
)
669+
else:
670+
self.loggit.warning(
671+
"DRY-RUN: Repository %s is thawed but has no expires_at timestamp",
672+
repo.name
673+
)
674+
675+
if would_expire:
676+
self.loggit.info(
677+
"DRY-RUN: Would mark %d repositories as expired:",
678+
len(would_expire)
679+
)
680+
for name, expired_at, time_ago in would_expire:
681+
self.loggit.info(
682+
"DRY-RUN: - %s (expired %s ago at %s)",
683+
name,
684+
time_ago,
685+
expired_at.isoformat()
686+
)
687+
else:
688+
self.loggit.info("DRY-RUN: No thawed repositories have passed expiration time")
689+
else:
690+
self.loggit.info("DRY-RUN: No thawed repositories found to check")
691+
503692
# Get all repositories and filter for expired ones
504693
from curator.actions.deepfreeze.constants import THAW_STATE_EXPIRED
505694
all_repos = get_matching_repos(self.client, self.settings.repo_name_prefix)
@@ -624,11 +813,17 @@ def do_dry_run(self) -> None:
624813
except Exception as e:
625814
self.loggit.error("DRY-RUN: Error checking thaw requests: %s", e)
626815

627-
def do_singleton_action(self) -> None:
816+
def do_singleton_action(self, dry_run: bool = False) -> None:
628817
"""
629818
Entry point for singleton CLI execution.
630819
820+
:param dry_run: If True, perform a dry-run without making changes
821+
:type dry_run: bool
822+
631823
:return: None
632824
:rtype: None
633825
"""
634-
self.do_action()
826+
if dry_run:
827+
self.do_dry_run()
828+
else:
829+
self.do_action()

curator/actions/deepfreeze/refreeze.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,9 @@ def _delete_thawed_ilm_policy(self, repo_name: str) -> bool:
197197
198198
Policy name format: {repo_name}-thawed (e.g., deepfreeze-000010-thawed)
199199
200+
Before deleting the policy, removes it from any indices still using it to avoid
201+
"policy in use" errors.
202+
200203
:param repo_name: The repository name
201204
:type repo_name: str
202205
@@ -209,7 +212,40 @@ def _delete_thawed_ilm_policy(self, repo_name: str) -> bool:
209212
# Check if policy exists first
210213
self.client.ilm.get_lifecycle(name=policy_name)
211214

212-
# Policy exists, delete it
215+
# Before deleting, remove the policy from any indices still using it
216+
self.loggit.debug("Checking for indices using ILM policy %s", policy_name)
217+
try:
218+
# Get all indices using this policy
219+
ilm_explain = self.client.ilm.explain_lifecycle(index="*")
220+
indices_using_policy = [
221+
idx for idx, info in ilm_explain.get("indices", {}).items()
222+
if info.get("policy") == policy_name
223+
]
224+
225+
if indices_using_policy:
226+
self.loggit.info(
227+
"Found %d indices still using policy %s, removing policy from them",
228+
len(indices_using_policy),
229+
policy_name
230+
)
231+
for idx in indices_using_policy:
232+
try:
233+
self.loggit.debug("Removing ILM policy from index %s", idx)
234+
self.client.ilm.remove_policy(index=idx)
235+
except Exception as idx_err:
236+
self.loggit.warning(
237+
"Failed to remove ILM policy from index %s: %s",
238+
idx,
239+
idx_err
240+
)
241+
except Exception as check_err:
242+
self.loggit.warning(
243+
"Failed to check for indices using policy %s: %s",
244+
policy_name,
245+
check_err
246+
)
247+
248+
# Policy exists and indices have been cleaned up, delete it
213249
self.loggit.info("Deleting thawed ILM policy %s", policy_name)
214250
self.client.ilm.delete_lifecycle(name=policy_name)
215251
self.loggit.debug("Successfully deleted ILM policy %s", policy_name)

0 commit comments

Comments
 (0)