Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 18 additions & 20 deletions test/integration/recurring_tasks_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,17 @@ class RecurringTasksTest < ActiveSupport::TestCase
self.use_transactional_tests = false

setup do
@pid = run_supervisor_as_fork(skip_recurring: false)
# 1 supervisor + 2 workers + 1 dispatcher + 1 scheduler
wait_for_registered_processes(5, timeout: 3.second)
@pid = run_recurring_supervisor
end

teardown do
terminate_process(@pid) if process_exists?(@pid)

SolidQueue::Process.destroy_all
SolidQueue::Job.destroy_all
SolidQueue::RecurringTask.delete_all
JobResult.delete_all
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this from teardown since it's already handled in test_helper.rb teardown

terminate_gracefully(@pid)
end

test "enqueue and process periodic tasks" do
wait_for_jobs_to_be_enqueued(2, timeout: 2.5.seconds)
wait_for_jobs_to_finish_for(2.5.seconds)

terminate_process(@pid)

skip_active_record_query_cache do
assert SolidQueue::Job.count >= 2
SolidQueue::Job.all.each do |job|
Expand All @@ -43,20 +34,14 @@ class RecurringTasksTest < ActiveSupport::TestCase

test "persist and delete configured tasks" do
configured_task = { periodic_store_result: { class: "StoreResultJob", schedule: "every second" } }
# Wait for concurrency schedule loading after process registration
sleep(0.5)

assert_recurring_tasks configured_task
terminate_process(@pid)

task = SolidQueue::RecurringTask.find_by(key: "periodic_store_result")
task.update!(class_name: "StoreResultJob", schedule: "every minute", arguments: [ 42 ])

@pid = run_supervisor_as_fork(skip_recurring: false)
wait_for_registered_processes(5, timeout: 3.second)
terminate_gracefully(@pid)

# Wait for concurrency schedule loading after process registration
sleep(0.5)
@pid = run_recurring_supervisor

assert_recurring_tasks configured_task

Expand All @@ -72,7 +57,6 @@ class RecurringTasksTest < ActiveSupport::TestCase

assert_recurring_tasks updated_task

terminate_process(@pid)
scheduler1.stop
scheduler2.stop
end
Expand All @@ -92,4 +76,18 @@ def assert_recurring_tasks(expected_tasks)
end
end
end

def run_recurring_supervisor
pid = run_supervisor_as_fork(skip_recurring: false)
wait_for_registered_processes(5, timeout: 3.seconds) # 1 supervisor + 2 workers + 1 dispatcher + 1 scheduler
sleep 1.second # Wait for concurrency schedule loading after process registration
pid
end

def terminate_gracefully(pid)
return if pid.nil? || !process_exists?(pid)

terminate_process(pid)
wait_for_registered_processes(0, timeout: SolidQueue.shutdown_timeout)
end
end