Skip to content

Commit dde6b2f

Browse files
authored
fix: fetch pods status for pending Deployment,DaemonSet and StatefulSet (#392)
* fetch pod status for pending Deployment,DaemonSet and StatefulSet * fix linter * fix selector * add limit * add progress timestamp to annotation * refactor * linter
1 parent 16d82a8 commit dde6b2f

File tree

5 files changed

+148
-23
lines changed

5 files changed

+148
-23
lines changed

internal/controller/clusterdrain_controller.go

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ const defaultBatchSize = 50
3434
const (
3535
drainAnnotation = "deployment.plural.sh/drain"
3636
healthStatusJitter = 5
37+
threshold = 15
3738
)
3839

3940
// ClusterDrainReconciler reconciles a ClusterDrain object
@@ -218,28 +219,42 @@ func healthStatusDelay() time.Duration {
218219
}
219220

220221
func waitForHealthStatus(ctx context.Context, c client.Client, obj *unstructured.Unstructured) error {
221-
startTime := time.Now()
222+
timeout := threshold * time.Minute // Timeout duration
223+
ticker := time.NewTicker(healthStatusDelay()) // Ticker to periodically check health status
224+
defer ticker.Stop()
225+
226+
// Create a timeout channel that will trigger after the specified timeout
227+
timeoutChan := time.After(timeout)
228+
222229
for {
223-
time.Sleep(healthStatusDelay())
224-
if err := c.Get(ctx, client.ObjectKeyFromObject(obj), obj); err != nil {
225-
return err
226-
}
227-
status := common.ToStatus(obj)
228-
if status == nil {
229-
return fmt.Errorf("status is nil")
230-
}
230+
select {
231+
case <-ticker.C:
232+
// Fetch the latest object
233+
if err := c.Get(ctx, client.ObjectKeyFromObject(obj), obj); err != nil {
234+
return err
235+
}
231236

232-
switch *status {
233-
case console.ComponentStateRunning:
234-
return nil
235-
case console.ComponentStateFailed:
236-
return fmt.Errorf("component %s failed", obj.GetName())
237-
}
237+
// Check the status of the object
238+
status := common.ToStatus(obj)
239+
if status == nil {
240+
return fmt.Errorf("status is nil")
241+
}
238242

239-
if time.Since(startTime).Minutes() > 5 {
240-
return fmt.Errorf("timeout after %f minutes", time.Since(startTime).Minutes())
243+
// Handle the different states of the component
244+
switch *status {
245+
case console.ComponentStateRunning:
246+
return nil
247+
case console.ComponentStateFailed:
248+
return fmt.Errorf("component %s failed", obj.GetName())
249+
}
250+
251+
case <-timeoutChan:
252+
return fmt.Errorf("timeout after %f minutes", timeout.Seconds()/60)
253+
case <-ctx.Done():
254+
return ctx.Err()
241255
}
242256
}
257+
243258
}
244259

245260
func splitIntoWaves[T any](items []T, batchSize int) [][]T {

internal/utils/kubernetes.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,24 @@ func TryAddControllerRef(ctx context.Context, client ctrlruntimeclient.Client, o
5252
})
5353
}
5454

55+
func TryToUpdate(ctx context.Context, client ctrlruntimeclient.Client, object ctrlruntimeclient.Object) error {
56+
key := ctrlruntimeclient.ObjectKeyFromObject(object)
57+
58+
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
59+
original := object.DeepCopyObject().(ctrlruntimeclient.Object)
60+
if err := client.Get(ctx, key, object); err != nil {
61+
return fmt.Errorf("could not fetch current %s/%s state, got error: %w", object.GetName(), object.GetNamespace(), err)
62+
}
63+
64+
if reflect.DeepEqual(object, original) {
65+
return nil
66+
}
67+
68+
return client.Patch(ctx, original, ctrlruntimeclient.MergeFrom(object))
69+
})
70+
71+
}
72+
5573
func TryAddOwnerRef(ctx context.Context, client ctrlruntimeclient.Client, owner ctrlruntimeclient.Object, object ctrlruntimeclient.Object, scheme *runtime.Scheme) error {
5674
key := ctrlruntimeclient.ObjectKeyFromObject(object)
5775

pkg/common/common.go

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,22 @@
11
package common
22

33
import (
4+
"context"
5+
"time"
6+
7+
"github.com/pluralsh/deployment-operator/internal/utils"
8+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
49
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
510
"k8s.io/apimachinery/pkg/labels"
611
"k8s.io/apimachinery/pkg/runtime"
12+
ctrclient "sigs.k8s.io/controller-runtime/pkg/client"
713
"sigs.k8s.io/yaml"
814
)
915

1016
const (
11-
ManagedByLabel = "plural.sh/managed-by"
12-
AgentLabelValue = "agent"
17+
ManagedByLabel = "plural.sh/managed-by"
18+
AgentLabelValue = "agent"
19+
LastProgressTimeAnnotation = "plural.sh/last-progress-time"
1320
)
1421

1522
func ManagedByAgentLabelSelector() labels.Selector {
@@ -34,3 +41,37 @@ func Unmarshal(s string) (map[string]interface{}, error) {
3441

3542
return result, nil
3643
}
44+
45+
func GetLastProgressTimestamp(ctx context.Context, k8sClient ctrclient.Client, obj *unstructured.Unstructured) (progressTime metav1.Time, err error) {
46+
progressTime = metav1.Now()
47+
48+
if obj.GetAnnotations() == nil {
49+
obj.SetAnnotations(make(map[string]string))
50+
}
51+
annotations := obj.GetAnnotations()
52+
timeStr, ok := annotations[LastProgressTimeAnnotation]
53+
54+
defer func() {
55+
if !ok {
56+
err = utils.TryToUpdate(ctx, k8sClient, obj)
57+
if err != nil {
58+
return
59+
}
60+
key := ctrclient.ObjectKeyFromObject(obj)
61+
err = k8sClient.Get(ctx, key, obj)
62+
}
63+
}()
64+
65+
if !ok {
66+
annotations[LastProgressTimeAnnotation] = progressTime.Time.Format(time.RFC3339)
67+
obj.SetAnnotations(annotations)
68+
return
69+
}
70+
parsedTime, err := time.Parse(time.RFC3339, timeStr)
71+
if err != nil {
72+
return
73+
}
74+
progressTime = metav1.Time{Time: parsedTime}
75+
76+
return
77+
}

pkg/common/health.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,16 @@ func getPodHealth(obj *unstructured.Unstructured) (*HealthStatus, error) {
429429
}
430430
}
431431

432+
func FindPodStatusCondition(conditions []corev1.PodCondition, conditionType corev1.PodConditionType) *corev1.PodCondition {
433+
for i := range conditions {
434+
if conditions[i].Type == conditionType {
435+
return &conditions[i]
436+
}
437+
}
438+
439+
return nil
440+
}
441+
432442
func getCorev1PodHealth(pod *corev1.Pod) (*HealthStatus, error) {
433443
// This logic cannot be applied when the pod.Spec.RestartPolicy is: corev1.RestartPolicyOnFailure,
434444
// corev1.RestartPolicyNever, otherwise it breaks the resource hook logic.
@@ -449,6 +459,17 @@ func getCorev1PodHealth(pod *corev1.Pod) (*HealthStatus, error) {
449459
}
450460
}
451461

462+
if cond := FindPodStatusCondition(pod.Status.Conditions, corev1.PodScheduled); cond != nil {
463+
if status == "" && cond.Status == corev1.ConditionFalse {
464+
// status older than 5min
465+
cutoffTime := metav1.NewTime(time.Now().Add(-5 * time.Minute))
466+
if cond.LastTransitionTime.Before(&cutoffTime) {
467+
status = HealthStatusDegraded
468+
messages = append(messages, cond.Message)
469+
}
470+
}
471+
}
472+
452473
if status != "" {
453474
return &HealthStatus{
454475
Status: status,

pkg/scraper/component_scraper.go

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,9 @@ import (
66
"sync"
77
"time"
88

9-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
"github.com/pluralsh/deployment-operator/internal/utils"
1010

1111
"github.com/cert-manager/cert-manager/pkg/apis/certmanager"
12-
1312
"github.com/pluralsh/deployment-operator/internal/helpers"
1413
"github.com/pluralsh/deployment-operator/pkg/common"
1514
agentcommon "github.com/pluralsh/deployment-operator/pkg/common"
@@ -18,6 +17,7 @@ import (
1817
"github.com/pluralsh/polly/containers"
1918
"github.com/samber/lo"
2019
corev1 "k8s.io/api/core/v1"
20+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2121
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2222
"k8s.io/apimachinery/pkg/runtime/schema"
2323
"k8s.io/client-go/discovery"
@@ -132,15 +132,15 @@ func RunAiInsightComponentScraperInBackgroundOrDie(ctx context.Context, k8sClien
132132
}
133133
}
134134

135-
func setUnhealthyComponents(ctx context.Context, k8sClient ctrclient.Client, gvk schema.GroupVersionKind, opts ...ctrclient.ListOption) error {
135+
func setUnhealthyComponents(ctx context.Context, k8sClient ctrclient.Client, gvk schema.GroupVersionKind) error {
136136
pager := listResources(ctx, k8sClient, gvk)
137137
for pager.HasNext() {
138138
items, err := pager.NextPage()
139139
if err != nil {
140140
return err
141141
}
142142
for _, item := range items {
143-
health, err := common.GetResourceHealth(&item)
143+
health, err := getResourceHealthStatus(ctx, k8sClient, &item)
144144
if err != nil {
145145
return err
146146
}
@@ -156,6 +156,36 @@ func setUnhealthyComponents(ctx context.Context, k8sClient ctrclient.Client, gvk
156156
return nil
157157
}
158158

159+
func getResourceHealthStatus(ctx context.Context, k8sClient ctrclient.Client, obj *unstructured.Unstructured) (*common.HealthStatus, error) {
160+
health, err := common.GetResourceHealth(obj)
161+
if err != nil {
162+
return nil, err
163+
}
164+
165+
progressTime, err := common.GetLastProgressTimestamp(ctx, k8sClient, obj)
166+
if err != nil {
167+
return nil, err
168+
}
169+
170+
// remove entry if no longer progressing
171+
if health.Status != common.HealthStatusProgressing {
172+
// cleanup progress timestamp
173+
annotations := obj.GetAnnotations()
174+
delete(annotations, common.LastProgressTimeAnnotation)
175+
obj.SetAnnotations(annotations)
176+
return health, utils.TryToUpdate(ctx, k8sClient, obj)
177+
}
178+
179+
// mark as failed if it exceeds a threshold
180+
cutoffTime := metav1.NewTime(time.Now().Add(-30 * time.Minute))
181+
182+
if progressTime.Before(&cutoffTime) {
183+
health.Status = common.HealthStatusDegraded
184+
}
185+
186+
return health, nil
187+
}
188+
159189
func listResources(ctx context.Context, k8sClient ctrclient.Client, gvk schema.GroupVersionKind) *algorithms.Pager[unstructured.Unstructured] {
160190
var opts []ctrclient.ListOption
161191
manageByOperatorLabels := map[string]string{

0 commit comments

Comments
 (0)