-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
56 lines (46 loc) · 1.21 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import os
from queue import Queue
from observer_manager import ObserverManager
from events import *
from kafka import KafkaConsumer, KafkaProducer
# put the value in a k8s manifest
kafka_bootstrap_server: str | None = os.environ.get('KAFKA_BOOTSTRAP_SERVER')
if kafka_bootstrap_server is None:
print(f'Environment variable KAFKA_BOOTSTRAP_SERVER is not set')
exit(1)
event_queue = Queue()
# share the queue with a reader
reader = KafkaEventReader(
KafkaConsumer(
'om',
bootstrap_servers=kafka_bootstrap_server,
auto_offset_reset='latest',
),
event_queue
)
# writers to other microservices
writers = {
'mc': KafkaEventWriter(
KafkaProducer(
bootstrap_servers=kafka_bootstrap_server
),
'mc'
),
'ta': KafkaEventWriter(
KafkaProducer(
bootstrap_servers=kafka_bootstrap_server
),
'ta'
),
'dmm': KafkaEventWriter(
KafkaProducer(
bootstrap_servers=kafka_bootstrap_server
),
'dmm'
),
}
# init the microservice
observer_manager = ObserverManager(
event_queue, writers
)
observer_manager.main_thread.join()