Skip to content

Commit 4a4a113

Browse files
committed
feat: move gang scheduling metadata to typed protobuf fields
Signed-off-by: Dejan Zele Pejchev <[email protected]>
1 parent 0a7087d commit 4a4a113

File tree

28 files changed

+2786
-876
lines changed

28 files changed

+2786
-876
lines changed

internal/executor/util/kubernetes_object.go

Lines changed: 32 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/armadaproject/armada/internal/common/util"
1414
"github.com/armadaproject/armada/internal/executor/configuration"
1515
"github.com/armadaproject/armada/internal/executor/domain"
16+
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
1617
"github.com/armadaproject/armada/pkg/api"
1718
"github.com/armadaproject/armada/pkg/armadaevents"
1819
"github.com/armadaproject/armada/pkg/executorapi"
@@ -128,10 +129,25 @@ func CreatePodFromExecutorApiJob(job *executorapi.JobRunLease, defaults *configu
128129
domain.Owner: job.User,
129130
})
130131

132+
// Add gang scheduling annotations from SchedulingMetadata if not already present
133+
// (for backward compatibility with old jobs that have annotations in the job spec)
134+
if job.SchedulingMetadata != nil && job.SchedulingMetadata.GangInfo != nil {
135+
gangInfo := job.SchedulingMetadata.GangInfo
136+
if _, exists := annotation[serverconfiguration.GangIdAnnotation]; !exists && gangInfo.GangId != "" {
137+
annotation[serverconfiguration.GangIdAnnotation] = gangInfo.GangId
138+
}
139+
if _, exists := annotation[serverconfiguration.GangCardinalityAnnotation]; !exists && gangInfo.Cardinality > 0 {
140+
annotation[serverconfiguration.GangCardinalityAnnotation] = strconv.FormatUint(uint64(gangInfo.Cardinality), 10)
141+
}
142+
if _, exists := annotation[serverconfiguration.GangNodeUniformityLabelAnnotation]; !exists && gangInfo.NodeUniformityLabelName != "" {
143+
annotation[serverconfiguration.GangNodeUniformityLabelAnnotation] = gangInfo.NodeUniformityLabelName
144+
}
145+
}
146+
131147
applyDefaults(podSpec, defaults)
132148
setRestartPolicyNever(podSpec)
133149

134-
injectArmadaEnvVars(podSpec, jobId, job.Queue, job.Jobset, annotation)
150+
injectArmadaEnvVars(podSpec, jobId, job.Queue, job.Jobset, job.SchedulingMetadata, annotation)
135151

136152
pod := &v1.Pod{
137153
ObjectMeta: metav1.ObjectMeta{
@@ -173,7 +189,7 @@ func CreatePod(job *api.Job, defaults *configuration.PodDefaults) *v1.Pod {
173189

174190
setRestartPolicyNever(podSpec)
175191

176-
injectArmadaEnvVars(podSpec, job.Id, job.Queue, job.JobSetId, annotation)
192+
injectArmadaEnvVars(podSpec, job.Id, job.Queue, job.JobSetId, nil, annotation)
177193

178194
pod := &v1.Pod{
179195
ObjectMeta: metav1.ObjectMeta{
@@ -203,41 +219,18 @@ func setRestartPolicyNever(podSpec *v1.PodSpec) {
203219

204220
// injectArmadaEnvVars injects Armada system environment variables into all containers.
205221
// It injects base variables for all jobs and additional variables for gang-scheduled jobs.
206-
func injectArmadaEnvVars(podSpec *v1.PodSpec, jobId string, queue string, jobsetId string, annotations map[string]string) {
207-
// Base environment variables for all jobs
222+
func injectArmadaEnvVars(podSpec *v1.PodSpec, jobId string, queue string, jobsetId string, schedulingMetadata *armadaevents.SchedulingMetadata, annotations map[string]string) {
208223
baseEnvVars := []v1.EnvVar{
209224
{Name: serverconfiguration.JobIdEnvVar, Value: jobId},
210225
{Name: serverconfiguration.QueueEnvVar, Value: queue},
211226
{Name: serverconfiguration.JobSetIdEnvVar, Value: jobsetId},
212227
}
213228

214-
// Gang-specific environment variables
215-
var gangEnvVars []v1.EnvVar
216-
217-
if gangId, ok := annotations[serverconfiguration.GangIdAnnotation]; ok && gangId != "" {
218-
gangEnvVars = append(gangEnvVars, v1.EnvVar{
219-
Name: serverconfiguration.GangIdEnvVar,
220-
Value: gangId,
221-
})
222-
}
223-
if gangCardinality, ok := annotations[serverconfiguration.GangCardinalityAnnotation]; ok && gangCardinality != "" {
224-
gangEnvVars = append(gangEnvVars, v1.EnvVar{
225-
Name: serverconfiguration.GangCardinalityEnvVar,
226-
Value: gangCardinality,
227-
})
229+
allEnvVars := baseEnvVars
230+
if schedulingMetadata != nil && schedulingMetadata.GangInfo != nil {
231+
allEnvVars = append(allEnvVars, buildGangEnvVars(schedulingMetadata.GangInfo)...)
228232
}
229233

230-
labelName, hasLabelName := annotations[serverconfiguration.GangNodeUniformityLabelNameEnvVar]
231-
labelValue, hasLabelValue := annotations[serverconfiguration.GangNodeUniformityLabelValueEnvVar]
232-
if hasLabelName && hasLabelValue {
233-
gangEnvVars = append(gangEnvVars,
234-
v1.EnvVar{Name: serverconfiguration.GangNodeUniformityLabelNameEnvVar, Value: labelName},
235-
v1.EnvVar{Name: serverconfiguration.GangNodeUniformityLabelValueEnvVar, Value: labelValue},
236-
)
237-
}
238-
239-
allEnvVars := append(baseEnvVars, gangEnvVars...)
240-
241234
for i := range podSpec.InitContainers {
242235
addEnvVarsIfNotExist(&podSpec.InitContainers[i], allEnvVars)
243236
}
@@ -246,6 +239,16 @@ func injectArmadaEnvVars(podSpec *v1.PodSpec, jobId string, queue string, jobset
246239
}
247240
}
248241

242+
// buildGangEnvVars builds gang environment variables from GangPlacement.
243+
func buildGangEnvVars(gangInfo *schedulerobjects.GangPlacement) []v1.EnvVar {
244+
return []v1.EnvVar{
245+
{Name: serverconfiguration.GangIdEnvVar, Value: gangInfo.GangId},
246+
{Name: serverconfiguration.GangCardinalityEnvVar, Value: strconv.FormatUint(uint64(gangInfo.Cardinality), 10)},
247+
{Name: serverconfiguration.GangNodeUniformityLabelNameEnvVar, Value: gangInfo.NodeUniformityLabelName},
248+
{Name: serverconfiguration.GangNodeUniformityLabelValueEnvVar, Value: gangInfo.NodeUniformityLabelValue},
249+
}
250+
}
251+
249252
// addEnvVarsIfNotExist adds environment variables to a container if they don't already exist.
250253
func addEnvVarsIfNotExist(container *v1.Container, newEnvVars []v1.EnvVar) {
251254
for _, newVar := range newEnvVars {

internal/executor/util/kubernetes_objects_test.go

Lines changed: 62 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/armadaproject/armada/internal/common/util"
1515
"github.com/armadaproject/armada/internal/executor/configuration"
1616
"github.com/armadaproject/armada/internal/executor/domain"
17+
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
1718
"github.com/armadaproject/armada/pkg/api"
1819
"github.com/armadaproject/armada/pkg/armadaevents"
1920
"github.com/armadaproject/armada/pkg/executorapi"
@@ -255,7 +256,7 @@ func TestInjectArmadaEnvVars(t *testing.T) {
255256
jobId string
256257
queue string
257258
jobsetId string
258-
annotations map[string]string
259+
schedulingMetadata *armadaevents.SchedulingMetadata
259260
existingEnvs []v1.EnvVar
260261
wantEnvs map[string]string
261262
dontWantEnvs []string
@@ -310,11 +311,13 @@ func TestInjectArmadaEnvVars(t *testing.T) {
310311
jobId: "job-123",
311312
queue: "queue",
312313
jobsetId: "jobset",
313-
annotations: map[string]string{
314-
serverconfiguration.GangIdAnnotation: "gang-789",
315-
serverconfiguration.GangCardinalityAnnotation: "3",
316-
serverconfiguration.GangNodeUniformityLabelNameEnvVar: "rack",
317-
serverconfiguration.GangNodeUniformityLabelValueEnvVar: "rack-1",
314+
schedulingMetadata: &armadaevents.SchedulingMetadata{
315+
GangInfo: &schedulerobjects.GangPlacement{
316+
GangId: "gang-789",
317+
Cardinality: 3,
318+
NodeUniformityLabelName: "rack",
319+
NodeUniformityLabelValue: "rack-1",
320+
},
318321
},
319322
wantEnvs: map[string]string{
320323
serverconfiguration.JobIdEnvVar: "job-123",
@@ -335,33 +338,6 @@ func TestInjectArmadaEnvVars(t *testing.T) {
335338
serverconfiguration.GangNodeUniformityLabelValueEnvVar: "rack-1",
336339
},
337340
},
338-
{
339-
name: "skips node uniformity env vars when only label name annotation exists",
340-
jobId: "job-123",
341-
queue: "queue",
342-
jobsetId: "jobset",
343-
annotations: map[string]string{
344-
serverconfiguration.GangNodeUniformityLabelNameEnvVar: "rack",
345-
},
346-
wantEnvs: map[string]string{
347-
serverconfiguration.JobIdEnvVar: "job-123",
348-
serverconfiguration.QueueEnvVar: "queue",
349-
serverconfiguration.JobSetIdEnvVar: "jobset",
350-
},
351-
dontWantEnvs: []string{
352-
serverconfiguration.GangNodeUniformityLabelNameEnvVar,
353-
serverconfiguration.GangNodeUniformityLabelValueEnvVar,
354-
},
355-
wantInitContainerEnvs: map[string]string{
356-
serverconfiguration.JobIdEnvVar: "job-123",
357-
serverconfiguration.QueueEnvVar: "queue",
358-
serverconfiguration.JobSetIdEnvVar: "jobset",
359-
},
360-
dontWantInitContainerEnvs: []string{
361-
serverconfiguration.GangNodeUniformityLabelNameEnvVar,
362-
serverconfiguration.GangNodeUniformityLabelValueEnvVar,
363-
},
364-
},
365341
}
366342

367343
for _, tc := range tests {
@@ -371,7 +347,7 @@ func TestInjectArmadaEnvVars(t *testing.T) {
371347
InitContainers: []v1.Container{{Name: "init"}},
372348
}
373349

374-
injectArmadaEnvVars(podSpec, tc.jobId, tc.queue, tc.jobsetId, tc.annotations)
350+
injectArmadaEnvVars(podSpec, tc.jobId, tc.queue, tc.jobsetId, tc.schedulingMetadata, nil)
375351

376352
for _, container := range podSpec.InitContainers {
377353
envMap := make(map[string]string, len(container.Env))
@@ -401,3 +377,55 @@ func TestInjectArmadaEnvVars(t *testing.T) {
401377
})
402378
}
403379
}
380+
381+
func TestInjectArmadaEnvVars_GangScheduling(t *testing.T) {
382+
tests := []struct {
383+
name string
384+
schedulingMetadata *armadaevents.SchedulingMetadata
385+
wantGangId string
386+
wantCardinality string
387+
}{
388+
{
389+
name: "injects gang env vars from GangInfo",
390+
schedulingMetadata: &armadaevents.SchedulingMetadata{
391+
GangInfo: &schedulerobjects.GangPlacement{GangId: "gang-123", Cardinality: 5},
392+
},
393+
wantGangId: "gang-123",
394+
wantCardinality: "5",
395+
},
396+
{
397+
name: "no gang env vars when GangInfo is nil",
398+
schedulingMetadata: nil,
399+
wantGangId: "",
400+
wantCardinality: "",
401+
},
402+
{
403+
name: "no gang env vars when SchedulingMetadata.GangInfo is nil",
404+
schedulingMetadata: &armadaevents.SchedulingMetadata{GangInfo: nil},
405+
wantGangId: "",
406+
wantCardinality: "",
407+
},
408+
}
409+
410+
for _, tt := range tests {
411+
t.Run(tt.name, func(t *testing.T) {
412+
podSpec := &v1.PodSpec{Containers: []v1.Container{{Name: "main"}}}
413+
injectArmadaEnvVars(podSpec, "job", "queue", "jobset", tt.schedulingMetadata, nil)
414+
415+
envMap := make(map[string]string)
416+
for _, env := range podSpec.Containers[0].Env {
417+
envMap[env.Name] = env.Value
418+
}
419+
420+
if tt.wantGangId != "" {
421+
assert.Equal(t, tt.wantGangId, envMap[serverconfiguration.GangIdEnvVar])
422+
assert.Equal(t, tt.wantCardinality, envMap[serverconfiguration.GangCardinalityEnvVar])
423+
} else {
424+
_, hasGangId := envMap[serverconfiguration.GangIdEnvVar]
425+
assert.False(t, hasGangId)
426+
_, hasCardinality := envMap[serverconfiguration.GangCardinalityEnvVar]
427+
assert.False(t, hasCardinality)
428+
}
429+
})
430+
}
431+
}

internal/scheduler/api.go

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,9 @@ func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRuns
152152
addAnnotations(submitMsg, PodRequirementsOverlay.Annotations)
153153
}
154154

155+
// Build SchedulingMetadata from annotations and run data
156+
schedulingMetadata := buildSchedulingMetadataFromLease(submitMsg, lease.GangNodeUniformityLabelValue)
157+
155158
srv.addPreemptibleLabel(submitMsg)
156159

157160
srv.dropDisallowedResources(submitMsg.MainObject.GetPodSpec().PodSpec)
@@ -172,12 +175,13 @@ func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRuns
172175
err := stream.Send(&executorapi.LeaseStreamMessage{
173176
Event: &executorapi.LeaseStreamMessage_Lease{
174177
Lease: &executorapi.JobRunLease{
175-
JobRunId: lease.RunID,
176-
Queue: lease.Queue,
177-
Jobset: lease.JobSet,
178-
User: lease.UserID,
179-
Groups: groups,
180-
Job: submitMsg,
178+
JobRunId: lease.RunID,
179+
Queue: lease.Queue,
180+
Jobset: lease.JobSet,
181+
User: lease.UserID,
182+
Groups: groups,
183+
Job: submitMsg,
184+
SchedulingMetadata: schedulingMetadata,
181185
},
182186
},
183187
})
@@ -410,3 +414,52 @@ func unmarshalFromCompressedBytes(bytes []byte, decompressor compress.Decompress
410414
}
411415
return proto.Unmarshal(decompressedBytes, msg)
412416
}
417+
418+
// buildSchedulingMetadata extracts gang scheduling information from the SubmitJob message
419+
// and builds a SchedulingMetadata protobuf message for the executor.
420+
421+
// buildSchedulingMetadataFromLease extracts gang scheduling information from the SubmitJob message
422+
// and builds a SchedulingMetadata protobuf message for the executor.
423+
// labelValue is the actual value determined during scheduling for the gang's uniformity label.
424+
func buildSchedulingMetadataFromLease(submitMsg *armadaevents.SubmitJob, labelValue string) *armadaevents.SchedulingMetadata {
425+
if submitMsg == nil {
426+
return nil
427+
}
428+
429+
var gangId string
430+
var cardinality uint32
431+
var labelName string
432+
433+
if submitMsg.Gang != nil {
434+
gangId = submitMsg.Gang.GangId
435+
cardinality = submitMsg.Gang.Cardinality
436+
labelName = submitMsg.Gang.NodeUniformityLabelName
437+
} else if submitMsg.ObjectMeta != nil && submitMsg.ObjectMeta.Annotations != nil {
438+
annotations := submitMsg.ObjectMeta.Annotations
439+
gangId = annotations[constants.GangIdAnnotation]
440+
if gangId == "" {
441+
return nil
442+
}
443+
444+
if cardinalityStr := annotations[constants.GangCardinalityAnnotation]; cardinalityStr != "" {
445+
if val, err := strconv.ParseUint(cardinalityStr, 10, 32); err == nil {
446+
cardinality = uint32(val)
447+
}
448+
}
449+
450+
labelName = annotations[constants.GangNodeUniformityLabelAnnotation]
451+
} else {
452+
return nil
453+
}
454+
455+
gangPlacement := &schedulerobjects.GangPlacement{
456+
GangId: gangId,
457+
Cardinality: cardinality,
458+
NodeUniformityLabelName: labelName,
459+
NodeUniformityLabelValue: labelValue,
460+
}
461+
462+
return &armadaevents.SchedulingMetadata{
463+
GangInfo: gangPlacement,
464+
}
465+
}

0 commit comments

Comments
 (0)