-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.coffee
58 lines (47 loc) · 1.39 KB
/
app.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#!/usr/bin/env coffee
commandLineArgs = require('command-line-args')
readline = require('readline')
kafka = require('kafka-node')
HighLevelProducer = kafka.HighLevelProducer
client = new kafka.Client()
producer = new HighLevelProducer(client)
cli = commandLineArgs([
{ name: 'topic', alias: 't', type: String },
{ name: 'readline', alias: 'r', type: Boolean }
])
parsedCli = cli.parse()
buildReadLine = ->
readline.createInterface({input: process.stdin, output: process.stdout})
produceMessage = (topic, message, callback) ->
payloads = [{ topic: topic, messages: message }]
producer.send payloads, (err, data) ->
if err
console.log("Got error", err)
else
console.log("Produced message: ", data)
if callback
callback()
queryMessage = (topic) ->
rl = buildReadLine()
rl.prompt()
rl.on 'line', (line) ->
rl.close()
produceMessage topic, line, ->
queryMessage(topic)
queryTopic = ->
rl = buildReadLine()
rl.question 'Topic to send payload: ', (topic) ->
rl.close()
queryMessage(topic)
queryProducerMessage = (topic) ->
if topic
queryMessage(options.topic)
else
queryTopic()
producer.on 'ready', ->
console.log("Ready to produce messages")
if parsedCli.readline
queryProducerMessage(parsedCli.topic)
exports.send = (topic, message) ->
console.log("Sending message", message)
produceMessage(topic, JSON.stringify(message))