-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathflink.sql
159 lines (148 loc) · 4.99 KB
/
flink.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
-- this code runs in the sql-client, so should be able to communicate via broker:29092 unencrypted port
DROP TABLE IF EXISTS fhv_trips;
CREATE TABLE fhv_trips (
pickup_location_id INT,
pickup_datetime TIMESTAMP(3),
proctime AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'fhv-trips',
'properties.bootstrap.servers' = 'broker:29092',
'properties.group.id' = 'tripsdata',
'properties.auto.offset.reset' = 'earliest',
'json.timestamp-format.standard' = 'ISO-8601',
'format' = 'json'
);
DROP TABLE IF EXISTS green_trips;
CREATE TABLE green_trips (
pickup_location_id INT,
pickup_datetime TIMESTAMP(3),
proctime AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'green-trips',
'properties.bootstrap.servers' = 'broker:29092',
'properties.group.id' = 'tripsdata',
'properties.auto.offset.reset' = 'earliest',
'json.timestamp-format.standard' = 'ISO-8601',
'format' = 'json'
);
-- test: union + count
-- SELECT
-- TUMBLE_START(proctime, INTERVAL '1' MINUTE) AS window_start,
-- TUMBLE_END(proctime, INTERVAL '1' MINUTE) AS window_end,
-- pickup_location_id,
-- COUNT(*) AS pickup_count
-- FROM (
-- SELECT pickup_location_id, proctime
-- FROM fhv_trips
-- UNION ALL
-- SELECT pickup_location_id, proctime
-- FROM green_trips
-- )
-- GROUP BY TUMBLE(proctime, INTERVAL '1' MINUTE), pickup_location_id;
-- test - read from topic
-- SELECT
-- TUMBLE_START(proctime, INTERVAL '1' MINUTE) AS window_start,
-- pickup_location_id,
-- COUNT(*) AS pickup_count
-- FROM fhv_trips
-- GROUP BY
-- TUMBLE(proctime, INTERVAL '1' MINUTE),
-- pickup_location_id;
-- DROP TABLE IF EXISTS rolling_pickup_counts;
-- CREATE TABLE rolling_pickup_counts (
-- window_start TIMESTAMP(3),
-- window_end TIMESTAMP(3),
-- pickup_location_id BIGINT,
-- pickup_count BIGINT,
-- proctime AS PROCTIME()
-- ) WITH (
-- 'connector' = 'filesystem',
-- 'format' = 'csv',
-- 'path' = '/data/rolling_pickup_counts',
-- 'sink.rolling-policy.file-size' = '128MB',
-- 'sink.rolling-policy.rollover-interval' = '1h',
-- 'sink.rolling-policy.check-interval' = '10s',
-- 'sink.parallelism' = '4'
-- );
-- popular location window
DROP TABLE IF EXISTS popular_pickup_location;
CREATE TABLE popular_pickup_location (
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
pickup_location_id INT,
pickup_count BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'popular-pickup-location',
'properties.bootstrap.servers' = 'broker:29092',
'properties.group.id' = 'tripsdata',
'properties.auto.offset.reset' = 'earliest',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
-- create to count all pickups for each location id during a rolling time interval
INSERT INTO popular_pickup_location
SELECT
TUMBLE_START(proctime, INTERVAL '10' MINUTE) AS window_start,
TUMBLE_END(proctime, INTERVAL '10' MINUTE) AS window_end,
pickup_location_id,
CAST(COUNT(*) AS BIGINT) AS pickup_count
FROM (
SELECT pickup_location_id, proctime
FROM fhv_trips
UNION ALL
SELECT pickup_location_id, proctime
FROM green_trips
)
GROUP BY TUMBLE(proctime, INTERVAL '10' MINUTE), pickup_location_id;
-- highest count location id for each interval
-- SELECT window_start, window_end, pickup_location_id, pickup_count
-- FROM (
-- SELECT TUMBLE_START(proctime, INTERVAL '30' MINUTE) AS window_start,
-- TUMBLE_END(proctime, INTERVAL '30' MINUTE) AS window_end,
-- pickup_location_id,
-- CAST(COUNT(*) AS BIGINT) AS pickup_count,
-- ROW_NUMBER() OVER (PARTITION BY TUMBLE_START(proctime, INTERVAL '30' MINUTE), pickup_location_id
-- ORDER BY COUNT(*) DESC) as row_num
-- FROM (
-- SELECT pickup_location_id, proctime
-- FROM fhv_trips
-- UNION ALL
-- SELECT pickup_location_id, proctime
-- FROM green_trips
-- )
-- GROUP BY TUMBLE(proctime, INTERVAL '30' MINUTE), pickup_location_id
-- )
-- WHERE row_num = 1;
-- create table for top locations aggregated over previous data
DROP TABLE IF EXISTS top_pickup_location;
CREATE TABLE top_pickup_location (
pickup_location_id INT,
pickup_count BIGINT,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'top-pickup-location',
'properties.bootstrap.servers' = 'broker:29092',
'properties.group.id' = 'tripsdata',
'properties.auto.offset.reset' = 'earliest',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
-- aggregate windowed
INSERT INTO top_pickup_location
SELECT
pickup_location_id,
SUM(pickup_count) AS total_pickup_count,
TUMBLE_START(proctime(), INTERVAL '60' MINUTE) AS window_start,
TUMBLE_END(proctime(), INTERVAL '60' MINUTE) AS window_end
FROM popular_pickup_location
GROUP BY
pickup_location_id,
TUMBLE(proctime(), INTERVAL '60' MINUTE);
SELECT * FROM top_pickup_location;