Enhance the data pipeline by introducing Apache Flink SQL for complex event processing with Apache Pulsar as the streaming system.
-
Ensure Docker Compose is running with Apache Pulsar, Apache Flink, and other necessary services
-
Navigate to the
05_pinot-pulsar-flink
directory containing the required files and scripts
-
Description: Initialize all necessary services by running the
make
command -
Action:
make
-
Description: Start the Apache Flink SQL client to manage streaming jobs
-
Action:
docker-compose run sql-client
-
Description: Use the Flink SQL client to create tables and perform data transformations
CREATE TABLE Movies (
movieId INT,
title STRING,
releaseYear INT,
country STRING,
rating DOUBLE,
genres ARRAY<STRING>,
actors ARRAY<STRING>,
directors ARRAY<STRING>,
composers ARRAY<STRING>,
screenwriters ARRAY<STRING>,
productionCompanies ARRAY<STRING>,
cinematographer STRING
) WITH (
'connector' = 'pulsar',
'topics' = 'persistent://public/default/movies',
'service-url' = 'pulsar://pulsar:6650',
'value.format' = 'json',
'source.subscription-name' = 'flink-movies-subscription',
'source.subscription-type' = 'Shared'
);
CREATE TABLE MovieRatings (
movieId INT,
rating DOUBLE,
ratingTimeMillis BIGINT,
ratingTime AS TO_TIMESTAMP_LTZ(ratingTimeMillis, 3)
) WITH (
'connector' = 'pulsar',
'topics' = 'persistent://public/default/ratings',
'service-url' = 'pulsar://pulsar:6650',
'value.format' = 'json',
'source.subscription-name' = 'flink-ratings-subscription',
'source.subscription-type' = 'Shared'
);
CREATE TABLE RatedMoviesSink (
movieId INT,
title STRING,
releaseYear INT,
actors ARRAY<STRING>,
rating DOUBLE,
ratingTime TIMESTAMP(3)
) WITH (
'connector' = 'pulsar',
'topics' = 'persistent://public/default/rated_movies',
'service-url' = 'pulsar://pulsar:6650',
'value.format' = 'json'
);
-
Query movies:
SELECT * FROM Movies WHERE LOWER(title) LIKE '%lethal weapon%';
-
Calculate average ratings:
SELECT movieId, COUNT(*) as ratingCount, AVG(rating) as avgRating, MIN(rating) as minRating, MAX(rating) as maxRating FROM MovieRatings GROUP BY movieId;
-
View recent ratings:
SELECT movieId, rating, ratingTime FROM MovieRatings ORDER BY ratingTime DESC LIMIT 10;
-
Description: Verify that data is flowing through the pipeline
-
Actions:
Check table counts:
-- Check Movies table
SELECT COUNT(*) FROM Movies;
-- Check MovieRatings table
SELECT COUNT(*) FROM MovieRatings;
-- Check processed results
SELECT COUNT(*) FROM RatedMoviesSink;
Check Pulsar topics:
# Check topic stats
docker-compose exec pulsar bin/pulsar-admin topics stats persistent://public/default/rated_movies
# Consume messages from sink topic
docker-compose exec pulsar bin/pulsar-client consume persistent://public/default/rated_movies -s "test-sub" -n 0
-
Add these configurations to the sink table for better performance:
CREATE TABLE RatedMoviesSink ( -- columns as above ) WITH ( 'connector' = 'pulsar', 'topics' = 'persistent://public/default/rated_movies', 'service-url' = 'pulsar://pulsar:6650', 'value.format' = 'json', 'sink.parallelism' = '2', 'sink.message-delay-interval' = '1000' );
-
If Pulsar topics aren’t created automatically:
docker-compose exec pulsar bin/pulsar-admin topics create persistent://public/default/movies docker-compose exec pulsar bin/pulsar-admin topics create persistent://public/default/ratings docker-compose exec pulsar bin/pulsar-admin topics create persistent://public/default/rated_movies
-
If you need to reset a topic:
docker-compose exec pulsar bin/pulsar-admin topics delete persistent://public/default/<topic_name> docker-compose exec pulsar bin/pulsar-admin topics create persistent://public/default/<topic_name>
-
To view topic statistics:
docker-compose exec pulsar bin/pulsar-admin topics stats persistent://public/default/<topic_name>