Skip to content

job: Assign scoped health to jobs added to scoped job groups #47

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 28, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"log/slog"
"runtime/pprof"
"slices"
"sync"

"github.com/cilium/hive"
Expand Down Expand Up @@ -102,6 +103,11 @@ type Job interface {
start(ctx context.Context, wg *sync.WaitGroup, health cell.Health, options options)
}

type queuedJob struct {
job Job
health cell.Health
}

type group struct {
options options

Expand All @@ -110,7 +116,7 @@ type group struct {
mu sync.Mutex
ctx context.Context
cancel context.CancelFunc
queuedJobs []Job
queuedJobs []queuedJob

health cell.Health
}
Expand Down Expand Up @@ -156,9 +162,9 @@ func (jg *group) Start(_ cell.HookContext) error {
jg.ctx, jg.cancel = context.WithCancel(context.Background())

jg.wg.Add(len(jg.queuedJobs))
for _, job := range jg.queuedJobs {
for _, queuedJob := range jg.queuedJobs {
pprof.Do(jg.ctx, jg.options.pprofLabels, func(ctx context.Context) {
go job.start(ctx, jg.wg, jg.health, jg.options)
go queuedJob.job.start(ctx, jg.wg, queuedJob.health, jg.options)
})
}
// Nil the queue once we start so it can be GC'ed
Expand Down Expand Up @@ -199,7 +205,13 @@ func (jg *group) add(health cell.Health, jobs ...Job) {

// The context is only set once the group has been started. If we have not yet started, queue the jobs.
if jg.ctx == nil {
jg.queuedJobs = append(jg.queuedJobs, jobs...)
jg.queuedJobs = slices.Grow(jg.queuedJobs, len(jobs))
for _, job := range jobs {
jg.queuedJobs = append(jg.queuedJobs, queuedJob{
job: job,
health: health,
})
}
return
}

Expand Down