Rate-Limit recipes in NodeJS using BullMQ

Rate-Limit recipes in NodeJS using BullMQ

As the communication between microservices increases and becomes more complex, we often have to deal with limitations on how fast we can call internal or external APIs. When the services are distributed and scaled horizontally, we find that limiting the speed while preserving high availability and robustness can become quite challenging.

In this post, we will describe strategies that are useful when dealing with processes that need to be rate-limited using BullMQ.

What is rate limiting?

When we talk about rate limiting we often refer to the requirement of a given service to not be called too fast or too often. The reason is that, in practice, all services have limitations, and by putting a rate limit they protect themselves from going down due to too high traffic that they are just not design to cope with.

As a consumer of a service that is rate limited, we need to comply with the service's constraints to avoid the we risk of being banned or rate limited even harder.

A basic rate-limiter

Let's say that we have some service from which we want to consume some data by calling an API. The service specifies a limit on how fast we are allowed to call its API. The limit is usually specified in requests per second. So let's say for the sake of this example that we are only allowed to perform up to 300 requests per second.

Using BullMQ we can achieve this requirement using the following queue:

import { Worker } from "bullmq";

const rateLimitWorker = new Worker(
  "rate-limit",
  async (job) => {
    await callExternalAPI(job.data);
  },
  {
    connection: { host: "my.redis-host.com" },
    limiter: { max: 300, duration: 1000 },
  }
);
A simple rate-limited worker

The worker above will call the API at most 300 times per second. So now we can add as many jobs as we want using the code below, and we will have the guarantee that we will not call the external API more often than allowed.

import { Queue } from "bullmq";

const rateLimitQueue = new Queue("rate-limit", {
  connection: { host: "my.redis-host.com" },
});

await rateLimitQueue.add("api-call", { foo: "bar" });

Since we are using queues, we would like to get some extra robustness in case the call to the external API fails for any reason. We can simply add a retry option so that we can handle temporal failures as well:

import { Worker } from "bullmq";

const rateLimitWorker = new Worker(
  "rate-limit",
  async (job) => {
    await callExternalAPI(job.data);
  },
  {
    connection: { host: "my.redis-host.com" },
    limiter: { max: 300, duration: 1000 },
    attempts: 5,
    backoff: {
      type: "exponential",
      delay: 1000,
    },
  }
);
Rate-limit with exponential retries.

The worker will now retry in the case the API call fails, since we used the "exponential" back-off it will retry after 1, 2, 4, 8, and 16 seconds. If the job still fails after the last attempt, then the job will be marked as failed. You can still manually inspect the failed jobs (or retry manually), either using the Queue API of BullMQ or using a dashboard such as Taskforce.sh.

Scaling up

The avid reader may have discovered an issue in the example above. If the API call would take, for example, 1 second to complete, then even though we have a rate-limiter of 300 requests per second, we would just perform 1 request per second, since only one job will be processed at a any given time.

Thankfully we can easily increase the concurrency of our worker by specifying the concurrency factor. Lets crank it up to 500 in this case, this means that at most 500 calls will be processed in parallel (but no more than 300 per second):

import { Worker } from "bullmq";

const rateLimitWorker = new Worker(
  "rate-limit",
  async (job) => {
    await callExternalAPI(job.data);
  },
  {
    connection: { host: "my.redis-host.com" },
    concurrency: 500,
    limiter: { max: 300, duration: 1000 },
    attempts: 5,
    backoff: {
      type: "exponential",
      delay: 1000,
    },
  }
);
A worker with rate-limit and high concurrency enabled.

When the work to be done is mostly IO, it is efficient to increase the concurrency factor as much as we did in this example. If, on the other hand, the work had been more CPU intensive, like for example transcoding an image from one format to another, then we would certainly not use such a large concurrency value, since it would only add overhead without any gains in performance.
In order to increase performance on a CPU bound job, it is more effective to increase the number of workers instead, as we get more CPUs to do the actual work.

Adding redundancy

We have so far one single worker with a lot of concurrency. The worker will process jobs as fast as 300 requests per second, and retry in the case of failure. This is quite robust, but often times you need even more robustness. For instance, the worker may go offline, and in that case the jobs will stop processing. The queue will grow as new jobs are added to it, but none will be processed until the worker comes up online again.

The simplest solution to this problem is to spawn more workers such as the one shown above. You can have as many workers running in parallel on different machines as you want. With the added redundancy, you minimize the risk of not processing your jobs in time. Also, for jobs that require a lot of CPU, having several workers will increase the throughput of your queue.

Smoothing out requests

When defining a rate-limiter as we did in our example above, by specifying a number of requests per second, the workers will always try to process as fast as possible until they get rate limited. This means that the 300 requests may all be executed almost at once, and then the workers will be mostly idling during one second where they will once again perform a very fast burst of calls.

This behaviour may not be the most desirable one in many cases, fortunately it is quite easy to fix. Instead of defining the duration of the rate-limiter in seconds we can divide one second by the number of jobs, so that the workers process jobs during the whole second instead of in bursts.

So for example, in our case where we limited to 300 jobs per second, we can instead write the limiter max value as 1 and the duration like this: 1000 / 300 = 3.33:

{
  limiter: {
    max: 1,
    duration: 3.33
  }
}
Smoothing out a rate-limiter

With such rate-limiter, the jobs will now be processed at regular 3.33ms intervals, instead of being processed in bursts.

Dynamic rate-limit

There are occasions when a fixed rate-limit is not enough. For example, some services do rate limit you based on more complex rules and your HTTP call will return a 429 (Too Many) response. In this case the service will tell you how much you need to wait until you are allowed to perform the next call, for example in the response header we can find something like this:

Retry-After: 3600
A rate-limited service giving the delay for the next call.

BullMQ also supports dynamic rate-limiting, by signalling in the processor that a given queue is rate limited for some specified time:

const rateLimitWorker = new Worker(
  "rate-limit",
  async (job) => {
    const result = await callExternalAPI(job.data);
    if (rateLimited(result)) {
      await rateLimitWorker.rateLimit(1000);
      throw Worker.RateLimitError();
    }
  },
  {
    connection: { host: "my.redis-host.com" },
    concurrency: 500,
    limiter: { max: 300, duration: 1000 },
    attempts: 5,
    backoff: {
      type: "exponential",
      delay: 1000,
    },
  }
);
Dynamic rate-limiter

In this example, we assume that we get some result from the call to the external API that tells us that we should be rate limited. So we call the "rateLimit" method in the worker specifying, in milliseconds, how much to wait until we can process a job again.

We can additionally throw a special exception that tells the worker that this job has been rate limited, so we should process it again when the rate limit time has expired.

Groups

If you require even more advanced rate limit functionality, the profesional version of BullMQ provides support for groups.

Groups are necessary when you want to provide a separate rate-limiter to a given subset of jobs. For example, you may have a Queue where you put jobs for all the users in your system. If you used a global rate-limiter for the queue, then some users may get to use more of the system resources than others, as all are affected by the same limiter.

Using groups however, every user could have its own separate rate-limiter, and therefore they would not be affected by other user's high volume of work, as the busy users will be rate limited independently of the less "busy" users.

Furthermore, it is also possible to combine a global rate-limit with the per-group rate-limiter, so even in the case when you have a lot of jobs for many users all at the same time, you can still limit the total overall amount of jobs that get to be processed per unit of time.

const rateLimitWorker = new Worker(
  "rate-limit",
  async (job) => {
    await callExternalAPI(job.data);
  },
  {
    connection: { host: "my.redis-host.com" },
    concurrency: 500,
    limiter: { max: 300, duration: 1000 },
    attempts: 5,
    backoff: {
      type: "exponential",
      delay: 1000,
    },
    groups: {
      limit: {
        max: 20,
        duration: 1000,
      },
    },
  }
);
Rate-limit using groups

For more information about groups check the documentation.



Follow me on twitter if you want to be the first to know when I publish new tutorials and tips for Bull/BullMQ.

And remember, subscribing to Taskforce.sh is the greatest way to help supporting future BullMQ development!