Skip to content

Conversation

@manast
Copy link
Contributor

@manast manast commented Jan 26, 2025

Why

To be able to efficiently get all the bullmq queues that are available in a Redis host. This feature makes
queue discovery a mostly O(1) operation instead of O(n).

How

Added a new key with a fixed name: "bullmq:registry", that holds a ZSET with the fully queue qualified names. This queue names will be kept in the registry until the queue is completely obliterated.

Additional Notes (Optional)

@manast manast requested a review from roggervalf January 26, 2025 10:55
@manast manast requested a review from fgozdz January 26, 2025 11:46
@@ -0,0 +1 @@
export const BullMQRegistryKey = 'bullmq:registry';
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we use bull as prefix for all our keys, do we really need to change that pattern?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, because this is a global key, if we used a prefix then we would not be able to auto-discover the queues defeating the whole purpose. The idea is that a UI can easily get all the queues available in a Redis instance.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no sure why we are not be able to auto-discover queues. I think we should reuse same prefix that users provides with prefix option. Imagine the scenario where 2 different prefixes are use in the same redis instance. With this key, auto-discovery will return a combination of all queues independent of these prefixes, I think discovery should depend on the prefix that is passed by users or if not passed, use bull as the default value that is used by our other keys

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to segment the queues in prefixes you can still use this approach as the queue names are stored in the registry fully qualified. The problem we are having is that auto discovery is too slow as you must check every key in Redis to match a certain pattern, currently this one *:*:id so some users complain that it takes too much time it can even time out.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to segment the queues in prefixes you can still use this approach as the queue names are stored in the registry fully qualified

That means that users need to get all queue qualified names from discovery, then filter by prefix and then they will get all the queues related to that prefix.

The problem we are having is that auto discovery is too slow as you must check every key in Redis to match a certain pattern, currently this one ::id so some users complain that it takes too much time it can even time out.

I'm in favor of this functionality, my only concern is that we should respect the use of the prefix that could be provided by the user. I also use prefix for separation of queues and seeing some other users using custom prefixes. Please take in count that it would be the same functionality, the only consideration is to use the prefix that users provide when creating queues, our default value is bull

const client = await this.queue.client;

const keys: (string | number)[] = [
BullMQRegistryKey,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have this class for handling queue keys, https://github.com/taskforcesh/bullmq/blob/master/src/classes/queue-keys.ts we can add a new method to handle global keys

Copy link
Contributor Author

@manast manast Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, this is not really a Queue key, so I am not sure it fits so well on a class that generates queue keys :/


await delay(100);

const results = await Queue.getRegistry(client, 0, -1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need some documentation about this new method. Also a warning that if users do only use FlowProducers, discovery won't work

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, it should work for flow producers also, this was an oversight from my part.

let errorCode: number | null = null;
try {
const result = await queue.obliterate();
console.log(result);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these console logs can be removed

* @param start - zero based index from where to start returning jobs.
* @param end - zero based index where to stop returning jobs.
*/
static getRegistry(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

following my previous comment, what if we pass custom prefix here. So we can get the discovery related to that custom value

roggervalf
roggervalf previously approved these changes Feb 9, 2025
Copilot AI review requested due to automatic review settings September 2, 2025 01:14
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces a queue registry feature for BullMQ that enables efficient queue discovery through a Redis ZSET. Instead of scanning all Redis keys to find queues (O(n) operation), the registry provides O(1) queue lookup by maintaining a centralized registry.

Key changes:

  • Adds a global queue registry using Redis ZSET at key "bullmq:registry"
  • Automatically registers queues on creation and deregisters on obliteration
  • Provides a static method Queue.getRegistry() for paginated queue discovery

Reviewed Changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
src/consts/bullmq-registry-key.ts Defines the constant for the registry key name
src/classes/queue.ts Adds queue registration logic and getRegistry() method
src/classes/scripts.ts Updates obliterate script to include registry key
src/commands/obliterate-3.lua Modified Lua script to remove queues from registry during obliteration
python/bullmq/scripts.py Updates Python implementation to use new obliterate script
tests/test_queue.ts Comprehensive test coverage for registry functionality
tests/test_repeat.ts Timeout adjustment for test stability
tests/test_job_scheduler.ts Timeout adjustment for test stability

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment on lines +831 to +845
// not working as expected.
it.skip('should fail to obliterate if queue is not paused', async () => {
const queue = new Queue('test-registry-not-paused', {
connection: client,
prefix,
});
await queue.waitUntilReady();

let errorCode: number | null = null;
try {
const result = await queue.obliterate();
console.log(result);
} catch (err: any) {
console.log(err);
errorCode = err.message.includes('-1') ? -1 : null;
Copy link

Copilot AI Sep 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test contains debug console.log statements that should be removed from the final code. Additionally, using it.skip suggests this test is not working as expected and should either be fixed or removed entirely.

Copilot uses AI. Check for mistakes.
Comment on lines +103 to +104
-- Remove from BullMQ registry. baseKey has an ending colon that needs to be removed
rcall("ZREM", KEYS[1], string.sub(baseKey, 1, -2))
Copy link

Copilot AI Sep 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment about removing the ending colon is helpful, but the string manipulation using string.sub(baseKey, 1, -2) could be fragile. Consider adding a safety check to ensure baseKey actually ends with a colon before removing it.

Suggested change
-- Remove from BullMQ registry. baseKey has an ending colon that needs to be removed
rcall("ZREM", KEYS[1], string.sub(baseKey, 1, -2))
local baseKeyNoColon = baseKey
if string.sub(baseKey, -1) == ":" then
baseKeyNoColon = string.sub(baseKey, 1, -2)
end
rcall("ZREM", KEYS[1], baseKeyNoColon)

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants