diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 25c532c..159066c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -19,7 +19,7 @@ jobs: strategy: matrix: # Test only supported versions: https://endoflife.date/ruby - ruby-version: [ '3.0', '3.1', '3.2', '3.3', head, jruby, jruby-head, truffleruby, truffleruby-head ] + ruby-version: [ '3.0', '3.1', '3.2', '3.3', jruby, truffleruby ] steps: - uses: actions/checkout@v3 diff --git a/README.md b/README.md index 0c0e76b..c8825d4 100644 --- a/README.md +++ b/README.md @@ -324,6 +324,52 @@ class MyJob < ActiveJob::Base end ``` +### Grouping jobs into batches + +```ruby +job = MyJob.perform_later +other_job = OtherJob.perform_later + +batch = ActiveJob::Status::Batch.new([job, other_job]) +batch.status +# "queued" +``` + +The batch status can be `queued`, `failed`, `completed` or `working`. + +1. The batch is considered `queued` if **all** of the jobs are `queued` +2. The batch is considered `failed` if **one** of the jobs is `failed` +3. The batch is considered `completed` if **all** of the jobs are `completed` +4. The batch is considered `working` in all other circumstances + +### Callbacks + +You can implement callbacks, by listening to the completion of a batch with a +simple ActiveJob job. + +```ruby +# frozen_string_literal: true + +require 'activejob-status' + +class CallbacksJob < ApplicationJob + queue_as :real_time + + def perform(*job_ids) + batch = ActiveJob::Status::Batch.new(job_ids) + + case batch.status + when :queued, :working + MonitorAnalysisBatchJob.set(wait: 5.seconds).perform_later(*job_ids) + when :completed + # Completed callback + when :failed + # Failed callback + end + end +end +``` + ## ActiveJob::Status and exceptions Internally, ActiveJob::Status uses `ActiveSupport#rescue_from` to catch every `Exception` to apply the `failed` status diff --git a/lib/activejob-status.rb b/lib/activejob-status.rb index 16ca5c5..c263ff7 100644 --- a/lib/activejob-status.rb +++ b/lib/activejob-status.rb @@ -8,6 +8,7 @@ require "activejob-status/status" require "activejob-status/progress" require "activejob-status/throttle" +require "activejob-status/batch" module ActiveJob module Status diff --git a/lib/activejob-status/batch.rb b/lib/activejob-status/batch.rb new file mode 100644 index 0000000..d0d4801 --- /dev/null +++ b/lib/activejob-status/batch.rb @@ -0,0 +1,31 @@ +# frozen_string_literal: true + +module ActiveJob + module Status + class Batch + def initialize(jobs) + @jobs = jobs + @storage = ActiveJob::Status::Storage.new + end + + def status + statuses = read.values.pluck(:status) + + if statuses.include?(:failed) + :failed + elsif statuses.all?(:queued) + :queued + elsif statuses.all?(:completed) + :completed + else + :working + end + end + + def read + @storage.read_multi(@jobs) + end + alias_method :to_h, :read + end + end +end diff --git a/lib/activejob-status/storage.rb b/lib/activejob-status/storage.rb index da0f7d6..d0c9235 100644 --- a/lib/activejob-status/storage.rb +++ b/lib/activejob-status/storage.rb @@ -26,6 +26,12 @@ def read(job) store.read(key(job)) || {} end + def read_multi(jobs) + keys = jobs.map { |job| key(job) } + data = store.read_multi(*keys) + keys.index_with { |k| data.fetch(k, {}) } + end + def write(job, message, force: false) @throttle.wrap(force: force) do store.write(key(job), message, expires_in: @expires_in) diff --git a/spec/specs/active_job/status/batch_spec.rb b/spec/specs/active_job/status/batch_spec.rb new file mode 100644 index 0000000..8704ec5 --- /dev/null +++ b/spec/specs/active_job/status/batch_spec.rb @@ -0,0 +1,52 @@ +# frozen_string_literal: true + +require_relative "../../../spec_helper" +require_relative "../../../jobs/test_jobs" + +RSpec.describe ActiveJob::Status::Batch do + describe "#status" do + it "returns queued when all jobs are queued" do + first_job = BaseJob.perform_later + second_job = BaseJob.perform_later + batch = described_class.new([first_job, second_job]) + + ActiveJob::Status.get(first_job).update(status: :queued) + ActiveJob::Status.get(second_job).update(status: :queued) + + expect(batch.status).to eq(:queued) + end + + it "returns failed when one job is failed" do + first_job = BaseJob.perform_later + second_job = BaseJob.perform_later + batch = described_class.new([first_job, second_job]) + + ActiveJob::Status.get(first_job).update(status: :failed) + ActiveJob::Status.get(second_job).update(status: :completed) + + expect(batch.status).to eq(:failed) + end + + it "returns completed when all jobs are completed" do + first_job = BaseJob.perform_later + second_job = BaseJob.perform_later + batch = described_class.new([first_job, second_job]) + + ActiveJob::Status.get(first_job).update(status: :completed) + ActiveJob::Status.get(second_job).update(status: :completed) + + expect(batch.status).to eq(:completed) + end + + it "returns working in other cases" do + first_job = BaseJob.perform_later + second_job = BaseJob.perform_later + batch = described_class.new([first_job, second_job]) + + ActiveJob::Status.get(first_job).update(status: :queued) + ActiveJob::Status.get(second_job).update(status: :working) + + expect(batch.status).to eq(:working) + end + end +end