How to integrate BullMQ’s telemetry on a newsletter’s registration application

How to integrate BullMQ’s telemetry on a newsletter’s registration application

In this tutorial we are going to demonstrate how to use the new telemetry support thats built-in in BullMQ by creating a simple newsletter's registration application.

In order to show the potential of telemetry, we want an application that is composed of several components that interact with each other. In this case we will have a standard ExpressJS server to handle HTTP requests for registering the newsletter's subscriptions, a BullMQ queue to handle the registrations and a postgreSQL database to store the subscribers and their statuses.

The idea is being able to see the requests all the way from the HTTP request up to the update of the database.

Why should you read this?

If you're using BullMQ and want to improve your application's monitoring and troubleshooting capabilities, this blog post is for you. The new telemetry functionality and this guide will save you time and effort by providing a streamlined approach to instrumenting BullMQ with OpenTelemetry.

What you will learn:

  • OpenTelemetry basics: Get a quick overview of OpenTelemetry and its core components (tracing, metrics, logs).
  • Introducing the new package: Explore the features and benefits of the new package that simplifies OpenTelemetry integration with BullMQ.
  • Step-by-step guide: Follow a practical tutorial to instrument your BullMQ queues and workers with OpenTelemetry.

Impatient? Jump to the solution:

If you prefer to dive straight into the code, check out the GitHub repository. Each chapter of the following tutorial is a separate branch and it will be linked accordingly.

Project Scaffold

To begin, let's set up our project. Start by creating a package.json file using the command npm init -y.

Next, install the necessary dependencies:

  • tsx
  • typescript
  • dotenv

Now, let's configure TypeScript. Create a tsconfig.json file in your project's root directory, using the recommended settings from tsx.

{
    "compilerOptions": {
        "moduleDetection": "force",
        "module": "Preserve",
        "resolveJsonModule": true,
        "allowJs": true,
        "esModuleInterop": true,
        "isolatedModules": true,
        "experimentalDecorators": true,
        "emitDecoratorMetadata": true
    },
}

tsconfig.json

Next, create a src directory in the project's root directory. Inside the src directory, create an empty file named index.ts.

Express setup

With the project's foundation in place, let's set up the necessary files, including routes, controllers, and services. First, install the required dependencies:

  • @types/express
  • express

Now, let's populate the index.ts file with the following code:

import express from 'express';
import config from './config';
import router from './routes';

const app = express();

const port = config.port || 3000;

app.use(express.json());
app.use(router);

app.listen(port, () => {
  console.log(`Listening ${port}`);
});

src/index.ts

This is the configuration file where we'll store all the essential values for the project:

import dotenv from 'dotenv';

dotenv.config();

export default {
    port: process.env.PORT
};

src/config/index.ts

Router inside routes folder:

import express from 'express';
import newsletterRoutes from './newsletter.route';

const router = express.Router();

router.use('/api/newsletter', newsletterRoutes);

export default router;

src/routes/index.ts

import express from 'express';
import controllers from '../controllers';

const router = express.Router();

const {newsletterController} = controllers;

router.post('/subscribe', newsletterController.subscribeToNewsletter);
router.post('/unsubscribe', newsletterController.unsubcribeFromNewsletter);

export default router;

src/routes/newsletter.route.ts

Controller inside controllers folder:

import newsletterController from './newsletter.controller';

export default {
    newsletterController
};

src/controllers/index.ts

import { Request, Response, NextFunction } from 'express';
import NewsletterService from '../services/newsletter.service';

class NewsletterController {
    constructor() {
        this.subscribeToNewsletter = this.subscribeToNewsletter.bind(this);
        this.unsubcribeFromNewsletter = this.unsubcribeFromNewsletter.bind(this);
    }

    async subscribeToNewsletter(req: Request, res: Response, next: NextFunction) {
        try {
            const subscribedUser = await NewsletterService.subscribeToNewsletter(req.body.email);
            if (!subscribedUser) {
                return res.json({message: 'user already subscribed'});
            }

            return res.json(subscribedUser);
        } catch(err) {
            return next(err);
        }
    }
   
    async unsubcribeFromNewsletter(req: Request, res: Response, next: NextFunction) {
        try {
            const unsubscribedUser = await NewsletterService.unsubcribeFromNewsletter(req.body.email);
            if (!unsubscribedUser) {
                return res.json({message: 'user is not a member of a newsletter'});
            }

            return res.json(unsubscribedUser);
        } catch(err) {
            return next(err);
        }
    }
}

export default new NewsletterController();

src/controllers/newsletter.controller.ts

Service for future logic inside services folder:

class NewsletterService {
    constructor() {}

    async subscribeToNewsletter(email: string) {
        return false;
    }

    async unsubcribeFromNewsletter(email: string) {
        return false;
    }
}

export default new NewsletterService();

src/services/newsletter.service.ts

Next, create an .env file to store the application's port:

PORT=3000

.env

The only remaining step is to add a script to run the application. In your package.json file, within the scripts section, add:

"start": "tsx ./src/index.ts"

package. json

We now have a basic project structure for newsletter subscriptions. This includes a dedicated route, a controller to manage various service outcomes, and an empty service that we'll implement later with the core logic.

Nodemailer setup

To implement the service, we need a way to send emails. We'll use Nodemailer for this purpose, as it's user-friendly and offers a free testing account where you can inspect outgoing emails through their UI. Guide for that is here!

Install:

  • @types/nodemailer
  • nodemailer

To connect to Nodemailer, add the following values to your .env file, referencing the guide above for specific instructions:

NODEMAILER_HOST='smtp.ethereal.email'
NODEMAILER_AUTH_USER=''
NODEMAILER_AUTH_PASS=''
NODEMAILER_PORT=587

.env

Import them with config file:

import dotenv from 'dotenv';

dotenv.config();

export default {
    port: process.env.PORT,
    nodemailerConfig: {
        host: process.env.NODEMAILER_HOST,
        port: parseInt(process.env.NODEMAILER_PORT),
        auth: {
            user: process.env.NODEMAILER_AUTH_USER,
            pass: process.env.NODEMAILER_AUTH_PASS
        }
    },
};

src/config/index.ts

With our Nodemailer credentials in place, let's integrate it into our application. Create a new directory named nodemailer within the src directory, and inside it, create an index.ts file:

import nodemailer from 'nodemailer';
import config from '../config';

const { nodemailerConfig } = config;

const transporter = nodemailer.createTransport({
    host: nodemailerConfig.host,
    port: nodemailerConfig.port,
    auth: {
        user: nodemailerConfig.auth.user,
        pass: nodemailerConfig.auth.pass
    }
});

export default transporter;

src/nodemailer/index.ts

This code will enable our service to send emails. Let's update the service accordingly:

import nodemailer from '../nodemailer';

class NewsletterService {
    constructor() {}

    async subscribeToNewsletter(email: string) {
        await nodemailer.sendMail({
            from: 'newsletter@example.email',
            subject: 'Subscribed to newsletter',
            text: 'You have succesfully subscribed to a newsletter',
            to: `${email}`,
        });
        return true;
    }

    async unsubcribeFromNewsletter(email: string) {
        await nodemailer.sendMail({
            from: 'newsletter@example.email',
            subject: 'Unsubscribed from a newsletter',
            text: 'You have succesfully unsubscribed from a newsletter',
            to: `${email}`,
        })
        return true;
    }
}

export default new NewsletterService();

src/services/newsletter.service.ts

With these changes, sending a POST request to /api/newsletter/subscribe with an email field, or to /api/newsletter/unsubscribe, will trigger a response and generate an email notification visible in the Nodemailer UI.

Postgres Setup

To persist user data, we'll utilize PostgreSQL. Install the following dependencies:

  • pg
  • typeorm
  • @types/pg

Next, create a docker-compose.yaml file in the project root to run the database:

services:
  pg:
    image: postgres:16
    container_name: opentelemetry_bullmq_pg
    ports:
      - '5432:5432'
    environment:
      POSTGRES_USER: "${POSTGRES_USER}"
      POSTGRES_PASSWORD: "${POSTGRES_PASSWORD}"
      POSTGRES_DB: "${POSTGRES_DB}"

docker-compose.yaml

This configuration exposes the database port for external access and retrieves configuration values from the .env file.

POSTGRES_USER=
POSTGRES_PASSWORD=
POSTGRES_DB=
POSTGRES_HOST=127.0.0.1
POSTGRES_PORT=5432

.env

import dotenv from 'dotenv';

dotenv.config();

export default {
    port: process.env.PORT,
    nodemailerConfig: {
        host: process.env.NODEMAILER_HOST,
        port: parseInt(process.env.NODEMAILER_PORT),
        auth: {
            user: process.env.NODEMAILER_AUTH_USER,
            pass: process.env.NODEMAILER_AUTH_PASS
        }
    },
    postgresConfig: {
        user: process.env.POSTGRES_USER,
        pass: process.env.POSTGRES_PASSWORD,
        db: process.env.POSTGRES_DB,
        host: process.env.POSTGRES_HOST,
        port: parseInt(process.env.POSTGRES_PORT || '5432')
    }
};

src/config/index.ts

Run docker-compose up to ensure everything is working correctly. If the database starts successfully, proceed to create the model, CRUD operations, and setup file.

import { Entity, PrimaryGeneratedColumn, Column } from 'typeorm';

@Entity()
export class SubscribedUser {
    @PrimaryGeneratedColumn('uuid')
    id: number

    @Column('text')
    email: string
}

src/models/subscribedUser.model.ts

import PGClient from '../db';
import { SubscribedUser } from '../models/subscribedUser.model';

class SubcribedUserCRUD {
    private pgClient: typeof PGClient.manager;
    private model: typeof SubscribedUser = SubscribedUser;

    constructor() {
        this.pgClient = PGClient.manager;
    }

    async create(email: string) {
        const exist = await this.read(email);
        if (!!exist) {
            return false;
        }

        const newSubscribedUser = new this.model();
        newSubscribedUser.email = email;

        return await this.pgClient.save(newSubscribedUser);
    }

    async read(email: string) {
        return await this.pgClient.getRepository(this.model).findOneBy({
            email: email
        });
    }

    async delete(email: string) {
        const subscribedUser = await this.read(email);
        if (!subscribedUser) {
            return false;
        }

        return await this.pgClient.getRepository(this.model).remove(subscribedUser);
    }
}

export default new SubcribedUserCRUD();

src/crud/subscribedUser.crud.ts

import { DataSource } from 'typeorm';
import { SubscribedUser } from '../models/subscribedUser.model';
import config from '../config';

const { postgresConfig } = config;

class PGClient {
    private client: DataSource;

    constructor() {
        this.client = new DataSource({
            type: 'postgres',
            host: postgresConfig.host,
            port: postgresConfig.port,
            username: postgresConfig.user,
            password: postgresConfig.pass,
            database: postgresConfig.db,
            entities: [SubscribedUser],
            synchronize: true,
            logging: false
        });
    }

    async init() {
        try {
            await this.client.initialize();
            console.log('database connected');
        } catch (err) {
            console.log('database connection error: ', err)
        }
       
    }

    async disconnect() {
        await this.client.destroy();
    }

    get clientInstance() {
        return this.client;
    }

    get manager() {
        return this.client.manager;
    }
}

export default new PGClient();

src/db/index.ts

And update main entry file for the project index.ts to initialize a postgres:

import express from 'express';
import config from './config';
import router from './routes';
import pgClient from './db';

const app = express();

const port = config.port || 3000;

(async () => {
  await pgClient.init();
})();

app.use(express.json());
app.use(router);

app.listen(port, () => {
  console.log(`Listening ${port}`);
});

src/index.ts

This code defines a simple model with an email field and CRUD operations to create, read, and delete users for newsletter management. Now, let's update the service to utilize these functionalities:

import nodemailer from '../nodemailer';
import SubscribedUserCrud from '../crud/subscribedUser.crud';

class NewsletterService {
    private subscribedUserCRUD: typeof SubscribedUserCrud;;

    constructor() {
        this.subscribedUserCRUD = SubscribedUserCrud;
    }

    async subscribeToNewsletter(email: string) {
        const subscribedUser = await this.subscribedUserCRUD.create(email);
        if (!subscribedUser) {
            return false;
        }

        await nodemailer.sendMail({
            from: 'newsletter@example.email',
            subject: 'Subscribed to newsletter',
            text: 'You have succesfully subscribed to a newsletter',
            to: `${email}`,
        });

        return subscribedUser;
    }

    async unsubcribeFromNewsletter(email: string) {
        const removedUser = await this.subscribedUserCRUD.delete(email);
        if (!removedUser) {
            return false;
        }

        await nodemailer.sendMail({
            from: 'newsletter@example.email',
            subject: 'Unsubscribed from a newsletter',
            text: 'You have succesfully unsubscribed from a newsletter',
            to: `${email}`,
        })
       
        return removedUser;
    }
}

export default new NewsletterService();

src/services/newsletter.service.ts

BullMQ setup

Finally, let's integrate BullMQ. Install the necessary dependency:

  • bullmq

Next, add a Redis service to your docker-compose.yaml file:

services:
  pg:
    image: postgres:16
    container_name: opentelemetry_bullmq_pg
    ports:
      - '5432:5432'
    environment:
      POSTGRES_USER: "${POSTGRES_USER}"
      POSTGRES_PASSWORD: "${POSTGRES_PASSWORD}"
      POSTGRES_DB: "${POSTGRES_DB}"


  redis:
    image: redis:latest
    container_name: opentelemetry_bullmq_redis
    ports:
      - '6379:6379'

docker-compose.yaml

import dotenv from 'dotenv';

dotenv.config();

export default {
    port: process.env.PORT,
    nodemailerConfig: {
        host: process.env.NODEMAILER_HOST,
        port: parseInt(process.env.NODEMAILER_PORT),
        auth: {
            user: process.env.NODEMAILER_AUTH_USER,
            pass: process.env.NODEMAILER_AUTH_PASS
        }
    },
    postgresConfig: {
        user: process.env.POSTGRES_USER,
        pass: process.env.POSTGRES_PASSWORD,
        db: process.env.POSTGRES_DB,
        host: process.env.POSTGRES_HOST,
        port: parseInt(process.env.POSTGRES_PORT || '5432')
    },
    bullmqConfig: {
        concurrency: parseInt(process.env.BULLMQ_QUEUE_CONCURRENCY || '1'),
        queueName: process.env.BULLMQ_QUEUE_NAME || 'mailbot',
        connection: {
          host: process.env.BULLMQ_REDIS_HOST || 'redis',
          port: parseInt(process.env.BULLMQ_REDIS_PORT || '6379'),
        },
    },
};

src/config/index.ts

To integrate queueing into our application, we need to make a few modifications. First, let's create an interface for Nodemailer. Create a new directory named interfaces:

export interface NodemailerInterface {
    from: string;
    to: string;
    subject: string;
    text: string;
}

src/interfaces/nodemailer.interface.ts

Next, modify the index.ts file within the nodemailer directory to export a job for processing emails:

import nodemailer from 'nodemailer';
import { Job } from 'bullmq';
import config from '../config';
import { NodemailerInterface } from '../interfaces/nodemailer.interface';

const { nodemailerConfig } = config;

const transporter = nodemailer.createTransport({
    host: nodemailerConfig.host,
    port: nodemailerConfig.port,
    auth: {
        user: nodemailerConfig.auth.user,
        pass: nodemailerConfig.auth.pass
    }
});

export default (job: Job<NodemailerInterface>) => transporter.sendMail(job.data);

src/nodemailer/index.ts

In the same file, create a worker to consume the job. We'll add some helpful events and console logs to verify that everything is functioning as expected:

import { Worker } from 'bullmq';
import config from '../config';
import processor from './';

const { bullmqConfig } = config;

export function initWorker() {
    const worker = new Worker(bullmqConfig.queueName, processor, {
        connection: bullmqConfig.connection,
        concurrency: bullmqConfig.concurrency,
    });

    worker.on('completed', (job) =>
        console.log(`Completed job ${job.id} successfully`)
    );
   
    worker.on('failed', (job, err) =>
        console.log(`Failed job ${job.id} with ${err}`)
    );
}

src/nodemailer/worker.ts

And modify service to use our queue system:

import { NodemailerInterface } from '../interfaces/nodemailer.interface';
import { Queue } from 'bullmq';
import config from '../config';
import SubscribedUserCrud from '../crud/subscribedUser.crud';

const { bullmqConfig } = config;

class NewsletterService {
    private queue: Queue;
    private subscribedUserCRUD: typeof SubscribedUserCrud;

    constructor() {
        this.queue = new Queue<NodemailerInterface>(bullmqConfig.queueName, {
            connection: bullmqConfig.connection,
        });


        this.subscribedUserCRUD = SubscribedUserCrud;
    }

    async subscribeToNewsletter(email: string) {
        const subscribedUser = await this.subscribedUserCRUD.create(email);
        if (!subscribedUser) {
            return false;
        }

        await this.queue.add('send-simple', {
            from: 'newsletter@example.email',
            subject: 'Subscribed to newsletter',
            text: 'You have succesfully subscribed to a newsletter',
            to: `${email}`,
        });

        console.log(`Enqueued an email sending`);
        return subscribedUser;
    }

    async unsubcribeFromNewsletter(email: string) {
        const removedUser = await this.subscribedUserCRUD.delete(email);
        if (!removedUser) {
            return false;
        }

        await this.queue.add('send-simple', {
            from: 'newsletter@example.email',
            subject: 'Unsubscribed from a newsletter',
            text: 'You have succesfully unsubscribed from a newsletter',
            to: `${email}`,
        });

        console.log(`Enqueued an email sending`);
        return removedUser;
    }
}

export default new NewsletterService();

src/services/newsletter.service.ts

Additionally, create a file to initialize the worker:

import { initWorker } from './nodemailer/worker';

initWorker();

console.log('worker listening');

src/worker.ts

To run the worker, add a new script to your package.json file:

"start:worker": "tsx ./src/worker.ts"

package.json

With the queue system integrated, our application now stores jobs in Redis and processes them using the worker. You can run the application using the scripts we defined. First, start the worker:

npm run start:worker

Once the worker is initialized, start the main application:

npm run start

You can then test the application as before.

OpenTelemetry setup

Let's move on to the main part: integrating telemetry. First, install the required packages:

  • @opentelemetry/instrumentation-express
  • @opentelemetry/instrumentation-http
  • @opentelemetry/instrumentation-ioredis
  • @opentelemetry/instrumentation-pg
  • @opentelemetry/sdk-metrics
  • @opentelemetry/sdk-node
  • @opentelemetry/sdk-trace-node
  • bullmq-otel

bullmq-otel is the official library for seamlessly integrating OpenTelemetry with BullMQ. I'll demonstrate how to use it and create a setup file for OpenTelemetry shortly.

To begin, we need to update all instances where queues and workers are initialized, adding a new option to pass the BullMQOtel class:

import { Worker } from 'bullmq';
import config from '../config';
import processor from './';
import { BullMQOtel } from 'bullmq-otel';

const { bullmqConfig } = config;

export function initWorker() {
    const worker = new Worker(bullmqConfig.queueName, processor, {
        connection: bullmqConfig.connection,
        concurrency: bullmqConfig.concurrency,
        telemetry: new BullMQOtel('example-tracer')
    });

    worker.on('completed', (job) =>
        console.log(`Completed job ${job.id} successfully`)
    );
   
    worker.on('failed', (job, err) =>
        console.log(`Failed job ${job.id} with ${err}`)
    );
}

src/nodemailer/worker.ts

import { NodemailerInterface } from '../interfaces/nodemailer.interface';
import { Queue } from 'bullmq';
import config from '../config';
import SubscribedUserCrud from '../crud/subscribedUser.crud';
import { BullMQOtel } from 'bullmq-otel';

const { bullmqConfig } = config;

class NewsletterService {
    private queue: Queue;
    private subscribedUserCRUD: typeof SubscribedUserCrud;

    constructor() {
        this.queue = new Queue<NodemailerInterface>(bullmqConfig.queueName, {
            connection: bullmqConfig.connection,
            telemetry: new BullMQOtel('example-tracer')
        });


        this.subscribedUserCRUD = SubscribedUserCrud;
    }

    async subscribeToNewsletter(email: string) {
        const subscribedUser = await this.subscribedUserCRUD.create(email);
        if (!subscribedUser) {
            return false;
        }

        await this.queue.add('send-simple', {
            from: 'newsletter@example.email',
            subject: 'Subscribed to newsletter',
            text: 'You have succesfully subscribed to a newsletter',
            to: `${email}`,
        });

        console.log(`Enqueued an email sending`);

        return subscribedUser;
    }

    async unsubcribeFromNewsletter(email: string) {
        const removedUser = await this.subscribedUserCRUD.delete(email);
        if (!removedUser) {
            return false;
        }

        await this.queue.add('send-simple', {
            from: 'newsletter@example.email',
            subject: 'Unsubscribed from a newsletter',
            text: 'You have succesfully unsubscribed from a newsletter',
            to: `${email}`,
        });

        console.log(`Enqueued an email sending`);

        return removedUser;
    }
}

export default new NewsletterService();

src/services/newsletter.service.ts

Next, let's set up OpenTelemetry. Create a new directory named instrumentation inside the src directory.

Within the instrumentation directory, create two files: producer.instrumentation.ts and consumer.instrumentation.ts. These files will handle instrumentation for producers and consumers, respectively:

import { NodeSDK } from '@opentelemetry/sdk-node';
import { PeriodicExportingMetricReader, ConsoleMetricExporter } from '@opentelemetry/sdk-metrics';
import { ConsoleSpanExporter } from '@opentelemetry/sdk-trace-node';
import { PgInstrumentation } from '@opentelemetry/instrumentation-pg';
import { ExpressInstrumentation } from '@opentelemetry/instrumentation-express';
import { HttpInstrumentation } from '@opentelemetry/instrumentation-http';
import { IORedisInstrumentation } from '@opentelemetry/instrumentation-ioredis';

const sdk = new NodeSDK({
  serviceName: 'producer',
  traceExporter: new ConsoleSpanExporter(),
  metricReader: new PeriodicExportingMetricReader({
    exporter: new ConsoleMetricExporter(),
  }),
  instrumentations: [
    new ExpressInstrumentation(),
    new HttpInstrumentation(),
    new PgInstrumentation(),
    new IORedisInstrumentation()
  ],
});

sdk.start();

src/instrumentation/instrumentation.producer.ts

import { NodeSDK } from '@opentelemetry/sdk-node';
import { PeriodicExportingMetricReader, ConsoleMetricExporter } from '@opentelemetry/sdk-metrics';
import { ConsoleSpanExporter } from '@opentelemetry/sdk-trace-node';
import { PgInstrumentation } from '@opentelemetry/instrumentation-pg';
import { ExpressInstrumentation } from '@opentelemetry/instrumentation-express';
import { HttpInstrumentation } from '@opentelemetry/instrumentation-http';
import { IORedisInstrumentation } from '@opentelemetry/instrumentation-ioredis';

const sdk = new NodeSDK({
  serviceName: 'consumer',
  traceExporter: new ConsoleSpanExporter(),
  metricReader: new PeriodicExportingMetricReader({
    exporter: new ConsoleMetricExporter(),
  }),
  instrumentations: [
    new ExpressInstrumentation(),
    new HttpInstrumentation(),
    new PgInstrumentation(),
    new IORedisInstrumentation()
  ],
});

sdk.start();

src/instrumentation/instrumentation.consumer.ts

This code configures OpenTelemetry with a basic setup. We've assigned a service name for easy identification of trace origins and enabled console output for all telemetry data. Additionally, we've included automatic instrumentation libraries for various parts of our application: HTTP, Express, PostgreSQL, and Redis.

You might wonder how this differs from the bullmq-otel instrumentation. The key distinction is that these libraries utilize monkey patching to observe the application, while bullmq-otel doesn't. Observability in BullMQ is achieved through direct source code integration, providing greater control, flexibility, and maintainability. Rest assured, these libraries work together seamlessly.

To utilize the instrumentation files, update the starting scripts in your package.json file:

    "start": "tsx --import ./src/instrumentation/instrumentation.producer.ts ./src/index.ts",
    "start:worker": "tsx --import ./src/instrumentation/instrumentation.consumer.ts ./src/worker.ts"

package.json

Now, run the application as before. You should see OpenTelemetry spans displayed in the console.

Jaeger setup

While the additional logs provide some insight into BullMQ's internal operations, to fully leverage the power of observability, we need a centralized system for storing and visualizing traces. This will allow us to generate insightful diagrams and understand the precise execution order of different application components. OpenTelemetry's popularity brings a wide array of options, both commercial and open source, for achieving this.

Using the console alone for observability can be overwhelming and difficult to interpret. A more effective approach is to utilize a dedicated tool like Jaeger, specifically designed for trace storage and visualization.

To export spans to Jaeger, we need to install the necessary packages:

  • @opentelemetry/exporter-metrics-otlp-proto
  • @opentelemetry/exporter-trace-otlp-proto

Now, update your docker-compose.yaml file with a new Jaeger service:

services:
  pg:
    image: postgres:16
    container_name: opentelemetry_bullmq_pg
    ports:
      - '5432:5432'
    environment:
      POSTGRES_USER: "${POSTGRES_USER}"
      POSTGRES_PASSWORD: "${POSTGRES_PASSWORD}"
      POSTGRES_DB: "${POSTGRES_DB}"

  redis:
    image: redis:latest
    container_name: opentelemetry_bullmq_redis
    ports:
      - '6379:6379'

  jaeger:
    image: jaegertracing/all-in-one:latest
    container_name: opentelemetry_jaeger_redis
    ports:
      - '16686:16686'
      - '4318:4318'

docker-compose.yaml

This configuration exposes two ports:

  • 4318: The endpoint for exporting traces in protobuf format.
  • 16686: The port for accessing the Jaeger UI.

Note that I'm using protobuf as the data format for Jaeger, which requires exposing port 4318. If you're using a different method, such as HTTP, you'll need to expose the appropriate port for that format.

Now, let's update the instrumentation files:

import { NodeSDK } from '@opentelemetry/sdk-node';
import { PeriodicExportingMetricReader } from '@opentelemetry/sdk-metrics';
import { PgInstrumentation } from '@opentelemetry/instrumentation-pg';
import { ExpressInstrumentation } from '@opentelemetry/instrumentation-express';
import { HttpInstrumentation } from '@opentelemetry/instrumentation-http';
import { IORedisInstrumentation } from '@opentelemetry/instrumentation-ioredis';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-proto';
import { OTLPMetricExporter } from '@opentelemetry/exporter-metrics-otlp-proto';

const sdk = new NodeSDK({
  serviceName: 'producer',
  traceExporter: new OTLPTraceExporter({
    url: 'http://127.0.0.1:4318/v1/traces'
  }),
  metricReader: new PeriodicExportingMetricReader({
    exporter: new OTLPMetricExporter({
      url: 'http://127.0.0.1:4318/v1/metrics'
    }),
  }),
  instrumentations: [
    new ExpressInstrumentation(),
    new HttpInstrumentation(),
    new PgInstrumentation(),
    new IORedisInstrumentation()
  ],
});

sdk.start();

src/instrumentation/instrumentation.producer.ts

import { NodeSDK } from '@opentelemetry/sdk-node';
import { PeriodicExportingMetricReader } from '@opentelemetry/sdk-metrics';
import { PgInstrumentation } from '@opentelemetry/instrumentation-pg';
import { ExpressInstrumentation } from '@opentelemetry/instrumentation-express';
import { HttpInstrumentation } from '@opentelemetry/instrumentation-http';
import { IORedisInstrumentation } from '@opentelemetry/instrumentation-ioredis';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-proto';
import { OTLPMetricExporter } from '@opentelemetry/exporter-metrics-otlp-proto';

const sdk = new NodeSDK({
  serviceName: 'consumer',
  traceExporter: new OTLPTraceExporter({
    url: 'http://127.0.0.1:4318/v1/traces'
  }),
  metricReader: new PeriodicExportingMetricReader({
    exporter: new OTLPMetricExporter({
      url: 'http://127.0.0.1:4318/v1/metrics'
    }),
  }),
  instrumentations: [
    new ExpressInstrumentation(),
    new HttpInstrumentation(),
    new PgInstrumentation(),
    new IORedisInstrumentation()
  ],
});

sdk.start();

src/instrumentation/instrumentation.consumer.ts

Here, we've replaced the console exporter with OTLP, directing the telemetry data to Jaeger.

Run the starting scripts again and navigate to http://localhost:16686 in your browser.

In the left-hand menu, you'll find an option to view traces from various services. You should see at least two services listed: producer and consumer. Select producer and click Find Traces to view the observed BullMQ operations.

This view displays comprehensive information about the observed application components, starting from the initial HTTP call, through Express and PostgreSQL, to BullMQ and Redis.

The left-hand menu allows you to filter traces by their origin, such as by consumer:

...or by other operations specific to your application:

One of the powerful capabilities of telemetry is the ability to gain insights into various parameters passed within your application. This can be invaluable for debugging and further development:

Furthermore, if any errors occur, they will be displayed as special events within the span. You can expand the Logs section of a span to view detailed information about the error:

It's crucial to remember that spans must be explicitly ended before they are sent to the telemetry backend. This means that if a worker process crashes mid-process, preventing the span from being closed, the entire trace will be lost and won't appear in Jaeger (or any other telemetry system you might be using).

For instance, if a worker dies in our example application, the complete trace, from the initial HTTP call to the Redis operation, will not be saved. This is a critical limitation to be aware of to avoid misinterpreting missing traces.

And there you have it! You've successfully built a simple BullMQ application with OpenTelemetry integration.



Follow me on twitter if you want to be the first to know when I publish new tutorials and tips for Bull/BullMQ.

And remember, subscribing to Taskforce.sh is the greatest way to help supporting future BullMQ development!