Skip to content

Commit 0b93390

Browse files
feat: cancel function in goroutine
feat: document function
1 parent 63eb17b commit 0b93390

File tree

2 files changed

+32
-9
lines changed

2 files changed

+32
-9
lines changed

controllers/cache_handler.go

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"k8s.io/apimachinery/pkg/fields"
2020
"k8s.io/apimachinery/pkg/runtime"
2121
"k8s.io/apimachinery/pkg/types"
22+
"k8s.io/apimachinery/pkg/util/wait"
2223
ctrl "sigs.k8s.io/controller-runtime"
2324
k8scache "sigs.k8s.io/controller-runtime/pkg/cache"
2425
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -511,17 +512,14 @@ func (r *DisruptionReconciler) manageInstanceSelectorCache(instance *chaosv1beta
511512

512513
// start the cache with a cancelable context and duration, and attach it to the controller as a watch source
513514
ch := make(chan error)
514-
deletionDelay := time.Minute * 2
515515

516-
if r.ExpiredDisruptionGCDelay != nil {
517-
deletionDelay = *r.ExpiredDisruptionGCDelay * 2
518-
}
516+
cacheCtx, cacheCancelFunc := context.WithCancel(context.Background())
517+
ctxTuple := CtxTuple{cacheCtx, cacheCancelFunc, disNamespacedName}
519518

520-
cacheCtx, cacheCancelFunc := context.WithTimeout(context.Background(), instance.Spec.Duration.Duration()+deletionDelay)
519+
r.CacheContextStore[disCacheHash] = ctxTuple
521520

522521
go func() { ch <- cache.Start(cacheCtx) }()
523-
524-
r.CacheContextStore[disCacheHash] = CtxTuple{cacheCtx, cacheCancelFunc}
522+
go r.cacheDeletionSafety(ctxTuple, disCacheHash)
525523

526524
var cacheSource source.SyncingSource
527525
if instance.Spec.Level == chaostypes.DisruptionLevelNode {
@@ -568,10 +566,34 @@ func (r *DisruptionReconciler) clearExpiredCacheContexts() {
568566
for key, contextTuple := range r.CacheContextStore {
569567
if contextTuple.Ctx.Err() != nil {
570568
deletionList = append(deletionList, key)
569+
continue
570+
}
571+
572+
if err := r.Get(contextTuple.Ctx, contextTuple.DisruptionNamespacedName, &chaosv1beta1.Disruption{}); err != nil {
573+
if client.IgnoreNotFound(err) == nil {
574+
contextTuple.CancelFunc()
575+
576+
deletionList = append(deletionList, key)
577+
}
571578
}
572579
}
573580

574581
for _, key := range deletionList {
575582
delete(r.CacheContextStore, key)
576583
}
577584
}
585+
586+
// cacheDeletionSafety is thought to be run in a goroutine to assert a cache is not running without its disruption
587+
// the polling is living on the cache context, meaning if it's deleted elsewhere this function will return early.
588+
func (r *DisruptionReconciler) cacheDeletionSafety(ctxTpl CtxTuple, disHash string) {
589+
_ = wait.PollInfiniteWithContext(ctxTpl.Ctx, time.Minute, func(context.Context) (bool, error) {
590+
if err := r.Get(ctxTpl.Ctx, ctxTpl.DisruptionNamespacedName, &chaosv1beta1.Disruption{}); err != nil {
591+
if client.IgnoreNotFound(err) == nil {
592+
defer ctxTpl.CancelFunc()
593+
delete(r.CacheContextStore, disHash)
594+
return true, nil
595+
}
596+
}
597+
return false, nil
598+
})
599+
}

controllers/disruption_controller.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,9 @@ type DisruptionReconciler struct {
7171
}
7272

7373
type CtxTuple struct {
74-
Ctx context.Context
75-
CancelFunc context.CancelFunc
74+
Ctx context.Context
75+
CancelFunc context.CancelFunc
76+
DisruptionNamespacedName types.NamespacedName
7677
}
7778

7879
//+kubebuilder:rbac:groups=chaos.datadoghq.com,resources=disruptions,verbs=get;list;watch;create;update;patch;delete

0 commit comments

Comments
 (0)