-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpipe_stream.py
109 lines (95 loc) · 3.91 KB
/
pipe_stream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import argparse
from datetime import datetime
import logging
import random
from apache_beam import DoFn, GroupByKey, io, ParDo, Pipeline, PTransform, WindowInto, WithKeys
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows
class GroupMessagesByFixedWindows(PTransform):
"""A composite transform that groups Pub/Sub messages based on publish time
and outputs a list of typles, each containg a message and its publish time.
"""
def __init__(self, window_size, num_shards=5):
# Set window size to 60 seconds.
self.window_size = int(window_size*60)
self.num_shards = num_shards
def expand(self, pcoll):
return (
pcoll
# Bind window info to each element using element timestamp (or publish time).
| "Window into fixed intrervals" >> WindowInto(FixedWindows(self.window_size))
| "Add timestamp to windowed elements" >> ParDo(AddTimestamp())
# Assign a random key to each windowed element based on the number of shards.
| "Add key" >> WithKeys(lambda _: random.randint(0, self.num_shards-1))
# Group windowed elements by key. All the elements in the same eindow must fit
# memory for this. If not, you need to use 'beam.util.BatchElements'.
| "Group by key" >> GroupByKey()
)
class AddTimestamp(DoFn):
def process(self, element, publish_time=DoFn.TimestampParam):
"""Processes each windowed element by extracting the message body and its
publish time into a tuple.
"""
yield (
element.decode('utf-8'),
datetime.utcfromtimestamp(float(publish_time)).strftime(
"%Y-%m-%d %H:%M:%S.%f"
),
)
class WriteToGCS(DoFn):
def __init__(self, output_path):
self.output_path = output_path
def process(self, key_value, window=DoFn.WindowParam):
"""Write messages in a batch to Google Cloud Storage."""
ts_format = "%H:%M"
window_start = window.start.to_utc_time().strftime(ts_format)
window_end = window.end.to_utc_time().strftime(ts_format)
shard_id, batch = key_value
filename = "-".join([self.output_path, window_start, window_end, str(shard_id)])
with io.gcsio.GcsIO().open(filename=filename, mode="w") as f:
for message_body, publish_time in batch:
f.write(f"{message_body}, {publish_time}\n".encode("utf-8"))
def run(input_topic, output_path, window_size=1.0, num_shards=5, pipeline_args=None):
# Set `save_main_session` to True so DoFns can access globally imported modules.
pipeline_options = PipelineOptions(
pipeline_args, streaming = True, save_main_session = True
)
with Pipeline(options=pipeline_options) as pipeline:
(
pipeline
# Because `timestamp_attriute` is unspecified in `ReadFromPubSub`, Beam
# binds the publish time returned by the Pub/Sub server for each message
# to the element's timestamp parameter, accessible via `DoFn.TimestampParam`.
# http://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub
| "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic)
| "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards)
| "Write to GCS" >> ParDo(WriteToGCS(output_path))
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument(
"--input_topic",
help="The Cloud Pub/Sub topic to read from."
'"projects/<PROJECT_ID>/topics/<TOPIC_ID>"',
)
parser.add_argument(
"--window_size",
type=float,
default=1.0,
help="Output file's window size in minutes.",
)
parser.add_argument(
"--output_path",
type=int,
default=5,
help="Number of shards to use when writing windowed elements to GCS",
)
known_args, pipeline_args = parser.parse_known_args()
run(
known_args.input_topic,
known_args.output_path,
known_args.window_size,
known_args.num_shards,
pipeline_args,
)