In the Node.js Bull Tutorial , We’ll delve into the power of Bull for queue management, exploring its key features, providing code examples, and sharing troubleshooting tips. As a prominent NPM package, Bull excels in handling queue operations in Node.js applications. It stands out for being feature-rich, highly performant, and built on top of Redis, seamlessly integrating with the Node.js environment.
Table of Contents
- Features of Bull
- Creating Jobs
- Repeated Jobs
- Delayed Jobs
- Priority Queuing
- Concurrency Control
- Global Events
- Rate Limiter
- Pause and Resume
- Persistence
Features of Bull
Bull provides several key features:
- Priority queueing
- Delayed jobs
- Rate limiting
- Repeatable jobs
- Concurrency control
- Global events listeners
- Persistence capabilities
- Pause and resume queues
- and many more…
Creating Jobs
const Queue = require('bull'); let myQueue = new Queue('myQueue'); function createJob(data) { return myQueue.add(data); } createJob({ task: 'send email' }) .then(() => console.log('Job has been added')) .catch(err => console.error('Failed to add job', err));
Here we import the Bull package, create a queue named ‘myQueue’, and add a job to it. The returned promise resolves when the job has been successfully added.
Repeated Jobs
function scheduleReport(data, repeatInterval) { return myQueue.add(data, { repeat: { cron: repeatInterval } }); } scheduleReport({ reportId: 'weekly-sales' }, '0 * * * *') .then(() => console.log('Weekly sales report job scheduled')) .catch(err => console.error('Failed to schedule job', err));
In the above example, a job is scheduled to run every hour according to the cron string.
Delayed Jobs
To delay a job, you pass an options object with a delay property in milliseconds.
function createDelayedJob(data, delay) { return myQueue.add(data, { delay: delay }); } createDelayedJob({ task: 'send reminder' }, 60000) .then(() => console.log('Reminder job will run in 1 minute')) .catch(err => console.error('Failed to add delayed job', err));
This code snippet will add a job that is going to be processed after a 1-minute delay.
Priority Queuing
function createPriorityJob(data, priority) { return myQueue.add(data, { priority: priority }); } createPriorityJob({ task: 'urgent fix' }, 1) .then(() => console.log('Urgent job added with highest priority')) .catch(err => console.error('Failed to add priority job', err));
Each job’s priority is considered whenever multiple jobs are queued, ensuring important tasks are executed first.
Concurrency Control
const MAX_CONCURRENT = 5; myQueue.process(MAX_CONCURRENT, function(job){ // Processing logic here });
This function will only process a maximum of 5 jobs concurrently.
Global Events
myQueue.on('completed', (job, result) => { console.log(`Job completed with result ${result}`); }); myQueue.on('failed', (job, err) => { console.error(`Job failed with error ${err}`); });
These event listeners will output console logs whenever a job is completed or fails.
Rate Limiter
Bull can also limit the rate at which jobs are processed.
// Only 10 jobs per minute myQueue.add(data, { rateLimiter: { max: 10, duration: 60000 } });
This option limits the processing rate to 10 jobs per minute for the queue.
Pause and Resume
You can pause and resume processing of the queue at any time.
async function pauseQueue() { await myQueue.pause(); console.log('Queue is paused'); } async function resumeQueue() { await myQueue.resume(); console.log('Queue is resumed'); } pauseQueue(); resumeQueue();
Persistence