Skip to content

Commit ec85c2b

Browse files
zreigzmaciaszczykmfloreks
authored
feat: Replace cli-utils (#516)
* init global watcher * add watch interface * example * add cleanup * add resources * add store interface * add map store * add synchronizers with supervisor and store * add status * add initial applier * add to delete * task runner * feat(applier): enhance service reconciliation with dynamic client and store integration * fixes * fixes * add db store storage and options * update store interface * add database store * feat(store): add SaveComponentAttributes method and refactor component saving logic * add filters * add SHA mechanism * update store interface * add interface docs * refactor store package * fix global store * fix filters * add metrics to cache filter * fix global store * add missing check to cache filter * add initial test for cache filter * refactor filters * refactor tests * feat(wave-processor): enhance logging and optimize concurrent applies * feat(applier): add resource filtering and improve wave processing with enhanced logging and options * refactor(applier): remove debug logging for resource filtering * refactor tests * add todo * feat: enhance resource application logic and add TODOs for future improvements * feat(applier): add dry run support base for apply operations * refactor * refactor component state * minor perf improvement * refactor * refactor * refactor: enhance logging and improve resource application handling * refactor(discovery): streamline discovery cache implementation and improve service mesh handling * feat(discovery): add discovery cache management and refresh interval configuration * refactor(logging): enhance log levels and messages across services, remove unused code, streamline discovery cache usage, and improve synchronizers with supervisor to properly wait for initial cache sync * fix: merge conflicts * fix(controller): correct variable assignment in poller function * refactor: add better support for discovery cache events, fix service mesh logic on deletion, add retry logic to cache gvr mapping * refactor: add better support for discovery cache events, fix service mesh logic on deletion, add retry logic to cache gvr mapping, fix applier error propagation to console * fix after merge * fix after merge * prevent destroy CRD * delete CRD from cache * update onDelete wave * refactor: standardize lifecycle annotations, improve deletion logic and add dry run support * Add proper cache expiration for database store * fix(db_queries): update uid uniqueness constraint and modify conflict resolution * fix(db_queries): update WHERE clause to use datetime function for updated_at * fix after rebase * Handle cache miss on status gathering and get from API w/ cache fill * Add support for delay between applying resource waves * refactor: improve logging and synchronization in various components * feat(store): add configurable storage options and cleaner intervals * feat(args): introduce jitter factor and supervisor options for enhanced control * refactor(args): update default store storage to string and improve logging * docs: improve function documentation * fix service reconciler test * fix(tests): update and skip some failing tests * refactor: make fix * test: skip failing CRD and MetricsAggregate tests, update cache initialization * fix TestCacheFilter * feat: enhance resource management with service ID tracking and logging improvements * feat(wave): send component attributes to channel when resource is already managed * fix(wave): set resource UID when already managed by another service * fix unit tests * cleanup * fix some linters * fix TODO refactor * fix(wave, db_store): force apply resources and handle pod deletion errors * refactor service * remove unused files * remove old applier * feat(synchronizer): add initial resync implementation * chore: update golangci-lint version and enable experimental greenteagc * calculate component sha on save and save skipped components in store * add delete components func to store * delete components on synchronizer stop * cache gvr to gvk mapping * finish kindfor implementation * fix lint * feat(metrics): add synchronization event metrics and update applier options * update golangci-lint * feat: add supervisor option to ServiceReconciler and update related components * fix(synchronizer): handle error events in watch synchronization * chore: remove unused code * chore: update Go version to 1.25 and bump alpine version in Dockerfile * refactor(supervisor): improve cache sync timeout error message and remove os.Exit * update dependencies * fix linter * fix nolint * refactor(applier): enhance filter engine with named filters and dry run support * remove cli-utils * fix linter * add unit tests * add unit tests * refactor(crd_controller): remove unused label check in discovery cache update * add unit tests * add unit tests * add unit tests * update unit tests * update unit tests * refactor: update applier initialization and improve cache filter function * fix lua health status * test(filter-engine): update filter addition to include names for clarity * fix conditions conversion * refactor(store): update component deletion to use StoreKey instead of UID * test: update filter engine tests and add store key creation options * refactor(e2e): comment out cache tests and update dependencies * fix: lint * fix bug with crd service id * refactor: change svcQueue to svcQueueGetter in socketPublisher * refresh rest mapper * fix(wave-processor): allow taking over resource when service no longer exists * detect new crds outside services * fix cyclic dependency * refactor: update reconciler functions to use RESTMapper and Kubernetes client interface * refactor(reconciler): streamline ServiceReconciler initialization * feat(discovery): add MaybeResetRESTMapper method to reset RESTMapper for CRDs * remove empty * bump API go client * fix(crd_controller): conditionally update DiscoveryCache based on Served status * fix(wave): add warning flag to service error attributes * fix: unit tests * refactor(discovery-cache): simplify cache refresh logic * fix: lint * fix: fix pinger store integration and improve runtime services logging * fix(ping): update New function signature to include additional parameter * bump console go client * update pr template * update pr template * wait for CRD deployment * fix linter * fix wait for crd * check CRD object list * fix wait for crd * wait fo CRD * create client * remove wait for CRD wave * change defaultApplierWaveDelay to 200ms * chore: add wave processing options for max concurrent applies and dequeue delay and refactor code * try to fix the namespace error * revert * refactor(wave, supervisor): improve logging and streamline resource management * fix unit test * linter * refactor(supervisor): improve supervisor management and synchronization logic * refactor(supervisor): improve channel handling and add buffer for resource registration * fix helm race condition * fix race condition * feat(kubernetes): enhance watch error handling and improve logging verbosity * fix: deadlock between supervisor, discovery cache and synchronizer * linter * fix null pointer * refactor: improve concurrency handling and cleanup in Kubernetes controllers * fix: remove unused time import * fix(discovery): improve error handling and logging in discovery manager * improve WaitForCacheSync counter * feat(supervisor): implement batch saving of components and improve cache sync error logging * linter * refactor(logging): improve log messages and remove unused ticker * add TestComponentCache_SaveComponents * fix(docker): update golang base image to 1.25-alpine3.22 * fix: lint * bump fips image v1.25.1 * disable harness fips arm * feat(metrics): add ResourceCacheWatchRemove method for metrics tracking * remove unused file * revert platform: linux/arm64 * remove unused file * update go fips image * feat(metrics): add ResourceCacheWatchRemove method for metrics tracking * refactor(controller): simplify error handling in reconciler logic * fix: avoid using multiple connections for db store SaveComponents * fix(synchronizer): improve locking mechanism in Stop method * fix(discoverycache): update logging level and make notifications asynchronous to avoid situations where callback code calls back discovery cache and results in deadlock * fix(helm): use both gv and gvk to build APIVersions set for helm templating * fix dependents deletion * add `metrics.k8s.io/v1beta1` to non-watchable resources --------- Co-authored-by: Marcin Maciaszczyk <[email protected]> Co-authored-by: Sebastian Florek <[email protected]>
1 parent 3164114 commit ec85c2b

File tree

156 files changed

+8463
-11381
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

156 files changed

+8463
-11381
lines changed

.github/PULL_REQUEST_TEMPLATE.md

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,20 @@ Test environment: https://console.your-env.onplural.sh/
1717

1818
- [ ] I have added a meaningful title and summary to convey the impact of this PR to a user.
1919
- [ ] I have deployed the agent to a test environment and verified that it works as expected.
20-
- [ ] Agent started successfully.
21-
- [ ] Logs are clean and do not contain errors.
22-
- [ ] Component trees are working as expected.
20+
- [ ] Agent starts successfully.
21+
- [ ] Service creation works without any issues when using raw manifests and Helm templates.
22+
- [ ] Service creation works when resources contain both CRD and CRD instances.
23+
- [ ] Service templating works correctly.
24+
- [ ] Service errors are reported properly and visible in the UI.
25+
- [ ] Service updates are reflected properly in the cluster.
26+
- [ ] Service resync triggers immediately and works as expected.
27+
- [ ] Service deletion works and cleanups resources properly.
28+
- [ ] Services can be recreated after deletion.
29+
- [ ] Service detachment works and keeps resources unaffected.
30+
- [ ] Services can be recreated after detachment.
31+
- [ ] Service component trees are working as expected.
32+
- [ ] Cluster health statuses are being updated.
33+
- [ ] Agent logs do not contain any errors.
2334
- [ ] I have added tests to cover my changes.
2435
- [ ] If required, I have updated the Plural documentation accordingly.
2536

.github/workflows/ci.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ jobs:
4040
with:
4141
go-version-file: go.mod
4242
check-latest: true
43-
- uses: golangci/golangci-lint-action@v7.0.0
43+
- uses: golangci/golangci-lint-action@v8.0.0
4444
with:
45-
version: v2.1.2
45+
version: v2.4.0
4646
build-image:
4747
name: Build image
4848
needs: [build, test]

.github/workflows/e2e.yaml

Lines changed: 0 additions & 22 deletions
This file was deleted.

.github/workflows/publish-fips.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,6 @@ jobs:
6060
cache-from: type=gha
6161
cache-to: type=gha,mode=max
6262
build-args: |
63-
GO_FIPS_IMAGE_REPO=ghcr.io/pluralsh/go-fips
64-
GO_FIPS_IMAGE_TAG=1.23.2
63+
GO_FIPS_IMAGE_REPO=ghcr.io/pluralsh/fips/go-fips
64+
GO_FIPS_IMAGE_TAG=1.25.1
6565

.github/workflows/publish-harness-fips.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ jobs:
7373
cache-to: type=gha,mode=max
7474
build-args: |
7575
VERSION=${{ steps.meta.outputs.version }}
76-
GO_FIPS_IMAGE_REPO=ghcr.io/pluralsh/go-fips
77-
GO_FIPS_IMAGE_TAG=1.23.2
76+
GO_FIPS_IMAGE_REPO=ghcr.io/pluralsh/fips/go-fips
77+
GO_FIPS_IMAGE_TAG=1.25.1
7878
- name: Export digest
7979
run: |
8080
mkdir -p ${{ runner.temp }}/digests
@@ -205,5 +205,5 @@ jobs:
205205
PYTHON_VERSION=${{ matrix.versions.python }}
206206
HARNESS_BASE_IMAGE_REPO=ghcr.io/pluralsh/stackrun-harness-base
207207
HARNESS_BASE_IMAGE_TAG=${{ needs.publish-base-image.outputs.version }}
208-
GO_FIPS_IMAGE_REPO=ghcr.io/pluralsh/go-fips
209-
GO_FIPS_IMAGE_TAG=1.23.2
208+
GO_FIPS_IMAGE_REPO=ghcr.io/pluralsh/fips/go-fips
209+
GO_FIPS_IMAGE_TAG=1.25.1

Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM golang:1.24-alpine3.21 AS builder
1+
FROM golang:1.25-alpine3.22 AS builder
22

33
ARG HELM_VERSION=v3.17.3
44
ARG TARGETARCH
@@ -21,7 +21,7 @@ COPY /api api/
2121
COPY /internal internal/
2222

2323
# Build
24-
RUN CGO_ENABLED=0 GOOS=linux GOARCH=${TARGETARCH} GO111MODULE=on go build -a -o deployment-agent cmd/agent/*.go
24+
RUN CGO_ENABLED=0 GOOS=linux GOARCH=${TARGETARCH} GO111MODULE=on GOEXPERIMENT=greenteagc go build -a -o deployment-agent cmd/agent/*.go
2525

2626
# Get helm binary for kustomize helm inflate to work
2727
RUN curl -L https://get.helm.sh/helm-${HELM_VERSION}-linux-${TARGETARCH}.tar.gz | tar xz && \

cmd/agent/args/args.go

Lines changed: 210 additions & 26 deletions
Large diffs are not rendered by default.

cmd/agent/console.go

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,21 @@ import (
44
"os"
55
"time"
66

7+
"k8s.io/apimachinery/pkg/api/meta"
78
"k8s.io/apimachinery/pkg/runtime"
9+
"k8s.io/client-go/dynamic"
10+
"k8s.io/client-go/kubernetes"
811

912
"github.com/pluralsh/deployment-operator/cmd/agent/args"
1013
"github.com/pluralsh/deployment-operator/internal/utils"
14+
discoverycache "github.com/pluralsh/deployment-operator/pkg/cache/discovery"
1115
"github.com/pluralsh/deployment-operator/pkg/client"
1216
consolectrl "github.com/pluralsh/deployment-operator/pkg/controller"
1317
"github.com/pluralsh/deployment-operator/pkg/controller/stacks"
1418
v1 "github.com/pluralsh/deployment-operator/pkg/controller/v1"
19+
"github.com/pluralsh/deployment-operator/pkg/streamline"
20+
"github.com/pluralsh/deployment-operator/pkg/streamline/store"
1521

16-
"k8s.io/client-go/rest"
1722
ctrclient "sigs.k8s.io/controller-runtime/pkg/client"
1823

1924
"github.com/pluralsh/deployment-operator/pkg/controller/namespaces"
@@ -47,18 +52,43 @@ const (
4752

4853
func registerConsoleReconcilersOrDie(
4954
mgr *consolectrl.Manager,
50-
config *rest.Config,
55+
mapper meta.RESTMapper,
56+
clientSet kubernetes.Interface,
5157
k8sClient ctrclient.Client,
58+
dynamicClient dynamic.Interface,
59+
store store.Store,
5260
scheme *runtime.Scheme,
5361
consoleClient client.Client,
62+
supervisor *streamline.Supervisor,
63+
discoveryCache discoverycache.Cache,
5464
) {
5565
mgr.AddReconcilerOrDie(service.Identifier, func() (v1.Reconciler, error) {
56-
r, err := service.NewServiceReconciler(consoleClient, k8sClient, config, args.ControllerCacheTTL(), args.ManifestCacheTTL(), args.ManifestCacheJitter(), args.WorkqueueBaseDelay(), args.WorkqueueMaxDelay(), args.PollInterval(), args.RestoreNamespace(), args.ConsoleUrl(), args.WorkqueueQPS(), args.WorkqueueBurst())
66+
r, err := service.NewServiceReconciler(consoleClient,
67+
k8sClient,
68+
mapper,
69+
clientSet,
70+
dynamicClient,
71+
discoveryCache,
72+
store,
73+
service.WithRefresh(args.ControllerCacheTTL()),
74+
service.WithManifestTTL(args.ManifestCacheTTL()),
75+
service.WithManifestTTLJitter(args.ManifestCacheJitter()),
76+
service.WithWorkqueueBaseDelay(args.WorkqueueBaseDelay()),
77+
service.WithWorkqueueMaxDelay(args.WorkqueueMaxDelay()),
78+
service.WithWorkqueueQPS(args.WorkqueueQPS()),
79+
service.WithWorkqueueBurst(args.WorkqueueBurst()),
80+
service.WithRestoreNamespace(args.RestoreNamespace()),
81+
service.WithConsoleURL(args.ConsoleUrl()),
82+
service.WithPollInterval(args.PollInterval()),
83+
service.WithWaveDelay(args.ApplierWaveDelay()),
84+
service.WithWaveDeQueueDelay(args.WaveDeQueueDelay()),
85+
service.WithWaveMaxConcurrentApplies(args.WaveMaxConcurrentApplies()),
86+
service.WithSupervisor(supervisor))
5787
return r, err
5888
})
5989

6090
mgr.AddReconcilerOrDie(pipelinegates.Identifier, func() (v1.Reconciler, error) {
61-
r, err := pipelinegates.NewGateReconciler(consoleClient, k8sClient, config, args.PipelineGatesInterval())
91+
r, err := pipelinegates.NewGateReconciler(consoleClient, k8sClient, args.PipelineGatesInterval())
6292
return r, err
6393
})
6494

cmd/agent/kubernetes.go

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"os"
88
"path/filepath"
99
"strings"
10-
"time"
1110

1211
trivy "github.com/aquasecurity/trivy-operator/pkg/apis/aquasecurity/v1alpha1"
1312
"github.com/argoproj/argo-rollouts/pkg/apis/rollouts"
@@ -16,12 +15,15 @@ import (
1615
cmap "github.com/orcaman/concurrent-map/v2"
1716
"github.com/prometheus/client_golang/prometheus/promhttp"
1817
velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
18+
apierrors "k8s.io/apimachinery/pkg/api/errors"
19+
"k8s.io/apimachinery/pkg/api/meta"
1920
"k8s.io/apimachinery/pkg/runtime/schema"
20-
"k8s.io/client-go/discovery"
2121
"k8s.io/client-go/dynamic"
2222
"k8s.io/client-go/kubernetes"
2323
"k8s.io/client-go/rest"
24+
clientgocache "k8s.io/client-go/tools/cache"
2425
ctrl "sigs.k8s.io/controller-runtime"
26+
crcache "sigs.k8s.io/controller-runtime/pkg/cache"
2527
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
2628
"sigs.k8s.io/controller-runtime/pkg/healthz"
2729
"sigs.k8s.io/controller-runtime/pkg/manager"
@@ -30,13 +32,12 @@ import (
3032
"github.com/pluralsh/deployment-operator/cmd/agent/args"
3133
"github.com/pluralsh/deployment-operator/internal/controller"
3234
"github.com/pluralsh/deployment-operator/pkg/cache"
35+
discoverycache "github.com/pluralsh/deployment-operator/pkg/cache/discovery"
3336
consoleclient "github.com/pluralsh/deployment-operator/pkg/client"
3437
consolectrl "github.com/pluralsh/deployment-operator/pkg/controller"
3538
"github.com/pluralsh/deployment-operator/pkg/controller/service"
3639
)
3740

38-
const serviceIDCacheExpiry = 12 * time.Hour
39-
4041
func emptyDiskHealthCheck(_ *http.Request) error {
4142
testFile := filepath.Join("/tmp", "healthcheck.tmp")
4243
data := []byte("ok")
@@ -48,11 +49,22 @@ func emptyDiskHealthCheck(_ *http.Request) error {
4849
}
4950

5051
func initKubeManagerOrDie(config *rest.Config) manager.Manager {
52+
watchErrHandler := func(ctx context.Context, r *clientgocache.Reflector, err error) {
53+
switch {
54+
case apierrors.IsNotFound(err), apierrors.IsGone(err), meta.IsNoMatchError(err):
55+
setupLog.V(2).Error(err, "ignoring watch error for removed resource")
56+
return
57+
default:
58+
clientgocache.DefaultWatchErrorHandler(ctx, r, err)
59+
}
60+
}
61+
5162
mgr, err := ctrl.NewManager(config, ctrl.Options{
5263
NewClient: ctrlclient.New, // client reads directly from the API server
5364
Logger: setupLog,
5465
Scheme: scheme,
5566
LeaderElection: args.EnableLeaderElection(),
67+
Cache: crcache.Options{DefaultWatchErrorHandler: watchErrHandler},
5668
LeaderElectionID: "dep12loy45.plural.sh",
5769
HealthProbeBindAddress: args.ProbeAddr(),
5870
Metrics: server.Options{
@@ -83,7 +95,10 @@ func initKubeManagerOrDie(config *rest.Config) manager.Manager {
8395
return mgr
8496
}
8597

86-
func initKubeClientsOrDie(config *rest.Config) (rolloutsClient *roclientset.Clientset, dynamicClient *dynamic.DynamicClient, kubeClient *kubernetes.Clientset) {
98+
func initKubeClientsOrDie(config *rest.Config) (rolloutsClient *roclientset.Clientset,
99+
dynamicClient *dynamic.DynamicClient,
100+
kubeClient *kubernetes.Clientset,
101+
) {
87102
rolloutsClient, err := roclientset.NewForConfig(config)
88103
if err != nil {
89104
setupLog.Error(err, "unable to create rollouts client")
@@ -111,7 +126,7 @@ func registerKubeReconcilersOrDie(
111126
consoleManager *consolectrl.Manager,
112127
config *rest.Config,
113128
extConsoleClient consoleclient.Client,
114-
discoveryClient discovery.DiscoveryInterface,
129+
discoveryCache discoverycache.Cache,
115130
enableKubecostProxy bool,
116131
) {
117132
rolloutsClient, dynamicClient, kubeClient := initKubeClientsOrDie(config)
@@ -184,6 +199,7 @@ func registerKubeReconcilersOrDie(
184199
Scheme: manager.GetScheme(),
185200
ReconcilerGroups: reconcileGroups,
186201
Mgr: manager,
202+
DiscoveryCache: discoveryCache,
187203
}).SetupWithManager(manager); err != nil {
188204
setupLog.Error(err, "unable to create controller", "controller", "CRDRegisterController")
189205
}
@@ -227,15 +243,7 @@ func registerKubeReconcilersOrDie(
227243
setupLog.Error(err, "unable to create controller", "controller", "UpgradeInsights")
228244
}
229245

230-
statusController, err := controller.NewStatusReconciler(manager.GetClient())
231-
if err != nil {
232-
setupLog.Error(err, "unable to create controller", "controller", "StatusController")
233-
}
234-
if err := statusController.SetupWithManager(manager); err != nil {
235-
setupLog.Error(err, "unable to setup controller", "controller", "StatusController")
236-
}
237-
238-
if err = (&controller.PipelineGateReconciler{
246+
if err := (&controller.PipelineGateReconciler{
239247
Client: manager.GetClient(),
240248
ConsoleClient: consoleclient.New(args.ConsoleUrl(), args.DeployToken()),
241249
Scheme: manager.GetScheme(),
@@ -246,9 +254,9 @@ func registerKubeReconcilersOrDie(
246254
}
247255

248256
if err := (&controller.MetricsAggregateReconciler{
249-
Client: manager.GetClient(),
250-
Scheme: manager.GetScheme(),
251-
DiscoveryClient: discoveryClient,
257+
Client: manager.GetClient(),
258+
Scheme: manager.GetScheme(),
259+
DiscoveryCache: discoveryCache,
252260
}).SetupWithManager(ctx, manager); err != nil {
253261
setupLog.Error(err, "unable to create controller", "controller", "MetricsAggregate")
254262
}
@@ -260,7 +268,7 @@ func registerKubeReconcilersOrDie(
260268
ExtConsoleClient: extConsoleClient,
261269
Tasks: cmap.New[context.CancelFunc](),
262270
Proxy: enableKubecostProxy,
263-
ServiceIDCache: controller.NewServiceIDCache(serviceIDCacheExpiry),
271+
ServiceIDCache: controller.NewServiceIDCache(args.KubeCostExtractorCacheTTL()),
264272
}).SetupWithManager(manager); err != nil {
265273
setupLog.Error(err, "unable to create controller", "controller", "MetricsAggregate")
266274
}

0 commit comments

Comments
 (0)