Skip to content

Commit bcdc73b

Browse files
feat: implement drain dependency ordering in agent (#570)
* implement drain dependency ordering in agent * fix update status * improve update status * register services befor delete * linter * prevent situation when dependent svc was detached * update check if service exists * typo * improve naming * code review * code review --------- Co-authored-by: michaeljguarino <[email protected]>
1 parent 36e8c8f commit bcdc73b

File tree

4 files changed

+102
-3
lines changed

4 files changed

+102
-3
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ require (
3737
github.com/openshift/api v0.0.0-20250908150922-8634aa495a26
3838
github.com/orcaman/concurrent-map/v2 v2.0.1
3939
github.com/pkg/errors v0.9.1
40-
github.com/pluralsh/console/go/client v1.54.0
40+
github.com/pluralsh/console/go/client v1.54.1
4141
github.com/pluralsh/controller-reconcile-helper v0.1.0
4242
github.com/pluralsh/gophoenix v0.1.3-0.20231201014135-dff1b4309e34
4343
github.com/pluralsh/polly v0.3.3

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -879,8 +879,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
879879
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
880880
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 h1:GFCKgmp0tecUJ0sJuv4pzYCqS9+RGSn52M3FUwPs+uo=
881881
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8=
882-
github.com/pluralsh/console/go/client v1.54.0 h1:kRPZvynUgDnx4KnWXOrJW0P+SYplYOvrCqA+GyF+OCA=
883-
github.com/pluralsh/console/go/client v1.54.0/go.mod h1:EwMrcI23s61oLY3etCVc3NE5RYxgfoiokvU0O7KDOQg=
882+
github.com/pluralsh/console/go/client v1.54.1 h1:+qYVU2zYP+AkU3me4PPF8Oxuck8LY9z5nzHug/MKL0I=
883+
github.com/pluralsh/console/go/client v1.54.1/go.mod h1:EwMrcI23s61oLY3etCVc3NE5RYxgfoiokvU0O7KDOQg=
884884
github.com/pluralsh/controller-reconcile-helper v0.1.0 h1:BV3dYZFH5rn8ZvZjtpkACSv/GmLEtRftNQj/Y4ddHEo=
885885
github.com/pluralsh/controller-reconcile-helper v0.1.0/go.mod h1:RxAbvSB4/jkvx616krCdNQXPbpGJXW3J1L3rASxeFOA=
886886
github.com/pluralsh/gophoenix v0.1.3-0.20231201014135-dff1b4309e34 h1:ab2PN+6if/Aq3/sJM0AVdy1SYuMAnq4g20VaKhTm/Bw=
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package service
2+
3+
import (
4+
"sync"
5+
6+
console "github.com/pluralsh/console/go/client"
7+
"github.com/pluralsh/polly/containers"
8+
)
9+
10+
var (
11+
allServices = containers.NewSet[string]()
12+
servicePresent = make(map[string][]*console.ServiceDependencyFragment)
13+
cacheMu sync.RWMutex
14+
)
15+
16+
func (s *ServiceReconciler) registerDependencies(svc *console.ServiceDeploymentForAgent) {
17+
cacheMu.Lock()
18+
defer cacheMu.Unlock()
19+
20+
// Update or add the service with its latest dependencies
21+
servicePresent[svc.Name] = svc.Dependencies
22+
23+
// Sideload dependencies: ensure they exist in the map
24+
for _, dep := range svc.Dependencies {
25+
if _, exists := servicePresent[dep.Name]; !exists {
26+
depSvc, err := s.svcCache.Get(dep.ID)
27+
if err != nil && depSvc != nil && depSvc.DeletedAt == nil {
28+
servicePresent[dep.Name] = depSvc.Dependencies
29+
}
30+
}
31+
}
32+
}
33+
34+
// getActiveDependents returns a list of service names that depend on the given service
35+
func (s *ServiceReconciler) getActiveDependents(svcName string) []string {
36+
cacheMu.RLock()
37+
defer cacheMu.RUnlock()
38+
39+
var dependents []string
40+
for name, deps := range servicePresent {
41+
if name == svcName {
42+
continue
43+
}
44+
for _, d := range deps {
45+
if d.Name == svcName {
46+
// check if the service is still in the cache
47+
// it could have been detached
48+
if allServices.Has(name) {
49+
dependents = append(dependents, name)
50+
break
51+
}
52+
}
53+
}
54+
}
55+
56+
return dependents
57+
}
58+
59+
// unregisterDependencies removes a service from the map
60+
func unregisterDependencies(svc *console.ServiceDeploymentForAgent) {
61+
cacheMu.Lock()
62+
defer cacheMu.Unlock()
63+
delete(servicePresent, svc.Name)
64+
}
65+
66+
func updateAllServices(newSet containers.Set[string]) {
67+
cacheMu.Lock()
68+
defer cacheMu.Unlock()
69+
// Add all newly seen services
70+
allServices = allServices.Union(newSet)
71+
// Remove any that are no longer present
72+
allServices = allServices.Intersect(newSet)
73+
}

pkg/controller/service/reconciler.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"strings"
88
"time"
99

10+
"github.com/pluralsh/polly/containers"
1011
"golang.org/x/time/rate"
1112
"k8s.io/client-go/dynamic"
1213
"k8s.io/klog/v2"
@@ -354,6 +355,8 @@ func (s *ServiceReconciler) Poll(ctx context.Context) error {
354355
return nil
355356
}
356357

358+
// Build a new set of services seen in this poll
359+
currentServices := containers.NewSet[string]()
357360
pager := s.ListServices(ctx)
358361

359362
for pager.HasNext() {
@@ -371,10 +374,13 @@ func (s *ServiceReconciler) Poll(ctx context.Context) error {
371374

372375
logger.V(4).Info("enqueueing update for", "service", svc.Node.ID)
373376
s.svcCache.Add(svc.Node.ID, svc.Node)
377+
currentServices.Add(svc.Node.Name)
374378
s.svcQueue.AddAfter(svc.Node.ID, utils.Jitter(15*time.Second))
375379
}
376380
}
377381

382+
updateAllServices(currentServices)
383+
378384
return nil
379385
}
380386

@@ -422,8 +428,24 @@ func (s *ServiceReconciler) Reconcile(ctx context.Context, id string) (result re
422428
return
423429
}
424430

431+
s.registerDependencies(svc)
432+
425433
if svc.DeletedAt != nil {
426434
logger.V(2).Info("deleting service", "name", svc.Name, "namespace", svc.Namespace)
435+
activeDependents := s.getActiveDependents(svc.Name)
436+
if len(activeDependents) > 0 {
437+
if err := s.UpdateErrors(id, &console.ServiceErrorAttributes{
438+
Message: "service is being deleted, but there are active dependents: " + strings.Join(activeDependents, ", "),
439+
Warning: lo.ToPtr(true),
440+
Source: "delete",
441+
}); err != nil {
442+
logger.Error(err, "failed to update errors")
443+
return ctrl.Result{}, err
444+
}
445+
done = true
446+
return ctrl.Result{}, nil
447+
}
448+
427449
components, err := s.applier.Destroy(ctx, id)
428450
metrics.Record().ServiceDeletion(id)
429451
s.svcCache.Expire(id)
@@ -438,6 +460,10 @@ func (s *ServiceReconciler) Reconcile(ctx context.Context, id string) (result re
438460
return ctrl.Result{}, err
439461
}
440462

463+
if len(components) == 0 {
464+
unregisterDependencies(svc)
465+
}
466+
441467
// delete service when components len == 0 (no new statuses, inventory file is empty, all deleted)
442468
if err := s.UpdateStatus(ctx, svc.ID, svc.Revision.ID, svc.Sha, svc.Status, lo.ToSlicePtr(components), []*console.ServiceErrorAttributes{}, nil); err != nil {
443469
logger.Error(err, "Failed to update service status, ignoring for now")

0 commit comments

Comments
 (0)