Skip to content

Commit 1778d4f

Browse files
Chunk constraint and vulnerability report uploads (#442)
This should improve overall system scalability by eliminating some very long running xactions
1 parent 8384203 commit 1778d4f

File tree

2 files changed

+26
-11
lines changed

2 files changed

+26
-11
lines changed

internal/controller/constraint_controller.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ package controller
33
import (
44
"context"
55
"fmt"
6+
"math/rand"
67
"strings"
8+
"time"
79

810
"gopkg.in/yaml.v3"
911

@@ -26,6 +28,7 @@ import (
2628
const (
2729
missingLabelError = "missing label while attempting to map a constraint status resource"
2830
bundleDataAnnotation = "policy.plural.sh/constraintData"
31+
constraintChunkSize = 15
2932
)
3033

3134
type BundleData struct {
@@ -89,11 +92,15 @@ func (r *ConstraintReconciler) Reconcile(ctx context.Context, req ctrl.Request)
8992

9093
logger.Info("recording constraint", "name", pca.Name)
9194
r.Constraints[pca.Name] = *pca
92-
res, err := r.ConsoleClient.UpsertConstraints(algorithms.MapValues[string, console.PolicyConstraintAttributes](r.Constraints))
93-
if err != nil {
94-
return ctrl.Result{}, err
95+
96+
for _, chunk := range lo.Chunk(algorithms.MapValues[string, console.PolicyConstraintAttributes](r.Constraints), constraintChunkSize) {
97+
res, err := r.ConsoleClient.UpsertConstraints(chunk)
98+
if err != nil {
99+
return ctrl.Result{}, err
100+
}
101+
logger.Info("upsert constraint", "number", *res.UpsertPolicyConstraints)
102+
time.Sleep(time.Duration(rand.Int63n(int64(500 * time.Millisecond))))
95103
}
96-
logger.Info("upsert constraint", "number", *res.UpsertPolicyConstraints)
97104
return ctrl.Result{}, nil
98105
}
99106

internal/controller/vulnerabilityreports_controller.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package controller
33
import (
44
"context"
55
"fmt"
6+
"math/rand"
67
"strings"
78
"time"
89

@@ -33,6 +34,8 @@ const (
3334

3435
// vulnerabilityJitter defines the random jitter duration added to vulnerability processing to prevent synchronized operations.
3536
vulnerabilityJitter = 2 * time.Minute
37+
38+
reportChunkSize = 15
3639
)
3740

3841
// VulnerabilityReportReconciler reconciles a Trivy VulnerabilityReport resource.
@@ -188,13 +191,18 @@ func (r *VulnerabilityReportReconciler) SetupWithManager(mgr ctrl.Manager) error
188191
err := helpers.BackgroundPollUntilContextCancel(r.Ctx, reportUploadInterval, false, true, func(_ context.Context) (done bool, err error) {
189192
if !r.reports.IsEmpty() {
190193
apiReports := algorithms.Map(lo.Values(r.reports.Items()), func(s console.VulnerabilityReportAttributes) *console.VulnerabilityReportAttributes { return &s })
191-
if _, err := r.ConsoleClient.UpsertVulnerabilityReports(apiReports); err != nil {
192-
logger.Error(err, "unable to upsert vulnerability reports")
193-
} else {
194-
// Clear the reports map and allow dangling elements to be garbage collected
195-
r.reports.Clear()
196-
r.reports = cmap.New[console.VulnerabilityReportAttributes]()
197-
logger.Info("upsert vulnerability reports")
194+
195+
for _, chunk := range lo.Chunk(apiReports, reportChunkSize) {
196+
if _, err := r.ConsoleClient.UpsertVulnerabilityReports(chunk); err != nil {
197+
logger.Error(err, "unable to upsert vulnerability reports")
198+
} else {
199+
// Clear the reports map and allow dangling elements to be garbage collected
200+
r.reports.Clear()
201+
r.reports = cmap.New[console.VulnerabilityReportAttributes]()
202+
logger.Info("upsert vulnerability reports")
203+
}
204+
205+
time.Sleep(time.Duration(rand.Int63n(int64(500 * time.Millisecond))))
198206
}
199207
}
200208
return false, nil

0 commit comments

Comments
 (0)