Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions internal/scheduler/scheduling/jobiteration.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package scheduling

import (
"math"
"sync"

"golang.org/x/exp/slices"
Expand Down Expand Up @@ -110,16 +111,23 @@ func (repo *InMemoryJobRepository) GetJobIterator(queue string) JobContextIterat

// QueuedJobsIterator is an iterator over all jobs in a queue.
type QueuedJobsIterator struct {
jobIter jobdb.JobIterator
pool string
ctx *armadacontext.Context
jobIter jobdb.JobIterator
pool string
maxLookback uint
jobsSeen uint
ctx *armadacontext.Context
}

func NewQueuedJobsIterator(ctx *armadacontext.Context, queue string, pool string, repo JobRepository) *QueuedJobsIterator {
func NewQueuedJobsIterator(ctx *armadacontext.Context, queue string, pool string, maxLookback uint, repo JobRepository) *QueuedJobsIterator {
if maxLookback == 0 {
maxLookback = math.MaxUint
}
return &QueuedJobsIterator{
jobIter: repo.QueuedJobs(queue),
pool: pool,
ctx: ctx,
jobIter: repo.QueuedJobs(queue),
pool: pool,
maxLookback: maxLookback,
jobsSeen: 0,
ctx: ctx,
}
}

Expand All @@ -129,11 +137,15 @@ func (it *QueuedJobsIterator) Next() (*schedulercontext.JobSchedulingContext, er
case <-it.ctx.Done():
return nil, it.ctx.Err()
default:
if it.jobsSeen >= it.maxLookback {
return nil, nil
}
job, _ := it.jobIter.Next()
if job == nil {
return nil, nil
}
if slices.Contains(job.Pools(), it.pool) {
it.jobsSeen++
return schedulercontext.JobSchedulingContextFromJob(job), nil
}
}
Expand Down
17 changes: 9 additions & 8 deletions internal/scheduler/scheduling/jobiteration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package scheduling

import (
"context"
"math"
"testing"
"time"

Expand Down Expand Up @@ -64,7 +65,7 @@ func TestMultiJobsIterator_TwoQueues(t *testing.T) {
ctx := armadacontext.Background()
its := make([]JobContextIterator, 3)
for i, queue := range []string{"A", "B", "C"} {
it := NewQueuedJobsIterator(ctx, queue, testfixtures.TestPool, repo)
it := NewQueuedJobsIterator(ctx, queue, testfixtures.TestPool, math.MaxUint, repo)
its[i] = it
}
it := NewMultiJobsIterator(its...)
Expand Down Expand Up @@ -93,7 +94,7 @@ func TestQueuedJobsIterator_OneQueue(t *testing.T) {
expected = append(expected, job.Id())
}
ctx := armadacontext.Background()
it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, repo)
it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, repo)
actual := make([]string, 0)
for {
jctx, err := it.Next()
Expand All @@ -115,7 +116,7 @@ func TestQueuedJobsIterator_ExceedsBufferSize(t *testing.T) {
expected = append(expected, job.Id())
}
ctx := armadacontext.Background()
it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, repo)
it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, repo)
actual := make([]string, 0)
for {
jctx, err := it.Next()
Expand All @@ -137,7 +138,7 @@ func TestQueuedJobsIterator_ManyJobs(t *testing.T) {
expected = append(expected, job.Id())
}
ctx := armadacontext.Background()
it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, repo)
it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, repo)
actual := make([]string, 0)
for {
jctx, err := it.Next()
Expand All @@ -164,7 +165,7 @@ func TestCreateQueuedJobsIterator_TwoQueues(t *testing.T) {
repo.Enqueue(job)
}
ctx := armadacontext.Background()
it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, repo)
it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, repo)
actual := make([]string, 0)
for {
jctx, err := it.Next()
Expand All @@ -187,7 +188,7 @@ func TestCreateQueuedJobsIterator_RespectsTimeout(t *testing.T) {
ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), time.Millisecond)
time.Sleep(20 * time.Millisecond)
defer cancel()
it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, repo)
it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, repo)
job, err := it.Next()
assert.Nil(t, job)
assert.ErrorIs(t, err, context.DeadlineExceeded)
Expand All @@ -205,7 +206,7 @@ func TestCreateQueuedJobsIterator_NilOnEmpty(t *testing.T) {
repo.Enqueue(job)
}
ctx := armadacontext.Background()
it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, repo)
it := NewQueuedJobsIterator(ctx, "A", testfixtures.TestPool, math.MaxUint, repo)
for job, err := it.Next(); job != nil; job, err = it.Next() {
require.NoError(t, err)
}
Expand Down Expand Up @@ -266,7 +267,7 @@ func (repo *mockJobRepository) Enqueue(job *jobdb.Job) {
}

func (repo *mockJobRepository) GetJobIterator(ctx *armadacontext.Context, queue string) JobContextIterator {
return NewQueuedJobsIterator(ctx, queue, testfixtures.TestPool, repo)
return NewQueuedJobsIterator(ctx, queue, testfixtures.TestPool, math.MaxUint, repo)
}

func jobFromPodSpec(queue string, req *schedulerobjects.PodRequirements) *jobdb.Job {
Expand Down
3 changes: 1 addition & 2 deletions internal/scheduler/scheduling/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,6 @@ func addEvictedJobsToNodeDb(_ *armadacontext.Context, sctx *schedulercontext.Sch
gangItByQueue[qctx.Queue] = NewQueuedGangIterator(
sctx,
inMemoryJobRepo.GetJobIterator(qctx.Queue),
0,
false,
)
}
Expand Down Expand Up @@ -511,7 +510,7 @@ func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, inMemo
if jobRepo == nil || reflect.ValueOf(jobRepo).IsNil() {
jobIteratorByQueue[qctx.Queue] = evictedIt
} else {
queueIt := NewQueuedJobsIterator(ctx, qctx.Queue, sch.schedulingContext.Pool, jobRepo)
queueIt := NewQueuedJobsIterator(ctx, qctx.Queue, sch.schedulingContext.Pool, sch.constraints.GetMaxQueueLookBack(), jobRepo)
jobIteratorByQueue[qctx.Queue] = NewMultiJobsIterator(evictedIt, queueIt)
}
}
Expand Down
29 changes: 3 additions & 26 deletions internal/scheduler/scheduling/queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewQueueScheduler(
}
gangIteratorsByQueue := make(map[string]*QueuedGangIterator)
for queue, it := range jobIteratorByQueue {
gangIteratorsByQueue[queue] = NewQueuedGangIterator(sctx, it, constraints.GetMaxQueueLookBack(), true)
gangIteratorsByQueue[queue] = NewQueuedGangIterator(sctx, it, true)
}
candidateGangIterator, err := NewCandidateGangIterator(sctx.Pool, sctx, sctx.FairnessCostProvider, gangIteratorsByQueue, considerPriorityClassPriority)
if err != nil {
Expand Down Expand Up @@ -220,20 +220,15 @@ type QueuedGangIterator struct {
queuedJobsIterator JobContextIterator
// Groups jctxs by the gang they belong to.
jctxsByGangId map[string][]*schedulercontext.JobSchedulingContext
// Maximum number of jobs to look at before giving up.
maxLookback uint
// If true, do not yield jobs known to be unschedulable.
skipKnownUnschedulableJobs bool
// Number of jobs we have seen so far.
jobsSeen uint
next *schedulercontext.GangSchedulingContext
next *schedulercontext.GangSchedulingContext
}

func NewQueuedGangIterator(sctx *schedulercontext.SchedulingContext, it JobContextIterator, maxLookback uint, skipKnownUnschedulableJobs bool) *QueuedGangIterator {
func NewQueuedGangIterator(sctx *schedulercontext.SchedulingContext, it JobContextIterator, skipKnownUnschedulableJobs bool) *QueuedGangIterator {
return &QueuedGangIterator{
schedulingContext: sctx,
queuedJobsIterator: it,
maxLookback: maxLookback,
skipKnownUnschedulableJobs: skipKnownUnschedulableJobs,
jctxsByGangId: make(map[string][]*schedulercontext.JobSchedulingContext),
}
Expand All @@ -256,9 +251,6 @@ func (it *QueuedGangIterator) Clear() error {
}

func (it *QueuedGangIterator) Peek() (*schedulercontext.GangSchedulingContext, error) {
if it.hitLookbackLimit() {
return nil, nil
}
if it.next != nil {
return it.next, nil
}
Expand All @@ -274,14 +266,6 @@ func (it *QueuedGangIterator) Peek() (*schedulercontext.GangSchedulingContext, e
return nil, nil
}

// Queue lookback limits. Rescheduled jobs don't count towards the limit.
if !jctx.IsEvicted {
it.jobsSeen++
}
if it.hitLookbackLimit() {
return nil, nil
}

// Skip this job if it's known to be unschedulable.
if it.skipKnownUnschedulableJobs && len(it.schedulingContext.UnfeasibleSchedulingKeys) > 0 {
schedulingKey, ok := jctx.SchedulingKey()
Expand Down Expand Up @@ -317,13 +301,6 @@ func (it *QueuedGangIterator) Peek() (*schedulercontext.GangSchedulingContext, e
}
}

func (it *QueuedGangIterator) hitLookbackLimit() bool {
if it.maxLookback == 0 {
return false
}
return it.jobsSeen > it.maxLookback
}

// CandidateGangIterator determines which gang to try scheduling next across queues.
// Specifically, it yields the next gang in the queue with smallest fraction of its fair share,
// where the fraction of fair share computation includes the yielded gang.
Expand Down
12 changes: 0 additions & 12 deletions internal/scheduler/scheduling/queue_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,18 +358,6 @@ func TestQueueScheduler(t *testing.T) {
Queues: testfixtures.SingleQueuePriorityOne("A"),
ExpectedScheduledIndices: []int{0},
},
"MaxQueueLookback": {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This no longer applies

SchedulingConfig: testfixtures.WithMaxQueueLookbackConfig(3, testfixtures.TestSchedulingConfig()),
Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities),
Jobs: armadaslices.Concatenate(
testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1),
testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 3),
testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1),
),
Queues: testfixtures.SingleQueuePriorityOne("A"),
ExpectedScheduledIndices: []int{0},
ExpectedNeverAttemptedIndices: []int{3, 4},
},
"gang success": {
SchedulingConfig: testfixtures.TestSchedulingConfig(),
Nodes: testfixtures.N32CpuNodes(2, testfixtures.TestPriorities),
Expand Down