Skip to content

Commit 0797aee

Browse files
authored
fix: resource health status (#397)
* fix resource health status * fix e2e cache tests * fix e2e cache tests
1 parent 80630ad commit 0797aee

File tree

15 files changed

+119
-86
lines changed

15 files changed

+119
-86
lines changed

cmd/agent/console.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func registerConsoleReconcilersOrDie(
5454
consoleClient client.Client,
5555
) {
5656
mgr.AddReconcilerOrDie(service.Identifier, func() (v1.Reconciler, error) {
57-
r, err := service.NewServiceReconciler(consoleClient, config, args.ControllerCacheTTL(), args.ManifestCacheTTL(), args.RestoreNamespace(), args.ConsoleUrl())
57+
r, err := service.NewServiceReconciler(k8sClient, consoleClient, config, args.ControllerCacheTTL(), args.ManifestCacheTTL(), args.RestoreNamespace(), args.ConsoleUrl())
5858
return r, err
5959
})
6060

cmd/agent/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func main() {
7070

7171
// Start resource cache in background if enabled.
7272
if args.ResourceCacheEnabled() {
73-
cache.Init(ctx, config, args.ResourceCacheTTL())
73+
cache.Init(ctx, kubeManager.GetClient(), config, args.ResourceCacheTTL())
7474
}
7575

7676
// Start the discovery cache in background.

internal/controller/clusterdrain_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ func waitForHealthStatus(ctx context.Context, c client.Client, obj *unstructured
235235
}
236236

237237
// Check the status of the object
238-
status := common.ToStatus(obj)
238+
status := common.ToStatus(ctx, c, obj)
239239
if status == nil {
240240
return fmt.Errorf("status is nil")
241241
}

pkg/applier/filters/cache_filter_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ var _ = Describe("Test filters", func() {
4242
}
4343

4444
It("check cache filter", func() {
45-
cache.Init(context.Background(), cfg, 100*time.Second)
45+
cache.Init(context.Background(), kClient, cfg, 100*time.Second)
4646
cacheFilter := CacheFilter{}
4747
res, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&pod)
4848
Expect(err).ToNot(HaveOccurred())

pkg/cache/resource_cache.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"k8s.io/klog/v2"
1212
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/clusterreader"
1313
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/statusreaders"
14+
ctrclient "sigs.k8s.io/controller-runtime/pkg/client"
1415

1516
"github.com/pluralsh/polly/containers"
1617
"github.com/samber/lo"
@@ -66,6 +67,9 @@ type ResourceCache struct {
6667
// control the lifecycle of opened watches and is using RetryListWatcher
6768
// instead of informers to minimize the memory footprint.
6869
watcher kwatcher.StatusWatcher
70+
71+
// k8sClient is required for resource statuses.
72+
k8sClient ctrclient.Client
6973
}
7074

7175
var (
@@ -76,7 +80,7 @@ var (
7680
// Init must be executed early in [main] in order to ensure that the
7781
// [ResourceCache] will be initialized properly during the application
7882
// startup.
79-
func Init(ctx context.Context, config *rest.Config, ttl time.Duration) {
83+
func Init(ctx context.Context, k8sClient ctrclient.Client, config *rest.Config, ttl time.Duration) {
8084
dynamicClient, err := dynamic.NewForConfig(config)
8185
if err != nil {
8286
klog.Error(err, "unable to create dynamic client")
@@ -114,6 +118,7 @@ func Init(ctx context.Context, config *rest.Config, ttl time.Duration) {
114118
cache: NewCache[*ResourceCacheEntry](ctx, ttl),
115119
resourceKeySet: containers.NewSet[ResourceKey](),
116120
watcher: w,
121+
k8sClient: k8sClient,
117122
}
118123

119124
initialized = true
@@ -223,7 +228,7 @@ func (in *ResourceCache) GetCacheStatus(key object.ObjMetadata) (*console.Compon
223228
return nil, err
224229
}
225230
in.saveResourceStatus(obj)
226-
return common.StatusEventToComponentAttributes(*s, make(map[schema.GroupName]string)), nil
231+
return common.StatusEventToComponentAttributes(in.ctx, in.k8sClient, *s, make(map[schema.GroupName]string)), nil
227232
}
228233

229234
func (in *ResourceCache) saveResourceStatus(resource *unstructured.Unstructured) {
@@ -235,7 +240,7 @@ func (in *ResourceCache) saveResourceStatus(resource *unstructured.Unstructured)
235240

236241
key := object.UnstructuredToObjMetadata(resource).String()
237242
cacheEntry, _ := resourceCache.GetCacheEntry(key)
238-
cacheEntry.SetStatus(*e)
243+
cacheEntry.SetStatus(in.ctx, in.k8sClient, *e)
239244
resourceCache.SetCacheEntry(key, cacheEntry)
240245

241246
}

pkg/cache/resource_cache_entry.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package cache
22

33
import (
4-
console "github.com/pluralsh/console/go/client"
5-
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
6-
"sigs.k8s.io/cli-utils/pkg/apply/event"
4+
"context"
75

6+
console "github.com/pluralsh/console/go/client"
87
"github.com/pluralsh/deployment-operator/internal/kubernetes/schema"
98
"github.com/pluralsh/deployment-operator/pkg/common"
9+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
10+
"sigs.k8s.io/cli-utils/pkg/apply/event"
11+
ctrclient "sigs.k8s.io/controller-runtime/pkg/client"
1012
)
1113

1214
type SHAType string
@@ -80,6 +82,6 @@ func (in *ResourceCacheEntry) RequiresApply(manifestSHA string) bool {
8082

8183
// SetStatus saves the last seen resource [event.StatusEvent] and converts it to a simpler
8284
// [console.ComponentAttributes] structure.
83-
func (in *ResourceCacheEntry) SetStatus(se event.StatusEvent) {
84-
in.status = common.StatusEventToComponentAttributes(se, make(map[schema.GroupName]string))
85+
func (in *ResourceCacheEntry) SetStatus(ctx context.Context, k8sClient ctrclient.Client, se event.StatusEvent) {
86+
in.status = common.StatusEventToComponentAttributes(ctx, k8sClient, se, make(map[schema.GroupName]string))
8587
}

pkg/cache/resource_cache_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ var _ = Describe("Resource cache", Ordered, func() {
104104
})
105105

106106
It("should successfully create resource cache", func() {
107-
Init(ctx, cfg, 100*time.Second)
107+
Init(ctx, kClient, cfg, 100*time.Second)
108108
toAdd := containers.NewSet[ResourceKey]()
109109

110110
// register resource and watch for changes
@@ -129,7 +129,7 @@ var _ = Describe("Resource cache", Ordered, func() {
129129
})
130130

131131
It("should successfully watch CRD object", func() {
132-
Init(ctx, cfg, 100*time.Second)
132+
Init(ctx, kClient, cfg, 100*time.Second)
133133
toAdd := containers.NewSet[ResourceKey]()
134134

135135
err = applyYamlFile(ctx, kClient, "../../config/crd/bases/deployments.plural.sh_customhealths.yaml")

pkg/common/health_test.go

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package common_test
22

33
import (
4+
"context"
45
"os"
56
"path/filepath"
67
"time"
@@ -9,21 +10,29 @@ import (
910
. "github.com/onsi/gomega"
1011
deploymentsv1alpha1 "github.com/pluralsh/deployment-operator/api/v1alpha1"
1112
"github.com/pluralsh/deployment-operator/pkg/common"
13+
testcommon "github.com/pluralsh/deployment-operator/pkg/test/common"
1214
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1315
)
1416

1517
var _ = Describe("Health Test", Ordered, func() {
1618
Context("Test health functions", func() {
1719
customResource := &deploymentsv1alpha1.MetricsAggregate{
1820
ObjectMeta: metav1.ObjectMeta{
19-
Name: "test",
21+
Name: "test",
22+
Namespace: "default",
2023
},
2124
}
2225

26+
BeforeAll(func() {
27+
Expect(testcommon.MaybeCreate(kClient, customResource, nil)).To(Succeed())
28+
})
29+
2330
It("should get default status from CRD without condition block", func() {
2431
obj, err := common.ToUnstructured(customResource)
32+
obj.SetKind("MetricsAggregate")
33+
obj.SetAPIVersion("deployments.plural.sh/v1alpha1")
2534
Expect(err).NotTo(HaveOccurred())
26-
status, err := common.GetResourceHealth(obj)
35+
status, err := common.GetResourceHealth(context.Background(), kClient, obj)
2736
Expect(err).NotTo(HaveOccurred())
2837
Expect(status).To(Not(BeNil()))
2938
Expect(*status).To(Equal(common.HealthStatus{
@@ -41,7 +50,9 @@ var _ = Describe("Health Test", Ordered, func() {
4150
}
4251
obj, err := common.ToUnstructured(customResource)
4352
Expect(err).NotTo(HaveOccurred())
44-
status, err := common.GetResourceHealth(obj)
53+
obj.SetKind("MetricsAggregate")
54+
obj.SetAPIVersion("deployments.plural.sh/v1alpha1")
55+
status, err := common.GetResourceHealth(context.Background(), kClient, obj)
4556
Expect(err).NotTo(HaveOccurred())
4657
Expect(status).To(Not(BeNil()))
4758
Expect(*status).To(Equal(common.HealthStatus{
@@ -61,7 +72,9 @@ var _ = Describe("Health Test", Ordered, func() {
6172
}
6273
obj, err := common.ToUnstructured(customResource)
6374
Expect(err).NotTo(HaveOccurred())
64-
status, err := common.GetResourceHealth(obj)
75+
obj.SetKind("MetricsAggregate")
76+
obj.SetAPIVersion("deployments.plural.sh/v1alpha1")
77+
status, err := common.GetResourceHealth(context.Background(), kClient, obj)
6578
Expect(err).NotTo(HaveOccurred())
6679
Expect(status).To(Not(BeNil()))
6780
Expect(*status).To(Equal(common.HealthStatus{
@@ -84,7 +97,7 @@ var _ = Describe("Health Test", Ordered, func() {
8497
obj.SetAPIVersion("deployments.plural.sh/v1alpha1")
8598
obj.SetKind("MetricsAggregate")
8699
Expect(err).NotTo(HaveOccurred())
87-
status, err := common.GetResourceHealth(obj)
100+
status, err := common.GetResourceHealth(context.Background(), kClient, obj)
88101
Expect(err).NotTo(HaveOccurred())
89102
Expect(status).To(Not(BeNil()))
90103
Expect(*status).To(Equal(common.HealthStatus{
@@ -98,7 +111,9 @@ var _ = Describe("Health Test", Ordered, func() {
98111
}
99112
obj, err := common.ToUnstructured(customResource)
100113
Expect(err).NotTo(HaveOccurred())
101-
status, err := common.GetResourceHealth(obj)
114+
obj.SetKind("MetricsAggregate")
115+
obj.SetAPIVersion("deployments.plural.sh/v1alpha1")
116+
status, err := common.GetResourceHealth(context.Background(), kClient, obj)
102117
Expect(err).NotTo(HaveOccurred())
103118
Expect(status).To(Not(BeNil()))
104119
Expect(*status).To(Equal(common.HealthStatus{
@@ -111,11 +126,13 @@ var _ = Describe("Health Test", Ordered, func() {
111126
customResource.DeletionTimestamp = nil
112127
obj, err := common.ToUnstructured(customResource)
113128
Expect(err).NotTo(HaveOccurred())
129+
obj.SetKind("MetricsAggregate")
130+
obj.SetAPIVersion("deployments.plural.sh/v1alpha1")
114131
scriptPath := filepath.Join("..", "..", "test", "lua", "test.lua")
115132
script, err := os.ReadFile(scriptPath)
116133
Expect(err).NotTo(HaveOccurred())
117134
common.GetLuaScript().SetValue(string(script))
118-
status, err := common.GetResourceHealth(obj)
135+
status, err := common.GetResourceHealth(context.Background(), kClient, obj)
119136
Expect(err).NotTo(HaveOccurred())
120137
Expect(status).To(Not(BeNil()))
121138
Expect(*status).To(Equal(common.HealthStatus{

pkg/common/status.go

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,23 @@
11
package common
22

33
import (
4+
"context"
5+
"time"
6+
47
console "github.com/pluralsh/console/go/client"
8+
internalschema "github.com/pluralsh/deployment-operator/internal/kubernetes/schema"
9+
"github.com/pluralsh/deployment-operator/internal/utils"
510
"github.com/samber/lo"
11+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
612
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
713
"k8s.io/apimachinery/pkg/runtime/schema"
814
"k8s.io/klog/v2"
915
"sigs.k8s.io/cli-utils/pkg/apply/event"
1016
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
11-
12-
internalschema "github.com/pluralsh/deployment-operator/internal/kubernetes/schema"
17+
ctrclient "sigs.k8s.io/controller-runtime/pkg/client"
1318
)
1419

15-
func StatusEventToComponentAttributes(e event.StatusEvent, vcache map[internalschema.GroupName]string) *console.ComponentAttributes {
20+
func StatusEventToComponentAttributes(ctx context.Context, k8sClient ctrclient.Client, e event.StatusEvent, vcache map[internalschema.GroupName]string) *console.ComponentAttributes {
1621
if e.Resource == nil {
1722
return nil
1823
}
@@ -31,7 +36,7 @@ func StatusEventToComponentAttributes(e event.StatusEvent, vcache map[internalsc
3136
synced := e.PollResourceInfo.Status == status.CurrentStatus
3237

3338
if e.PollResourceInfo.Status == status.UnknownStatus {
34-
if ToStatus(e.Resource) != nil {
39+
if ToStatus(ctx, k8sClient, e.Resource) != nil {
3540
synced = true
3641
}
3742
}
@@ -42,12 +47,12 @@ func StatusEventToComponentAttributes(e event.StatusEvent, vcache map[internalsc
4247
Name: e.Resource.GetName(),
4348
Version: version,
4449
Synced: synced,
45-
State: ToStatus(e.Resource),
50+
State: ToStatus(ctx, k8sClient, e.Resource),
4651
}
4752
}
4853

49-
func ToStatus(obj *unstructured.Unstructured) *console.ComponentState {
50-
h, err := GetResourceHealth(obj)
54+
func ToStatus(ctx context.Context, k8sClient ctrclient.Client, obj *unstructured.Unstructured) *console.ComponentState {
55+
h, err := GetResourceHealth(ctx, k8sClient, obj)
5156
if err != nil {
5257
klog.ErrorS(err, "failed to get resource health status", "name", obj.GetName(), "namespace", obj.GetNamespace())
5358
}
@@ -71,7 +76,7 @@ func ToStatus(obj *unstructured.Unstructured) *console.ComponentState {
7176
}
7277

7378
// GetResourceHealth returns the health of a k8s resource
74-
func GetResourceHealth(obj *unstructured.Unstructured) (health *HealthStatus, err error) {
79+
func GetResourceHealth(ctx context.Context, k8sClient ctrclient.Client, obj *unstructured.Unstructured) (health *HealthStatus, err error) {
7580
if obj.GetDeletionTimestamp() != nil {
7681
return &HealthStatus{
7782
Status: HealthStatusProgressing,
@@ -87,6 +92,34 @@ func GetResourceHealth(obj *unstructured.Unstructured) (health *HealthStatus, er
8792
}
8893
}
8994
}
95+
96+
if health == nil {
97+
health = &HealthStatus{
98+
Status: HealthStatusUnknown,
99+
}
100+
}
101+
102+
progressTime, err := GetLastProgressTimestamp(ctx, k8sClient, obj)
103+
if err != nil {
104+
return nil, err
105+
}
106+
107+
// remove entry if no longer progressing
108+
if health.Status != HealthStatusProgressing {
109+
// cleanup progress timestamp
110+
annotations := obj.GetAnnotations()
111+
delete(annotations, LastProgressTimeAnnotation)
112+
obj.SetAnnotations(annotations)
113+
return health, utils.TryToUpdate(ctx, k8sClient, obj)
114+
}
115+
116+
// mark as failed if it exceeds a threshold
117+
cutoffTime := metav1.NewTime(time.Now().Add(-15 * time.Minute))
118+
119+
if progressTime.Before(&cutoffTime) {
120+
health.Status = HealthStatusDegraded
121+
}
122+
90123
return health, err
91124

92125
}

pkg/common/suite_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424

2525
. "github.com/onsi/ginkgo/v2"
2626
. "github.com/onsi/gomega"
27+
deploymentsv1alpha1 "github.com/pluralsh/deployment-operator/api/v1alpha1"
2728
"k8s.io/client-go/kubernetes/scheme"
2829
"sigs.k8s.io/controller-runtime/pkg/client"
2930
"sigs.k8s.io/controller-runtime/pkg/envtest"
@@ -46,6 +47,7 @@ var _ = BeforeSuite(func() {
4647

4748
By("bootstrapping test environment")
4849
testEnv = &envtest.Environment{
50+
CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases"), filepath.Join("..", "..", "test", "crd")},
4951
BinaryAssetsDirectory: filepath.Join("..", "..", "bin", "k8s",
5052
fmt.Sprintf("1.28.3-%s-%s", runtime.GOOS, runtime.GOARCH)),
5153
}
@@ -54,6 +56,9 @@ var _ = BeforeSuite(func() {
5456
Expect(err).NotTo(HaveOccurred())
5557
Expect(cfg).NotTo(BeNil())
5658

59+
err = deploymentsv1alpha1.AddToScheme(scheme.Scheme)
60+
Expect(err).NotTo(HaveOccurred())
61+
5762
kClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
5863
Expect(err).NotTo(HaveOccurred())
5964
Expect(kClient).NotTo(BeNil())

0 commit comments

Comments
 (0)