This repository was archived by the owner on Dec 12, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.py
227 lines (188 loc) · 7.67 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
#!/usr/bin/env python3
import configparser
import gi
import kafka
import os
import uuid
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk, Gio, GObject
from components import SettingsDialog, KafkaTopicPanel, KafkaPanelHeader
KAFKA_GROUP = "kafka-dashboard-" + str(uuid.uuid4())
CONFIG_FILE = os.path.expanduser("~/.samsa.ini")
# Create config file if not exists
if not os.path.exists(CONFIG_FILE):
t = configparser.ConfigParser()
t.add_section("samsa")
with open(CONFIG_FILE, 'w') as f:
t.write(f)
class PickTopicDialog(Gtk.Dialog):
def __init__(self, parent):
buttons = (Gtk.STOCK_CANCEL, Gtk.ResponseType.CANCEL,
Gtk.STOCK_OK, Gtk.ResponseType.OK)
Gtk.Dialog.__init__(self, "Topic", parent, 0, buttons)
self.set_default_size(150, 100)
self.combo = Gtk.ComboBoxText.new_with_entry()
for topic in sorted(parent.kafka_consumer.topics()):
self.combo.append_text(topic)
box = self.get_content_area()
box.add(self.combo)
self.show_all()
def get_value(self):
return self.combo.get_active_text()
class RebalanceHandler(kafka.consumer.subscription_state.ConsumerRebalanceListener):
def __init__(self, application):
self.application = application
def on_partitions_revoked(self, revoked):
pass
def on_partitions_assigned(self, assigned):
for topic in self.application.paused_topics:
self.application.pause(topic)
class SamsaWindow(Gtk.Window):
def __init__(self, config):
Gtk.Window.__init__(self)
self.connect('destroy', self.on_destroy)
self.config = config
self.kafka_servers = [
k.strip()
for k in self.config['kafka_servers'].split(',')
]
self.kafka_consumer = kafka.KafkaConsumer(bootstrap_servers=self.kafka_servers,
group_id=KAFKA_GROUP,
enable_auto_commit=True)
self.pages = {}
self.tabs = {}
hb = Gtk.HeaderBar()
hb.set_show_close_button(True)
hb.props.title = "Kafka Dashboard: " + self.config['kafka_servers']
self.set_titlebar(hb)
newButton = Gtk.Button()
image = Gtk.Image.new_from_gicon(Gio.ThemedIcon(name="gtk-add"),
Gtk.IconSize.BUTTON)
box = Gtk.HBox()
box.add(image)
box.add(Gtk.Label("Add Topic"))
newButton.add(box)
newButton.connect('clicked', self.on_add_clicked)
hb.pack_end(newButton)
self.set_default_size(800, 600)
if self.config['view_mode'] == 'tabs':
self.topic_panel_container = Gtk.Notebook()
self.topic_panel_container.set_tab_pos(Gtk.PositionType.LEFT)
self.topic_panel_container.connect('switch-page', self.on_switch_page)
elif self.config['view_mode'] == 'tiles':
self.topic_panel_container = Gtk.HBox()
self.add(self.topic_panel_container)
if config['topics']:
for topic in config['topics'].split(","):
self.add_page(topic)
self.paused_topics = set()
GObject.timeout_add(self.config['polling_freq'], self.update)
self.show_all()
def add_page(self, topic):
"""
Create a new page.
"""
topic = topic.strip()
page = KafkaTopicPanel(self, topic)
header = KafkaPanelHeader(topic,
close=lambda b: self.remove_page(topic),
pause=lambda b: self.pause(topic),
resume=lambda b: self.resume(topic))
# TODO: This should be abstracted a little further so that
# I'm always adding something with the same interface to self.pages
# Currently there's some hackery in the update function
if self.config['view_mode'] == 'tabs':
self.topic_panel_container.append_page(page, header)
self.topic_panel_container.set_current_page(-1)
self.pages[topic] = page
elif self.config['view_mode'] == 'tiles':
vbox = Gtk.VBox()
vbox.pack_start(header, False, False, 0)
vbox.pack_start(page, True, True, 0)
vbox.show_all()
self.topic_panel_container.pack_start(vbox, True, True, 5)
self.pages[topic] = vbox
page.show_all()
self.tabs[topic] = header
self.kafka_consumer.subscribe(self.pages.keys(), listener=RebalanceHandler(self))
def remove_page(self, topic):
self.topic_panel_container.remove(self.pages[topic])
del self.pages[topic]
if self.pages.keys():
self.kafka_consumer.subscribe(self.pages.keys(), listener=RebalanceHandler(self))
else:
self.kafka_consumer.unsubscribe()
def pause(self, topic):
self.paused_topics.add(topic)
partitions = self.kafka_consumer.partitions_for_topic(topic)
assigned_partitions = self.kafka_consumer.assignment()
to_pause = [p for p in assigned_partitions if p.topic == topic and p.partition in partitions]
self.kafka_consumer.pause(*to_pause)
self.tabs[topic].set_paused(True)
def resume(self, topic):
self.paused_topics.remove(topic)
paused = self.kafka_consumer.paused()
to_resume = [p for p in paused if p.topic == topic]
self.kafka_consumer.resume(*to_resume)
self.tabs[topic].set_paused(False)
def on_switch_page(self, notebook, page, page_num, *args, **kwargs):
"""
Clear the bold markup if present when switching to a tab.
"""
tab_box = notebook.get_tab_label(page)
label = tab_box.get_children()[1]
label.set_markup(label.get_text())
def on_add_clicked(self, *args, **kwargs):
"""
Prompt the user for a new topic and add a page for it.
"""
dialog = PickTopicDialog(self)
response = dialog.run()
if response == Gtk.ResponseType.OK:
topic = dialog.get_value()
if topic:
self.add_page(topic)
dialog.destroy()
def on_destroy(self, *args, **kwargs):
"""
Close and clean up.
"""
self.kafka_consumer.close()
Gtk.main_quit()
def update(self):
"""
Check for new messages and distribute them to their
appropriate pages.
"""
response = self.kafka_consumer.poll()
updated_topics = set(map(lambda x: x.topic, response.keys()))
for topic in self.tabs.keys():
self.set_urgency_hint(True)
if topic in updated_topics:
self.tabs[topic].get_children()[1].set_markup("<b>{}</b>".format(topic))
for messages in response.values():
for message in messages:
page = self.pages[message.topic]
if isinstance(page, KafkaTopicPanel):
self.pages[message.topic].append_message(message)
else:
page.get_children()[1].append_message(message)
return True
if __name__ == '__main__':
_ = Gtk.Window()
dialog = SettingsDialog(_, CONFIG_FILE)
response = dialog.run()
config = {}
if response == Gtk.ResponseType.OK:
config = dialog.get_value()
DEFAULTS = configparser.ConfigParser()
DEFAULTS.add_section("samsa")
for k, v in config.items():
DEFAULTS.set('samsa', k, str(v))
with open(os.path.expanduser(CONFIG_FILE), 'w') as f:
DEFAULTS.write(f)
dialog.destroy()
_.destroy()
if config.get('kafka_servers'):
win = SamsaWindow(config)
Gtk.main()