From 1b8c9922d2a26e2151d9ca822c5e73ba71e1527c Mon Sep 17 00:00:00 2001 From: Sebastien Savater Date: Sun, 18 Feb 2024 17:06:55 +0100 Subject: [PATCH 1/6] Remove ruby head version from CI --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 25c532c..159066c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -19,7 +19,7 @@ jobs: strategy: matrix: # Test only supported versions: https://endoflife.date/ruby - ruby-version: [ '3.0', '3.1', '3.2', '3.3', head, jruby, jruby-head, truffleruby, truffleruby-head ] + ruby-version: [ '3.0', '3.1', '3.2', '3.3', jruby, truffleruby ] steps: - uses: actions/checkout@v3 From 3a3ecfdff03fc31a4e599304d0a382799e77c654 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matija=20=C4=8Cupi=C4=87?= Date: Sun, 14 Jan 2024 01:40:01 +0100 Subject: [PATCH 2/6] Implement job batches --- README.md | 18 ++++++++ lib/activejob-status.rb | 1 + lib/activejob-status/batch.rb | 23 ++++++++++ spec/specs/active_job/status/batch_spec.rb | 52 ++++++++++++++++++++++ 4 files changed, 94 insertions(+) create mode 100644 lib/activejob-status/batch.rb create mode 100644 spec/specs/active_job/status/batch_spec.rb diff --git a/README.md b/README.md index 0c0e76b..0f78d46 100644 --- a/README.md +++ b/README.md @@ -324,6 +324,24 @@ class MyJob < ActiveJob::Base end ``` +### Grouping jobs into batches + +```ruby +job = MyJob.perform_later +other_job = OtherJob.perform_later + +batch = ActiveJob::Status::Batch.new(job, other_job) +batch.status +# "queued" +``` + +The batch status can be `queued`, `failed`, `completed` or `working`. + +1. The batch is considered `queued` if **all** of the jobs are `queued` +2. The batch is considered `failed` if **one** of the jobs is `failed` +3. The batch is considered `completed` if **all** of the jobs are `completed` +4. The batch is considered `working` in all other circumstances + ## ActiveJob::Status and exceptions Internally, ActiveJob::Status uses `ActiveSupport#rescue_from` to catch every `Exception` to apply the `failed` status diff --git a/lib/activejob-status.rb b/lib/activejob-status.rb index 16ca5c5..c263ff7 100644 --- a/lib/activejob-status.rb +++ b/lib/activejob-status.rb @@ -8,6 +8,7 @@ require "activejob-status/status" require "activejob-status/progress" require "activejob-status/throttle" +require "activejob-status/batch" module ActiveJob module Status diff --git a/lib/activejob-status/batch.rb b/lib/activejob-status/batch.rb new file mode 100644 index 0000000..3c37001 --- /dev/null +++ b/lib/activejob-status/batch.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true + +module ActiveJob + module Status + class Batch + def initialize(*jobs) + @statuses = jobs.map { |job| ActiveJob::Status.get(job) } + end + + def status + if @statuses.all? { |status| status[:status] == :queued } + :queued + elsif @statuses.any? { |status| status[:status] == :failed } + :failed + elsif @statuses.all? { |status| status[:status] == :completed } + :completed + else + :working + end + end + end + end +end diff --git a/spec/specs/active_job/status/batch_spec.rb b/spec/specs/active_job/status/batch_spec.rb new file mode 100644 index 0000000..884cffa --- /dev/null +++ b/spec/specs/active_job/status/batch_spec.rb @@ -0,0 +1,52 @@ +# frozen_string_literal: true + +require_relative "../../../spec_helper" +require_relative "../../../jobs/test_jobs" + +RSpec.describe ActiveJob::Status::Batch do + describe "#status" do + it "returns queued when all jobs are queued" do + first_job = BaseJob.perform_later + second_job = BaseJob.perform_later + batch = described_class.new(first_job, second_job) + + ActiveJob::Status.get(first_job).update(status: :queued) + ActiveJob::Status.get(second_job).update(status: :queued) + + expect(batch.status).to eq(:queued) + end + + it "returns failed when one job is failed" do + first_job = BaseJob.perform_later + second_job = BaseJob.perform_later + batch = described_class.new(first_job, second_job) + + ActiveJob::Status.get(first_job).update(status: :failed) + ActiveJob::Status.get(second_job).update(status: :completed) + + expect(batch.status).to eq(:failed) + end + + it "returns completed when all jobs are completed" do + first_job = BaseJob.perform_later + second_job = BaseJob.perform_later + batch = described_class.new(first_job, second_job) + + ActiveJob::Status.get(first_job).update(status: :completed) + ActiveJob::Status.get(second_job).update(status: :completed) + + expect(batch.status).to eq(:completed) + end + + it "returns working in other cases" do + first_job = BaseJob.perform_later + second_job = BaseJob.perform_later + batch = described_class.new(first_job, second_job) + + ActiveJob::Status.get(first_job).update(status: :queued) + ActiveJob::Status.get(second_job).update(status: :working) + + expect(batch.status).to eq(:working) + end + end +end From 46fe181e0d41ccc844d8c57787b12c6910a9500f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matija=20=C4=8Cupi=C4=87?= Date: Thu, 25 Jan 2024 15:59:06 +0100 Subject: [PATCH 3/6] Document callbacks --- README.md | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/README.md b/README.md index 0f78d46..0db1f54 100644 --- a/README.md +++ b/README.md @@ -342,6 +342,34 @@ The batch status can be `queued`, `failed`, `completed` or `working`. 3. The batch is considered `completed` if **all** of the jobs are `completed` 4. The batch is considered `working` in all other circumstances +### Callbacks + +You can implement callbacks, by listening to the completion of a batch with a +simple ActiveJob job. + +```ruby +# frozen_string_literal: true + +require 'activejob-status' + +class CallbacksJob < ApplicationJob + queue_as :real_time + + def perform(*job_ids) + batch = ActiveJob::Status::Batch.new(*job_ids) + + case batch.status + when :queued, :working + MonitorAnalysisBatchJob.set(wait: 5.seconds).perform_later(*job_ids) + when :completed + # Completed callback + when :failed + # Failed callback + end + end +end +``` + ## ActiveJob::Status and exceptions Internally, ActiveJob::Status uses `ActiveSupport#rescue_from` to catch every `Exception` to apply the `failed` status From 5224b67409eacecb0408b50c4400e3ab6b746a93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matija=20=C4=8Cupi=C4=87?= Date: Sat, 27 Jan 2024 13:41:20 +0100 Subject: [PATCH 4/6] Pass jobs as an array --- README.md | 4 ++-- lib/activejob-status/batch.rb | 2 +- spec/specs/active_job/status/batch_spec.rb | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 0db1f54..c8825d4 100644 --- a/README.md +++ b/README.md @@ -330,7 +330,7 @@ end job = MyJob.perform_later other_job = OtherJob.perform_later -batch = ActiveJob::Status::Batch.new(job, other_job) +batch = ActiveJob::Status::Batch.new([job, other_job]) batch.status # "queued" ``` @@ -356,7 +356,7 @@ class CallbacksJob < ApplicationJob queue_as :real_time def perform(*job_ids) - batch = ActiveJob::Status::Batch.new(*job_ids) + batch = ActiveJob::Status::Batch.new(job_ids) case batch.status when :queued, :working diff --git a/lib/activejob-status/batch.rb b/lib/activejob-status/batch.rb index 3c37001..3027a1a 100644 --- a/lib/activejob-status/batch.rb +++ b/lib/activejob-status/batch.rb @@ -3,7 +3,7 @@ module ActiveJob module Status class Batch - def initialize(*jobs) + def initialize(jobs) @statuses = jobs.map { |job| ActiveJob::Status.get(job) } end diff --git a/spec/specs/active_job/status/batch_spec.rb b/spec/specs/active_job/status/batch_spec.rb index 884cffa..8704ec5 100644 --- a/spec/specs/active_job/status/batch_spec.rb +++ b/spec/specs/active_job/status/batch_spec.rb @@ -8,7 +8,7 @@ it "returns queued when all jobs are queued" do first_job = BaseJob.perform_later second_job = BaseJob.perform_later - batch = described_class.new(first_job, second_job) + batch = described_class.new([first_job, second_job]) ActiveJob::Status.get(first_job).update(status: :queued) ActiveJob::Status.get(second_job).update(status: :queued) @@ -19,7 +19,7 @@ it "returns failed when one job is failed" do first_job = BaseJob.perform_later second_job = BaseJob.perform_later - batch = described_class.new(first_job, second_job) + batch = described_class.new([first_job, second_job]) ActiveJob::Status.get(first_job).update(status: :failed) ActiveJob::Status.get(second_job).update(status: :completed) @@ -30,7 +30,7 @@ it "returns completed when all jobs are completed" do first_job = BaseJob.perform_later second_job = BaseJob.perform_later - batch = described_class.new(first_job, second_job) + batch = described_class.new([first_job, second_job]) ActiveJob::Status.get(first_job).update(status: :completed) ActiveJob::Status.get(second_job).update(status: :completed) @@ -41,7 +41,7 @@ it "returns working in other cases" do first_job = BaseJob.perform_later second_job = BaseJob.perform_later - batch = described_class.new(first_job, second_job) + batch = described_class.new([first_job, second_job]) ActiveJob::Status.get(first_job).update(status: :queued) ActiveJob::Status.get(second_job).update(status: :working) From c4e0f6e3547c356d3039b44d97772768c2ef9c2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matija=20=C4=8Cupi=C4=87?= Date: Sat, 27 Jan 2024 14:45:30 +0100 Subject: [PATCH 5/6] Use read_multi for reading statuses --- lib/activejob-status/batch.rb | 19 +++++++++++++++---- lib/activejob-status/storage.rb | 4 ++++ 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/lib/activejob-status/batch.rb b/lib/activejob-status/batch.rb index 3027a1a..fb05efa 100644 --- a/lib/activejob-status/batch.rb +++ b/lib/activejob-status/batch.rb @@ -4,20 +4,31 @@ module ActiveJob module Status class Batch def initialize(jobs) - @statuses = jobs.map { |job| ActiveJob::Status.get(job) } + @jobs = jobs + @storage = ActiveJob::Status::Storage.new end def status - if @statuses.all? { |status| status[:status] == :queued } + if @jobs.all? { |job| status_for(job) == :queued } :queued - elsif @statuses.any? { |status| status[:status] == :failed } + elsif @jobs.any? { |job| status_for(job) == :failed } :failed - elsif @statuses.all? { |status| status[:status] == :completed } + elsif @jobs.all? { |job| status_for(job) == :completed } :completed else :working end end + + private + + def statuses + @statuses ||= @storage.read_multi(@jobs) + end + + def status_for(job) + statuses.dig(@storage.key(job), :status) + end end end end diff --git a/lib/activejob-status/storage.rb b/lib/activejob-status/storage.rb index da0f7d6..455fb73 100644 --- a/lib/activejob-status/storage.rb +++ b/lib/activejob-status/storage.rb @@ -26,6 +26,10 @@ def read(job) store.read(key(job)) || {} end + def read_multi(jobs) + store.read_multi(*jobs.map { |job| key(job) }) + end + def write(job, message, force: false) @throttle.wrap(force: force) do store.write(key(job), message, expires_in: @expires_in) From d2a7f4faf83bddd38e44800e9fa71d79d481df15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matija=20=C4=8Cupi=C4=87?= Date: Sat, 27 Jan 2024 17:44:25 +0100 Subject: [PATCH 6/6] Standardize read_multi output --- lib/activejob-status/batch.rb | 21 +++++++++------------ lib/activejob-status/storage.rb | 4 +++- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/lib/activejob-status/batch.rb b/lib/activejob-status/batch.rb index fb05efa..d0d4801 100644 --- a/lib/activejob-status/batch.rb +++ b/lib/activejob-status/batch.rb @@ -9,26 +9,23 @@ def initialize(jobs) end def status - if @jobs.all? { |job| status_for(job) == :queued } - :queued - elsif @jobs.any? { |job| status_for(job) == :failed } + statuses = read.values.pluck(:status) + + if statuses.include?(:failed) :failed - elsif @jobs.all? { |job| status_for(job) == :completed } + elsif statuses.all?(:queued) + :queued + elsif statuses.all?(:completed) :completed else :working end end - private - - def statuses - @statuses ||= @storage.read_multi(@jobs) - end - - def status_for(job) - statuses.dig(@storage.key(job), :status) + def read + @storage.read_multi(@jobs) end + alias_method :to_h, :read end end end diff --git a/lib/activejob-status/storage.rb b/lib/activejob-status/storage.rb index 455fb73..d0c9235 100644 --- a/lib/activejob-status/storage.rb +++ b/lib/activejob-status/storage.rb @@ -27,7 +27,9 @@ def read(job) end def read_multi(jobs) - store.read_multi(*jobs.map { |job| key(job) }) + keys = jobs.map { |job| key(job) } + data = store.read_multi(*keys) + keys.index_with { |k| data.fetch(k, {}) } end def write(job, message, force: false)