Skip to content

Commit 75306bd

Browse files
Plumb run index to pod name
Signed-off-by: Jason Parraga <[email protected]>
1 parent 287daaa commit 75306bd

File tree

19 files changed

+386
-277
lines changed

19 files changed

+386
-277
lines changed

internal/executor/domain/pod_metadata.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package domain
33
const (
44
JobId = "armada_job_id"
55
JobRunId = "armada_job_run_id"
6+
JobRunIndex = "armada_job_run_index"
67
PodNumber = "armada_pod_number"
78
PodCount = "armada_pod_count"
89
JobSetId = "armada_jobset_id"

internal/executor/util/kubernetes_object.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,11 +116,12 @@ func CreatePodFromExecutorApiJob(job *executorapi.JobRunLease, defaults *configu
116116
}
117117

118118
labels := util.MergeMaps(job.Job.ObjectMeta.Labels, map[string]string{
119-
domain.JobId: jobId,
120-
domain.JobRunId: runId,
121-
domain.Queue: job.Queue,
122-
domain.PodNumber: strconv.Itoa(0),
123-
domain.PodCount: strconv.Itoa(1),
119+
domain.JobId: jobId,
120+
domain.JobRunId: runId,
121+
domain.JobRunIndex: strconv.Itoa(int(job.JobRunIndex)),
122+
domain.Queue: job.Queue,
123+
domain.PodNumber: strconv.Itoa(0),
124+
domain.PodCount: strconv.Itoa(1),
124125
})
125126
annotation := util.MergeMaps(job.Job.ObjectMeta.Annotations, map[string]string{
126127
domain.JobSetId: job.Jobset,
@@ -132,7 +133,7 @@ func CreatePodFromExecutorApiJob(job *executorapi.JobRunLease, defaults *configu
132133

133134
pod := &v1.Pod{
134135
ObjectMeta: metav1.ObjectMeta{
135-
Name: common.PodNamePrefix + job.Job.JobId + "-" + strconv.Itoa(0),
136+
Name: common.PodNamePrefix + job.Job.JobId + "-" + strconv.Itoa(0) + "-" + strconv.FormatUint(uint64(job.JobRunIndex), 10),
136137
Labels: labels,
137138
Annotations: annotation,
138139
Namespace: job.Job.ObjectMeta.Namespace,

internal/executor/util/kubernetes_objects_test.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package util
22

33
import (
44
"fmt"
5+
"strconv"
56
"testing"
67

78
"github.com/armadaproject/armada/internal/common/util"
@@ -142,6 +143,7 @@ func makePodSpec() *v1.PodSpec {
142143
func TestCreatePodFromExecutorApiJob(t *testing.T) {
143144
runId := uuid.NewString()
144145
jobId := util.NewULID()
146+
runIndex := 0
145147

146148
validJobLease := &executorapi.JobRunLease{
147149
JobRunId: runId,
@@ -167,14 +169,15 @@ func TestCreatePodFromExecutorApiJob(t *testing.T) {
167169

168170
expectedPod := &v1.Pod{
169171
ObjectMeta: metav1.ObjectMeta{
170-
Name: fmt.Sprintf("armada-%s-0", jobId),
172+
Name: fmt.Sprintf("armada-%s-0-%d", jobId, runIndex),
171173
Namespace: "test-namespace",
172174
Labels: map[string]string{
173-
domain.JobId: jobId,
174-
domain.JobRunId: runId,
175-
domain.Queue: "queue",
176-
domain.PodNumber: "0",
177-
domain.PodCount: "1",
175+
domain.JobId: jobId,
176+
domain.JobRunId: runId,
177+
domain.JobRunIndex: strconv.Itoa(runIndex),
178+
domain.Queue: "queue",
179+
domain.PodNumber: "0",
180+
domain.PodCount: "1",
178181
},
179182
Annotations: map[string]string{
180183
domain.JobSetId: "job-set",

internal/scheduler/api.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRuns
174174
Event: &executorapi.LeaseStreamMessage_Lease{
175175
Lease: &executorapi.JobRunLease{
176176
JobRunId: lease.RunID,
177+
JobRunIndex: lease.R
177178
Queue: lease.Queue,
178179
Jobset: lease.JobSet,
179180
User: lease.UserID,

internal/scheduler/database/job_repository.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type hasSerial interface {
2424

2525
type JobRunLease struct {
2626
RunID string
27+
RunIndex uint32
2728
Queue string
2829
Pool string
2930
JobSet string
@@ -352,7 +353,7 @@ func (r *PostgresJobRepository) FetchJobRunLeases(ctx *armadacontext.Context, ex
352353
}
353354

354355
query := `
355-
SELECT jr.run_id, jr.node, j.queue, j.job_set, jr.pool, j.user_id, j.groups, j.submit_message, jr.pod_requirements_overlay
356+
SELECT jr.run_id, jr.run_index, jr.node, j.queue, j.job_set, jr.pool, j.user_id, j.groups, j.submit_message, jr.pod_requirements_overlay
356357
FROM runs jr
357358
LEFT JOIN %s as tmp ON (tmp.run_id = jr.run_id)
358359
JOIN jobs j
@@ -374,7 +375,7 @@ func (r *PostgresJobRepository) FetchJobRunLeases(ctx *armadacontext.Context, ex
374375
defer rows.Close()
375376
for rows.Next() {
376377
run := JobRunLease{}
377-
err = rows.Scan(&run.RunID, &run.Node, &run.Queue, &run.JobSet, &run.Pool, &run.UserID, &run.Groups, &run.SubmitMessage, &run.PodRequirementsOverlay)
378+
err = rows.Scan(&run.RunID, &run.RunIndex, &run.Node, &run.Queue, &run.JobSet, &run.Pool, &run.UserID, &run.Groups, &run.SubmitMessage, &run.PodRequirementsOverlay)
378379
if err != nil {
379380
return errors.WithStack(err)
380381
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ALTER TABLE runs ADD COLUMN IF NOT EXISTS run_index bigint NOT NULL DEFAULT 0;

internal/scheduler/database/models.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/scheduler/database/query.sql.go

Lines changed: 6 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/scheduler/jobdb/job.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -670,8 +670,10 @@ func (job *Job) ValidateResourceRequests() error {
670670
// WithNewRun creates a copy of the job with a new run on the given executor.
671671
func (job *Job) WithNewRun(executor, nodeId, nodeName, pool string, scheduledAtPriority int32) *Job {
672672
now := job.jobDb.clock.Now()
673+
nextRunIndex := len(job.runsById)
673674
return job.WithUpdatedRun(job.jobDb.CreateRun(
674675
job.jobDb.uuidProvider.New(),
676+
uint32(nextRunIndex),
675677
job.Id(),
676678
now.UnixNano(),
677679
executor,

internal/scheduler/jobdb/job_run.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ import (
1515
type JobRun struct {
1616
// Unique identifier for the run.
1717
id string
18+
19+
index uint32
1820
// Id of the job this run is associated with.
1921
jobId string
2022
// Time at which the run was created.
@@ -157,6 +159,9 @@ func (run *JobRun) Equal(other *JobRun) bool {
157159
if run.id != other.id {
158160
return false
159161
}
162+
if run.index != other.index {
163+
return false
164+
}
160165
if run.jobId != other.jobId {
161166
return false
162167
}
@@ -213,6 +218,7 @@ func MinimalRun(id string, creationTime int64) *JobRun {
213218
// CreateRun creates a new scheduler job run from a database job run
214219
func (jobDb *JobDb) CreateRun(
215220
id string,
221+
index uint32,
216222
jobId string,
217223
creationTime int64,
218224
executor string,
@@ -238,6 +244,7 @@ func (jobDb *JobDb) CreateRun(
238244
) *JobRun {
239245
return &JobRun{
240246
id: id,
247+
index: index,
241248
jobId: jobId,
242249
created: creationTime,
243250
executor: jobDb.stringInterner.Intern(executor),
@@ -268,6 +275,11 @@ func (run *JobRun) Id() string {
268275
return run.id
269276
}
270277

278+
// Index returns the index of the JobRun with respect to the Job.
279+
func (run *JobRun) Index() uint32 {
280+
return run.index
281+
}
282+
271283
// JobId returns the id of the job this run is associated with.
272284
func (run *JobRun) JobId() string {
273285
return run.jobId

0 commit comments

Comments
 (0)