-
Notifications
You must be signed in to change notification settings - Fork 23
/
consumer.js
39 lines (36 loc) · 1.07 KB
/
consumer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/*
* Copyright 2024 New Relic Corporation. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/
'use strict'
const pino = require('pino')
const logger = pino({ level: 'debug' })
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'test-consumer',
brokers: ['localhost:9093']
})
const topics = ['test-topic', 'other-topic']
const consumer = kafka.consumer({ groupId: 'test-group' })
async function main() {
await consumer.connect()
topics.forEach(async (topic) => {
await consumer.subscribe({ topic })
})
await consumer.run({
eachMessage: async function processMessage({ topic, partition, message }) {
logger.info('Handling message %s from topic: %s', message.value.toString(), topic)
await new Promise((resolve) => {
setTimeout(resolve, Math.floor(Math.random() * 2000))
})
await consumer.commitOffsets([{ topic, partition, offset: message.offset }])
}
})
}
main()
.then(() => {
logger.info('Consumer is running')
})
.catch((err) => {
logger.error('Failed to start consumer %j', err)
})