Replies: 4 comments 5 replies
-
Some other things I have tried:
|
Beta Was this translation helpful? Give feedback.
0 replies
-
My workaround now is to use a separate queue per job - this way, I can simply call |
Beta Was this translation helpful? Give feedback.
5 replies
-
I did it in an ugly way... @manast How to save this code... const config = this.getQueueMiddleware(queueName).getConfig()?.[queueName];
const redisConfig = config?.redis;
const worker = new Worker(queueName, null, {
connection: {
host: redisConfig?.host,
password: redisConfig?.password,
port: redisConfig?.port,
db: redisConfig?.db,
tls: redisConfig?.tls,
},
lockDuration: 1000 * 60 * 3, // 3min lock duration
});
const token = getUuid();
// get a job
try {
const job = (await worker.getNextJob(token)) as Job;
if (!job) {
return;
}
// Extend the lock, make sure the job is not lost
await job.extendLock(token, 60 * 1000 * 5);
let succeeded = false;
let errorMessage = '';
try {
const ret = handler(job);
if (ret instanceof Promise) {
try {
await ret;
succeeded = true;
} catch (err) {
this.logger.error(err);
errorMessage = (err as Error)?.message;
}
} else {
succeeded = true;
}
} catch (err) {
this.logger.error(err);
errorMessage = (err as Error)?.message;
}
if (succeeded) {
try {
await job.moveToCompleted('Success', token, false);
} catch (err) {
// <--------------ugly one----------------->
this.logger.info('lock expired not found.');
await this.app.redis.lrem(
'bull:staticReport:active',
1,
`${job?.id}`,
);
}
} else {
await job.moveToFailed(
new Error(`Consume Queue <${queueName}> error: \n` + errorMessage),
token,
false,
);
}
} catch (err) {
this.logger.error(err);
} finally {
await worker.close();
} |
Beta Was this translation helpful? Give feedback.
0 replies
-
// bullMQUtils.ts
import { delay, Job, Queue, Worker } from "bullmq";
/**
// 调用手动队列处理函数
const token = nanoid();
const { queue: manuallyQueue, worker } = processManualQueue(
"manuallyQueue",
{
connection: connection,
concurrency: 5,
delayTime: 3000,
successCallback: async (data) => {
if (data.video == 3) return false;
return true;
},
},
token
);
*/
export function processManualQueue(queueName, options, token) {
const { connection, concurrency, delayTime, successCallback } = options;
const queue = new Queue(queueName, { connection });
const worker = new Worker(queueName, null, {
connection,
concurrency: concurrency || 5,
});
workerRun();
async function workerRun() {
let job;
while (true) {
await delay(delayTime || 3000);
let jobData = null;
let jobId;
let success;
if (job) {
success = await successCallback(job.data);
// console.log("处理手动作业", job.data, "状态", success);
if (success) {
[jobData, jobId] = await job.moveToCompleted(
"some return value",
token
);
} else {
await job.moveToFailed(new Error("some error message"), token);
}
if (jobData) {
job = Job.fromJSON(worker, jobData, jobId);
} else {
job = null;
}
} else {
if (!job) {
job = await worker.getNextJob(token);
}
}
}
}
return { queue, worker };
} // 调用手动队列处理函数
const token = nanoid();
const { queue: manuallyQueue, worker } = processManualQueue(
"manuallyQueue",
{
connection: connection,
concurrency: 5,
delayTime: 3000,
successCallback: async (data) => {
if (data.video == 3) return false;
return true;
},
},
token
); // 手动作业, 失败重试
async manuallyQueue() {
// console.log('触发了-test_queue_1');
// manuallyQueue.add("jobName", { video: 1 });
// manuallyQueue.add("jobName", { video: 2 });
for (let i = 1; i < 6; i++) {
manuallyQueue.add(
"jobName",
{ video: i },
{
attempts: 3, // 重试3次
backoff: {
type: "exponential",
delay: 3000, // 1秒后重试
},
}
);
}
}, |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
I am trying to find a way to manually process a specific job. I am aware of the documentation at https://docs.bullmq.io/patterns/manually-fetching-jobs - but the documented use-case is not suitable to process a specific job (
getNextJob()
returns a random job).The code I thought of is:
But this throws the error:
Error: Missing lock for job <ID>. finished
I haven't found a way to skip the job lock or manually set it.
Any ideas?
Beta Was this translation helpful? Give feedback.
All reactions