Skip to content

Latest commit

 

History

History
227 lines (202 loc) · 5.28 KB

File metadata and controls

227 lines (202 loc) · 5.28 KB

Directory

  • 05_pinot-pulsar-flink

Objective

Enhance the data pipeline by introducing Apache Flink SQL for complex event processing with Apache Pulsar as the streaming system.

Setup Requirements

  • Ensure Docker Compose is running with Apache Pulsar, Apache Flink, and other necessary services

  • Navigate to the 05_pinot-pulsar-flink directory containing the required files and scripts

Step 1: Start Services Using Make

  • Description: Initialize all necessary services by running the make command

  • Action:

    make
  • Description: Start the Apache Flink SQL client to manage streaming jobs

  • Action:

    docker-compose run sql-client
  • Description: Use the Flink SQL client to create tables and perform data transformations

Create Movies Table
CREATE TABLE Movies (
    movieId             INT,
    title               STRING,
    releaseYear         INT,
    country             STRING,
    rating              DOUBLE,
    genres              ARRAY<STRING>,
    actors              ARRAY<STRING>,
    directors           ARRAY<STRING>,
    composers           ARRAY<STRING>,
    screenwriters       ARRAY<STRING>,
    productionCompanies ARRAY<STRING>,
    cinematographer     STRING
) WITH (
    'connector' = 'pulsar',
    'topics' = 'persistent://public/default/movies',
    'service-url' = 'pulsar://pulsar:6650',
    'value.format' = 'json',
    'source.subscription-name' = 'flink-movies-subscription',
    'source.subscription-type' = 'Shared'
);
Create Ratings Table
CREATE TABLE MovieRatings (
    movieId          INT,
    rating          DOUBLE,
    ratingTimeMillis BIGINT,
    ratingTime AS TO_TIMESTAMP_LTZ(ratingTimeMillis, 3)
) WITH (
    'connector' = 'pulsar',
    'topics' = 'persistent://public/default/ratings',
    'service-url' = 'pulsar://pulsar:6650',
    'value.format' = 'json',
    'source.subscription-name' = 'flink-ratings-subscription',
    'source.subscription-type' = 'Shared'
);
Create Rated Movies Sink Table
CREATE TABLE RatedMoviesSink (
    movieId     INT,
    title       STRING,
    releaseYear INT,
    actors      ARRAY<STRING>,
    rating      DOUBLE,
    ratingTime  TIMESTAMP(3)
) WITH (
    'connector' = 'pulsar',
    'topics' = 'persistent://public/default/rated_movies',
    'service-url' = 'pulsar://pulsar:6650',
    'value.format' = 'json'
);
Example Queries
  • Query movies:

    SELECT *
    FROM Movies
    WHERE LOWER(title) LIKE '%lethal weapon%';
  • Calculate average ratings:

    SELECT
        movieId,
        COUNT(*) as ratingCount,
        AVG(rating) as avgRating,
        MIN(rating) as minRating,
        MAX(rating) as maxRating
    FROM MovieRatings
    GROUP BY movieId;
  • View recent ratings:

    SELECT movieId, rating, ratingTime
    FROM MovieRatings
    ORDER BY ratingTime DESC
    LIMIT 10;
Process and Send Data
INSERT INTO RatedMoviesSink
SELECT
    m.movieId,
    m.title,
    m.releaseYear,
    m.actors,
    r.rating,
    r.ratingTime
FROM MovieRatings r
JOIN Movies m ON r.movieId = m.movieId;

Step 4: Verify Data Flow

  • Description: Verify that data is flowing through the pipeline

  • Actions:

Check table counts:

-- Check Movies table
SELECT COUNT(*) FROM Movies;

-- Check MovieRatings table
SELECT COUNT(*) FROM MovieRatings;

-- Check processed results
SELECT COUNT(*) FROM RatedMoviesSink;

Check Pulsar topics:

# Check topic stats
docker-compose exec pulsar bin/pulsar-admin topics stats persistent://public/default/rated_movies

# Consume messages from sink topic
docker-compose exec pulsar bin/pulsar-client consume persistent://public/default/rated_movies -s "test-sub" -n 0

Optional Performance Configurations

  • Add these configurations to the sink table for better performance:

    CREATE TABLE RatedMoviesSink (
        -- columns as above
    ) WITH (
        'connector' = 'pulsar',
        'topics' = 'persistent://public/default/rated_movies',
        'service-url' = 'pulsar://pulsar:6650',
        'value.format' = 'json',
        'sink.parallelism' = '2',
        'sink.message-delay-interval' = '1000'
    );

Clean Up

  • To stop and remove all services:

    make destroy

Troubleshooting

Check Service Logs
docker logs <service_name>
Common Issues and Solutions
  • If Pulsar topics aren’t created automatically:

    docker-compose exec pulsar bin/pulsar-admin topics create persistent://public/default/movies
    docker-compose exec pulsar bin/pulsar-admin topics create persistent://public/default/ratings
    docker-compose exec pulsar bin/pulsar-admin topics create persistent://public/default/rated_movies
  • If you need to reset a topic:

    docker-compose exec pulsar bin/pulsar-admin topics delete persistent://public/default/<topic_name>
    docker-compose exec pulsar bin/pulsar-admin topics create persistent://public/default/<topic_name>
  • To view topic statistics:

    docker-compose exec pulsar bin/pulsar-admin topics stats persistent://public/default/<topic_name>