Implementing a Job Scheduler into a newsletter application

Implementing a Job Scheduler into a newsletter application

A while back, I developed a simple newsletter application to demonstrate telemetry integration in a real-world scenario. If you haven't already seen this post, I highly recommend checking it out. This application will now serve as a foundation for exploring a new feature in BullMQ. Version 5.16.0 introduced the Job Scheduler, designed to replace repeatable jobs. We'll be incorporating this functionality into our newsletter application.

Before diving into the implementation, let's understand why repeatable jobs are deprecated and explore the key differences between the old and new approaches.

Before:

  • You add a repeatable job by providing a “repeat” option to Queue.add
  • You manage repeatable jobs by using removeRepeatable or removeRepeatableByKey

Now:

  • You add repeatable job by calling Queue.upsertJobScheduler
  • You manage repeatable jobs by using removeJobScheduler or getJobSchedulers

Similarities:

  • Both schedulers add new jobs when the currently active job begins processing.
  • A job will always remain in a delayed state as long as the scheduler continues to produce jobs.
  • Both support custom repeat strategies, which must be configured in both the Worker and the Queue.

You can read more about why the new Job Schedulers API is much more useful than the old one here: https://docs.bullmq.io/guide/job-schedulers

Equipped with an understanding of the new job scheduler, let's integrate it into our project. Bear with me that this is a theoretical example just for learning purposes, as in real life it would require that some newsletter has actually been written during the week, otherwise there will be no newsletter to send to any subscriber. Moreover, in a simple use case like this, you most likely will only have 1 job scheduler, as all newsletters are sent the same day and time of the week. In a more practical case you would probably use different cron patterns for different users depending on their timezones, or you would just have one job scheduler for all your users.

The implementation will be straightforward, requiring minimal code modifications. Notably, the new job scheduler seamlessly integrates with the OpenTelemetry instrumentation we added earlier. We'll begin by updating the main service:

import { NodemailerInterface } from '../interfaces/nodemailer.interface';
import { Queue } from 'bullmq';
import config from '../config';
import SubscribedUserCrud from '../crud/subscribedUser.crud';
import { BullMQOtel } from 'bullmq-otel';

const { bullmqConfig } = config;

class NewsletterService {
    private queue: Queue;
    private subscribedUserCRUD: typeof SubscribedUserCrud;
    private cronPattern: '0 0 12 * * 5';

    constructor() {
        this.queue = new Queue<NodemailerInterface>(bullmqConfig.queueName, {
            connection: bullmqConfig.connection,
            telemetry: new BullMQOtel('newsletter-tracer'),
        });

        this.subscribedUserCRUD = SubscribedUserCrud;
    }

    async subscribeToNewsletter(email: string) {
        const subscribedUser = await this.subscribedUserCRUD.create(email);
        if (!subscribedUser) {
            return false;
        }

        await this.queue.add('send-simple', {
            from: 'newsletter@example.email',
            subject: 'Subscribed to newsletter',
            text: 'You have successfully subscribed to a newsletter',
            to: `${email}`,
        });

        console.log(`Enqueued an email sending`);

        await this.startSendingWeeklyEmails(email);

        return subscribedUser;
    }

    async unsubscribeFromNewsletter(email: string) {
        const removedUser = await this.subscribedUserCRUD.delete(email);
        if (!removedUser) {
            return false;
        }

        await this.queue.add('send-simple', {
            from: 'newsletter@example.email',
            subject: 'Unsubscribed from a newsletter',
            text: 'You have successfully unsubscribed from a newsletter',
            to: `${email}`,
        });

        console.log(`Enqueued an email sending`);

        const result = await this.stopSendingWeeklyEmails(email);
        console.log(result ? `scheduler for email: ${email} removed` : `scheduler for email: ${email} not found`);

        return removedUser;
    }

    /* Add this new method */
    private async startSendingWeeklyEmails(email: string) {
        await this.queue.upsertJobScheduler(
            `${email}`,
            { pattern: this.cronPattern },
            {
                name: 'send-weekly-newsletter',
                data: {
                    from: 'newsletter@example.email',
                    subject: 'weekly newsletter',
                    text: 'newsletter',
                    to: `${email}`,
                }
            }
        );
    }

    /* Add this new method */
    private async stopSendingWeeklyEmails(email: string) {
        return this.queue.removeJobScheduler(`${email}`);
    }
}

export default new NewsletterService();

src/services/newsletter.service.ts

As you can see we added 2 more methods: startSendingWeeklyEmails and stopSendingWeeklyEmails.

In this updated initialization method, we utilize the upsertJobScheduler function. We generate a unique job scheduler ID based on the user's email address. Then, we define the scheduling pattern using cron syntax to ensure emails are sent every Friday at 12 AM. Finally, we create a job template that will be executed weekly, sending a simple email notification.

In the second method, we utilize the removeJobScheduler function to delete the recurring job based on its unique ID. For better insight, we've included a console log to confirm successful deletion.

We then integrated these methods by triggering them after the initial subscription email, providing users with immediate feedback. With that, our implementation is complete, and we can proceed to testing.

Send a POST request to /api/newsletter/subscribe. The console should display a clear indication that the scheduler successfully completed its task: "Completed job repeat:example@gmail.com:1732577580000 successfully".

Now, let's examine the Jaeger results:

Let's tweek the code a bit. We are gonna change the cron expression to better see how telemetry is handling it. Change cron to * * * * * . This expression will send newsletter every minute. Let's examine the results. Go to the menu on the left, change service to consumer , operation to add mailbot.send-weekly-newsletter and hit search:

This will show us the output of the newsletter scheduler:

You'll notice a difference in the number of consumer services utilized between the two operations. Let's investigate the reason behind this discrepancy. Select the more recent trace to begin our analysis:

This particular trace provides valuable insights into the inner workings of our scheduling mechanism, revealing five distinct operations involved in the process. One of these operations originates directly from BullMQ, the remaining four operations stem from Redis, the in-memory data store that BullMQ utilizes to persist and manage job data. Let's delve deeper into the BullMQ operation. Its primary function is to add a scheduled task to the "delayed" state, effectively queuing it up for future execution. As we discussed earlier, BullMQ's job schedulers are designed to maintain a single job in this delayed state at all times, ensuring that there's always a task ready to be processed when the time is right. This job patiently bides its time for one minute before it's finally promoted to the active state. Once active, the job is picked up by a worker and processed according to the defined logic.

Now, let's compare this to the older trace:

In this trace, we observe two additional operations, one from Redis and one from BullMQ. As expected, these operations are processed one minute later, aligning with the cron schedule we defined. Let's expand the process mailbot operation to examine its details:

Notice the new Logs label. Let's expand it to see what it contains:

This reveals an event generated by the application, indicating that the job has been successfully completed. Since the scheduler always maintains a delayed job, it immediately adds another job with the same configuration as the one we just examined, thus continuing the cycle.

Now, let's unsubscribe from the newsletter and observe the process. Send a POST request to /api/newsletter/unsubscribe. You should see a confirmation message in the console: scheduler for email: example@gmail.com removed, indicating a successful operation. The same confirmation can be found in Jaeger:

This concludes our exploration of the newsletter application, which only scratches the surface of what job schedulers offer. These powerful tools provide a wealth of additional functionality. For instance, we could leverage the advanced options available for recurring jobs, such as:

  • limit - lets us limit the number of executions:
queue.upsertJobScheduler(
    `scheduler-id`,
    { limit: 10 },
    { jobData }
)
  • immediately - job scheduler adds jobs in the delayed state, it ensures that the job will be run as fast as possible without a delay:
queue.upsertJobScheduler(
    `scheduler-id`,
    { immediately: true },
    { jobData }
)
  • count - start value of the repeat iteration count:
queue.upsertJobScheduler(
    `scheduler-id`,
    { count: 2 },
    { jobData }
)

In the application we made use of the cron expression, but if we don’t want to, we can use every. This one will repeat the job every 10 seconds:

queue.upsertJobScheduler(
    `scheduler-id`,
    { every: 10000 },
    { jobData }
)

We can use it together with startDate/endDate to make sure the job will be repeating only after/before certain date:

queue.upsertJobScheduler(
    `scheduler-id`,
    { 
      every: 10000,
      startDate: new Date('2024-10-15T00:00:00Z'),
      endDate: new Date('2024-10-16T00:00:00Z'),
    },
    { jobData }
)

While this blog post covers the essentials of using job schedulers, there's much more to explore. For a comprehensive understanding of their capabilities, please check the official BullMQ documentation for more information.



Follow me on twitter if you want to be the first to know when I publish new tutorials and tips for Bull/BullMQ.

And remember, subscribing to Taskforce.sh is the greatest way to help supporting future BullMQ development!