-
Notifications
You must be signed in to change notification settings - Fork 0
Description
Let's say we are creating a group for each user. How to get the position in the queue for each user?
The main target is to be able to show the end user information about the wait time and their position in the queue.
How to achieve this with groups? It seems there is nothing specific implemented in bullmq that we could utilize so having realized that I have been diving into the docs and possible solution, but I am still quite the novice at BullMQ and it would be tremendously helpful if you guys could help verify some of this logic that I have been able to come up with in my own limited experience.
This is what I have right now, a function to add a new job for the user, to queue and return his global position in the queue and the estimated wait time. Notice that to keep it simple we are rate limiting only on the global queue level and not in the group. Now currently I set concurrency to 1 for simplicity, this is a big unknown for me, will this break with group concurrency higher than 1?
Anyways, here is what I have right now:
async addJob(
data,
): Promise<AddJobResult> {
const PREFIX = `[addJob][${data.userId}]`;
try {
const queue = this.queue;
// 1. Get initial state
const [existingJobs, groupStatus, globalRateLimitTTL, allGroups] =
await Promise.all([
queue.getGroupJobs(data.userId),
queue.getGroupsCountByStatus(),
queue.getRateLimitTtl(),
queue.getGroups(),
]);
// 2. Get job counts for all groups
const groupCounts = await Promise.all(
allGroups.map((g) =>
queue.getGroupJobsCount(g.id).then((count) => ({ id: g.id, count })),
),
);
// 3. Calculate core parameters
const K = existingJobs.length;
const R = this.rateLimit;
const intervalPerJob = 60000 / R;
const isNewGroup = K === 0;
const currentGroupCount = K + 1;
// 4. Calculate global position
const groupMap = new Map(groupCounts.map((g) => [g.id, g.count])); // g.id = ULID
let globalPosition = 0;
if (isNewGroup) groupMap.set(data.userId, currentGroupCount);
const sortedGroups = Array.from(groupMap.entries()).sort(([a], [b]) =>
a.localeCompare(b),
);
// Calculate actual jobs ahead
globalPosition = sortedGroups.reduce((acc, [id, count]) => {
return (
acc +
(id === data.userId
? Math.min(count, K) // Existing jobs in current group
: Math.min(count, currentGroupCount))
); // Jobs in other groups
}, 0);
// 5. Calculate time estimate
let timeEstimate = 0;
// Calculate base time for full cycles
const activeGroups = sortedGroups
.filter(([_, count]) => count > 0)
.map(([id]) => id);
timeEstimate = activeGroups.length * intervalPerJob * K;
// Add new group penalty if applicable
if (isNewGroup && activeGroups.length > 0) {
timeEstimate += (activeGroups.length - 1) * intervalPerJob;
}
// Apply rate limiting
const estimatedWaitMs = Math.max(globalRateLimitTTL, timeEstimate);
// 6. Add job to queue
const jobId = `${data.userId}-${ulid()}`;
const job = await queue.add('generate', data, {
group: { id: data.userId },
jobId,
});
return {
success: true,
jobId: job.id,
queuePosition: globalPosition + 1, // +1 for 1-based position
estimatedWaitMs: Math.round(estimatedWaitMs),
};
} catch (error) {
this.logger.error(`${PREFIX} Failed to add job`, error);
return {
success: false,
code: 'QUEUE_ERROR',
error: 'Failed to add job to queue',
};
}
}
From limited testing this estimates the time for the job in waiting status for the newly added job, but also it might be making some incorrect assumptions about how bullmq queuepro consumes the jobs. I just created it around the idea it will do fair round & robin. Also there is a new group penalty assuming that if we add a new job and it creates a new group then it first have to wait for one round of previous group's jobs to finish before being processed for the first time for a new group, is this true? Btw the code for queuepro is not available to inspect? It would be tremendously helpful if we could get a model for the algorithm that is used to consume the jobs by the worker in the case of groups, it's super complex to reason about! How can we give fair estimates for the end user about the time they have to wait before their current submitted job will be processed. There are a lot of nuances and it's kinda impossible to guess all of it, actual algoritm would be very very helpful.
Thank you for all the help you can provide me as I try to offer a good UX to our users using BullMQ Pro