@@ -133,17 +133,21 @@ impl WatchManager {
133133 resource_type : & str ,
134134 namespaces : Option < Vec < String > > ,
135135 ) -> Result < ( ) , anyhow:: Error > {
136- // Generate watch key from namespaces (sorted for consistency)
136+ // Get current cluster context to include in watch key for proper isolation
137+ let cluster_context = K8sClient :: get_current_context ( ) . await
138+ . unwrap_or_else ( |_| "unknown" . to_string ( ) ) ;
139+
140+ // Generate watch key from cluster context and namespaces (sorted for consistency)
137141 let watch_key = if let Some ( ref ns_list) = namespaces {
138142 if ns_list. is_empty ( ) {
139- format ! ( "{resource_type}: all" )
143+ format ! ( "{}:{}: all" , cluster_context , resource_type )
140144 } else {
141145 let mut sorted_ns = ns_list. clone ( ) ;
142146 sorted_ns. sort ( ) ;
143- format ! ( "{}:{}" , resource_type, sorted_ns. join( "," ) )
147+ format ! ( "{}:{}:{}" , cluster_context , resource_type, sorted_ns. join( "," ) )
144148 }
145149 } else {
146- format ! ( "{resource_type}: all" )
150+ format ! ( "{}:{}: all" , cluster_context , resource_type )
147151 } ;
148152
149153 let mut watches = self . active_watches . lock ( ) . await ;
@@ -299,17 +303,21 @@ impl WatchManager {
299303 }
300304
301305 pub async fn stop_watch ( & self , resource_type : & str , namespaces : Option < Vec < String > > ) -> Result < ( ) , anyhow:: Error > {
306+ // Get current cluster context to generate the same watch key as in start_watch
307+ let cluster_context = K8sClient :: get_current_context ( ) . await
308+ . unwrap_or_else ( |_| "unknown" . to_string ( ) ) ;
309+
302310 // Generate the same watch key as in start_watch
303311 let watch_key = if let Some ( ref ns_list) = namespaces {
304312 if ns_list. is_empty ( ) {
305- format ! ( "{resource_type}: all" )
313+ format ! ( "{}:{}: all" , cluster_context , resource_type )
306314 } else {
307315 let mut sorted_ns = ns_list. clone ( ) ;
308316 sorted_ns. sort ( ) ;
309- format ! ( "{}:{}" , resource_type, sorted_ns. join( "," ) )
317+ format ! ( "{}:{}:{}" , cluster_context , resource_type, sorted_ns. join( "," ) )
310318 }
311319 } else {
312- format ! ( "{resource_type}: all" )
320+ format ! ( "{}:{}: all" , cluster_context , resource_type )
313321 } ;
314322
315323 let mut watches = self . active_watches . lock ( ) . await ;
@@ -328,6 +336,31 @@ impl WatchManager {
328336 }
329337 Ok ( ( ) )
330338 }
339+
340+ /// Stop all watches for the current cluster context only
341+ /// This is useful when switching cluster contexts to avoid cross-cluster contamination
342+ pub async fn stop_cluster_watches ( & self ) -> Result < ( ) , anyhow:: Error > {
343+ let cluster_context = K8sClient :: get_current_context ( ) . await
344+ . unwrap_or_else ( |_| "unknown" . to_string ( ) ) ;
345+
346+ let mut watches = self . active_watches . lock ( ) . await ;
347+ let prefix = format ! ( "{}:" , cluster_context) ;
348+
349+ // Collect keys to remove (to avoid borrowing issues)
350+ let keys_to_remove: Vec < String > = watches. keys ( )
351+ . filter ( |key| key. starts_with ( & prefix) )
352+ . cloned ( )
353+ . collect ( ) ;
354+
355+ // Stop and remove watches for this cluster
356+ for key in keys_to_remove {
357+ if let Some ( handle) = watches. remove ( & key) {
358+ handle. abort ( ) ;
359+ }
360+ }
361+
362+ Ok ( ( ) )
363+ }
331364}
332365
333366async fn watch_resource < K > ( api : Api < K > , app_handle : AppHandle , resource_type : String )
@@ -751,11 +784,74 @@ mod tests {
751784 // Test the watch key format used internally
752785 let resource_type = "pods" ;
753786 let namespace = Some ( "default" . to_string ( ) ) ;
754- let watch_key = format ! ( "{}:{}" , resource_type, namespace. as_deref( ) . unwrap_or( "all" ) ) ;
755- assert_eq ! ( watch_key, "pods:default" ) ;
787+ let cluster_context = "test-cluster" ;
788+ let watch_key = format ! ( "{}:{}:{}" , cluster_context, resource_type, namespace. as_deref( ) . unwrap_or( "all" ) ) ;
789+ assert_eq ! ( watch_key, "test-cluster:pods:default" ) ;
790+
791+ let cluster_wide_key = format ! ( "{}:{}:{}" , cluster_context, "nodes" , None :: <String >. as_deref( ) . unwrap_or( "all" ) ) ;
792+ assert_eq ! ( cluster_wide_key, "test-cluster:nodes:all" ) ;
793+ }
794+
795+ #[ tokio:: test]
796+ async fn test_cluster_aware_watch_keys ( ) {
797+ // Test that different clusters generate different watch keys for the same resource
798+ let resource_type = "nodes" ;
799+ let _namespaces: Option < Vec < String > > = None ;
800+
801+ // Simulate different cluster contexts
802+ let cluster_a = "cluster-a" ;
803+ let cluster_b = "cluster-b" ;
804+
805+ let key_a = format ! ( "{}:{}:all" , cluster_a, resource_type) ;
806+ let key_b = format ! ( "{}:{}:all" , cluster_b, resource_type) ;
807+
808+ assert_eq ! ( key_a, "cluster-a:nodes:all" ) ;
809+ assert_eq ! ( key_b, "cluster-b:nodes:all" ) ;
810+ assert_ne ! ( key_a, key_b, "Watch keys for different clusters should be different" ) ;
811+ }
812+
813+ #[ tokio:: test]
814+ async fn test_cluster_aware_namespaced_watch_keys ( ) {
815+ // Test that different clusters generate different watch keys for namespaced resources
816+ let resource_type = "pods" ;
817+ let namespace = "default" ;
818+
819+ let cluster_a = "cluster-a" ;
820+ let cluster_b = "cluster-b" ;
821+
822+ let key_a = format ! ( "{}:{}:{}" , cluster_a, resource_type, namespace) ;
823+ let key_b = format ! ( "{}:{}:{}" , cluster_b, resource_type, namespace) ;
824+
825+ assert_eq ! ( key_a, "cluster-a:pods:default" ) ;
826+ assert_eq ! ( key_b, "cluster-b:pods:default" ) ;
827+ assert_ne ! ( key_a, key_b, "Watch keys for different clusters should be different even for same namespace" ) ;
828+ }
829+
830+ #[ tokio:: test]
831+ async fn test_cluster_prefix_filtering ( ) {
832+ // Test the cluster prefix filtering logic used in stop_cluster_watches
833+ let cluster_context = "test-cluster" ;
834+ let prefix = format ! ( "{}:" , cluster_context) ;
835+
836+ let test_keys = vec ! [
837+ "test-cluster:pods:default" ,
838+ "test-cluster:nodes:all" ,
839+ "other-cluster:pods:default" ,
840+ "other-cluster:nodes:all" ,
841+ "test-cluster:services:kube-system"
842+ ] ;
843+
844+ let matching_keys: Vec < & str > = test_keys. iter ( )
845+ . filter ( |key| key. starts_with ( & prefix) )
846+ . copied ( )
847+ . collect ( ) ;
756848
757- let cluster_wide_key = format ! ( "{}:{}" , "nodes" , None :: <String >. as_deref( ) . unwrap_or( "all" ) ) ;
758- assert_eq ! ( cluster_wide_key, "nodes:all" ) ;
849+ assert_eq ! ( matching_keys. len( ) , 3 ) ;
850+ assert ! ( matching_keys. contains( & "test-cluster:pods:default" ) ) ;
851+ assert ! ( matching_keys. contains( & "test-cluster:nodes:all" ) ) ;
852+ assert ! ( matching_keys. contains( & "test-cluster:services:kube-system" ) ) ;
853+ assert ! ( !matching_keys. contains( & "other-cluster:pods:default" ) ) ;
854+ assert ! ( !matching_keys. contains( & "other-cluster:nodes:all" ) ) ;
759855 }
760856
761857 #[ tokio:: test]
@@ -889,6 +985,118 @@ mod tests {
889985 }
890986 }
891987
988+ #[ tokio:: test]
989+ async fn test_cross_cluster_watch_isolation ( ) {
990+ // This is a comprehensive integration test for cross-cluster contamination prevention
991+
992+ // Mock two different cluster contexts by directly testing the key generation
993+ // In a real scenario, this would involve switching actual Kubernetes contexts
994+
995+ let test_resources = vec ! [ "nodes" , "pods" , "services" ] ;
996+ let cluster_a = "cluster-production" ;
997+ let cluster_b = "cluster-staging" ;
998+
999+ for resource_type in test_resources {
1000+ // Generate keys for the same resource type in different clusters
1001+ let key_a = format ! ( "{}:{}:all" , cluster_a, resource_type) ;
1002+ let key_b = format ! ( "{}:{}:all" , cluster_b, resource_type) ;
1003+
1004+ // Ensure isolation - keys should be different
1005+ assert_ne ! ( key_a, key_b,
1006+ "Resource {} should have different keys in different clusters" , resource_type) ;
1007+
1008+ // Test namespaced resources too
1009+ let ns_key_a = format ! ( "{}:{}:default" , cluster_a, resource_type) ;
1010+ let ns_key_b = format ! ( "{}:{}:default" , cluster_b, resource_type) ;
1011+ assert_ne ! ( ns_key_a, ns_key_b,
1012+ "Namespaced resource {} should have different keys in different clusters" , resource_type) ;
1013+ }
1014+ }
1015+
1016+ #[ tokio:: test]
1017+ async fn test_cluster_cleanup_on_context_switch ( ) {
1018+ // Test that switching cluster context properly cleans up old watches
1019+ use std:: collections:: HashMap ;
1020+
1021+ // Simulate watch registry with mixed cluster watches
1022+ let mut mock_watches = HashMap :: new ( ) ;
1023+
1024+ // Add watches for cluster A
1025+ mock_watches. insert ( "cluster-a:nodes:all" . to_string ( ) , "handle1" ) ;
1026+ mock_watches. insert ( "cluster-a:pods:default" . to_string ( ) , "handle2" ) ;
1027+ mock_watches. insert ( "cluster-a:services:kube-system" . to_string ( ) , "handle3" ) ;
1028+
1029+ // Add watches for cluster B
1030+ mock_watches. insert ( "cluster-b:nodes:all" . to_string ( ) , "handle4" ) ;
1031+ mock_watches. insert ( "cluster-b:pods:default" . to_string ( ) , "handle5" ) ;
1032+
1033+ // Simulate cleanup for cluster-a (current cluster context)
1034+ let cluster_context = "cluster-a" ;
1035+ let prefix = format ! ( "{}:" , cluster_context) ;
1036+
1037+ let keys_to_remove: Vec < String > = mock_watches. keys ( )
1038+ . filter ( |key| key. starts_with ( & prefix) )
1039+ . cloned ( )
1040+ . collect ( ) ;
1041+
1042+ // Verify we identified the correct keys to remove
1043+ assert_eq ! ( keys_to_remove. len( ) , 3 ) ;
1044+ assert ! ( keys_to_remove. contains( & "cluster-a:nodes:all" . to_string( ) ) ) ;
1045+ assert ! ( keys_to_remove. contains( & "cluster-a:pods:default" . to_string( ) ) ) ;
1046+ assert ! ( keys_to_remove. contains( & "cluster-a:services:kube-system" . to_string( ) ) ) ;
1047+
1048+ // Remove cluster-a watches (simulating actual cleanup)
1049+ for key in keys_to_remove {
1050+ mock_watches. remove ( & key) ;
1051+ }
1052+
1053+ // Verify only cluster-b watches remain
1054+ assert_eq ! ( mock_watches. len( ) , 2 ) ;
1055+ assert ! ( mock_watches. contains_key( "cluster-b:nodes:all" ) ) ;
1056+ assert ! ( mock_watches. contains_key( "cluster-b:pods:default" ) ) ;
1057+ assert ! ( !mock_watches. contains_key( "cluster-a:nodes:all" ) ) ;
1058+ }
1059+
1060+ #[ tokio:: test]
1061+ async fn test_multi_cluster_resource_contamination_prevention ( ) {
1062+ // Test that watches for the same resource type in different clusters are isolated
1063+ let scenarios = vec ! [
1064+ // (cluster, resource_type, namespace)
1065+ ( "production" , "nodes" , None ) ,
1066+ ( "staging" , "nodes" , None ) ,
1067+ ( "development" , "nodes" , None ) ,
1068+ ( "production" , "pods" , Some ( "default" ) ) ,
1069+ ( "staging" , "pods" , Some ( "default" ) ) ,
1070+ ( "production" , "services" , Some ( "kube-system" ) ) ,
1071+ ( "staging" , "services" , Some ( "kube-system" ) ) ,
1072+ ] ;
1073+
1074+ let mut generated_keys = std:: collections:: HashSet :: new ( ) ;
1075+
1076+ for ( cluster, resource_type, namespace) in scenarios {
1077+ let watch_key = if let Some ( ns) = namespace {
1078+ format ! ( "{}:{}:{}" , cluster, resource_type, ns)
1079+ } else {
1080+ format ! ( "{}:{}:all" , cluster, resource_type)
1081+ } ;
1082+
1083+ // Ensure no duplicate keys (which would indicate contamination)
1084+ assert ! ( generated_keys. insert( watch_key. clone( ) ) ,
1085+ "Duplicate watch key detected: {} - this indicates cross-cluster contamination risk" ,
1086+ watch_key) ;
1087+ }
1088+
1089+ // Verify we have the expected number of unique keys
1090+ assert_eq ! ( generated_keys. len( ) , 7 ) ;
1091+
1092+ // Verify specific patterns
1093+ assert ! ( generated_keys. contains( "production:nodes:all" ) ) ;
1094+ assert ! ( generated_keys. contains( "staging:nodes:all" ) ) ;
1095+ assert ! ( generated_keys. contains( "development:nodes:all" ) ) ;
1096+ assert ! ( generated_keys. contains( "production:pods:default" ) ) ;
1097+ assert ! ( generated_keys. contains( "staging:pods:default" ) ) ;
1098+ }
1099+
8921100 #[ tokio:: test]
8931101 async fn test_concurrent_watch_operations ( ) {
8941102 let client = K8sClient :: new ( ) ;
0 commit comments