From f91ec7c5779bdec30cd93e7ceb25f0e191721aa2 Mon Sep 17 00:00:00 2001 From: Peter Retzlaff Date: Mon, 27 Feb 2017 16:55:36 +0100 Subject: [PATCH 1/6] Support expected behavior of overlap option. If overlap is set to false, we don't want to enqueue jobs if there's already one running or enqueued. --- lib/resque/scheduler.rb | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/lib/resque/scheduler.rb b/lib/resque/scheduler.rb index 8ca91367..8f333a1e 100644 --- a/lib/resque/scheduler.rb +++ b/lib/resque/scheduler.rb @@ -421,9 +421,25 @@ def logger def enqueue_recurring(name, config) if master? - log! "queueing #{config['class']} (#{name})" - Resque.last_enqueued_at(name, Time.now.to_s) - enqueue(config) + allow_overlap = (config[:overlap] != false && config['overlap'] != false) + klass_name = config['class'] || config[:class] + begin + klass = Resque::Scheduler::Util.constantize(klass_name) + rescue NameError + klass = klass_name + end + + queue_name = config['queue'] || + config[:queue] || + Resque.queue_from_class(klass) + + if allow_overlap || !in_progress?(queue_name) + log! "queueing #{config['class']} (#{name})" + Resque.last_enqueued_at(name, Time.now.to_s) + enqueue(config) + else + log! "No overlap allowed. Not enqueueing #{config['class']} (#{name})" + end end end @@ -442,6 +458,15 @@ def build_procline(string) def internal_name "resque-scheduler-#{Resque::Scheduler::VERSION}" end + + def in_progress?(queue_name) + currently_processing = Resque::Worker.working.map(&:job).any? do |job| + job['queue'] == queue_name.to_s + end + return true if currently_processing + + Resque.peek(queue_name, 0, 5000).any? + end end end end From 9c387e8c61e1e3931d7b9d11ba0919194eb8e17c Mon Sep 17 00:00:00 2001 From: Peter Retzlaff Date: Mon, 27 Feb 2017 17:39:12 +0100 Subject: [PATCH 2/6] Refactor code related to overlap option. Add new method to get the queue_name and check if enqueuing is allowed. --- lib/resque/scheduler.rb | 64 ++++++++++++++++++++--------------------- 1 file changed, 31 insertions(+), 33 deletions(-) diff --git a/lib/resque/scheduler.rb b/lib/resque/scheduler.rb index 8f333a1e..332c79fc 100644 --- a/lib/resque/scheduler.rb +++ b/lib/resque/scheduler.rb @@ -225,18 +225,9 @@ def handle_shutdown # Enqueues a job based on a config hash def enqueue_from_config(job_config) args = job_config['args'] || job_config[:args] - - klass_name = job_config['class'] || job_config[:class] - begin - klass = Resque::Scheduler::Util.constantize(klass_name) - rescue NameError - klass = klass_name - end - params = args.is_a?(Hash) ? [args] : Array(args) - queue = job_config['queue'] || - job_config[:queue] || - Resque.queue_from_class(klass) + klass, klass_name, queue = queue_info_from(job_config) + # Support custom job classes like those that inherit from # Resque::JobWithStatus (resque-status) job_klass = job_config['custom_job_class'] @@ -419,21 +410,37 @@ def logger private - def enqueue_recurring(name, config) - if master? - allow_overlap = (config[:overlap] != false && config['overlap'] != false) - klass_name = config['class'] || config[:class] - begin - klass = Resque::Scheduler::Util.constantize(klass_name) - rescue NameError - klass = klass_name - end + def queue_info_from(config) + klass_name = config['class'] || config[:class] + begin + klass = Resque::Scheduler::Util.constantize(klass_name) + rescue NameError + klass = klass_name + end + + queue_name = config['queue'] || config[:queue] || Resque.queue_from_class(klass) + return klass, klass_name, queue_name + end + + def in_progress?(queue_name) + currently_processing = Resque::Worker.working.map(&:job).any? do |job| + job['queue'] == queue_name.to_s + end + return true if currently_processing - queue_name = config['queue'] || - config[:queue] || - Resque.queue_from_class(klass) + Resque.peek(queue_name, 0, 5000).any? + end + + def should_enqueue?(config) + allow_overlap = (config[:overlap] != false && config['overlap'] != false) + _, _, queue_name = queue_info_from(config) - if allow_overlap || !in_progress?(queue_name) + allow_overlap || !in_progress?(queue_name) + end + + def enqueue_recurring(name, config) + if master? + if should_enqueue?(config) log! "queueing #{config['class']} (#{name})" Resque.last_enqueued_at(name, Time.now.to_s) enqueue(config) @@ -458,15 +465,6 @@ def build_procline(string) def internal_name "resque-scheduler-#{Resque::Scheduler::VERSION}" end - - def in_progress?(queue_name) - currently_processing = Resque::Worker.working.map(&:job).any? do |job| - job['queue'] == queue_name.to_s - end - return true if currently_processing - - Resque.peek(queue_name, 0, 5000).any? - end end end end From cb3a7d50aeabef0055cf50a79251cde297d0fed9 Mon Sep 17 00:00:00 2001 From: Peter Retzlaff Date: Mon, 27 Feb 2017 18:00:56 +0100 Subject: [PATCH 3/6] Add myself to AUTHORS.md. --- AUTHORS.md | 1 + 1 file changed, 1 insertion(+) diff --git a/AUTHORS.md b/AUTHORS.md index fd0b877d..d61ad812 100644 --- a/AUTHORS.md +++ b/AUTHORS.md @@ -49,6 +49,7 @@ Resque Scheduler authors - Nickolas Means - Olek Janiszewski - Olivier Brisse +- Peter Retzlaff - Petteri Räty - Phil Cohen - Rob Olson From 81655e1b4b00c98ca50999f921f9db70891f0461 Mon Sep 17 00:00:00 2001 From: Peter Retzlaff Date: Mon, 27 Feb 2017 18:14:03 +0100 Subject: [PATCH 4/6] Avoid explicit return. --- lib/resque/scheduler.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/resque/scheduler.rb b/lib/resque/scheduler.rb index 332c79fc..00dc9357 100644 --- a/lib/resque/scheduler.rb +++ b/lib/resque/scheduler.rb @@ -419,7 +419,7 @@ def queue_info_from(config) end queue_name = config['queue'] || config[:queue] || Resque.queue_from_class(klass) - return klass, klass_name, queue_name + [klass, klass_name, queue_name] end def in_progress?(queue_name) From 0476e8bb4cb52ffa76711494ea9acf11d20e98d7 Mon Sep 17 00:00:00 2001 From: Peter Retzlaff Date: Sun, 19 Mar 2017 19:13:59 +0100 Subject: [PATCH 5/6] Refactor in_progress? method a little. --- lib/resque/scheduler.rb | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/resque/scheduler.rb b/lib/resque/scheduler.rb index 00dc9357..fc0dca0b 100644 --- a/lib/resque/scheduler.rb +++ b/lib/resque/scheduler.rb @@ -423,12 +423,11 @@ def queue_info_from(config) end def in_progress?(queue_name) - currently_processing = Resque::Worker.working.map(&:job).any? do |job| + currently_processing = Resque.working.map(&:job).any? do |job| job['queue'] == queue_name.to_s end - return true if currently_processing - Resque.peek(queue_name, 0, 5000).any? + currently_processing || (Resque.size(queue_name.to_s) > 0) end def should_enqueue?(config) From 52a6150e92f61f9c621c2829bbf6ef5886053a30 Mon Sep 17 00:00:00 2001 From: Peter Retzlaff Date: Thu, 13 Apr 2017 16:25:00 +0200 Subject: [PATCH 6/6] Replace accidental unicode character instead of space. --- lib/resque/scheduler.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/resque/scheduler.rb b/lib/resque/scheduler.rb index fc0dca0b..24491819 100644 --- a/lib/resque/scheduler.rb +++ b/lib/resque/scheduler.rb @@ -427,7 +427,7 @@ def in_progress?(queue_name) job['queue'] == queue_name.to_s end - currently_processing || (Resque.size(queue_name.to_s) > 0) + currently_processing || (Resque.size(queue_name.to_s) > 0) end def should_enqueue?(config)