Skip to content

Commit 78e4e34

Browse files
authored
Fixes 4927: add flag to prevent requeuing canceled tasks (#875)
* Fixes 4927: add flag to prevent requeuing canceled tasks * set cancel_attempted in cancel query instead * only include tasks that were not cancelled in requeueFailedTasks query * address comments
1 parent 084d2ca commit 78e4e34

6 files changed

+91
-27
lines changed

db/migrations.latest

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
20241018154315
1+
20241104115955
22

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
BEGIN;
2+
3+
ALTER TABLE tasks DROP COLUMN IF EXISTS cancel_attempted;
4+
5+
COMMIT;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
BEGIN;
2+
3+
ALTER TABLE tasks ADD COLUMN IF NOT EXISTS cancel_attempted BOOLEAN DEFAULT FALSE;
4+
5+
UPDATE tasks SET cancel_attempted = true WHERE status = 'canceled';
6+
7+
COMMIT;

pkg/models/task_info.go

+20-19
Original file line numberDiff line numberDiff line change
@@ -12,25 +12,26 @@ import (
1212
// Shared by DAO and queue packages
1313
// GORM only used in DAO to read from table
1414
type TaskInfo struct {
15-
Id uuid.UUID `gorm:"primary_key;column:id"`
16-
Typename string `gorm:"column:type"` // "introspect" or "snapshot"
17-
Payload json.RawMessage `gorm:"type:jsonb"`
18-
OrgId string
19-
AccountId string
20-
ObjectUUID uuid.UUID
21-
ObjectType *string
22-
Dependencies pq.StringArray `gorm:"->;column:t_dependencies;type:text[]"`
23-
Dependents pq.StringArray `gorm:"->;column:t_dependents;type:text[]"`
24-
Token uuid.UUID
25-
Queued *time.Time `gorm:"column:queued_at"`
26-
Started *time.Time `gorm:"column:started_at"`
27-
Finished *time.Time `gorm:"column:finished_at"`
28-
Error *string
29-
Status string
30-
RequestID string
31-
Retries int
32-
NextRetryTime *time.Time
33-
Priority int
15+
Id uuid.UUID `gorm:"primary_key;column:id"`
16+
Typename string `gorm:"column:type"` // "introspect" or "snapshot"
17+
Payload json.RawMessage `gorm:"type:jsonb"`
18+
OrgId string
19+
AccountId string
20+
ObjectUUID uuid.UUID
21+
ObjectType *string
22+
Dependencies pq.StringArray `gorm:"->;column:t_dependencies;type:text[]"`
23+
Dependents pq.StringArray `gorm:"->;column:t_dependents;type:text[]"`
24+
Token uuid.UUID
25+
Queued *time.Time `gorm:"column:queued_at"`
26+
Started *time.Time `gorm:"column:started_at"`
27+
Finished *time.Time `gorm:"column:finished_at"`
28+
Error *string
29+
Status string
30+
RequestID string
31+
Retries int
32+
NextRetryTime *time.Time
33+
Priority int
34+
CancelAttempted bool
3435
}
3536

3637
type TaskInfoRepositoryConfiguration struct {

pkg/tasks/queue/pgqueue.go

+10-7
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
"github.com/rs/zerolog/log"
2525
)
2626

27-
const taskInfoReturning = ` id, type, payload, queued_at, started_at, finished_at, status, error, org_id, object_uuid, object_type, token, request_id, retries, next_retry_time, priority ` // fields to return when returning taskInfo
27+
const taskInfoReturning = ` id, type, payload, queued_at, started_at, finished_at, status, error, org_id, object_uuid, object_type, token, request_id, retries, next_retry_time, priority, cancel_attempted ` // fields to return when returning taskInfo
2828

2929
const (
3030
sqlNotify = `NOTIFY tasks`
@@ -54,14 +54,14 @@ const (
5454
sqlRequeueFailedTasks = `
5555
WITH v1 AS (
5656
SELECT * FROM tasks t LEFT JOIN task_dependencies td ON (t.id = td.dependency_id)
57-
WHERE (started_at IS NOT NULL AND finished_at IS NOT NULL AND status = 'failed' AND retries < 3 AND next_retry_time <= clock_timestamp() AND type = ANY($1::text[]))
57+
WHERE (started_at IS NOT NULL AND finished_at IS NOT NULL AND status = 'failed' AND retries < 3 AND next_retry_time <= clock_timestamp() AND type = ANY($1::text[]) AND cancel_attempted = false)
5858
)
5959
UPDATE tasks SET started_at = NULL, finished_at = NULL, token = NULL, status = 'pending', retries = retries + 1, queued_at = clock_timestamp()
6060
FROM (
6161
SELECT tasks.id
6262
FROM tasks, v1
6363
WHERE v1.task_id = tasks.id
64-
OR (tasks.started_at IS NOT NULL AND tasks.finished_at IS NOT NULL AND tasks.status = 'failed' AND tasks.retries < 3 AND tasks.next_retry_time <= clock_timestamp() AND tasks.type = ANY($1::text[]))
64+
OR (tasks.started_at IS NOT NULL AND tasks.finished_at IS NOT NULL AND tasks.status = 'failed' AND tasks.retries < 3 AND tasks.next_retry_time <= clock_timestamp() AND tasks.type = ANY($1::text[]) AND tasks.cancel_attempted = false)
6565
) t1
6666
WHERE tasks.id = t1.id`
6767

@@ -96,7 +96,7 @@ const (
9696
RETURNING finished_at`
9797
sqlCancelTask = `
9898
UPDATE tasks
99-
SET status = 'canceled', error = (left($2, 4000))
99+
SET status = 'canceled', error = (left($2, 4000)), cancel_attempted = true
100100
WHERE id = $1 AND finished_at IS NULL`
101101
// sqlUpdatePayload
102102
sqlUpdatePayload = `
@@ -409,7 +409,7 @@ func (p *PgQueue) dequeueMaybe(ctx context.Context, token uuid.UUID, taskTypes [
409409
err = tx.QueryRow(ctx, sqlDequeue, token, taskTypes).Scan(
410410
&info.Id, &info.Typename, &info.Payload, &info.Queued, &info.Started, &info.Finished, &info.Status,
411411
&info.Error, &info.OrgId, &info.ObjectUUID, &info.ObjectType, &info.Token, &info.RequestID,
412-
&info.Retries, &info.NextRetryTime, &info.Priority,
412+
&info.Retries, &info.NextRetryTime, &info.Priority, &info.CancelAttempted,
413413
)
414414
if err != nil {
415415
return nil, fmt.Errorf("error during dequeue query: %w", err)
@@ -473,7 +473,7 @@ func (p *PgQueue) Status(taskId uuid.UUID) (*models.TaskInfo, error) {
473473
err = conn.QueryRow(context.Background(), sqlQueryTaskStatus, taskId).Scan(
474474
&info.Id, &info.Typename, &info.Payload, &info.Queued, &info.Started, &info.Finished, &info.Status,
475475
&info.Error, &info.OrgId, &info.ObjectUUID, &info.ObjectType, &info.Token, &info.RequestID,
476-
&info.Retries, &info.NextRetryTime, &info.Priority,
476+
&info.Retries, &info.NextRetryTime, &info.Priority, &info.CancelAttempted,
477477
)
478478
if err != nil {
479479
return nil, err
@@ -534,7 +534,7 @@ func (p *PgQueue) Finish(taskId uuid.UUID, taskError error) error {
534534
if err != nil {
535535
return fmt.Errorf("error removing task %s from heartbeats: %v", taskId, err)
536536
}
537-
if tag.RowsAffected() != 1 {
537+
if tag.RowsAffected() != 1 && info.Status != config.TaskStatusCanceled {
538538
logger := log.Logger.With().Str("task_id", taskId.String()).Logger()
539539
logger.Warn().Msgf("error finishing task: error deleting heartbeat: heartbeat not found. was this task requeued recently?")
540540
}
@@ -669,6 +669,9 @@ func (p *PgQueue) Requeue(taskId uuid.UUID) error {
669669
if err == pgx.ErrNoRows {
670670
return ErrNotExist
671671
}
672+
if info.CancelAttempted && info.Status != config.TaskStatusRunning {
673+
return ErrTaskCanceled
674+
}
672675
if info.Started == nil || info.Finished != nil {
673676
return ErrNotRunning
674677
}

pkg/tasks/queue/pgqueue_test.go

+48
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,54 @@ func (s *QueueSuite) TestRequeueFailedTasks() {
315315
assert.True(s.T(), info.Queued.After(*originalQueueTime))
316316
}
317317

318+
func (s *QueueSuite) TestCannotRequeueCanceledTasks() {
319+
id, err := s.queue.Enqueue(&testTask)
320+
require.NoError(s.T(), err)
321+
assert.NotEqual(s.T(), uuid.Nil, id)
322+
323+
_, err = s.queue.Status(id)
324+
require.NoError(s.T(), err)
325+
326+
_, err = s.queue.Dequeue(context.Background(), []string{testTaskType})
327+
require.NoError(s.T(), err)
328+
329+
err = s.queue.Cancel(context.Background(), id)
330+
require.NoError(s.T(), err)
331+
332+
err = s.queue.Requeue(id)
333+
assert.ErrorIs(s.T(), err, ErrTaskCanceled)
334+
}
335+
336+
func (s *QueueSuite) TestCannotRequeueCanceledFailedTasks() {
337+
config.Get().Tasking.RetryWaitUpperBound = 0
338+
339+
id, err := s.queue.Enqueue(&testTask)
340+
require.NoError(s.T(), err)
341+
assert.NotEqual(s.T(), uuid.Nil, id)
342+
343+
info, err := s.queue.Status(id)
344+
require.NoError(s.T(), err)
345+
originalQueueTime := info.Queued
346+
347+
_, err = s.queue.Dequeue(context.Background(), []string{testTaskType})
348+
require.NoError(s.T(), err)
349+
350+
err = s.queue.Cancel(context.Background(), id)
351+
require.NoError(s.T(), err)
352+
353+
err = s.queue.Finish(id, fmt.Errorf("something went wrong"))
354+
require.NoError(s.T(), err)
355+
356+
err = s.queue.RequeueFailedTasks([]string{testTaskType})
357+
assert.NoError(s.T(), err)
358+
359+
info, err = s.queue.Status(id)
360+
require.NoError(s.T(), err)
361+
assert.Equal(s.T(), config.TaskStatusFailed, info.Status)
362+
assert.Equal(s.T(), true, info.CancelAttempted)
363+
assert.True(s.T(), info.Queued.Equal(*originalQueueTime))
364+
}
365+
318366
func (s *QueueSuite) TestRequeueFailedTasksExceedRetries() {
319367
config.Get().Tasking.RetryWaitUpperBound = 0
320368

0 commit comments

Comments
 (0)