Skip to content

Commit f95f923

Browse files
authored
[bug] Fix reconciliation events setting the wrong run id (#4605)
If jobs are preempted/failed due to reconciliation the run id is getting set incorrect (set to job id) - meaning the run isn't actually getting failed/preempted This PR fixes that mistake + makes the testing more robust to ensure run ids are set correct on events generated by the scheduler --------- Signed-off-by: JamesMurkin <[email protected]>
1 parent 47dfb7a commit f95f923

File tree

2 files changed

+113
-40
lines changed

2 files changed

+113
-40
lines changed

internal/scheduler/scheduler.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -622,7 +622,10 @@ func EventsFromSchedulerResult(result *scheduling.SchedulerResult, time time.Tim
622622
return nil, err
623623
}
624624

625-
eventSequences = AppendEventSequencesFromReconciliationFailureJobs(eventSequences, result.FailedReconciliationJobs, time)
625+
eventSequences, err = AppendEventSequencesFromReconciliationFailureJobs(eventSequences, result.FailedReconciliationJobs, time)
626+
if err != nil {
627+
return nil, err
628+
}
626629

627630
return eventSequences, nil
628631
}
@@ -718,11 +721,15 @@ func createEventsForPreemptedJob(jobId string, runId string, reason string, time
718721
}
719722
}
720723

721-
func AppendEventSequencesFromReconciliationFailureJobs(eventSequences []*armadaevents.EventSequence, reconciliationResult *scheduling.ReconciliationResult, time time.Time) []*armadaevents.EventSequence {
724+
func AppendEventSequencesFromReconciliationFailureJobs(eventSequences []*armadaevents.EventSequence, reconciliationResult *scheduling.ReconciliationResult, time time.Time) ([]*armadaevents.EventSequence, error) {
722725
if reconciliationResult == nil {
723-
return eventSequences
726+
return eventSequences, nil
724727
}
725728
for _, jobInfo := range reconciliationResult.FailedJobs {
729+
run := jobInfo.Job.LatestRun()
730+
if run == nil {
731+
return nil, errors.Errorf("attempting to generate reconciliation failed eventSequences for job %s with no associated runs", jobInfo.Job.Id())
732+
}
726733
reconciliationError := &armadaevents.Error{
727734
Terminal: true,
728735
Reason: &armadaevents.Error_ReconciliationError{
@@ -734,21 +741,25 @@ func AppendEventSequencesFromReconciliationFailureJobs(eventSequences []*armadae
734741
es := &armadaevents.EventSequence{
735742
Queue: jobInfo.Job.Queue(),
736743
JobSetName: jobInfo.Job.Jobset(),
737-
Events: createEventsForFailedJob(jobInfo.Job.Id(), jobInfo.Job.Id(), reconciliationError, time),
744+
Events: createEventsForFailedJob(jobInfo.Job.Id(), run.Id(), reconciliationError, time),
738745
}
739746
eventSequences = append(eventSequences, es)
740747
}
741748

742749
for _, jobInfo := range reconciliationResult.PreemptedJobs {
750+
run := jobInfo.Job.LatestRun()
751+
if run == nil {
752+
return nil, errors.Errorf("attempting to generate reconciliation preemption eventSequences for job %s with no associated runs", jobInfo.Job.Id())
753+
}
743754
es := &armadaevents.EventSequence{
744755
Queue: jobInfo.Job.Queue(),
745756
JobSetName: jobInfo.Job.Jobset(),
746-
Events: createEventsForPreemptedJob(jobInfo.Job.Id(), jobInfo.Job.Id(), jobInfo.Reason, time),
757+
Events: createEventsForPreemptedJob(jobInfo.Job.Id(), run.Id(), jobInfo.Reason, time),
747758
}
748759
eventSequences = append(eventSequences, es)
749760
}
750761

751-
return eventSequences
762+
return eventSequences, nil
752763
}
753764

754765
func AppendEventSequencesFromScheduledJobs(eventSequences []*armadaevents.EventSequence, jctxs []*schedulercontext.JobSchedulingContext) ([]*armadaevents.EventSequence, error) {

internal/scheduler/scheduler_test.go

Lines changed: 96 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"k8s.io/utils/pointer"
1919

2020
"github.com/armadaproject/armada/internal/common/armadacontext"
21+
"github.com/armadaproject/armada/internal/common/armadaerrors"
2122
apiconfig "github.com/armadaproject/armada/internal/common/constants"
2223
"github.com/armadaproject/armada/internal/common/ingest/utils"
2324
protoutil "github.com/armadaproject/armada/internal/common/proto"
@@ -341,6 +342,11 @@ var (
341342
))
342343
)
343344

345+
type jobRunId struct {
346+
jobId string
347+
runId string
348+
}
349+
344350
// Test a single scheduler cycle
345351
func TestScheduler_TestCycle(t *testing.T) {
346352
tests := map[string]struct {
@@ -357,11 +363,11 @@ func TestScheduler_TestCycle(t *testing.T) {
357363
jobIdsToFailDueToReconciliation []string // job ids that will be failed by the scheduler due to reconciliation issues
358364
jobIdsToPreemptDueToReconciliation []string // job ids that will be preempted by the scheduler due to reconciliation issues
359365
expectedJobRunLeased []string // ids of jobs we expect to have produced leased messages
360-
expectedJobRunErrors []string // ids of jobs we expect to have produced jobRunErrors messages
366+
expectedJobRunErrors []jobRunId // ids of jobs we expect to have produced jobRunErrors messages
361367
expectedJobErrors []string // ids of jobs we expect to have produced jobErrors messages
362368
expectedJobsRunsToPreempt []string // ids of jobs we expect to be preempted by the scheduler
363-
expectedJobRunPreempted []string // ids of jobs we expect to have produced jobRunPreempted messages
364-
expectedJobRunCancelled []string // ids of jobs we expect to have produced jobRunPreempted messages
369+
expectedJobRunPreempted []jobRunId // ids of jobs we expect to have produced jobRunPreempted messages
370+
expectedJobRunCancelled []jobRunId // ids of jobs we expect to have produced jobRunPreempted messages
365371
expectedJobCancelled []string // ids of jobs we expect to have produced cancelled messages
366372
expectedJobRequestCancel []string // ids of jobs we expect to have produced request cancel
367373
expectedJobReprioritised []string // ids of jobs we expect to have produced reprioritised messages
@@ -631,7 +637,7 @@ func TestScheduler_TestCycle(t *testing.T) {
631637
Serial: 1,
632638
},
633639
},
634-
expectedJobRunCancelled: []string{leasedJob.Id()},
640+
expectedJobRunCancelled: []jobRunId{{jobId: leasedJob.Id(), runId: leasedJob.LatestRun().Id()}},
635641
expectedJobCancelled: []string{leasedJob.Id()},
636642
expectedTerminal: []string{leasedJob.Id()},
637643
expectedQueuedVersion: leasedJob.QueuedVersion(),
@@ -648,9 +654,9 @@ func TestScheduler_TestCycle(t *testing.T) {
648654
Serial: 1,
649655
},
650656
},
651-
expectedJobRunPreempted: []string{preemptibleLeasedJob.Id()},
657+
expectedJobRunPreempted: []jobRunId{{jobId: preemptibleLeasedJob.Id(), runId: preemptibleLeasedJob.LatestRun().Id()}},
652658
expectedJobErrors: []string{preemptibleLeasedJob.Id()},
653-
expectedJobRunErrors: []string{preemptibleLeasedJob.Id()},
659+
expectedJobRunErrors: []jobRunId{{jobId: preemptibleLeasedJob.Id(), runId: preemptibleLeasedJob.LatestRun().Id()}},
654660
expectedTerminal: []string{preemptibleLeasedJob.Id()},
655661
expectedQueuedVersion: preemptibleLeasedJob.QueuedVersion(),
656662
},
@@ -666,11 +672,17 @@ func TestScheduler_TestCycle(t *testing.T) {
666672
Serial: 1,
667673
},
668674
},
669-
expectedJobRunPreempted: []string{preemptibleGangJob1.Id(), preemptibleGangJob2.Id()},
670-
expectedJobErrors: []string{preemptibleGangJob1.Id(), preemptibleGangJob2.Id()},
671-
expectedJobRunErrors: []string{preemptibleGangJob1.Id(), preemptibleGangJob2.Id()},
672-
expectedTerminal: []string{preemptibleGangJob1.Id(), preemptibleGangJob2.Id()},
673-
expectedQueuedVersion: preemptibleGangJob1.QueuedVersion(),
675+
expectedJobRunPreempted: []jobRunId{
676+
{jobId: preemptibleGangJob1.Id(), runId: preemptibleGangJob1.LatestRun().Id()},
677+
{jobId: preemptibleGangJob2.Id(), runId: preemptibleGangJob2.LatestRun().Id()},
678+
},
679+
expectedJobErrors: []string{preemptibleGangJob1.Id(), preemptibleGangJob2.Id()},
680+
expectedJobRunErrors: []jobRunId{
681+
{jobId: preemptibleGangJob1.Id(), runId: preemptibleGangJob1.LatestRun().Id()},
682+
{jobId: preemptibleGangJob2.Id(), runId: preemptibleGangJob2.LatestRun().Id()},
683+
},
684+
expectedTerminal: []string{preemptibleGangJob1.Id(), preemptibleGangJob2.Id()},
685+
expectedQueuedVersion: preemptibleGangJob1.QueuedVersion(),
674686
},
675687
"Job Run preemption requested - job not pre-emptible - no action expected": {
676688
initialJobs: []*jobdb.Job{leasedJob},
@@ -785,7 +797,7 @@ func TestScheduler_TestCycle(t *testing.T) {
785797
"Lease expired": {
786798
initialJobs: []*jobdb.Job{leasedJob},
787799
staleExecutor: true,
788-
expectedJobRunErrors: []string{leasedJob.Id()},
800+
expectedJobRunErrors: []jobRunId{{jobId: leasedJob.Id(), runId: leasedJob.LatestRun().Id()}},
789801
expectedJobErrors: []string{leasedJob.Id()},
790802
expectedTerminal: []string{leasedJob.Id()},
791803
expectedQueuedVersion: leasedJob.QueuedVersion(),
@@ -805,17 +817,17 @@ func TestScheduler_TestCycle(t *testing.T) {
805817
"Reconciliation failure - preempted": {
806818
initialJobs: []*jobdb.Job{preemptibleLeasedJob},
807819
jobIdsToPreemptDueToReconciliation: []string{preemptibleLeasedJob.Id()},
808-
expectedJobRunPreempted: []string{preemptibleLeasedJob.Id()},
820+
expectedJobRunPreempted: []jobRunId{{jobId: preemptibleLeasedJob.Id(), runId: preemptibleLeasedJob.LatestRun().Id()}},
809821
expectedJobErrors: []string{preemptibleLeasedJob.Id()},
810-
expectedJobRunErrors: []string{preemptibleLeasedJob.Id()},
822+
expectedJobRunErrors: []jobRunId{{jobId: preemptibleLeasedJob.Id(), runId: preemptibleLeasedJob.LatestRun().Id()}},
811823
expectedTerminal: []string{preemptibleLeasedJob.Id()},
812824
expectedQueuedVersion: preemptibleLeasedJob.QueuedVersion(),
813825
},
814826
"Reconciliation failure - failed": {
815827
initialJobs: []*jobdb.Job{leasedJob},
816828
jobIdsToFailDueToReconciliation: []string{leasedJob.Id()},
817829
expectedJobErrors: []string{leasedJob.Id()},
818-
expectedJobRunErrors: []string{leasedJob.Id()},
830+
expectedJobRunErrors: []jobRunId{{jobId: leasedJob.Id(), runId: leasedJob.LatestRun().Id()}},
819831
expectedTerminal: []string{leasedJob.Id()},
820832
expectedQueuedVersion: leasedJob.QueuedVersion(),
821833
},
@@ -824,7 +836,7 @@ func TestScheduler_TestCycle(t *testing.T) {
824836
expectedJobRunLeased: []string{queuedJob.Id()},
825837
jobIdsToFailDueToReconciliation: []string{queuedJob.Id()},
826838
expectedJobErrors: []string{queuedJob.Id()},
827-
expectedJobRunErrors: []string{queuedJob.Id()},
839+
expectedJobRunErrors: []jobRunId{{jobId: queuedJob.Id()}},
828840
expectedTerminal: []string{queuedJob.Id()},
829841
expectedQueuedVersion: leasedJob.QueuedVersion(),
830842
},
@@ -866,9 +878,9 @@ func TestScheduler_TestCycle(t *testing.T) {
866878
"Job preempted": {
867879
initialJobs: []*jobdb.Job{leasedJob},
868880
expectedJobsRunsToPreempt: []string{leasedJob.Id()},
869-
expectedJobRunPreempted: []string{leasedJob.Id()},
881+
expectedJobRunPreempted: []jobRunId{{jobId: leasedJob.Id(), runId: leasedJob.LatestRun().Id()}},
870882
expectedJobErrors: []string{leasedJob.Id()},
871-
expectedJobRunErrors: []string{leasedJob.Id()},
883+
expectedJobRunErrors: []jobRunId{{jobId: leasedJob.Id(), runId: leasedJob.LatestRun().Id()}},
872884
expectedTerminal: []string{leasedJob.Id()},
873885
expectedQueuedVersion: leasedJob.QueuedVersion(),
874886
},
@@ -961,22 +973,22 @@ func TestScheduler_TestCycle(t *testing.T) {
961973
}
962974

963975
// Assert that all expected eventSequences are generated and that all eventSequences are expected.
964-
outstandingEventsByType := map[string]map[string]bool{
965-
fmt.Sprintf("%T", &armadaevents.EventSequence_Event_JobRunLeased{}): stringSet(tc.expectedJobRunLeased),
966-
fmt.Sprintf("%T", &armadaevents.EventSequence_Event_JobErrors{}): stringSet(tc.expectedJobErrors),
967-
fmt.Sprintf("%T", &armadaevents.EventSequence_Event_JobRunErrors{}): stringSet(tc.expectedJobRunErrors),
968-
fmt.Sprintf("%T", &armadaevents.EventSequence_Event_JobRunPreempted{}): stringSet(tc.expectedJobRunPreempted),
969-
fmt.Sprintf("%T", &armadaevents.EventSequence_Event_JobRunCancelled{}): stringSet(tc.expectedJobRunCancelled),
970-
fmt.Sprintf("%T", &armadaevents.EventSequence_Event_CancelledJob{}): stringSet(tc.expectedJobCancelled),
971-
fmt.Sprintf("%T", &armadaevents.EventSequence_Event_ReprioritisedJob{}): stringSet(tc.expectedJobReprioritised),
972-
fmt.Sprintf("%T", &armadaevents.EventSequence_Event_JobSucceeded{}): stringSet(tc.expectedJobSucceeded),
973-
fmt.Sprintf("%T", &armadaevents.EventSequence_Event_JobRequeued{}): stringSet(tc.expectedRequeued),
974-
fmt.Sprintf("%T", &armadaevents.EventSequence_Event_CancelJob{}): stringSet(tc.expectedJobRequestCancel),
975-
fmt.Sprintf("%T", &armadaevents.EventSequence_Event_JobValidated{}): stringSet(tc.expectedValidated),
976+
outstandingJobEventsByType := map[string]map[string]eventDetails{
977+
fmt.Sprintf("%T", &armadaevents.EventSequence_Event_JobRunLeased{}): stringsToSetWithEventDetails(tc.expectedJobRunLeased),
978+
fmt.Sprintf("%T", &armadaevents.EventSequence_Event_JobErrors{}): stringsToSetWithEventDetails(tc.expectedJobErrors),
979+
fmt.Sprintf("%T", &armadaevents.EventSequence_Event_JobRunErrors{}): jobRunIdToSetWithEventDetails(tc.expectedJobRunErrors),
980+
fmt.Sprintf("%T", &armadaevents.EventSequence_Event_JobRunPreempted{}): jobRunIdToSetWithEventDetails(tc.expectedJobRunPreempted),
981+
fmt.Sprintf("%T", &armadaevents.EventSequence_Event_JobRunCancelled{}): jobRunIdToSetWithEventDetails(tc.expectedJobRunCancelled),
982+
fmt.Sprintf("%T", &armadaevents.EventSequence_Event_CancelledJob{}): stringsToSetWithEventDetails(tc.expectedJobCancelled),
983+
fmt.Sprintf("%T", &armadaevents.EventSequence_Event_ReprioritisedJob{}): stringsToSetWithEventDetails(tc.expectedJobReprioritised),
984+
fmt.Sprintf("%T", &armadaevents.EventSequence_Event_JobSucceeded{}): stringsToSetWithEventDetails(tc.expectedJobSucceeded),
985+
fmt.Sprintf("%T", &armadaevents.EventSequence_Event_JobRequeued{}): stringsToSetWithEventDetails(tc.expectedRequeued),
986+
fmt.Sprintf("%T", &armadaevents.EventSequence_Event_CancelJob{}): stringsToSetWithEventDetails(tc.expectedJobRequestCancel),
987+
fmt.Sprintf("%T", &armadaevents.EventSequence_Event_JobValidated{}): stringsToSetWithEventDetails(tc.expectedValidated),
976988
}
977-
err = subtractEventsFromOutstandingEventsByType(publisher.eventSequences, outstandingEventsByType)
989+
err = subtractEventsFromOutstandingEventsByType(publisher.eventSequences, outstandingJobEventsByType)
978990
require.NoError(t, err)
979-
for eventType, m := range outstandingEventsByType {
991+
for eventType, m := range outstandingJobEventsByType {
980992
assert.Empty(t, m, "%d outstanding eventSequences of type %s", len(m), eventType)
981993
}
982994

@@ -1054,24 +1066,54 @@ func createAntiAffinity(t *testing.T, key string, values []string) *v1.Affinity
10541066
return newAffinity
10551067
}
10561068

1057-
func subtractEventsFromOutstandingEventsByType(eventSequences []*armadaevents.EventSequence, outstandingEventsByType map[string]map[string]bool) error {
1069+
func subtractEventsFromOutstandingEventsByType(eventSequences []*armadaevents.EventSequence, outstandingEventsByType map[string]map[string]eventDetails) error {
10581070
for _, eventSequence := range eventSequences {
10591071
for _, event := range eventSequence.Events {
10601072
jobId, err := armadaevents.JobIdFromEvent(event)
10611073
if err != nil {
10621074
return err
10631075
}
10641076
key := fmt.Sprintf("%T", event.Event)
1065-
_, ok := outstandingEventsByType[key][jobId]
1077+
details, ok := outstandingEventsByType[key][jobId]
10661078
if !ok {
10671079
return errors.Errorf("received unexpected event for job %s: %T - %v", jobId, event.Event, event.Event)
10681080
}
1081+
1082+
if details.runId != "" {
1083+
runId, err := RunIdFromEvent(event)
1084+
if err != nil {
1085+
return err
1086+
}
1087+
1088+
if runId != details.runId {
1089+
return errors.Errorf("received expected event for job with unexpected runId %s: %T - %v", jobId, event.Event, event.Event)
1090+
}
1091+
}
1092+
10691093
delete(outstandingEventsByType[key], jobId)
10701094
}
10711095
}
10721096
return nil
10731097
}
10741098

1099+
func RunIdFromEvent(event *armadaevents.EventSequence_Event) (string, error) {
1100+
switch e := event.Event.(type) {
1101+
case *armadaevents.EventSequence_Event_JobRunErrors:
1102+
return e.JobRunErrors.RunId, nil
1103+
case *armadaevents.EventSequence_Event_JobRunPreempted:
1104+
return e.JobRunPreempted.PreemptedRunId, nil
1105+
case *armadaevents.EventSequence_Event_JobRunCancelled:
1106+
return e.JobRunCancelled.RunId, nil
1107+
default:
1108+
err := errors.WithStack(&armadaerrors.ErrInvalidArgument{
1109+
Name: "event.Event",
1110+
Value: e,
1111+
Message: "event doesn't contain a jobId",
1112+
})
1113+
return "", err
1114+
}
1115+
}
1116+
10751117
// Test running multiple scheduler cycles
10761118
func TestRun(t *testing.T) {
10771119
// Test objects
@@ -2059,6 +2101,26 @@ func stringSet(src []string) map[string]bool {
20592101
return set
20602102
}
20612103

2104+
type eventDetails struct {
2105+
runId string
2106+
}
2107+
2108+
func stringsToSetWithEventDetails(src []string) map[string]eventDetails {
2109+
set := make(map[string]eventDetails, len(src))
2110+
for _, s := range src {
2111+
set[s] = eventDetails{}
2112+
}
2113+
return set
2114+
}
2115+
2116+
func jobRunIdToSetWithEventDetails(src []jobRunId) map[string]eventDetails {
2117+
set := make(map[string]eventDetails, len(src))
2118+
for _, s := range src {
2119+
set[s.jobId] = eventDetails{runId: s.runId}
2120+
}
2121+
return set
2122+
}
2123+
20622124
var (
20632125
queuedJobA = &database.Job{
20642126
JobID: util.NewULID(),

0 commit comments

Comments
 (0)