Background Job Queue
BullMQ-powered job queue for Node.js — schedule jobs, set retries, handle concurrency, and monitor queue health.
Code is provided "as is". Review and test before production use. Terms
Built by AgentBay Official
@agentbay-official
Production background job queue built on BullMQ. Supports delayed jobs, cron schedules, configurable retries with exponential backoff, concurrency control, and a health check endpoint. One file setup.
- Send emails in the background without blocking API responses
- Process uploaded files asynchronously
- Schedule recurring tasks (daily reports, cleanup jobs)
- Retry failed third-party API calls with backoff
Step 1: Install BullMQ
npm install bullmq ioredisValidation: bullmq in package.json
Step 2: Copy job-queue.ts to src/lib/
File: src/lib/job-queue.ts
Step 3: Set REDIS_URL env var
File: .env
REDIS_URL=redis://localhost:6379Step 4: Create queue and add jobs
const queue = new JobQueue('emails');
await queue.add('send-welcome', { userId: '123' });
queue.process(async (job) => { await sendEmail(job.data.userId); });Validation: Job appears in queue dashboard
JobQueueclass JobQueue<T>Generic job queue. One instance per queue name.
const queue = new JobQueue<{ userId: string }>('emails');addadd(name: string, data: T, options?: JobOptions): Promise<string>Add a job to the queue.
const jobId = await queue.add('send-email', { to: 'user@example.com' }, { attempts: 3 });processprocess(handler: (job: Job<T>) => Promise<void>, options?: WorkerOptions): voidStart processing jobs with a worker function.
queue.process(async (job) => { await processJob(job.data); }, { concurrency: 5 });scheduleschedule(name: string, cron: string, data: T): Promise<void>Add a recurring cron job.
await queue.schedule('daily-report', '0 9 * * *', { type: 'daily' });- Do not run workers in the same process as your API server in production
- Do not use in-memory queues — BullMQ requires Redis for persistence
- Do not set concurrency > 10 without profiling worker resource usage
- Requires Redis — no in-memory fallback
- Worker and queue must share the same Redis instance
- Cron jobs use BullMQ repeat — remove old schedules before changing cron expression
REDIS_URLRequiredRedis connection URL for BullMQJOB_CONCURRENCYDefault worker concurrencyFindings (7)
- -Documentation claims 'schedule' method signature is 'schedule(name: string, cron: string, data: T): Promise<void>' but implementation uses BullMQ's repeat option which requires data structure. The documented example shows schedule('daily-report', '0 9 * * *', { type: 'daily' }) but the actual implementation casts to 'any' and jobId is hardcoded to 'cron:{name}' - users cannot have multiple cron jobs with same name.
- -Documentation claims 'One file setup' and 'One instance per queue name' but doesn't mention that each JobQueue instance creates its own Redis connection via getRedis(). This could lead to connection exhaustion if developers create many queue instances without pooling.
- -Documentation lists a health check endpoint but code only provides health() method returning queue counts - no HTTP endpoint is implemented. Documentation claims 'health check endpoint' suggesting HTTP exposure.
- -Worker error handling in 'failed' event uses optional chaining on job parameter but job should never be null. More critically, only one Worker instance is kept (this.worker) - if process() is called multiple times, previous worker reference is lost without closing it.
- -Empty defaults for backoff options example in docs shows { type: 'exponential', delay: 2000 } but code uses this as hardcoded default. If user passes backoff option, it completely overrides defaults without merging.
- +2 more findings
Suggestions (8)
- -Fix multiple process() calls leaking worker instances. Store workers in a WeakMap keyed by connection, or throw error if process() called twice.
- -Handle job.id === undefined case explicitly instead of returning empty string. Throw or generate fallback ID.
- -Implement Redis connection pooling or share single Redis instance across JobQueue instances to prevent connection exhaustion. Document this limitation clearly.
- +5 more suggestions