Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/common/ingest/testfixtures/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (
)

var (
RunIndex = int64(0)
PartitionMarkerGroupIdUuid = uuid.MustParse(PartitionMarkerGroupId)
PriorityClassName = "test-priority"
Groups = []string{"group1", "group2"}
Expand Down
7 changes: 6 additions & 1 deletion internal/scheduler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,16 @@ func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRuns
}
}

var runIndex uint32 = 0
if lease.RunIndex != nil {
runIndex = uint32(*lease.RunIndex)
}

err := stream.Send(&executorapi.LeaseStreamMessage{
Event: &executorapi.LeaseStreamMessage_Lease{
Lease: &executorapi.JobRunLease{
JobRunId: lease.RunID,
JobRunIndex: lease.RunIndex,
JobRunIndex: runIndex,
Queue: lease.Queue,
Jobset: lease.JobSet,
User: lease.UserID,
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/database/job_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type hasSerial interface {

type JobRunLease struct {
RunID string
RunIndex uint32
RunIndex *int64
Queue string
Pool string
JobSet string
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
ALTER TABLE runs ADD COLUMN IF NOT EXISTS run_index bigint NOT NULL DEFAULT 0;
ALTER TABLE runs ADD COLUMN IF NOT EXISTS run_index bigint;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During testing we found that the old scheduler ingester was trying to write rows with a null value which blocked ingestion from occurring. In order to make this smoother we've made this null-able and then handle null-able values in the app layer.

2 changes: 1 addition & 1 deletion internal/scheduler/database/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion internal/scheduler/jobdb/reconciliation.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,13 @@ func (jobDb *JobDb) schedulerJobFromDatabaseJob(dbJob *database.Job) (*Job, erro
// schedulerRunFromDatabaseRun creates a new scheduler job run from a database job run
func (jobDb *JobDb) schedulerRunFromDatabaseRun(dbRun *database.Run) *JobRun {
nodeId := api.NodeIdFromExecutorAndNodeName(dbRun.Executor, dbRun.Node)
var runIndex uint32 = 0
if dbRun.RunIndex != nil {
runIndex = uint32(*dbRun.RunIndex)
}
return jobDb.CreateRun(
dbRun.RunID,
uint32(dbRun.RunIndex),
runIndex,
dbRun.JobID,
dbRun.Created,
dbRun.Executor,
Expand Down
3 changes: 2 additions & 1 deletion internal/scheduleringester/instructions.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ func (c *JobSetEventsInstructionConverter) handleJobRunLeased(jobRunLeased *arma
if jobRunLeased.HasScheduledAtPriority {
scheduledAtPriority = &jobRunLeased.ScheduledAtPriority
}
runIndex := int64(jobRunLeased.RunIndex)
PodRequirementsOverlay, err := proto.Marshal(jobRunLeased.GetPodRequirementsOverlay())
if err != nil {
return nil, errors.WithStack(err)
Expand All @@ -195,7 +196,7 @@ func (c *JobSetEventsInstructionConverter) handleJobRunLeased(jobRunLeased *arma
Queue: meta.queue,
DbRun: &schedulerdb.Run{
RunID: runId,
RunIndex: int64(jobRunLeased.RunIndex),
RunIndex: &runIndex,
JobID: jobRunLeased.JobId,
Created: eventTime.UnixNano(),
JobSet: meta.jobset,
Expand Down
1 change: 1 addition & 0 deletions internal/scheduleringester/instructions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func TestConvertEventSequence(t *testing.T) {
expected: []DbOperation{
InsertRuns{f.RunId: &JobRunDetails{Queue: f.Queue, DbRun: &schedulerdb.Run{
RunID: f.RunId,
RunIndex: &f.RunIndex,
JobID: f.JobId,
JobSet: f.JobsetName,
Queue: f.Queue,
Expand Down