@@ -19,6 +19,7 @@ import (
1919 "k8s.io/apimachinery/pkg/fields"
2020 "k8s.io/apimachinery/pkg/runtime"
2121 "k8s.io/apimachinery/pkg/types"
22+ "k8s.io/apimachinery/pkg/util/wait"
2223 ctrl "sigs.k8s.io/controller-runtime"
2324 k8scache "sigs.k8s.io/controller-runtime/pkg/cache"
2425 "sigs.k8s.io/controller-runtime/pkg/client"
@@ -511,11 +512,14 @@ func (r *DisruptionReconciler) manageInstanceSelectorCache(instance *chaosv1beta
511512
512513 // start the cache with a cancelable context and duration, and attach it to the controller as a watch source
513514 ch := make (chan error )
514- cacheCtx , cacheCancelFunc := context .WithTimeout (context .Background (), instance .Spec .Duration .Duration ()+ * r .ExpiredDisruptionGCDelay * 2 )
515515
516- go func () { ch <- cache .Start (cacheCtx ) }()
516+ cacheCtx , cacheCancelFunc := context .WithCancel (context .Background ())
517+ ctxTuple := CtxTuple {cacheCtx , cacheCancelFunc , disNamespacedName }
518+
519+ r .CacheContextStore [disCacheHash ] = ctxTuple
517520
518- r .CacheContextStore [disCacheHash ] = CtxTuple {cacheCtx , cacheCancelFunc }
521+ go func () { ch <- cache .Start (cacheCtx ) }()
522+ go r .cacheDeletionSafety (ctxTuple , disCacheHash )
519523
520524 var cacheSource source.SyncingSource
521525 if instance .Spec .Level == chaostypes .DisruptionLevelNode {
@@ -562,10 +566,34 @@ func (r *DisruptionReconciler) clearExpiredCacheContexts() {
562566 for key , contextTuple := range r .CacheContextStore {
563567 if contextTuple .Ctx .Err () != nil {
564568 deletionList = append (deletionList , key )
569+ continue
570+ }
571+
572+ if err := r .Get (contextTuple .Ctx , contextTuple .DisruptionNamespacedName , & chaosv1beta1.Disruption {}); err != nil {
573+ if client .IgnoreNotFound (err ) == nil {
574+ contextTuple .CancelFunc ()
575+
576+ deletionList = append (deletionList , key )
577+ }
565578 }
566579 }
567580
568581 for _ , key := range deletionList {
569582 delete (r .CacheContextStore , key )
570583 }
571584}
585+
586+ // cacheDeletionSafety is thought to be run in a goroutine to assert a cache is not running without its disruption
587+ // the polling is living on the cache context, meaning if it's deleted elsewhere this function will return early.
588+ func (r * DisruptionReconciler ) cacheDeletionSafety (ctxTpl CtxTuple , disHash string ) {
589+ _ = wait .PollInfiniteWithContext (ctxTpl .Ctx , time .Minute , func (context.Context ) (bool , error ) {
590+ if err := r .Get (ctxTpl .Ctx , ctxTpl .DisruptionNamespacedName , & chaosv1beta1.Disruption {}); err != nil {
591+ if client .IgnoreNotFound (err ) == nil {
592+ defer ctxTpl .CancelFunc ()
593+ delete (r .CacheContextStore , disHash )
594+ return true , nil
595+ }
596+ }
597+ return false , nil
598+ })
599+ }
0 commit comments