diff --git a/lib/kue.js b/lib/kue.js index 533bf20c..74f131f4 100755 --- a/lib/kue.js +++ b/lib/kue.js @@ -278,19 +278,23 @@ Queue.prototype.watchStuckJobs = function( ms ) { prefix = '{' + prefix + '}'; } var script = - 'local msg = redis.call( "keys", "' + prefix + ':jobs:*:inactive" )\n\ + 'local cursor = "0"\n\ local need_fix = 0\n\ - for i,v in ipairs(msg) do\n\ - local queue = redis.call( "zcard", v )\n\ - local jt = string.match(v, "' + prefix + ':jobs:(.*):inactive")\n\ - local pending = redis.call( "LLEN", "' + prefix + ':" .. jt .. ":jobs" )\n\ - if queue > pending then\n\ - need_fix = need_fix + 1\n\ - for j=1,(queue-pending) do\n\ - redis.call( "lpush", "' + prefix + ':"..jt..":jobs", 1 )\n\ + repeat\n\ + local window = redis.call( "scan", cursor, "match", "' + prefix + ':jobs:*:inactive", "COUNT", "100")\n\ + cursor = window[1]\n\ + for i,v in ipairs(window[2]) do\n\ + local queue = redis.call( "zcard", v )\n\ + local jt = string.match(v, "' + prefix + ':jobs:(.*):inactive")\n\ + local pending = redis.call( "LLEN", "' + prefix + ':" .. jt .. ":jobs" )\n\ + if queue > pending then\n\ + need_fix = need_fix + 1\n\ + for j=1,(queue-pending) do\n\ + redis.call( "lpush", "' + prefix + ':"..jt..":jobs", 1 )\n\ + end\n\ + end\n\ end\n\ - end\n\ - end\n\ + until (cursor == "0")\n\ return need_fix'; clearInterval(this.stuck_job_watch); client.script('LOAD', script, function( err, sha ) {