Skip to content

Commit 23723d9

Browse files
maciaszczykmflorekszreigz
authored
feat: Add sync phases and waves customizability (#537)
* move `asJSON` to utils and rename to `UnstructuredAsJSON` * add `SyncWaveAnnotation` for resource ordering * add `GetSyncWave` function for handling `SyncWaveAnnotation` * move and integrate the default wave logic into the `GetSyncWave` function for resource ordering * remove unused `GetLastProgressTimestamp` function and cleanup imports in `common.go` * remove unused `ServiceID` function and cleanup imports in `common.go` * add `SyncPhase` constants and `GetSyncPhase` function for handling resource apply phases * move `const.go` contents to `annotations.go` for better file organization * integrate wave construction into `Phase` and improve resource synchronization handling * add skipped resource tracking and integrate skip/delete filters into phase construction * remove `CacheFilter` from applier initialization in service reconciler * add `WithFilter` to applier initialization in service reconciler * commit work in progress * feat(phase): add support for phased resource application and enhanced health evaluation * rename `kindToDefaultSyncWave` to `kindSyncPriorities` and replace `lastDefaultWave` with `defaultSyncPriority` * fix `DeletedCount` to use `deleteWave.Len` instead of the last wave length * improve phase management by adding `HasWaves` method, ensuring phase initialization, and refactoring `Next` logic for better clarity * add `sync_phase` tracking in database schema, queries, and data structures * replace base32 with base64 for hash encoding in `utils` package to optimize output format * add check to skip insert query execution if no values are provided in `db_store` * feat(applier): improve logging, enhance resource filtering, and refine phase application logic * replace `AreResourcesHealthy` with `GetResourceHealth` to improve health status tracking and error handling across store and applier components * improve logging in `applier` to provide better visibility into phase health and failure handling * revise `GetResourceHealth` test cases to ensure accurate tracking of pending and failed states * fix unit tests * fix unit tests * add initial version of resource check function * add tests for `HasSomeResources` to validate handling of different resource scenarios and edge cases * feat(annotations): add Helm hook annotations and extend sync phase definitions * handle Helm hooks * update `defaultPhase` to accept annotations map directly instead of unstructured type * implement `HelmFilter` to handle resource application logic based on Helm hook annotations during installs and upgrades * fix(tests): correct resource references in db_store_test to align with updated test scenarios * feat(sync-wave): add Helm hook weight support and enhance default sync wave determination based on resource kind * add service warnings * add manifest table * add manifests query * rename entry to component * add SaveManifests and GetManifests functions to handle storing and retrieving service-applied manifests * rename `component` to `manifest` in insert statement and adjust field mappings in query * add tests for SaveManifests and GetManifests to validate manifest storage and retrieval behavior * update docs * add GetPhaseHookDeletePolicy function to retrieve sync phase hook delete policy from annotations * rename manifest storage to cleanup candidates and update related functions, queries, and tests * replace direct schema initialization with GroupVersionKind method and refactor associated code * refactor ToUnstructured logic into Component and update usage in applier to streamline resource conversion * simplify resource existence check in applier by combining variable declaration and conditional * simplify resource filtering by inverting conditional check to reduce nesting * add UID field to cleanup candidate and update related queries and functions to handle unique resource identification * replace `NewStoreKeyFromEntry` with `ToStoreKey` method in `Component` and `CleanupCandidate`, and refactor related logic for consistency and clarity * refactor phase deletion logic to filter resources by sync phase directly in NewPhase for improved clarity and alignment with existing patterns * rename cleanup candidates to processed hook components and update associated methods, queries, and tests for enhanced semantic clarity * add status field to processed hook component and update queries, functions, and conflict resolution logic to manage and persist component state * add ExpireProcessedHookComponents method to DatabaseStore and store interface to enable removal of processed hook components by service ID * call ExpireProcessedHookComponents during service reconciliation to remove processed components and ensure consistency * refactor SyncPhaseHookDeletePolicy handling to use switch-case for improved clarity and extensibility * extend processed hook component logic with status-based handling and update related applier logic for resource filtering and deletion * improve comment clarity in applier logic for resource deletion policies * remove unnecessary comment in SetAnnotations call for streamlined code readability * simplify applier logic by consolidating deletion policy checks and optimizing resource filtering and deletion handling * clarify error log message in component saving logic to improve debugging * refactor processed hook component saving logic to optimize checks, simplify queries, and enhance error handling * extend processed hook component saving logic with status-based delete policy checks and update related error handling * clarify log messages in processed hook saving logic for improved traceability and debugging * extend `maybeSaveProcessedHookComponent` with state and serviceID parameters to refine delete policy checks and enhance logging accuracy * refactor delete policy checks by introducing `HasStateDesiredByDeletePolicy` helper function and simplifying conditional logic in `maybeSaveProcessedHookComponent` * fix import cycle * refactor `maybeSaveProcessedHookComponent` to accept database connection as a parameter, removing redundant connection handling logic * introduce `maybeSaveProcessedHookComponents` to handle bulk saving of processed hook components, replacing outdated TODO and improving logic readability * remove redundant and outdated test cases for resource health and existence checks, replace with streamlined `createHookJob` utility, and refactor `TestComponentCache_ProcessedHookComponents` * add comprehensive test cases for `GetProcessedHookComponents` to validate behavior with isolation, resource types, UIDs, and large datasets * fix lint * update pull request template * support helm.sh/hook-delete-policy * bump gql client v1.51.3 * optimize `maybeSaveProcessedHookComponents` by reducing redundant string operations and improving clarity with pre-allocated slice for value assembly * replace fallthrough with explicit return for SyncPhaseSync in annotations * replace fallthrough with call to defaultPhase in annotations * recreate hooks if their manifest change * fix cyclic dependency * update comment * mark service as failed only if it has errors, not warnings * refactor * do not sleep after last wave * refactor * fix query * reformat * fix manifest sha handling * refactor SaveHookComponentWithManifestSHA signature and update call sites * fix manifest sha handling * add required checks * support multiple phases * fix multiple phases support * add tests for phase recognition * fix tests for phase recognition * add tests for wave recognition * fix delete filtering * update comment * update comment * rename methods for consistency * remove unused NewKeyFromEntry function * rename methods and variables for consistency * do not ignore helm hooks by default * remove duplicated check * replace `GetPhaseHookDeletePolicy` with `HasSyncPhaseHookDeletePolicy` for clarity and improve readability * handle multiple delete policies * support argo sync wave annotation * skip applying hook resource if their manifest did not change * reformat * fix linter * add continue * fix multi-phase resource deletion * fix for trivy * fix import cycle * fix import cycle * improve logging * fix install/upgrade bug * fix install/upgrade bug --------- Co-authored-by: Sebastian Florek <[email protected]> Co-authored-by: Lukasz Zajaczkowski <[email protected]>
1 parent 36d6db3 commit 23723d9

32 files changed

+1965
-443
lines changed

.github/PULL_REQUEST_TEMPLATE.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ Test environment: https://console.your-env.onplural.sh/
2424
- [ ] Service errors are reported properly and visible in the UI.
2525
- [ ] Service updates are reflected properly in the cluster.
2626
- [ ] Service resync triggers immediately and works as expected.
27+
- [ ] Sync waves annotations are respected.
28+
- [ ] Sync phases annotations are respected. Phases are executed in the correct order.
29+
- [ ] Sync hook delete policies are respected. Resources are not recreated once they reach the desired state.
2730
- [ ] Service deletion works and cleanups resources properly.
2831
- [ ] Services can be recreated after deletion.
2932
- [ ] Service detachment works and keeps resources unaffected.

internal/controller/ingressreplica_controller_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ var _ = Describe("IngressReplica Controller", Ordered, func() {
102102
err = kClient.Get(ctx, namespacedName, ingressReplica)
103103
Expect(err).NotTo(HaveOccurred())
104104
Expect(SanitizeStatusConditions(ingressReplica.Status)).To(Equal(SanitizeStatusConditions(v1alpha1.Status{
105-
SHA: lo.ToPtr("ACBBWIKK74ACGAK5NWAXYTTIYI2GDOSXGCJ65UGOLOPFCB24PKUQ===="),
105+
SHA: lo.ToPtr("AIIbIUr_ACMBXW2BfE5owjRhulcwk-7QzlueUQdceqk"),
106106
Conditions: []metav1.Condition{
107107
{
108108
Type: v1alpha1.ReadyConditionType.String(),
@@ -132,7 +132,7 @@ var _ = Describe("IngressReplica Controller", Ordered, func() {
132132
err = kClient.Get(ctx, namespacedName, ingressReplica)
133133
Expect(err).NotTo(HaveOccurred())
134134
Expect(SanitizeStatusConditions(ingressReplica.Status)).To(Equal(SanitizeStatusConditions(v1alpha1.Status{
135-
SHA: lo.ToPtr("ACBBWIKK74ACGAK5NWAXYTTIYI2GDOSXGCJ65UGOLOPFCB24PKUQ===="),
135+
SHA: lo.ToPtr("AIIbIUr_ACMBXW2BfE5owjRhulcwk-7QzlueUQdceqk"),
136136
Conditions: []metav1.Condition{
137137
{
138138
Type: v1alpha1.ReadyConditionType.String(),

internal/utils/hash.go

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,48 @@ package utils
22

33
import (
44
"crypto/sha256"
5-
"encoding/base32"
5+
"encoding/base64"
66
"encoding/json"
7+
8+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
79
)
810

9-
func HashObject(any interface{}) (string, error) {
10-
out, err := json.Marshal(any)
11+
func HashObject(object any) (string, error) {
12+
out, err := json.Marshal(object)
1113
if err != nil {
1214
return "", err
1315
}
1416
sha := sha256.Sum256(out)
15-
return base32.StdEncoding.EncodeToString(sha[:]), nil
17+
return base64.RawURLEncoding.EncodeToString(sha[:]), nil
1618
}
1719

18-
func HashString(s string) string {
19-
sha := sha256.Sum256([]byte(s))
20-
return base32.StdEncoding.EncodeToString(sha[:])
20+
// HashResource calculates SHA for an unstructured resource.
21+
// It uses object metadata (name, namespace, labels, annotations, deletion timestamp)
22+
// and all other top-level fields except status.
23+
func HashResource(resource unstructured.Unstructured) (string, error) {
24+
resourceCopy := resource.DeepCopy()
25+
object := struct {
26+
Name string `json:"name"`
27+
Namespace string `json:"namespace"`
28+
Labels map[string]string `json:"labels"`
29+
Annotations map[string]string `json:"annotations"`
30+
DeletionTimestamp string `json:"deletionTimestamp"`
31+
Other map[string]any `json:"other"`
32+
}{
33+
Name: resourceCopy.GetName(),
34+
Namespace: resourceCopy.GetNamespace(),
35+
Labels: resourceCopy.GetLabels(),
36+
Annotations: resourceCopy.GetAnnotations(),
37+
}
38+
39+
deletionTimestamp := resourceCopy.GetDeletionTimestamp()
40+
if deletionTimestamp != nil {
41+
object.DeletionTimestamp = deletionTimestamp.String()
42+
}
43+
44+
unstructured.RemoveNestedField(resourceCopy.Object, "metadata")
45+
unstructured.RemoveNestedField(resourceCopy.Object, "status")
46+
object.Other = resourceCopy.Object
47+
48+
return HashObject(object)
2149
}

internal/utils/kubernetes.go

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
12+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
1213
"k8s.io/apimachinery/pkg/runtime"
1314
"k8s.io/cli-runtime/pkg/genericclioptions"
1415
"k8s.io/client-go/rest"
@@ -17,6 +18,7 @@ import (
1718
"k8s.io/kubectl/pkg/cmd/util"
1819
ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"
1920
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
21+
"sigs.k8s.io/yaml"
2022

2123
"github.com/pluralsh/deployment-operator/pkg/flowcontrol"
2224

@@ -50,23 +52,6 @@ func TryAddControllerRef(ctx context.Context, client ctrlruntimeclient.Client, o
5052
})
5153
}
5254

53-
func TryToUpdate(ctx context.Context, client ctrlruntimeclient.Client, object ctrlruntimeclient.Object) error {
54-
key := ctrlruntimeclient.ObjectKeyFromObject(object)
55-
56-
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
57-
original := object.DeepCopyObject().(ctrlruntimeclient.Object)
58-
if err := client.Get(ctx, key, object); err != nil {
59-
return fmt.Errorf("could not fetch current %s/%s state, got error: %w", object.GetName(), object.GetNamespace(), err)
60-
}
61-
62-
if reflect.DeepEqual(object, original) {
63-
return nil
64-
}
65-
66-
return client.Patch(ctx, original, ctrlruntimeclient.MergeFrom(object))
67-
})
68-
}
69-
7055
func TryAddOwnerRef(ctx context.Context, client ctrlruntimeclient.Client, owner ctrlruntimeclient.Object, object ctrlruntimeclient.Object, scheme *runtime.Scheme) error {
7156
key := ctrlruntimeclient.ObjectKeyFromObject(object)
7257

@@ -228,3 +213,8 @@ func UnstructuredToConditions(c []interface{}) []metav1.Condition {
228213
}
229214
return conditions
230215
}
216+
217+
func UnstructuredAsJSON(resource *unstructured.Unstructured) string {
218+
data, _ := yaml.Marshal(resource)
219+
return string(data)
220+
}

pkg/common/common.go

Lines changed: 2 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,19 @@
11
package common
22

33
import (
4-
"context"
54
"math/rand"
65
"time"
76

8-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
97
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
108
"k8s.io/apimachinery/pkg/runtime"
11-
ctrclient "sigs.k8s.io/controller-runtime/pkg/client"
129
"sigs.k8s.io/yaml"
1310

1411
"github.com/pluralsh/deployment-operator/cmd/agent/args"
15-
"github.com/pluralsh/deployment-operator/internal/utils"
16-
smcommon "github.com/pluralsh/deployment-operator/pkg/streamline/common"
1712
)
1813

1914
const (
20-
ManagedByLabel = "plural.sh/managed-by"
21-
AgentLabelValue = "agent"
22-
LastProgressTimeAnnotation = "plural.sh/last-progress-time"
15+
ManagedByLabel = "plural.sh/managed-by"
16+
AgentLabelValue = "agent"
2317
)
2418

2519
func ToUnstructured(obj runtime.Object) (*unstructured.Unstructured, error) {
@@ -53,48 +47,6 @@ func Unmarshal(s string) (map[string]interface{}, error) {
5347
return result, nil
5448
}
5549

56-
func GetLastProgressTimestamp(ctx context.Context, k8sClient ctrclient.Client, obj *unstructured.Unstructured) (progressTime metav1.Time, err error) {
57-
progressTime = metav1.Now()
58-
59-
if obj.GetAnnotations() == nil {
60-
obj.SetAnnotations(make(map[string]string))
61-
}
62-
annotations := obj.GetAnnotations()
63-
timeStr, ok := annotations[LastProgressTimeAnnotation]
64-
65-
defer func() {
66-
if !ok {
67-
err = utils.TryToUpdate(ctx, k8sClient, obj)
68-
if err != nil {
69-
return
70-
}
71-
key := ctrclient.ObjectKeyFromObject(obj)
72-
err = k8sClient.Get(ctx, key, obj)
73-
}
74-
}()
75-
76-
if !ok {
77-
annotations[LastProgressTimeAnnotation] = progressTime.Format(time.RFC3339)
78-
obj.SetAnnotations(annotations)
79-
return
80-
}
81-
parsedTime, err := time.Parse(time.RFC3339, timeStr)
82-
if err != nil {
83-
return
84-
}
85-
progressTime = metav1.Time{Time: parsedTime}
86-
87-
return
88-
}
89-
90-
func ServiceID(obj *unstructured.Unstructured) string {
91-
if annotations := obj.GetAnnotations(); annotations != nil {
92-
return annotations[smcommon.OwningInventoryKey]
93-
}
94-
95-
return ""
96-
}
97-
9850
// WithJitter adds a random jitter to the interval based on the global jitter factor.
9951
func WithJitter(interval time.Duration) time.Duration {
10052
maxJitter := int64(float64(interval) * args.JitterFactor())

pkg/common/health.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"github.com/argoproj/argo-rollouts/pkg/apis/rollouts"
1010
rolloutv1alpha1 "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
1111
flaggerv1beta1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1"
12-
"github.com/pluralsh/deployment-operator/internal/utils"
1312
appsv1 "k8s.io/api/apps/v1"
1413
autoscalingv1 "k8s.io/api/autoscaling/v1"
1514
autoscalingv2 "k8s.io/api/autoscaling/v2"
@@ -23,6 +22,8 @@ import (
2322
"k8s.io/apimachinery/pkg/runtime"
2423
"k8s.io/apimachinery/pkg/runtime/schema"
2524
"k8s.io/kubectl/pkg/util/podutils"
25+
26+
"github.com/pluralsh/deployment-operator/internal/utils"
2627
)
2728

2829
const (

pkg/controller/service/reconciler.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,11 @@ func (s *ServiceReconciler) init() (*ServiceReconciler, error) {
134134
return s.consoleClient.GetService(id)
135135
})
136136
s.manifestCache = manis.NewCache(s.manifestTTL, s.manifestTTLJitter, deployToken, s.consoleURL)
137-
s.applier = applier.NewApplier(s.dynamicClient, s.discoveryCache, s.store, applier.WithWaveDelay(s.waveDelay), applier.WithFilter(applier.FilterCache, applier.CacheFilter()))
137+
s.applier = applier.NewApplier(s.dynamicClient, s.discoveryCache, s.store,
138+
applier.WithWaveDelay(s.waveDelay),
139+
applier.WithFilter(applier.FilterCache, applier.CacheFilter()),
140+
applier.WithFilter(applier.FilterSkip, applier.SkipFilter()),
141+
)
138142

139143
return s, nil
140144
}
@@ -429,6 +433,11 @@ func (s *ServiceReconciler) Reconcile(ctx context.Context, id string) (result re
429433
return ctrl.Result{}, err
430434
}
431435

436+
if err = s.store.ExpireHookComponents(svc.ID); err != nil {
437+
logger.Error(err, "failed to expire processed hook components", "service", svc.Name)
438+
return ctrl.Result{}, err
439+
}
440+
432441
// delete service when components len == 0 (no new statuses, inventory file is empty, all deleted)
433442
if err := s.UpdateStatus(ctx, svc.ID, svc.Revision.ID, svc.Sha, lo.ToSlicePtr(components), []*console.ServiceErrorAttributes{}); err != nil {
434443
logger.Error(err, "Failed to update service status, ignoring for now")

pkg/manifests/template/helm.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,8 @@ func (h *helm) Render(svc *console.ServiceDeploymentForAgent, mapper meta.RESTMa
137137
if err != nil {
138138
return nil, err
139139
}
140-
if svc.Helm != nil && svc.Helm.IgnoreHooks != nil && !*svc.Helm.IgnoreHooks {
140+
141+
if svc.Helm != nil && !lo.FromPtr(svc.Helm.IgnoreHooks) {
141142
for _, h := range rel.Hooks {
142143
_, err = fmt.Fprintln(&buffer, "---")
143144
if err != nil {

0 commit comments

Comments
 (0)