Implementing a mail microservice in NodeJS with BullMQ (2/3)

In this second post we are going to show you how to add rate limiting, retries after failure and delay jobs so that emails are sent in a future point in time.

The code for this tutorial is available at https://github.com/taskforcesh/bullmq-mailbot branch part2. And there is also a plain JS version of the tutorial here: https://github.com/igolskyi/bullmq-mailbot-js.

If you haven't read the first post in this series you should start doing that https://blog.taskforce.sh/implementing-mail-microservice-with-bullmq/

Rate limiting

Most services implement som kind of rate limit that you need to honor so that your calls are not restricted or in some cases to avoid being banned. With BullMQ you can simply define the maximum rate for processing your jobs independently on how many parallel workers you have running.

We build on the previous code by adding a rate limiter to the worker instance:

export const worker = new Worker(
  config.queueName,
  __dirname + "/mail.proccessor.js",
  {
    connection: config.connection,
    concurrency: config.concurrency,
    limiter: config.limiter,
  }
);
mail.worker.ts

We factor out the rate limiter to the config object:

export default {
  queueName: process.env.QUEUE_NAME || "mailbot",
  concurrency: parseInt(process.env.QUEUE_CONCURRENCY || "1"),
  connection: {
    host: process.env.REDIS_HOST,
    port: parseInt(process.env.REDIS_PORT || "6379"),
  },
  region: process.env.AWS_DEFAULT_REGION || "us-west-2",
  limiter: {
    max: parseInt(process.env.MAX_LIMIT || "1"),
    duration: parseIn(process.env.DURATION_LIMIT || "1000")
  }
};
config.ts

Note that the limiter has 2 options, a max value which is the max number of jobs, and a duration in milliseconds. So this means that with the default settings provided above the queue will run max 1 job every second.

We are not quite ready yet, we also need a special class called QueueScheduler. This class takes care of moving delayed jobs back to the wait status when the time is right. Since the rate limiter will delay the jobs that become limited, we need to have this instance running or the jobs will never be processed at all. We just instantiate it in the same file as where we instantiate the worker:

export const scheduler = new QueueScheduler(config.queueName, {
  connection: config.connection,
});
mail.worker.ts

You can now start the workers with:

$ MAX_LIMIT=1 DURATION_LIMIT=2000 yarn start

And they will now only process 1 job every 2 seconds. To test it you can run:

$ node dist/tests/send-limited.js hello@example.com

Retry failed emails

Our processor function is very simple, just a call to transporter.send, however if this call fails unexpectedly the email will not be sent. Instead we want to perform some automatic retries before we give up on that send operation. BullMQ has a flexible retry mechanism that is configured with 2 options, the max amount of times to retry, and which backoff function to use. For this tutorial we will use the exponential back-off which is a good backoff function for most cases.

One important difference now is that the retry options are not configured on the workers but when adding jobs to the queue, i.e. it is decided by the producer of the jobs, so this allows us to have different retry mechanisms for every job if we wish so.

When writing a module like the one for this tutorial, you would probably will divide it into two modules, one for the producer of jobs (adds jobs to the queue) and another for the consumer of the jobs (processes the jobs). For simplicity we will just create a helper class and keep it in the same repository:

import { Queue, QueueOpts, RetryOpts } from "bullmq";
import { Mail } from "./mail.interface";
import config from "./config";

export class MailbotClient {
  private queue: Queue;

  constructor(opts: QueueOpts) {
    this.queue = new Queue<Mail>(config.queueName, opts);
  }

  async enqueue(jobName: string, mail: Mail, retry?: RetryOpts) {
    await this.queue.add(jobName, mail);

    console.log(`Enqueued an email sending to ${mail.to}`);
  }

  close() {
    return this.queue.close();
  }
}
mail.client.ts

Of course we could use the Queue class exported by BullMQ directly, but wrapping it in our own class helps in adding some extra type safety and maybe some app specific defaults.

We can now test adding jobs with retry functionality. For example let's retry a maximum of 5 times with an exponential backoff starting with 3 seconds delay in the first retry:

import { MailbotClient } from "../mail.client";
import config from "../config";

const args = process.argv.slice(2);

const client = new MailbotClient({
  connection: config.connection,
});

client.enqueue(
  "welcome-mail",
  {
    from: "manast@taskforce.sh",
    to: args[0],
    subject: "Welcome to BullMQ",
    text: "This is a welcome email!",
  },
  { attempts: 5, backoff: { type: "exponential", delay: 3000 } }
);
send-retry.ts

If a job fails more than 5 times it will not be automatically retried anymore, however it will be kept in the "failed" status, so it can be examined and/or retried manually in the future when the cause for the failure has been resolved.

Delayed emails

It is quite common that we want to send an email after some time has passed since a user some operation. For example, maybe we want to send a follow up to a new user one week after the first login. This is very easy to accomplish with our "mailbot" module, we will just enqueue a new email with a one week delay:

import { MailbotClient } from "../mail.client";
import config from "../config";

const args = process.argv.slice(2);

const client = new MailbotClient({
  connection: config.connection,
});

const ONE_MINUTE = 1000 * 60;
const ONE_HOUR = 60 * ONE_MINUTE;
const ONE__DAY = 24 * ONE_HOUR;
const ONE_WEEK = 7 * ONE__DAY;

client.enqueue(
  "We are here to help!",
  {
    from: "manast@taskforce.sh",
    to: args[0],
    subject: "Your first week with BullMq",
    text: "This is an engagement email!",
  },
  { delay: ONE_WEEK },
);
send-delay.ts

If you instead want to delay the job to a specific point in time just take the difference between now and desired time and use that as the delay:

const delay = (new Date("2023-03-25T12:00:00Z")) - Date.now();

Note that in the example above we did not specify any retry options, so in case of failure that particular email will not be retried. Since the retry option probably will be the same for all jobs, we can move it as a "defaultJobOption", so that all jobs will retry but we are also allowed to override that option if we wish, so back to our MailClient class:

  constructor(opts: QueueOptions) {
    this.queue = new Queue<Mail>(config.queueName, {
      defaultJobOptions: {
        attempts: 5,
        backoff: { type: "exponential", delay: 3000 },
      },
      ...opts,
    });
  }
mail.client.ts

This is all for this post. In the next post we will show how to add .PDF attachments to the emails: https://blog.taskforce.sh/implementing-a-mail-microservice-in-nodejs-with-bullmq-part-3/

Follow me on Twitter to get notified when it's out!.



Follow me on twitter if you want to be the first to know when I publish new tutorials and tips for Bull/BullMQ.

And remember, subscribing to Taskforce.sh is the greatest way to help supporting future BullMQ development!