Skip to content

Commit

Permalink
Merge pull request #32 from matteeyah/main
Browse files Browse the repository at this point in the history
  • Loading branch information
inkstak committed Feb 18, 2024
2 parents 98b8c35 + d2a7f4f commit 1416b4c
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
strategy:
matrix:
# Test only supported versions: https://endoflife.date/ruby
ruby-version: [ '3.0', '3.1', '3.2', '3.3', head, jruby, jruby-head, truffleruby, truffleruby-head ]
ruby-version: [ '3.0', '3.1', '3.2', '3.3', jruby, truffleruby ]

steps:
- uses: actions/checkout@v3
Expand Down
46 changes: 46 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,52 @@ class MyJob < ActiveJob::Base
end
```

### Grouping jobs into batches

```ruby
job = MyJob.perform_later
other_job = OtherJob.perform_later

batch = ActiveJob::Status::Batch.new([job, other_job])
batch.status
# "queued"
```

The batch status can be `queued`, `failed`, `completed` or `working`.

1. The batch is considered `queued` if **all** of the jobs are `queued`
2. The batch is considered `failed` if **one** of the jobs is `failed`
3. The batch is considered `completed` if **all** of the jobs are `completed`
4. The batch is considered `working` in all other circumstances

### Callbacks

You can implement callbacks, by listening to the completion of a batch with a
simple ActiveJob job.

```ruby
# frozen_string_literal: true

require 'activejob-status'

class CallbacksJob < ApplicationJob
queue_as :real_time

def perform(*job_ids)
batch = ActiveJob::Status::Batch.new(job_ids)

case batch.status
when :queued, :working
MonitorAnalysisBatchJob.set(wait: 5.seconds).perform_later(*job_ids)
when :completed
# Completed callback
when :failed
# Failed callback
end
end
end
```

## ActiveJob::Status and exceptions

Internally, ActiveJob::Status uses `ActiveSupport#rescue_from` to catch every `Exception` to apply the `failed` status
Expand Down
1 change: 1 addition & 0 deletions lib/activejob-status.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
require "activejob-status/status"
require "activejob-status/progress"
require "activejob-status/throttle"
require "activejob-status/batch"

module ActiveJob
module Status
Expand Down
31 changes: 31 additions & 0 deletions lib/activejob-status/batch.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# frozen_string_literal: true

module ActiveJob
module Status
class Batch
def initialize(jobs)
@jobs = jobs
@storage = ActiveJob::Status::Storage.new
end

def status
statuses = read.values.pluck(:status)

if statuses.include?(:failed)
:failed
elsif statuses.all?(:queued)
:queued
elsif statuses.all?(:completed)
:completed
else
:working
end
end

def read
@storage.read_multi(@jobs)
end
alias_method :to_h, :read
end
end
end
6 changes: 6 additions & 0 deletions lib/activejob-status/storage.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ def read(job)
store.read(key(job)) || {}
end

def read_multi(jobs)
keys = jobs.map { |job| key(job) }
data = store.read_multi(*keys)
keys.index_with { |k| data.fetch(k, {}) }
end

def write(job, message, force: false)
@throttle.wrap(force: force) do
store.write(key(job), message, expires_in: @expires_in)
Expand Down
52 changes: 52 additions & 0 deletions spec/specs/active_job/status/batch_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# frozen_string_literal: true

require_relative "../../../spec_helper"
require_relative "../../../jobs/test_jobs"

RSpec.describe ActiveJob::Status::Batch do
describe "#status" do
it "returns queued when all jobs are queued" do
first_job = BaseJob.perform_later
second_job = BaseJob.perform_later
batch = described_class.new([first_job, second_job])

ActiveJob::Status.get(first_job).update(status: :queued)
ActiveJob::Status.get(second_job).update(status: :queued)

expect(batch.status).to eq(:queued)
end

it "returns failed when one job is failed" do
first_job = BaseJob.perform_later
second_job = BaseJob.perform_later
batch = described_class.new([first_job, second_job])

ActiveJob::Status.get(first_job).update(status: :failed)
ActiveJob::Status.get(second_job).update(status: :completed)

expect(batch.status).to eq(:failed)
end

it "returns completed when all jobs are completed" do
first_job = BaseJob.perform_later
second_job = BaseJob.perform_later
batch = described_class.new([first_job, second_job])

ActiveJob::Status.get(first_job).update(status: :completed)
ActiveJob::Status.get(second_job).update(status: :completed)

expect(batch.status).to eq(:completed)
end

it "returns working in other cases" do
first_job = BaseJob.perform_later
second_job = BaseJob.perform_later
batch = described_class.new([first_job, second_job])

ActiveJob::Status.get(first_job).update(status: :queued)
ActiveJob::Status.get(second_job).update(status: :working)

expect(batch.status).to eq(:working)
end
end
end

0 comments on commit 1416b4c

Please sign in to comment.