Skip to content

Commit 47a4142

Browse files
Transform KubernetesComputeResourcesPatch "Push" statuses into "Pull" statuses (#107)
* update k8s compute patch major lib * add more exception types * refactor * correct version * remove debug log * refactor * lint * fix UT * fixes * change return type for get_status * lint * bymp lightkube * static checks * converge dry_run * lint
1 parent 3c757ea commit 47a4142

File tree

3 files changed

+240
-25
lines changed

3 files changed

+240
-25
lines changed

lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py

Lines changed: 192 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"""# KubernetesComputeResourcesPatch Library.
55
66
This library is designed to enable developers to more simply patch the Kubernetes compute resource
7-
limits and requests created by Juju during the deployment of a sidecar charm.
7+
limits and requests created by Juju during the deployment of a charm.
88
99
When initialised, this library binds a handler to the parent charm's `config-changed` event.
1010
The config-changed event is used because it is guaranteed to fire on startup, on upgrade and on
@@ -76,19 +76,32 @@ def _resource_spec_from_config(self) -> ResourceRequirements:
7676
return ResourceRequirements(limits=spec, requests=spec)
7777
```
7878
79+
If you wish to pull the state of the resources patch operation and set the charm unit status based on that patch result,
80+
you can achieve that using `get_status()` function.
81+
```python
82+
class SomeCharm(CharmBase):
83+
def __init__(self, *args):
84+
#...
85+
self.framework.observe(self.on.collect_unit_status, self._on_collect_unit_status)
86+
#...
87+
def _on_collect_unit_status(self, event: CollectStatusEvent):
88+
event.add_status(self.resources_patch.get_status())
89+
```
7990
8091
Additionally, you may wish to use mocks in your charm's unit testing to ensure that the library
8192
does not try to make any API calls, or open any files during testing that are unlikely to be
8293
present, and could break your tests. The easiest way to do this is during your test `setUp`:
8394
8495
```python
8596
# ...
97+
from ops import ActiveStatus
8698
8799
@patch.multiple(
88100
"charm.KubernetesComputeResourcesPatch",
89101
_namespace="test-namespace",
90102
_is_patched=lambda *a, **kw: True,
91103
is_ready=lambda *a, **kw: True,
104+
get_status=lambda _: ActiveStatus(),
92105
)
93106
@patch("lightkube.core.client.GenericSyncClient")
94107
def setUp(self, *unused):
@@ -105,8 +118,9 @@ def setUp(self, *unused):
105118
import logging
106119
from decimal import Decimal
107120
from math import ceil, floor
108-
from typing import Callable, Dict, List, Optional, Union
121+
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
109122

123+
import tenacity
110124
from lightkube import ApiError, Client # pyright: ignore
111125
from lightkube.core import exceptions
112126
from lightkube.models.apps_v1 import StatefulSetSpec
@@ -120,8 +134,10 @@ def setUp(self, *unused):
120134
from lightkube.resources.core_v1 import Pod
121135
from lightkube.types import PatchType
122136
from lightkube.utils.quantity import equals_canonically, parse_quantity
137+
from ops import ActiveStatus, BlockedStatus, WaitingStatus
123138
from ops.charm import CharmBase
124139
from ops.framework import BoundEvent, EventBase, EventSource, Object, ObjectEvents
140+
from ops.model import StatusBase
125141

126142
logger = logging.getLogger(__name__)
127143

@@ -133,14 +149,16 @@ def setUp(self, *unused):
133149

134150
# Increment this PATCH version before using `charmcraft publish-lib` or reset
135151
# to 0 if you are raising the major API version
136-
LIBPATCH = 7
152+
LIBPATCH = 8
137153

138154

139155
_Decimal = Union[Decimal, float, str, int] # types that are potentially convertible to Decimal
140156

141157

142158
def adjust_resource_requirements(
143-
limits: Optional[dict], requests: Optional[dict], adhere_to_requests: bool = True
159+
limits: Optional[Dict[Any, Any]],
160+
requests: Optional[Dict[Any, Any]],
161+
adhere_to_requests: bool = True,
144162
) -> ResourceRequirements:
145163
"""Adjust resource limits so that `limits` and `requests` are consistent with each other.
146164
@@ -289,6 +307,18 @@ def sanitize_resource_spec_dict(spec: Optional[dict]) -> Optional[dict]:
289307
return d
290308

291309

310+
def _retry_on_condition(exception):
311+
"""Retry if the exception is an ApiError with a status code != 403.
312+
313+
Returns: a boolean value to indicate whether to retry or not.
314+
"""
315+
if isinstance(exception, ApiError) and str(exception.status.code) != "403":
316+
return True
317+
if isinstance(exception, exceptions.ConfigError) or isinstance(exception, ValueError):
318+
return True
319+
return False
320+
321+
292322
class K8sResourcePatchFailedEvent(EventBase):
293323
"""Emitted when patching fails."""
294324

@@ -385,27 +415,132 @@ def get_actual(self, pod_name: str) -> Optional[ResourceRequirements]:
385415
)
386416
return podspec.resources
387417

418+
def is_failed(
419+
self, resource_reqs_func: Callable[[], ResourceRequirements]
420+
) -> Tuple[bool, str]:
421+
"""Returns a tuple indicating whether a patch operation has failed along with a failure message.
422+
423+
Implementation is based on dry running the patch operation to catch if there would be failures (e.g: Wrong spec and Auth errors).
424+
"""
425+
try:
426+
resource_reqs = resource_reqs_func()
427+
limits = resource_reqs.limits
428+
requests = resource_reqs.requests
429+
except ValueError as e:
430+
msg = f"Failed obtaining resource limit spec: {e}"
431+
logger.error(msg)
432+
return True, msg
433+
434+
# Dry run does not catch negative values for resource requests and limits.
435+
if not is_valid_spec(limits) or not is_valid_spec(requests):
436+
msg = f"Invalid resource requirements specs: {limits}, {requests}"
437+
logger.error(msg)
438+
return True, msg
439+
440+
resource_reqs = ResourceRequirements(
441+
limits=sanitize_resource_spec_dict(limits), # type: ignore[arg-type]
442+
requests=sanitize_resource_spec_dict(requests), # type: ignore[arg-type]
443+
)
444+
445+
try:
446+
self.apply(resource_reqs, dry_run=True)
447+
except ApiError as e:
448+
if e.status.code == 403:
449+
msg = f"Kubernetes resources patch failed: `juju trust` this application. {e}"
450+
else:
451+
msg = f"Kubernetes resources patch failed: {e}"
452+
return True, msg
453+
except ValueError as e:
454+
msg = f"Kubernetes resources patch failed: {e}"
455+
return True, msg
456+
457+
return False, ""
458+
459+
def is_in_progress(self) -> bool:
460+
"""Returns a boolean to indicate whether a patch operation is in progress.
461+
462+
Implementation follows a similar approach to `kubectl rollout status statefulset` to track the progress of a rollout.
463+
Reference: https://github.com/kubernetes/kubectl/blob/kubernetes-1.31.0/pkg/polymorphichelpers/rollout_status.go
464+
"""
465+
try:
466+
sts = self.client.get(
467+
StatefulSet, name=self.statefulset_name, namespace=self.namespace
468+
)
469+
except (ValueError, ApiError) as e:
470+
# Assumption: if there was a persistent issue, it'd have been caught in `is_failed`
471+
# Wait until next run to try again.
472+
logger.error(f"Failed to fetch statefulset from K8s api: {e}")
473+
return False
474+
475+
if sts.status is None or sts.spec is None:
476+
logger.debug("status/spec are not yet available")
477+
return False
478+
if sts.status.observedGeneration == 0 or (
479+
sts.metadata
480+
and sts.status.observedGeneration
481+
and sts.metadata.generation
482+
and sts.metadata.generation > sts.status.observedGeneration
483+
):
484+
logger.debug("waiting for statefulset spec update to be observed...")
485+
return True
486+
if (
487+
sts.spec.replicas is not None
488+
and sts.status.readyReplicas is not None
489+
and sts.status.readyReplicas < sts.spec.replicas
490+
):
491+
logger.debug(
492+
f"Waiting for {sts.spec.replicas-sts.status.readyReplicas} pods to be ready..."
493+
)
494+
return True
495+
496+
if (
497+
sts.spec.updateStrategy
498+
and sts.spec.updateStrategy.type == "rollingUpdate"
499+
and sts.spec.updateStrategy.rollingUpdate is not None
500+
):
501+
if (
502+
sts.spec.replicas is not None
503+
and sts.spec.updateStrategy.rollingUpdate.partition is not None
504+
):
505+
if sts.status.updatedReplicas and sts.status.updatedReplicas < (
506+
sts.spec.replicas - sts.spec.updateStrategy.rollingUpdate.partition
507+
):
508+
logger.debug(
509+
f"Waiting for partitioned roll out to finish: {sts.status.updatedReplicas} out of {sts.spec.replicas - sts.spec.updateStrategy.rollingUpdate.partition} new pods have been updated..."
510+
)
511+
return True
512+
logger.debug(
513+
f"partitioned roll out complete: {sts.status.updatedReplicas} new pods have been updated..."
514+
)
515+
return False
516+
517+
if sts.status.updateRevision != sts.status.currentRevision:
518+
logger.debug(
519+
f"waiting for statefulset rolling update to complete {sts.status.updatedReplicas} pods at revision {sts.status.updateRevision}..."
520+
)
521+
return True
522+
523+
logger.debug(
524+
f"statefulset rolling update complete pods at revision {sts.status.currentRevision}"
525+
)
526+
return False
527+
388528
def is_ready(self, pod_name, resource_reqs: ResourceRequirements):
389529
"""Reports if the resource patch has been applied and is in effect.
390530
391531
Returns:
392532
bool: A boolean indicating if the service patch has been applied and is in effect.
393533
"""
394-
logger.info(
395-
"reqs=%s, templated=%s, actual=%s",
396-
resource_reqs,
397-
self.get_templated(),
398-
self.get_actual(pod_name),
399-
)
400534
return self.is_patched(resource_reqs) and equals_canonically( # pyright: ignore
401535
resource_reqs, self.get_actual(pod_name) # pyright: ignore
402536
)
403537

404-
def apply(self, resource_reqs: ResourceRequirements) -> None:
538+
def apply(self, resource_reqs: ResourceRequirements, dry_run=False) -> None:
405539
"""Patch the Kubernetes resources created by Juju to limit cpu or mem."""
406540
# Need to ignore invalid input, otherwise the StatefulSet gives "FailedCreate" and the
407541
# charm would be stuck in unknown/lost.
408-
if self.is_patched(resource_reqs):
542+
if not dry_run and self.is_patched(resource_reqs):
543+
logger.debug(f"Resource requests are already patched: {resource_reqs}")
409544
return
410545

411546
self.client.patch(
@@ -415,13 +550,17 @@ def apply(self, resource_reqs: ResourceRequirements) -> None:
415550
namespace=self.namespace,
416551
patch_type=PatchType.APPLY,
417552
field_manager=self.__class__.__name__,
553+
dry_run=dry_run,
418554
)
419555

420556

421557
class KubernetesComputeResourcesPatch(Object):
422558
"""A utility for patching the Kubernetes compute resources set up by Juju."""
423559

424560
on = K8sResourcePatchEvents() # pyright: ignore
561+
PATCH_RETRY_STOP = tenacity.stop_after_delay(20)
562+
PATCH_RETRY_WAIT = tenacity.wait_fixed(5)
563+
PATCH_RETRY_IF = tenacity.retry_if_exception(_retry_on_condition)
425564

426565
def __init__(
427566
self,
@@ -468,7 +607,11 @@ def _on_config_changed(self, _):
468607
self._patch()
469608

470609
def _patch(self) -> None:
471-
"""Patch the Kubernetes resources created by Juju to limit cpu or mem."""
610+
"""Patch the Kubernetes resources created by Juju to limit cpu or mem.
611+
612+
This method will keep on retrying to patch the kubernetes resource for a default duration of 20 seconds
613+
if the patching failure is due to a recoverable error (e.g: Network Latency).
614+
"""
472615
try:
473616
resource_reqs = self.resource_reqs_func()
474617
limits = resource_reqs.limits
@@ -492,7 +635,18 @@ def _patch(self) -> None:
492635
)
493636

494637
try:
495-
self.patcher.apply(resource_reqs)
638+
for attempt in tenacity.Retrying(
639+
retry=self.PATCH_RETRY_IF,
640+
stop=self.PATCH_RETRY_STOP,
641+
wait=self.PATCH_RETRY_WAIT,
642+
# if you don't succeed raise the last caught exception when you're done
643+
reraise=True,
644+
):
645+
with attempt:
646+
logger.debug(
647+
f"attempt #{attempt.retry_state.attempt_number} to patch resource limits"
648+
)
649+
self.patcher.apply(resource_reqs)
496650

497651
except exceptions.ConfigError as e:
498652
msg = f"Error creating k8s client: {e}"
@@ -503,6 +657,7 @@ def _patch(self) -> None:
503657
except ApiError as e:
504658
if e.status.code == 403:
505659
msg = f"Kubernetes resources patch failed: `juju trust` this application. {e}"
660+
506661
else:
507662
msg = f"Kubernetes resources patch failed: {e}"
508663

@@ -554,6 +709,29 @@ def is_ready(self) -> bool:
554709
self.on.patch_failed.emit(message=msg)
555710
return False
556711

712+
def get_status(self) -> StatusBase:
713+
"""Return the status of patching the resource limits in a `StatusBase` format.
714+
715+
Returns:
716+
StatusBase: There is a 1:1 mapping between the state of the patching operation and a `StatusBase` value that the charm can be set to.
717+
Possible values are:
718+
- ActiveStatus: The patch was applied successfully.
719+
- BlockedStatus: The patch failed and requires a human intervention.
720+
- WaitingStatus: The patch is still in progress.
721+
722+
Example:
723+
- ActiveStatus("Patch applied successfully")
724+
- BlockedStatus("Failed due to missing permissions")
725+
- WaitingStatus("Patch is in progress")
726+
"""
727+
failed, msg = self.patcher.is_failed(self.resource_reqs_func)
728+
if failed:
729+
return BlockedStatus(msg)
730+
if self.patcher.is_in_progress():
731+
return WaitingStatus("waiting for resources patch to apply")
732+
# patch successful or nothing has been patched yet
733+
return ActiveStatus()
734+
557735
@property
558736
def _app(self) -> str:
559737
"""Name of the current Juju application.

requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@
33

44
ops
55
PyYAML
6-
lightkube
6+
lightkube>=v0.15.4
7+
tenacity

0 commit comments

Comments
 (0)