-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathevent_producer.py
145 lines (111 loc) · 3.19 KB
/
event_producer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import functools
import random
import datetime
import uuid
import argparse
import json
from kafka import KafkaProducer
from faker import Faker
fake = Faker()
def random_browser() -> str:
"""
Return a random web browser.
"""
browsers = ["Chrome", "Firefox", "Safari"]
return random.choice(browsers)
def event_wrapper(event_type):
"""
Decorator function that wraps raw event data
in a message envelope.
Parameters:
event_type (str): the inner event type
"""
def decorator_with_type(func):
@functools.wraps(func)
def wrapper():
event = func()
event_wrapper = {
"eventId": str(uuid.uuid4()),
"eventType": event_type,
"eventTimestamp": str(datetime.datetime.now()),
"event": event,
}
return event_wrapper
return wrapper
return decorator_with_type
def create_common_event_attrs() -> dict:
"""
Generate common event attributes.
"""
return {
"userAgent": fake.user_agent(),
"sessionId": fake.uuid4(),
"url": fake.url(),
"browser": random_browser(),
"ip": fake.ipv4(),
}
@event_wrapper("page_viewed.v1")
def create_pageview_event() -> dict:
"""
Generate a random page viewed event.
"""
event = {"common": create_common_event_attrs()}
return event
@event_wrapper("link_clicked.v1")
def create_link_clicked_event() -> dict:
"""
Generate a random link clicked event.
"""
event = {"common": create_common_event_attrs(), "link_url": fake.url()}
return event
def create_events(num_events) -> list:
"""
Generate the specified number of random events.
Parameters:
num_events (int): number of random events to generate
"""
page_view_events = [create_pageview_event() for i in range(num_events)]
link_clicked_events = [create_link_clicked_event() for i in range(num_events)]
return page_view_events + link_clicked_events
def send_events_to_kafka(events, topic, brokers):
"""
Send a list of events to the specified Kafka topic.
Parameters:
events (list): events to send to Kafka
topic (str): Kafka topic to send events to
brokers (str): Kafka brokers
"""
producer = KafkaProducer(
bootstrap_servers=brokers,
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
)
for event in events:
future = producer.send(topic, event)
res = future.get(timeout=60)
print(res)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--num_events",
type=int,
required=False,
default=10,
help="number of events of each type to produce"
)
parser.add_argument(
"--topic",
type=str,
required=False,
default='raw_events',
help="Kafka topic"
)
parser.add_argument(
"--brokers",
type=str,
required=False,
default='localhost:9092',
help="Kafka broker list"
)
args = parser.parse_args()
events = create_events(args.num_events)
send_events_to_kafka(events, args.topic, args.brokers)