-
Notifications
You must be signed in to change notification settings - Fork 0
/
kafka_event.py
76 lines (57 loc) · 2.28 KB
/
kafka_event.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
from queue import Queue
import threading
from threading import Thread
from kafka import KafkaConsumer, KafkaProducer
from event import *
MESSAGE_ENCODING = 'utf-8'
class KafkaEventReader:
'''
Класс, который считывает сообщения с кафки и сохраняет их в виде ивентов
'''
def __init__(self, kafka_consumer: KafkaConsumer, event_queue: Queue):
'''
Подготовка к считыванию сообщений
'''
self.consumer = kafka_consumer
self.running = threading.Event()
self.running.set()
self.event_queue = event_queue
self.reading_thread = Thread(target=self.read_events)
self.reading_thread.start()
def read_events(self):
'''
Считывание сообщений от кафки, превращение их в ивенты, сохранение в очередь ивентов
Запущен на отдельном потоке для постоянного считывания новых сообщений
'''
while self.running.is_set():
message_pack = self.consumer.poll(timeout_ms=1000)
for _topic, messages in message_pack.items():
for message in messages:
self.event_queue.put(EventFromMessage(message.value.decode(MESSAGE_ENCODING)))
def release(self):
'''
Отключение от кафки
'''
self.running.clear()
self.consumer.close()
class KafkaEventWriter:
'''
Класс, который превращает ивенты в сообщения и отправляет их в кафку
'''
def __init__(self, kafka_producer: KafkaProducer, topic: str):
'''
Инициализация класса
'''
self.topic = topic
self.producer = kafka_producer
def send_event(self, event: Event):
'''
Отправка ивента превращенного в сообщение
'''
message = str(event)
self.producer.send(self.topic, message.encode(MESSAGE_ENCODING))
def release(self):
'''
Отключение от кафки
'''
self.producer.close()