This example provides both a kafkajs producer and a background consumer.
Note: This application requires the use of Node.js v20+ and docker.
-
Clone or fork this repository.
-
Setup the kafka containers
docker compose up -d
-
Install dependencies and run application
npm ci cp env.sample .env # Fill out `NEW_RELIC_LICENSE_KEY` in .env and save # Start the producer npm start # Start the consumer in a different shell npm run start:consumer
-
Make requests to application. The consumer subscribes to two topics:
test-topic
andother-topic
.
curl --location 'http://localhost:3000/message' \
--header 'Content-Type: application/json' \
--data '{
"topic": "test-topic",
"messages": [
{
"key":"key1",
"value":"This is a sample message",
"headers": {
"x-custom-header": "custom-value"
}
},
{
"key": "key1",
"value": "Hello from the other side"
},
{
"key": "key2",
"value": "Greetings!"
}
]
}'
You can change the number of messages sent by editing the curl post above.
After sending a few requests, navigate to your application in APM & Services
. Select Distributed tracing
. The producer is run within a fastify handler. A transaction will be created and spans for the middleware handler as well as the sending of messages. Since the consumer is running and handling message consumption, Distributed Tracing will link the two entities.
For every consumption of a message a transaction is created.
Metrics are captured for the number of messages and byte size of messages by running this query
SELECT count(newrelic.timeslice.value) FROM Metric WHERE metricTimesliceName LIKE 'Message/Kafka/Topic/Named/%/Received/%' AND `entity.guid` = '<entity-guid>' FACET metricTimesliceName TIMESERIES SINCE 1 day ago
kafka.consume.byteCount
and kafka.consume.client_id
are tracked per transaction. Run this query:
FROM Transaction select name, kafka.consume.byteCount, kafka.consume.client_id where appName = 'kafka-consumer'