Skip to content

Commit 3b3a7fe

Browse files
committed
fix(postgres): jobs may never get processed if no workers hear the initial announcement
When jobs are inserted into the 'neoq_jobs', the `announce_job` trigger announces the job to listeners of the job's queue. However, if no workers are available to receive the announcement, or the announcement is somehow lost, it meant those jobs were marooned on the queue. This fix adds a periodic check for post-due pending jobs to ensure that a neoq restart is not necessary to catch marooned, overdue jobs.
1 parent 245711b commit 3b3a7fe

File tree

3 files changed

+49
-24
lines changed

3 files changed

+49
-24
lines changed

backends/postgres/postgres_backend.go

Lines changed: 42 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,14 @@ const (
4343
AND status NOT IN ('processed')
4444
FOR UPDATE SKIP LOCKED
4545
LIMIT 1`
46-
PendingJobIDQuery = `SELECT id
46+
PendingJobIDsQuery = `SELECT id
4747
FROM neoq_jobs
4848
WHERE queue = $1
4949
AND status NOT IN ('processed')
5050
AND run_after <= NOW()
51+
ORDER BY created_at ASC
5152
FOR UPDATE SKIP LOCKED
52-
LIMIT 1`
53+
LIMIT 100`
5354
FutureJobQuery = `SELECT id,fingerprint,queue,status,deadline,payload,retries,max_retries,run_after,ran_at,created_at,error
5455
FROM neoq_jobs
5556
WHERE queue = $1
@@ -674,8 +675,7 @@ func (p *PgBackend) start(ctx context.Context, h handler.Handler) (err error) {
674675
return fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, h.Queue)
675676
}
676677

677-
pendingJobsChan := p.pendingJobs(ctx, h.Queue) // process overdue jobs *at startup*
678-
678+
pendingJobsChan := p.processPendingJobs(ctx, h.Queue)
679679
// wait for the listener to connect and be ready to listen
680680
for q := range p.readyQueues {
681681
if q == h.Queue {
@@ -688,7 +688,7 @@ func (p *PgBackend) start(ctx context.Context, h handler.Handler) (err error) {
688688
p.readyQueues <- q
689689
}
690690

691-
// process all future jobs and retries
691+
// process all future jobs
692692
go func() { p.scheduleFutureJobs(ctx, h.Queue) }()
693693

694694
for i := 0; i < h.Concurrency; i++ {
@@ -754,7 +754,7 @@ func (p *PgBackend) initFutureJobs(ctx context.Context, queue string) (err error
754754
return
755755
}
756756

757-
// scheduleFutureJobs announces future jobs using NOTIFY on an interval
757+
// scheduleFutureJobs monitors the future job list for upcoming jobs and announces them to be processed by available workers
758758
func (p *PgBackend) scheduleFutureJobs(ctx context.Context, queue string) {
759759
err := p.initFutureJobs(ctx, queue)
760760
if err != nil {
@@ -772,8 +772,8 @@ func (p *PgBackend) scheduleFutureJobs(ctx context.Context, queue string) {
772772
if timeUntillRunAfter <= p.config.FutureJobWindow {
773773
delete(p.futureJobs, jobID)
774774
go func(jid string, j *jobs.Job) {
775-
scheduleCh := time.After(timeUntillRunAfter)
776-
<-scheduleCh
775+
jobDue := time.After(timeUntillRunAfter)
776+
<-jobDue
777777
p.announceJob(ctx, j.Queue, jid)
778778
}(jobID, job)
779779
}
@@ -823,9 +823,11 @@ func (p *PgBackend) announceJob(ctx context.Context, queue, jobID string) {
823823
}
824824
}
825825

826-
func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan *pgconn.Notification) {
827-
jobsCh = make(chan *pgconn.Notification)
828-
826+
// processPendingJobs starts a goroutine that periodically fetches pendings jobs and announces them to workers.
827+
//
828+
// Past due jobs are fetched on the interval [neoq.Config.JobCheckInterval]
829+
// nolint: cyclop
830+
func (p *PgBackend) processPendingJobs(ctx context.Context, queue string) (jobsCh chan *pgconn.Notification) {
829831
conn, err := p.acquire(ctx)
830832
if err != nil {
831833
p.logger.Error(
@@ -836,28 +838,37 @@ func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan
836838
return
837839
}
838840

841+
// check for new past-due jobs on an interval
842+
ticker := time.NewTicker(p.config.JobCheckInterval)
839843
go func(ctx context.Context) {
840844
defer conn.Release()
841-
845+
// check for pending jobs on an interval until the context is canceled
842846
for {
843-
jobID, err := p.getPendingJobID(ctx, conn, queue)
844-
if err != nil {
845-
if errors.Is(err, pgx.ErrNoRows) || errors.Is(err, context.Canceled) {
846-
break
847-
}
847+
jobIDs, err := p.getPendingJobIDs(ctx, conn, queue)
848+
if errors.Is(err, context.Canceled) {
849+
return
850+
}
848851

852+
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
849853
p.logger.Error(
850854
"failed to fetch pending job",
851855
slog.String("queue", queue),
852856
slog.Any("error", err),
853-
slog.String("job_id", jobID),
854857
)
855-
} else {
856-
jobsCh <- &pgconn.Notification{Channel: queue, Payload: jobID}
858+
}
859+
860+
for _, jid := range jobIDs {
861+
jobsCh <- &pgconn.Notification{Channel: queue, Payload: jid}
862+
}
863+
select {
864+
case <-ctx.Done():
865+
return
866+
case <-ticker.C:
857867
}
858868
}
859869
}(ctx)
860870

871+
jobsCh = make(chan *pgconn.Notification)
861872
return jobsCh
862873
}
863874

@@ -1030,8 +1041,17 @@ func (p *PgBackend) getJob(ctx context.Context, tx pgx.Tx, jobID string) (job *j
10301041
return
10311042
}
10321043

1033-
func (p *PgBackend) getPendingJobID(ctx context.Context, conn *pgxpool.Conn, queue string) (jobID string, err error) {
1034-
err = conn.QueryRow(ctx, PendingJobIDQuery, queue).Scan(&jobID)
1044+
func (p *PgBackend) getPendingJobIDs(ctx context.Context, conn *pgxpool.Conn, queue string) (jobIDs []string, err error) {
1045+
var rows pgx.Rows
1046+
var jid int64
1047+
rows, err = conn.Query(ctx, PendingJobIDsQuery, queue)
1048+
for rows.Next() {
1049+
err = rows.Scan(&jid)
1050+
if err != nil {
1051+
return
1052+
}
1053+
jobIDs = append(jobIDs, fmt.Sprint(jid))
1054+
}
10351055
return
10361056
}
10371057

flake.nix

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
{
3232
packages = with pkgs; [
3333
automake
34+
gcc
3435
go_1_21
3536
gomod2nix.legacyPackages.${system}.gomod2nix
3637
gotools
@@ -41,7 +42,7 @@
4142
];
4243

4344
enterShell = ''
44-
export TEST_DATABASE_URL="postgres://postgres:postgres@localhost:${toString postgresPort}/neoq?sslmode=disable&pool_max_conns=100"
45+
export TEST_DATABASE_URL="postgres://postgres:postgres@localhost:${toString postgresPort}/neoq?sslmode=disable&pool_max_conns=250"
4546
export TEST_REDIS_URL=localhost:${toString redisPort}
4647
export REDIS_PASSWORD=
4748
'';
@@ -64,6 +65,10 @@
6465
CREATE USER postgres WITH PASSWORD 'postgres' SUPERUSER;
6566
CREATE DATABASE neoq;
6667
'';
68+
settings = {
69+
max_connections = 250;
70+
log_statement = "all";
71+
};
6772
};
6873

6974
redis = {

neoq.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ type Config struct {
3232
BackendAuthPassword string // password with which to authenticate to the backend
3333
BackendConcurrency int // total number of backend processes available to process jobs
3434
ConnectionString string // a string containing connection details for the backend
35-
JobCheckInterval time.Duration // the interval of time between checking for new future/retry jobs
35+
JobCheckInterval time.Duration // the interval of time between checking for new future/past-due jobs
3636
FutureJobWindow time.Duration // time duration between current time and job.RunAfter that future jobs get scheduled
3737
IdleTransactionTimeout int // number of milliseconds PgBackend transaction may idle before the connection is killed
3838
ShutdownTimeout time.Duration // duration to wait for jobs to finish during shutdown

0 commit comments

Comments
 (0)