NodeJS Microservice with Kafka and TypeScript
In my previous blog, the microservices communicated synchronously via gRPC. In this blog, we will explore asynchronous communication using events.
I will not go into much detail about asynchronous vs. synchronous communication. Both have their pros and cons, and, most importantly, their own use cases.
In event-driven architectures, services communicate by publishing and consuming events. One event can be consumed by multiple consumers. These events remain available in the stream even after a consumer consumes them, so other consumers can also consume them later.
There are different technologies for event-driven architectures like RabbitMQ, Redis, AWS SQS, Kafka, etc. But in this blog, we will use Kafka for event streaming. The microservices will publish and consume events.
We will use Docker to run Kafka locally, making development easier.
Let's add the kafka service to our Docker Compose file.
services:
kafka:
image: apache/kafka:4.1.1
ports:
- "9092:9092"
environment:
- KAFKA_NODE_ID=1
- KAFKA_PROCESS_ROLES=broker,controller
- KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
Let's start the Kafka broker with the docker compose up kafka command.
Before writing services, lets have quick intro for Kafka terminologies.
- Event -> A Record of action happened in the business model. They are similar to an immutable log.
- Event Stream -> It is a continuous and ordered sequence of events.
- Topic -> It is like a group in which related events are stored and organized in Kafka
- Partition -> The events of similar topics are subdivided into partitions. It helps in parallel processing as different consumer can consume from different partitions at the same time.
- Producer -> Service which publishes events to the event stream
- Consumer -> Service which subscribes to events.
- Broker -> A Kafka server that stores and serves events to producers and consumers.
Now, let's create a test service in our services directory to test publishing and consuming events from Kafka locally. We will not have this in production.
npm init -w services/test-kafka --scope=@rsbh-nodejs-microservices -y
cd services/test-kafka
We will create a producer, a consumer, and an admin client to create topics in this service.
We will use @platformatic/kafka as the Kafka client for this service.
Initially, the plan was to use kafkajs, but it is no longer maintained - link. You can read more about it here
We will set the type as module in package.json and create the tsconfig.json file in the service directory.
npm pkg set type=module
npm i @platformatic/kafka uuid
npm i -D typescript @types/node @types/uuid ts-node
npx tsc --init
Let's create a script to add topics to our Kafka broker. The @platformatic/kafka producer also supports autocreateTopics: true, but it creates topics with default broker settings for partitions and replicas. Here, we will create the topics manually. Ideally, in a production system, topics are created via scripts or IaC tools like Terraform.
Let's create a create-topic.ts file in the test-kafka service. We are creating 2 topics here: product.created and product.updated.
import { Admin } from "@platformatic/kafka";
const BROKERS = (process.env.KAFKA_BROKERS || "localhost:9092").split(",");
const SERVICE_NAME = process.env.SERVICE_NAME || "test-kafka-admin";
const HOSTNAME = process.env.HOSTNAME || "local";
const TOPICS = ["product.created", "product.updated"];
const admin = new Admin({
clientId: `${SERVICE_NAME}-${HOSTNAME}`,
bootstrapBrokers: BROKERS,
});
async function main() {
try {
await admin.createTopics({
topics: TOPICS,
partitions: 3,
replicas: 1,
});
console.log('Topics created successfully')
} catch (err: unknown) {
console.error(err);
} finally {
await admin.close();
}
}
main();
Here we used 3 partitions when creating topics. Here we will use 1 consumer locally. But in production, there will be more replicas of consumers belonging to same consumer group. each consumer group can read events from different partitions.
The key is used to decide in which partitions the event will be published. Events with the same key will be published to the same partitions, which guarantees they are consumed in order.
After running the Kafka broker via Docker, we can run the create-topics.ts script with the npx ts-node create-topics.ts command. If it runs successfully, it will print the success log. And if we run the script multiple times, we will get a TOPIC_ALREADY_EXISTS error.
Now, let's create a script to publish an event to the Kafka broker. We will use a plain string message for now.
publish-event.ts
import { Producer, ProduceAcks, stringSerializers } from "@platformatic/kafka";
import {v4 as uuid} from 'uuid';
const BROKERS = (process.env.KAFKA_BROKERS || "localhost:9092").split(",");
const SERVICE_NAME = process.env.SERVICE_NAME || "test-producer";
const HOSTNAME = process.env.HOSTNAME || "local";
const TOPIC = "product.created";
const producer = new Producer({
clientId: `${SERVICE_NAME}-${HOSTNAME}`,
bootstrapBrokers: BROKERS,
serializers: stringSerializers,
acks: ProduceAcks.ALL,
});
async function main() {
const product = {
id: uuid(),
name: "Test"
}
try {
const { offsets } = await producer.send({
messages: [
{
topic: TOPIC,
key: product.id,
value: JSON.stringify(product),
},
],
});
offsets?.forEach(({ topic, partition, offset }) => {
console.log(
`Published to ${topic} partition ${partition} at offset ${offset}`
);
});
} catch (err) {
console.error(err);
} finally {
await producer.close();
}
}
main();
After running the script with the npx ts-node publish-event.ts command, it should publish a new event to Kafka. Every time we run this script, it creates a new product object and publishes the event.
Now let's work on the consumer script.
import { Consumer, stringDeserializers } from "@platformatic/kafka";
const BROKERS = (process.env.KAFKA_BROKERS || "localhost:9092").split(",");
const CONSUMER_GROUP_ID = process.env.CONSUMER_GROUP_ID || "my-consumer-group";
const SERVICE_NAME = process.env.SERVICE_NAME || "test-consumer";
const HOSTNAME = process.env.HOSTNAME || "local";
const TOPIC = "product.created";
const consumer = new Consumer({
groupId: CONSUMER_GROUP_ID,
clientId: `${SERVICE_NAME}-${HOSTNAME}`,
bootstrapBrokers: BROKERS,
deserializers: stringDeserializers,
});
async function main() {
try {
const stream = await consumer.consume({
topics: [TOPIC],
mode: "committed",
});
for await (const message of stream) {
console.log(`Received: ${message.key} ->`, message.value);
await message.commit();
}
} catch (err) {
console.error(err);
}
}
async function shutdown() {
try {
await consumer.close();
} catch (err) {
console.error("Error during shutdown", err);
process.exit(1);
}
}
process.once("SIGTERM", shutdown);
process.once("SIGINT", shutdown);
main();
We can run this script with the npx ts-node consume-events.ts command, which will start consuming all the events from the beginning. We can change the mode in stream to earliest, latest, or committed. In earliest mode, the consumer will consume from the first message available for the topic in the stream, in latest mode, it will consume from new events and skip the previous events. In committed mode, it will consume after the last committed event. Here we can use committed in our local testing.
we can see output of the events in the consumer
Received: fb49627f-b0f9-4a23-a26a-c98a21ae9526 -> {"id":"fb49627f-b0f9-4a23-a26a-c98a21ae9526","name":"Test"}
Received: b734dee7-8f44-4eb7-8178-764284ed0bbb -> {"id":"b734dee7-8f44-4eb7-8178-764284ed0bbb","name":"Test"}
Received: 436eeee7-2091-4e79-b50a-a00a81529413 -> {"id":"436eeee7-2091-4e79-b50a-a00a81529413","name":"Test"}
Now our producer and consumer are working. But it is not production-ready. There is no shared contract of event schema between the producer and consumers. So the producer can change the event message shape, making it difficult for the consumer to use.
There are multiple ways to have schema sharing for events in our services, like JSON-Schema, Avro, or protobuf. We will use protobuf here, as we already use it for gRPC services.
There are many benefits to using protobuf for event message schemas, such as using binary rather than plain strings, which reduces payload size. We can write consumers in any language, and all can use protobuf. And if we follow protobuf best practices, it will be backward- and forward-compatible. Adding a new field or reserving an old field will not break the consumer and the producer.
Let's update the existing product.proto file. Add ProductEvent message and ProductEventType enum. The publisher will publish the event using this protobuf message.
syntax = "proto3";
package product;
import "google/protobuf/timestamp.proto";
message Product {
int32 id = 1;
string name = 2;
string description = 3;
string image = 4;
repeated string tags = 5;
google.protobuf.Timestamp created_at = 6;
google.protobuf.Timestamp updated_at = 7;
}
enum ProductEventType {
PRODUCT_EVENT_TYPE_UNSPECIFIED = 0;
PRODUCT_EVENT_TYPE_CREATED = 1;
PRODUCT_EVENT_TYPE_UPDATED = 2;
PRODUCT_EVENT_TYPE_DELETED = 3;
}
message ProductEvent {
string id = 1;
ProductEventType type = 2;
Product product = 3;
google.protobuf.Timestamp timestamp = 4;
}Let's build the protos package
cd packages/protos
npm run build
And install the protos package in the test-kafka service
cd services/test-kafka
npm i @rsbh-nodejs-microservices/protos
Let's update the publish-event.ts file and add the import at the top of the file. Update the serializers in the producer to serialize values using protobuf.
Also, update the product with Product.create and create an event with ProductEvent.create.
import { Producer, ProduceAcks, stringSerializer } from "@platformatic/kafka";
import { v4 as uuid } from "uuid";
import {
Product,
ProductEvent,
ProductEventType,
} from "@rsbh-nodejs-microservices/protos/product/product";
const BROKERS = (process.env.KAFKA_BROKERS || "localhost:9092").split(",");
const SERVICE_NAME = process.env.SERVICE_NAME || "test-producer";
const HOSTNAME = process.env.HOSTNAME || "local";
const TOPIC = "product.created";
const producer = new Producer({
clientId: `${SERVICE_NAME}-${HOSTNAME}`,
bootstrapBrokers: BROKERS,
serializers: {
key: stringSerializer,
value: (value?: ProductEvent) => {
const event = value ?? ProductEvent.create();
return Buffer.from(ProductEvent.encode(event).finish());
},
},
acks: ProduceAcks.ALL,
});
async function main() {
const product = Product.create({
id: Math.random() * 1000,
name: "Test Product",
});
const event = ProductEvent.create({
id: uuid(),
type: ProductEventType.PRODUCT_EVENT_TYPE_CREATED,
product: product,
timestamp: new Date(),
});
try {
const { offsets } = await producer.send({
messages: [
{
topic: TOPIC,
key: event.id,
value: event,
},
],
});
offsets?.forEach(({ topic, partition, offset }) => {
console.log(
`Published to ${topic} partition ${partition} at offset ${offset}`
);
});
} catch (err) {
console.error(err);
} finally {
await producer.close();
}
}
main();
After running the script again, it will publish the event in binary format. The consumer will log something, but will be unable to decode it.
Received: d31e20b5-98ee-4506-89d1-029a033f5914 ->
$d31e20b5-98ee-4506-89d1-029a033f591�
Test Product"
ۣ������
Received: 099cea80-95e7-4489-bd7b-37d28ad51950 ->
$099cea80-95e7-4489-bd7b-37d28ad5195�
Test Product"
����큰
Received: e5907757-1440-4a7f-932d-ff22febf8ea1 ->
$e5907757-1440-4a7f-932d-ff22febf8ea�
Test Product"
������
Now, add protobuf in the consumer to decode the events. We import the protos package, similar to the producer script. Update the deserializers to decode the binary event into a protobuf message.
import { Consumer, stringDeserializer } from "@platformatic/kafka";
import { ProductEvent } from "@rsbh-nodejs-microservices/protos/product/product";
const BROKERS = (process.env.KAFKA_BROKERS || "localhost:9092").split(",");
const CONSUMER_GROUP_ID = process.env.CONSUMER_GROUP_ID || "my-consumer-group";
const SERVICE_NAME = process.env.SERVICE_NAME || "test-consumer";
const HOSTNAME = process.env.HOSTNAME || "local";
const TOPIC = "product.created";
const consumer = new Consumer({
groupId: CONSUMER_GROUP_ID,
clientId: `${SERVICE_NAME}-${HOSTNAME}`,
bootstrapBrokers: BROKERS,
deserializers: {
key: stringDeserializer,
value: (value?: Buffer) => {
return value ? ProductEvent.decode(value) : ProductEvent.create();
},
},
});
async function main() {
try {
const stream = await consumer.consume({
topics: [TOPIC],
mode: "committed",
});
for await (const message of stream) {
console.log(
`Received: ${message.key} ->`,
ProductEvent.toJSON(message.value)
);
await message.commit();
}
} catch (err) {
console.error(err);
}
}
async function shutdown() {
try {
await consumer.close();
} catch (err) {
console.error("Error during shutdown", err);
process.exit(1);
}
}
process.once("SIGTERM", shutdown);
process.once("SIGINT", shutdown);
main();
After running the updated consume-events.ts script, you will see the logs of successfully consumed events.
Received: 465abab8-0916-420a-bca7-c779a3c8782f -> {
id: '465abab8-0916-420a-bca7-c779a3c8782f',
type: 'PRODUCT_EVENT_TYPE_CREATED',
product: {
id: 591,
name: 'Test Product',
description: '',
image: '',
tags: []
},
timestamp: '2026-03-15T15:49:22.883Z'
}
Received: f8d445d8-4f9f-4c2b-bf7a-ab0d7ce1923d -> {
id: 'f8d445d8-4f9f-4c2b-bf7a-ab0d7ce1923d',
type: 'PRODUCT_EVENT_TYPE_CREATED',
product: {
id: 166,
name: 'Test Product',
description: '',
image: '',
tags: []
},
timestamp: '2026-03-15T15:49:24.817Z'
}
Now we can implement this setup in our product service. When the service creates or updates a product in the database, it will publish an event. The events can be consumed by different services, e.g., a notification service, an inventory service, and an analytics service.
Note: We need to migrate the product-service module system from CommonJS to ESM. @platformatic/kafka supports only ESM.
cd services/product-service
npm pkg set type=module
npm i @rsbh-nodejs-microservices/protos @platformatic/kafka uuid
npm i -D @types/uuid
mkdir src/clients
touch src/clients/kafka.client.ts
Now create the KafkaClient class in kafka.client.ts. We will not add a serializer for value, as this client can be used for any event publishing. So, we will not hardcode the Protobuf message in the serializer; the caller has to encode the message in Binary and call the publishEvent method
import { Producer, ProduceAcks, stringSerializer } from "@platformatic/kafka";
interface kafkaConfig {
brokers: string[];
clientId: string;
}
export class KafkaClient {
private readonly producer: Producer<string, Buffer, Buffer, Buffer>;
constructor(config: kafkaConfig) {
this.producer = new Producer({
clientId: config.clientId,
bootstrapBrokers: config.brokers,
serializers: {
key: stringSerializer,
},
acks: ProduceAcks.ALL,
});
}
public async publishEvent(topic: string, key: string, value: Buffer) {
try {
const { offsets } = await this.producer.send({
messages: [{ topic, key, value }],
});
offsets?.forEach(({ topic, partition, offset }) => {
console.log(
`Published to ${topic} partition ${partition} at offset ${offset}`
);
});
} catch (err) {
console.error("Unable to publish event", err);
throw err;
}
}
public async close() {
return this.producer.close();
}
}
Now update the main.ts file to import the KafkaClient and create its instance before starting the grpc server.
import "reflect-metadata";
import dataSource from "./db/index.js";
import { Server, ServerCredentials } from "@grpc/grpc-js";
import { getProductServer } from "./server.js";
import { ProductServiceService } from "@rsbh-nodejs-microservices/protos/product/product";
import { KafkaClient } from "./clients/kafka.client.js";
const HOST = process.env.HOST || "0.0.0.0";
const PORT = Number(process.env.PORT) || 50051;
const BROKERS = (process.env.KAFKA_BROKERS || "localhost:9092").split(",");
const SERVICE_NAME = process.env.SERVICE_NAME || "test-producer";
const HOSTNAME = process.env.HOSTNAME || "local";
const address = `${HOST}:${PORT}`;
async function main() {
const db = await dataSource.initialize();
const kafkaClient = new KafkaClient({
clientId: `${SERVICE_NAME}-${HOSTNAME}`,
brokers: BROKERS,
});
const server = new Server();
server.addService(ProductServiceService, getProductServer(db, kafkaClient));
await new Promise<void>((resolve, reject) => {
server.bindAsync(
address,
ServerCredentials.createInsecure(),
(error, port) => {
if (error) {
return reject(error);
}
console.log("server is running on", port);
server.start();
resolve();
}
);
});
const shutdown = async () => {
console.log("Shutting down...");
await new Promise<void>((resolve) => {
server.tryShutdown((err) => {
if (err) server.forceShutdown();
resolve();
});
});
await kafkaClient.close();
await db.destroy();
};
process.once("SIGTERM", shutdown);
process.once("SIGINT", shutdown);
}
main().catch((error) => {
console.error(error);
process.exit(1);
});
And in the server, we will call the publishEvent on createProduct
import { sendUnaryData, ServerUnaryCall, status } from "@grpc/grpc-js";
import {
CreateProductRequest,
CreateProductResponse,
GetProductRequest,
GetProductResponse,
ListProductsRequest,
ListProductsResponse,
Product,
ProductServiceServer,
} from "@rsbh-nodejs-microservices/protos/product/product";
import { DataSource } from "typeorm";
import * as ProductController from "./controllers/product.controller.js";
import {
ProductEventType,
ProductEvent,
} from "@rsbh-nodejs-microservices/protos/product/product";
import { v4 as uuid } from 'uuid';
import type { KafkaClient } from "./clients/kafka.client.js";
const TOPICS = {
PRODUCT_CREATED: "product.created",
} as const;
export function getProductServer(
db: DataSource,
kafkaClient: KafkaClient
): ProductServiceServer {
async function createProduct(
call: ServerUnaryCall<CreateProductRequest, CreateProductResponse>,
callback: sendUnaryData<CreateProductResponse>
) {
try {
const product = await ProductController.createProduct(db, call.request);
const productPB = Product.fromJSON(product);
const response: CreateProductResponse = {
product: productPB,
};
const event = ProductEvent.create({
id: uuid(),
type: ProductEventType.PRODUCT_EVENT_TYPE_CREATED,
product: productPB,
timestamp: new Date(),
});
await kafkaClient.publishEvent(
TOPICS.PRODUCT_CREATED,
product.id.toString(),
Buffer.from(ProductEvent.encode(event).finish())
);
callback(null, response);
} catch (err) {
callback({ code: status.INTERNAL }, null);
console.error(err);
}
}
Now we can test our integration by running all our services. Docker compose will start the PostgreSQL and Kafka broker
docker compose up
Start the kafka consumer in a seperate terminal window/pane
cd services/test-kafka
npx ts-node consume-events.ts
Start the product service in another terminal window/pane
cd services/product-service
npm run build
npm run start
Now run the grpc client code in another terminal. It will call the product service, which will publish the product.created event to the Kafka broker, and the consumer script will log it.
cd services/test-client
npm run dev
All the code used in this blog will be available in this github repo