-
Notifications
You must be signed in to change notification settings - Fork 518
Open
Labels
bugSomething isn't workingSomething isn't working
Description
Version
5.58.5
Platform
NodeJS
What happened?
I have 6 queue running and i am not sure what happening it gets auto remove after sometimes without execution.
Worker is running on ECS: with 3% CPU utilization and 18% memory.
Redis is on the EC2
Worker config:
export const defaultJobOptions = {
attempts: 3,
backoff: {
delay: 2000,
type: "exponential",
},
removeOnComplete: {
age: 7 * 24 * 60 * 60 * 1000, // keep for 7 days (in milliseconds)
},
removeOnFail: {
age: 30 * 24 * 60 * 60 * 1000, // keep failed jobs for 30 days (in milliseconds)
},
};
export const defaultWorkerOptions = {
attempts: 5,
autorun: true,
backoff: {
delay: 1000 * 60 * 60 * 24, // 24 hours
type: "exponential",
},
lockDuration: 180000,
lockRenewTime: 90000,
maxStalledCount: 3,
removeOnComplete: {
age: 7 * 24 * 60 * 60 * 1000, // 7 days (in milliseconds)
},
removeOnFail: {
age: 30 * 24 * 60 * 60 * 1000, // 30 days (in milliseconds)
},
stalledInterval: 120 * 1000, // 2 minutes between stalled checks (increased for EC2 Redis)
useWorkerThreads: false,
};
export const getWorker = (
name: keyof typeof QUEUE_NAMES,
// biome-ignore lint/suspicious/noExplicitAny: processer type is dynamic
processer: (job: any) => Promise<void>,
redis: RedisOptions,
) => {
const queueName = QUEUE_NAMES[name];
const workerOptions: WorkerOptions = {
...defaultWorkerOptions,
concurrency: WORKER_CONCURRENCY[name],
connection: redis,
// Remove prefix to use default BullMQ behavior - queue name is already unique
};
return new Worker(queueName, processer, workerOptions);
};
export const WORKER_CONCURRENCY = {
[JobType.EMAIL]: 30,
[JobType.VOICEMAIL]: 50,
[JobType.WATCH]: 100,
[JobType.CAMPAIGN]: 100,
[JobType.CAMPAIGN_STEP]: 40,
[JobType.SMS]: 50,
};
export const QUEUE_NAMES = {
CAMPAIGN: "campaignQueue",
CAMPAIGN_STEP: "campaignStepQueue",
EMAIL: "emailQueue",
SMS: "smsQueue",
VOICEMAIL: "voicemailQueue",
WATCH: "watchQueue",
} as const;
// Redis configuration for workers (matches campaign scheduler)
export const getRedisConfig = () => ({
connection: env.REDIS_URL,
});
// Get Redis connection options for BullMQ (shared by workers)
export const getRedisConnectionOptions = () => {
// Parse Redis URL to get host and port
const redisUrl = env.REDIS_URL;
// Assume host:port format
const [host, portStr] = redisUrl.split(":");
const port = portStr ? Number.parseInt(portStr, 10) : 6379;
const redisOptions: RedisOptions = {
connectTimeout: 20000, // 20 seconds timeout for EC2 connections
enableOfflineQueue: true,
enableReadyCheck: true,
family: 4, // Force IPv4
host: host || "localhost",
keepAlive: 30000, // Keep connection alive every 30 seconds
maxRetriesPerRequest: null, // Required by BullMQ for blocking operations
port,
// Retry strategy for network issues
retryStrategy: (times: number) => {
const delay = Math.min(times * 50, 2000);
return delay;
},
};
return redisOptions;
};
// Create Redis connection instance (for backward compatibility)
export const createRedisConnection = () => {
const options = getRedisConnectionOptions();
// Add ACL username and password from env
return new Redis({
...options,
connectionName: "worker-connection",
lazyConnect: true,
});
};
export const createQueueMQ = (name: keyof typeof QUEUE_NAMES, redis: ReturnType<typeof createRedisConnection>) => {
const queueName = QUEUE_NAMES[name];
return new QueueMQ(queueName, {
connection: redis,
defaultJobOptions: {
...defaultJobOptions,
...(name === "CAMPAIGN"
? {
removeOnComplete: true,
}
: {}),
},
});
};Worker
// Get Redis connection options for BullMQ
const redisConnectionOptions = getRedisConnectionOptions();
// Create a dedicated Redis connection for health checks
const healthCheckRedis = createRedisConnection();
// Create workers with enhanced processors using Redis connection options
const workers = {
[JobType.EMAIL]: getWorker("EMAIL", emailProcessor, redisConnectionOptions),
[JobType.VOICEMAIL]: getWorker("VOICEMAIL", voicemailProcessor, redisConnectionOptions),
[JobType.WATCH]: getWorker("WATCH", processWatchJob, redisConnectionOptions),
[JobType.CAMPAIGN]: getWorker("CAMPAIGN", campaignProcessor, redisConnectionOptions),
[JobType.CAMPAIGN_STEP]: getWorker("CAMPAIGN_STEP", campaignStepProcessor, redisConnectionOptions),
[JobType.SMS]: getWorker("SMS", processSMSJob, redisConnectionOptions),
};
// Add event handlers for all workers
for (const [jobType, worker] of Object.entries(workers)) {
const typedJobType = jobType as JobType;
worker.on("ready", () => {
console.log(`🟢 [WORKER] ${typedJobType} worker is ready and waiting for jobs`);
});
worker.on("active", (job: Job) => {
console.log(`🔄 [WORKER] ${typedJobType} job started: ${job.id}`);
});
worker.on("completed", (job: Job) => {
console.log(`✅ [WORKER] ${typedJobType} job completed: ${job.id}`);
});
worker.on("failed", (job: Job | undefined, err: Error) => {
console.error(`❌ [WORKER] ${typedJobType} job failed: ${job?.id || "unknown"}`);
console.error(` Error: ${err.message}`);
});
worker.on("error", (err: Error) => {
if (err.message.includes("timeout") || err.message.includes("ETIMEDOUT")) {
console.error(`⏰ [WORKER] ${typedJobType} worker timeout error: ${err.message}`);
} else if (err.message.includes("ECONNRESET") || err.message.includes("connection")) {
console.error(`🔌 [WORKER] ${typedJobType} worker connection error: ${err.message}`);
} else {
console.error(`💥 [WORKER] ${typedJobType} worker error: ${err.message}`);
}
});
worker.on("stalled", (jobId: string) => {
console.warn(`⚠️ [WORKER] ${typedJobType} job stalled: ${jobId}`);
console.warn(` This might indicate lock duration is too short or network issues with Redis`);
});
}
// Graceful shutdown
export async function closeWorkers(): Promise<void> {
console.log("🛑 [WORKERS] Closing workers...");
await Promise.all(Object.values(workers).map((worker) => worker.close()));
// Close watch job scheduler
try {
await watchJobScheduler.close();
console.log("✅ [SCHEDULER] Watch job scheduler closed");
} catch (error) {
console.error("❌ [SCHEDULER] Error closing watch job scheduler:", error);
}
// Close health check Redis connection
try {
await healthCheckRedis.quit();
console.log("✅ [REDIS] Health check Redis connection closed");
} catch (error) {
console.error("❌ [REDIS] Error closing health check Redis connection:", error);
}
// Disconnect from Redis (BullMQ manages Redis connections automatically)
try {
console.log("✅ [REDIS] Redis connections will be closed by BullMQ workers");
} catch (error) {
console.error("❌ [REDIS] Error with Redis cleanup:", error);
}
// Disconnect from database
try {
await db.$disconnect();
console.log("✅ [DB] Database disconnected");
} catch (error) {
console.error("❌ [DB] Error disconnecting from database:", error);
}
console.log("✅ [WORKERS] All workers closed successfully");
}How to reproduce.
No response
Relevant log output
I cant find logs what happening, if anyone can say what kind of logs I need to share I can share the logs.Code of Conduct
- I agree to follow this project's Code of Conduct
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working