Do not wait for your jobs to complete

Do not wait for your jobs to complete

A common pattern that arises when working with queues in general and BullMQ in particular, is the one of waiting for a job to complete. The reasoning goes, that if the job is doing something useful we need to wait for it to be complete so that we can get the result of its work.

So an intuitive way to code this behaviour would be something like this:

const job = queue.add("my-job", { foo: 'bar' });

const result = await job.waitUntilFinished();

doSomethingWithResult(result);

This is intuitive because it follows a sequence of actions and it's what you may be used to do on a single process, sequential program. However when using distributed queues on a highly tolerant system you do not want to solve the problems like this.

First of all, the code above does not provide any guarantees that the call to "doSomethingWithResult(result)" will ever be performed successfully, and if it fails, the result of that job will be lost forever.

Secondly, we do not get any guarantees on how long time it will take for a given job to complete. The point of the queue in the first place was to offload job to workers, so it could very easily be the case the workers are quite busy and the duration of the call to "waitUntilFinished" be unacceptable. You could add a timeout, but that would also make us lose that job's result forever.

Finally, it is much less efficient, imagine that for every job that you add to the queue you are also creating a promise and listeners and wait them to resolve. If you are adding many jobs per second this will be a slow way to handle them and will most likely become a performance and memory bottleneck.

So "waitUntilFinished" has very limited use, it is mostly used by test code but not on production code unless the above limitations do not matter for your specific case, but most likely you will be better off using a different approach.

How to do it instead?

The solution is to think about completing jobs in a different way. You have to consider that once you add a job to a queue, the "producer" of that job must let go. Now the job will live its own life processed by one of the potentially hundreds of workers running in parallel.

Let's illustrate this different way of thinking with a couple of examples.

HTTP Call that must return the job result

This one is quite a common case where you have for example a POST HTTP call to a service, the service will do some heavy work so you want to offload this job to a queue and when the job completes return this result to the caller.

If these are your requirements, then you will need to actually change them a little bit because waiting for a job to complete inside an HTTP handler does not scale and suffers from all the problems highlighted above.

Instead what you typically do in this case is to break the call into 2 (or more) calls.

The first call will just enqueue the job and return to the caller a Job ID or any other ID that the caller can later use for asking about this particular job status.

app.post("/jobs", json(), async (req, res) => {
  try {
    const job = await queue.add("my-queue", req.body);
    res.status(201).send(job.id);
  } catch(err) {
    res.status(500).send(err.message);
  }
});

The job has been queued and the user gets a response from the HTTP call immediately. With the provided Job ID, the caller can now ask for the job status, something like:

app.get("/jobs/:id/status", async (req, res) => {
  try {
    const status = await queue.getJobStatus("my-queue", req.params.id);
    res.status(201).send(status);
  } catch(err) {
    res.status(500).send(err.message);
  }
});

Long running process

Sometimes you have jobs that will need to work for a long time until completing. In this case it is quite common to report progress for some interested parties. The way to do this is by using the job api "progress":

export default async function (job: Job) {
  await job.log("Start processing job");
  
  for(let i=0; i < 100; i++) {
     await processChunk(i);
     await job.progress({ percentage: i, userId: job.data.userId });
  }
}

You can listen to all the progress events for a given queue, and when some progress is detected report to the interested party.

const queueEvents = new QueueEvents("my-queue", { connection });

queueEvents.on("progress", async ({ jobId, data }) => {
  const { userId } = data;
  
  // Notify userId about the progress
  await notifyUser(userId, data);
});

Now the question which falls a bit outside of the scope of this post is how to practically notify the relevant user with the progress information. Typically it would be via some WebSocket associated with a certain user id, so you could just send a message to that socket. Now, the important thing to notice here is that notifying the progress is usually not a critical thing to do, so it is not the end of the world if for some temporal network issue the user does not receive a particular progress event.

However, it may be critical to notify the completion of the job to some other system. In this case, a robust pattern is to use another queue only to notify completions. As we will be adding a job to a different queue inside the process function, we get the guarantee that if the job is added successfully, the "message" will be delivered:

export default async function (job: Job) {
  await job.log("Start processing job");
  
  for(let i=0; i < 100; i++) {
     await processChunk(i);
     await job.progress({ percentage: i, userId: job.data.userId });
  }
  await otherQueue.add("completed", { id: job.id, data: job.data });
}

The otherQueue would be handled by the service interested in knowing that the job has been performed, just process the message and do something useful, maybe update a status in a database, or whatever.

export default async function (job: Job) {  
  await updateDatabaseWithResult(job.data);
}


Follow me on twitter if you want to be the first to know when I publish new tutorials and tips for Bull/BullMQ.

And remember, subscribing to Taskforce.sh is the greatest way to help supporting future BullMQ development!