1
+ import json
1
2
import os
2
3
import random
3
4
import string
@@ -65,19 +66,30 @@ def create_namespace(self):
65
66
return RuntimeError (e )
66
67
67
68
68
- def create_new_resource_flavor (self ):
69
- self .resource_flavor = f"test-resource-flavor-{ random_choice ()} "
70
- create_resource_flavor (self , self .resource_flavor )
69
+ def create_new_resource_flavor (self , num_flavors ):
70
+ self .resource_flavors = []
71
+ for i in range (num_flavors ):
72
+ default = i < 1
73
+ resource_flavor = f"test-resource-flavor-{ random_choice ()} "
74
+ create_resource_flavor (self , resource_flavor , default )
75
+ self .resource_flavors .append (resource_flavor )
71
76
72
77
73
- def create_new_cluster_queue (self ):
74
- self .cluster_queue = f"test-cluster-queue-{ random_choice ()} "
75
- create_cluster_queue (self , self .cluster_queue , self .resource_flavor )
78
+ def create_new_cluster_queue (self , num_queues ):
79
+ self .cluster_queues = []
80
+ for i in range (num_queues ):
81
+ cluster_queue_name = f"test-cluster-queue-{ random_choice ()} "
82
+ create_cluster_queue (self , cluster_queue_name , self .resource_flavors [i ])
83
+ self .cluster_queues .append (cluster_queue_name )
76
84
77
85
78
- def create_new_local_queue (self ):
79
- self .local_queue = f"test-local-queue-{ random_choice ()} "
80
- create_local_queue (self , self .cluster_queue , self .local_queue )
86
+ def create_new_local_queue (self , num_queues ):
87
+ self .local_queues = []
88
+ for i in range (num_queues ):
89
+ is_default = i == 0
90
+ local_queue_name = f"test-local-queue-{ random_choice ()} "
91
+ create_local_queue (self , self .cluster_queues [i ], local_queue_name , is_default )
92
+ self .local_queues .append (local_queue_name )
81
93
82
94
83
95
def create_namespace_with_name (self , namespace_name ):
@@ -132,7 +144,7 @@ def create_cluster_queue(self, cluster_queue, flavor):
132
144
{"name" : "memory" , "nominalQuota" : "36Gi" },
133
145
{"name" : "nvidia.com/gpu" , "nominalQuota" : 1 },
134
146
],
135
- }
147
+ },
136
148
],
137
149
}
138
150
],
@@ -161,11 +173,33 @@ def create_cluster_queue(self, cluster_queue, flavor):
161
173
self .cluster_queue = cluster_queue
162
174
163
175
164
- def create_resource_flavor (self , flavor ):
176
+ def create_resource_flavor (self , flavor , default = True ):
177
+ worker_label , worker_value = os .getenv ("WORKER_LABEL" , "worker-1=true" ).split ("=" )
178
+ control_label , control_value = os .getenv (
179
+ "CONTROL_LABEL" , "ingress-ready=true"
180
+ ).split ("=" )
181
+ toleration_key = os .getenv (
182
+ "TOLERATION_KEY" , "node-role.kubernetes.io/control-plane"
183
+ )
184
+
185
+ node_labels = (
186
+ {worker_label : worker_value } if default else {control_label : control_value }
187
+ )
188
+
165
189
resource_flavor_json = {
166
190
"apiVersion" : "kueue.x-k8s.io/v1beta1" ,
167
191
"kind" : "ResourceFlavor" ,
168
192
"metadata" : {"name" : flavor },
193
+ "spec" : {
194
+ "nodeLabels" : node_labels ,
195
+ "tolerations" : [
196
+ {
197
+ "key" : toleration_key ,
198
+ "operator" : "Exists" ,
199
+ "effect" : "NoSchedule" ,
200
+ }
201
+ ],
202
+ },
169
203
}
170
204
171
205
try :
@@ -190,14 +224,14 @@ def create_resource_flavor(self, flavor):
190
224
self .resource_flavor = flavor
191
225
192
226
193
- def create_local_queue (self , cluster_queue , local_queue ):
227
+ def create_local_queue (self , cluster_queue , local_queue , is_default = True ):
194
228
local_queue_json = {
195
229
"apiVersion" : "kueue.x-k8s.io/v1beta1" ,
196
230
"kind" : "LocalQueue" ,
197
231
"metadata" : {
198
232
"namespace" : self .namespace ,
199
233
"name" : local_queue ,
200
- "annotations" : {"kueue.x-k8s.io/default-queue" : "true" },
234
+ "annotations" : {"kueue.x-k8s.io/default-queue" : str ( is_default ). lower () },
201
235
},
202
236
"spec" : {"clusterQueue" : cluster_queue },
203
237
}
@@ -226,34 +260,77 @@ def create_local_queue(self, cluster_queue, local_queue):
226
260
self .local_queue = local_queue
227
261
228
262
229
- def create_kueue_resources (self ):
263
+ def create_kueue_resources (self , resource_ammount = 1 ):
230
264
print ("creating Kueue resources ..." )
231
- create_new_resource_flavor (self )
232
- create_new_cluster_queue (self )
233
- create_new_local_queue (self )
265
+ create_new_resource_flavor (self , resource_ammount )
266
+ create_new_cluster_queue (self , resource_ammount )
267
+ create_new_local_queue (self , resource_ammount )
234
268
235
269
236
270
def delete_kueue_resources (self ):
237
271
# Delete if given cluster-queue exists
238
- try :
239
- self .custom_api .delete_cluster_custom_object (
240
- group = "kueue.x-k8s.io" ,
241
- plural = "clusterqueues" ,
242
- version = "v1beta1" ,
243
- name = self .cluster_queue ,
244
- )
245
- print (f"\n '{ self .cluster_queue } ' cluster-queue deleted" )
246
- except Exception as e :
247
- print (f"\n Error deleting cluster-queue '{ self .cluster_queue } ' : { e } " )
272
+ for cq in self .cluster_queues :
273
+ try :
274
+ self .custom_api .delete_cluster_custom_object (
275
+ group = "kueue.x-k8s.io" ,
276
+ plural = "clusterqueues" ,
277
+ version = "v1beta1" ,
278
+ name = cq ,
279
+ )
280
+ print (f"\n '{ cq } ' cluster-queue deleted" )
281
+ except Exception as e :
282
+ print (f"\n Error deleting cluster-queue '{ cq } ' : { e } " )
248
283
249
284
# Delete if given resource-flavor exists
285
+ for flavor in self .resource_flavors :
286
+ try :
287
+ self .custom_api .delete_cluster_custom_object (
288
+ group = "kueue.x-k8s.io" ,
289
+ plural = "resourceflavors" ,
290
+ version = "v1beta1" ,
291
+ name = flavor ,
292
+ )
293
+ print (f"'{ flavor } ' resource-flavor deleted" )
294
+ except Exception as e :
295
+ print (f"\n Error deleting resource-flavor '{ flavor } ': { e } " )
296
+
297
+
298
+ def get_pod_node (self , namespace , name ):
299
+ label_selector = f"ray.io/cluster={ name } "
300
+ pods = self .api_instance .list_namespaced_pod (
301
+ namespace , label_selector = label_selector
302
+ )
303
+ if not pods .items :
304
+ raise ValueError (
305
+ f"Unable to retrieve node name for pod '{ name } ' in namespace '{ namespace } '"
306
+ )
307
+ pod = pods .items [0 ]
308
+ node_name = pod .spec .node_name
309
+ if node_name is None :
310
+ raise ValueError (
311
+ f"No node selected for pod '{ name } ' in namespace '{ namespace } '"
312
+ )
313
+ return node_name
314
+
315
+
316
+ def get_flavor_spec (self , flavor_name ):
250
317
try :
251
- self .custom_api .delete_cluster_custom_object (
318
+ flavor = self .custom_api .get_cluster_custom_object (
252
319
group = "kueue.x-k8s.io" ,
253
- plural = "resourceflavors" ,
254
320
version = "v1beta1" ,
255
- name = self .resource_flavor ,
321
+ plural = "resourceflavors" ,
322
+ name = flavor_name ,
256
323
)
257
- print (f"'{ self .resource_flavor } ' resource-flavor deleted" )
258
- except Exception as e :
259
- print (f"\n Error deleting resource-flavor '{ self .resource_flavor } ' : { e } " )
324
+ return flavor
325
+ except client .exceptions .ApiException as e :
326
+ if e .status == 404 :
327
+ print (f"ResourceFlavor '{ flavor_name } ' not found." )
328
+ else :
329
+ print (f"Error retrieving ResourceFlavor '{ flavor_name } ': { e } " )
330
+ raise
331
+
332
+
333
+ def get_nodes_by_label (self , node_labels ):
334
+ label_selector = "," .join (f"{ k } ={ v } " for k , v in node_labels .items ())
335
+ nodes = self .api_instance .list_node (label_selector = label_selector )
336
+ return [node .metadata .name for node in nodes .items ]
0 commit comments