1
1
import asyncio
2
2
import logging
3
+ import re
3
4
from collections import defaultdict
4
5
from concurrent .futures import ThreadPoolExecutor
5
- from typing import Any , Awaitable , Callable , Iterable , Optional , Union
6
+ from typing import Any , Awaitable , Callable , Iterable , Optional , Union , Literal
6
7
7
8
from kubernetes import client , config # type: ignore
8
9
from kubernetes .client import ApiException
@@ -47,6 +48,43 @@ def __init__(self, cluster: Optional[str]=None):
47
48
48
49
self .__jobs_for_cronjobs : dict [str , list [V1Job ]] = {}
49
50
self .__jobs_loading_locks : defaultdict [str , asyncio .Lock ] = defaultdict (asyncio .Lock )
51
+ self .__namespaces : Union [list [str , None ]] = None
52
+
53
+ @property
54
+ def namespaces (self ) -> Union [list [str ], Literal ["*" ]]:
55
+ """wrapper for settings.namespaces, which will do expand namespace list if some regex pattern included
56
+
57
+ Returns:
58
+ A list of namespace that will be scanned
59
+ """
60
+ # just return the list if it's already initialized
61
+ if self .__namespaces != None :
62
+ return self .__namespaces
63
+
64
+ setting_ns = settings .namespaces
65
+ if setting_ns == "*" :
66
+ self .__namespaces = setting_ns
67
+ return self .__namespaces
68
+
69
+ self .__namespaces = []
70
+ expand_list : list [re .Pattern ] = []
71
+ ns_regex_chars = re .compile (r"[\\*|\(.*?\)|\[.*?\]|\^|\$]" )
72
+ for ns in setting_ns :
73
+ if ns_regex_chars .search (ns ):
74
+ logger .debug (f"{ ns } is detected as regex pattern in expanding namespace list" )
75
+ expand_list .append (re .compile (ns ))
76
+ else :
77
+ self .__namespaces .append (ns )
78
+
79
+ if expand_list :
80
+ logger .info ("found regex pattern in provided namespace argument, expanding namespace list" )
81
+ all_ns = [ ns .metadata .name for ns in self .core .list_namespace ().items ]
82
+ for expand_ns in expand_list :
83
+ for ns in all_ns :
84
+ if expand_ns .fullmatch (ns ) and ns not in self .__namespaces :
85
+ self .__namespaces .append (ns )
86
+
87
+ return self .__namespaces
50
88
51
89
async def list_scannable_objects (self ) -> list [K8sObjectData ]:
52
90
"""List all scannable objects.
@@ -56,7 +94,7 @@ async def list_scannable_objects(self) -> list[K8sObjectData]:
56
94
"""
57
95
58
96
logger .info (f"Listing scannable objects in { self .cluster } " )
59
- logger .debug (f"Namespaces: { settings .namespaces } " )
97
+ logger .debug (f"Namespaces: { self .namespaces } " )
60
98
logger .debug (f"Resources: { settings .resources } " )
61
99
62
100
self .__hpa_list = await self ._try_list_hpa ()
@@ -75,7 +113,7 @@ async def list_scannable_objects(self) -> list[K8sObjectData]:
75
113
for workload_objects in workload_object_lists
76
114
for object in workload_objects
77
115
# NOTE: By default we will filter out kube-system namespace
78
- if not (settings .namespaces == "*" and object .namespace == "kube-system" )
116
+ if not (self .namespaces == "*" and object .namespace == "kube-system" )
79
117
]
80
118
81
119
async def _list_jobs_for_cronjobs (self , namespace : str ) -> list [V1Job ]:
@@ -189,7 +227,7 @@ async def _list_namespaced_or_global_objects(
189
227
logger .debug (f"Listing { kind } s in { self .cluster } " )
190
228
loop = asyncio .get_running_loop ()
191
229
192
- if settings .namespaces == "*" :
230
+ if self .namespaces == "*" :
193
231
requests = [
194
232
loop .run_in_executor (
195
233
self .executor ,
@@ -209,7 +247,7 @@ async def _list_namespaced_or_global_objects(
209
247
label_selector = settings .selector ,
210
248
),
211
249
)
212
- for namespace in settings .namespaces
250
+ for namespace in self .namespaces
213
251
]
214
252
215
253
result = [
0 commit comments