Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add example of data streaming with apache kafka + apache flink #353

Merged
merged 3 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions application/apache-kafka-flink-streaming/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
CRATEDB_HOST=crate
CRATEDB_PORT=4200
CRATEDB_PG_PORT=5432

WEATHER_PRODUCER_CITY=Vienna
WEATHER_PRODUCER_API_KEY=#GET THE API KEY - https://www.weatherapi.com/
WEATHER_PRODUCER_FETCH_EVERY_SECONDS=30
WEATHER_PRODUCER_KAFKA_TOPIC=weather_topic
WEATHER_PRODUCER_KAFKA_BOOTSTRAP_SERVER=kafka

FLINK_CONSUMER_KAFKA_TOPIC=weather_topic
FLINK_CONSUMER_BOOTSTRAP_SERVER=kafka
FLINK_CONSUMER_CRATEDB_PG_URI=jdbc:postgresql://crate:5432/crate
FLINK_CONSUMER_CRATE_USER=crate
FLINK_CONSUMER_CRATE_PASSWORD=empty

# Jar versions.
POSTGRESQL_JAR_VERSION=42.7.2
FLINK_CONNECTOR_JDBC_VERSION=3.1.2-1.18
FLINK_KAFKA_JAR_URL_VERSION=3.1.0-1.18
7 changes: 7 additions & 0 deletions application/apache-kafka-flink-streaming/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM python:3.10

WORKDIR /app
COPY * /app

RUN pip install poetry
RUN poetry config virtualenvs.create false && poetry install
135 changes: 135 additions & 0 deletions application/apache-kafka-flink-streaming/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# Streaming data with Apache Kafka, Apache Flink and CrateDB.

## About

This example showcases what a data-streaming architecture leveraging Kafka and Flink could look
like.

We use.

- Kafka (confluent)
- Apache Flink
- CrateDB
- Python >=3.7<=3.11
-
[img of the thing]
surister marked this conversation as resolved.
Show resolved Hide resolved

## Overview

An HTTP call is scheduled to run every 60 seconds on `weather_producer`, the API returns a JSON
with the specified city's weather, the json is then sent through `Kafka`.

`flink_consumer` is a flink application consuming the same kafka topic;
upon receiving data, it sends the resulting datastream to the sink, which is `CrateDB`
surister marked this conversation as resolved.
Show resolved Hide resolved

Both `flink_consumer` and `weather_producer` are written using their respective Python Wrappers.

[kafka-python](https://kafka-python.readthedocs.io/en/master/)

[apache-flink](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/overview/)

Everything is customizable via environment variables, the API schedule, the topic, credentials...
etc.

See `.env` more details.
surister marked this conversation as resolved.
Show resolved Hide resolved

## How to use

There is ready-to-use docker-compose, fill in the .env with the API
Key (Get it here https://www.weatherapi.com/).
surister marked this conversation as resolved.
Show resolved Hide resolved

### Run the docker compose (and build the images)

`docker compose up -d --build`
surister marked this conversation as resolved.
Show resolved Hide resolved

### Stop the docker compose

`docker compose down`
surister marked this conversation as resolved.
Show resolved Hide resolved

### Poetry

`poetry install`
surister marked this conversation as resolved.
Show resolved Hide resolved

### Pip

`pip install -r requirements.txt`
surister marked this conversation as resolved.
Show resolved Hide resolved

## Notes

### CrateDB initial settings.

CrateDB stores the shard indexes on the file system by mapping a file into memory (mmap)
You might need to set `max_map_count` to something higher than the usual default, like `262144`.

You can do it by running `sysctl -w vm.max_map_count=262144`,
for more information see: [this](https://cratedb.com/docs/guide/admin/bootstrap-checks.html#linux)

### Mock API call.

If you don't want to register in the weather api we use, you can use the
provided `mock_fetch_weather_data`, call this instead in the scheduler call.

This is what it'd look like.
surister marked this conversation as resolved.
Show resolved Hide resolved

```python
scheduler.enter(
RUN_EVERY_SECONDS,
1,
schedule_every,
(RUN_EVERY_SECONDS, mock_fetch_weather_data, scheduler)
)
```

*after changing this, re-build the docker compose.*
surister marked this conversation as resolved.
Show resolved Hide resolved

### Initial kafka topic.

In this example the `Kafka` topic is only initialized the first data is sent to it, because of this
the flink job could fail if it exceeds the default timeout (60) seconds, this might only happen
if the API takes too long to respond *the very first time this project*.

To solve this, you should [configure](https://kafka.apache.org/quickstart#quickstart_createtopic)
the
topics on boot time. This is recommended for production scenarios.
surister marked this conversation as resolved.
Show resolved Hide resolved

If you are just testing things around, you can solve this by re-running `docker compose up -d`, it
will only start `flink_job` and assuming everything went ok, the topic should already exist and
work as expected.

If it still fails, check if any other container/service is down,
it could be a symptom of a wrong api token or an unresponsive Kafka server, for example.

## Data and schema

See `example.json` for the schema, as you can see in `weather_producer` and `flink_consumer`, schema
manipulation is minimum,
thanks to CrateDB's dynamic objects we only need to map `location` and `current` keys.

For more information on dynamic objects
see: [this](https://cratedb.com/blog/handling-dynamic-objects-in-cratedb)

In `weather_producer` the `Kafka` producer directly serializes the json into a string.

```python
KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVER,
value_serializer=lambda m: json.dumps(m).encode('utf-8'))
```

In `flink_consumer` we use a `JSON` serializer and only specify the two main keys,
`location` and `current`

```python
row_type_info = Types.ROW_NAMED(['location', 'current'], [Types.STRING(), Types.STRING()])
json_format = JsonRowDeserializationSchema.builder().type_info(row_type_info).build()
```

If your format is not json, or if you want to specify the whole schema, adapt it as needed.

[Here](https://nightlies.apache.org/flink/flink-docs-master/api/python/examples/datastream/connectors.html)
you can find example of other formats like `csv` or `avro`.

## Jars and versions.

Jars are downloaded at build time to /app/jars, versions are pinned in the .env

There is a `JARS_PATH` in `flink_consumer`, change it if you have the jars somewhere else.
58 changes: 58 additions & 0 deletions application/apache-kafka-flink-streaming/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
services:
weather_producer:
env_file:
- .env
build:
context: .
dockerfile: Dockerfile
command: python -m weather_producer
depends_on:
- kafka

flink_job:
env_file:
- .env
build:
context: .
dockerfile: flink_job.Dockerfile
args:
- POSTGRESQL_JAR_URL=jdbc.postgresql.org/download/postgresql-${POSTGRESQL_JAR_VERSION}.jar
- FLINK_SQL_JAR_URL=https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/${FLINK_CONNECTOR_JDBC_VERSION}/flink-connector-jdbc-${FLINK_CONNECTOR_JDBC_VERSION}.jar
- FLINK_KAFKA_JAR_URL=https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/${FLINK_KAFKA_JAR_URL_VERSION}/flink-sql-connector-kafka-${FLINK_KAFKA_JAR_URL_VERSION}.jar
surister marked this conversation as resolved.
Show resolved Hide resolved
command: python -m flink_consumer
depends_on:
- kafka

crate:
image: crate:5.6.2
surister marked this conversation as resolved.
Show resolved Hide resolved
ports:
- "4200:4200"
command: [ "crate",
"-Cdiscovery.type=single-node",
]
environment:
- CRATE_HEAP_SIZE=2g

zookeeper:
image: confluentinc/cp-zookeeper:6.2.0
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka:
image: confluentinc/cp-server:6.2.0
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
41 changes: 41 additions & 0 deletions application/apache-kafka-flink-streaming/example.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{
"location": {
"localtime": "2024-03-07 18:20",
"country": "France",
"localtime_epoch": 1709832024,
"name": "Nonette",
"lon": 3.28,
"region": "Auvergne",
"lat": 45.48,
"tz_id": "Europe/Paris"
},
"current": {
"feelslike_c": 11,
"uv": 3,
"last_updated": "2024-03-07 18:15",
"feelslike_f": 51.7,
"wind_degree": 30,
"last_updated_epoch": 1709831700,
"is_day": 1,
"precip_in": 0,
"wind_dir": "NNE",
"gust_mph": 12.1,
"temp_c": 12,
"pressure_in": 29.83,
"gust_kph": 19.5,
"temp_f": 53.6,
"precip_mm": 0,
"cloud": 0,
"wind_kph": 6.8,
"condition": {
"code": 1000,
"icon": "//cdn.weatherapi.com/weather/64x64/day/113.png",
"text": "Sunny"
},
"wind_mph": 4.3,
"vis_km": 10,
"humidity": 50,
"pressure_mb": 1010,
"vis_miles": 6
}
}
67 changes: 67 additions & 0 deletions application/apache-kafka-flink-streaming/flink_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import os
import logging

from pathlib import Path

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.jdbc import JdbcSink, JdbcConnectionOptions, JdbcExecutionOptions
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.datastream.formats.json import JsonRowDeserializationSchema

logging.basicConfig(
format='[%(asctime)s] [%(levelname)s] [%(filename)s:%(lineno)d] - %(message)s',
level=logging.DEBUG
)

JARS_PATH = Path(__file__).parent / 'jars'

KAFKA_BOOTSTRAP_SERVER = os.getenv('FLINK_CONSUMER_BOOTSTRAP_SERVER')
KAFKA_TOPIC = os.getenv('FLINK_CONSUMER_KAFKA_TOPIC')
CRATEDB_PG_URI = os.getenv('FLINK_CONSUMER_CRATEDB_PG_URI', 'jdbc:postgresql://localhost:5432/crate')
CRATEDB_USER = os.getenv('FLINK_CONSUMER_CRATE_USER')
CRATEDB_PASSWORD = os.getenv('FLINK_CONSUMER_CRATE_PASSWORD')
surister marked this conversation as resolved.
Show resolved Hide resolved


def kafka_to_cratedb(env: StreamExecutionEnvironment):
row_type_info = Types.ROW_NAMED(['location', 'current'], [Types.STRING(), Types.STRING()])
json_format = JsonRowDeserializationSchema.builder().type_info(row_type_info).build()

# Consumes data from Kafka.
kafka_consumer = FlinkKafkaConsumer(
topics=KAFKA_TOPIC,
deserialization_schema=json_format,
properties={'bootstrap.servers': f'{KAFKA_BOOTSTRAP_SERVER}:9092'}
)
kafka_consumer.set_start_from_latest()

ds = env.add_source(kafka_consumer, source_name='kafka')

# Writes data to cratedb.
ds.add_sink(
JdbcSink.sink(
"insert into doc.weather_flink_sink (location, current) values (?, ?)",
row_type_info,
JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.with_url(CRATEDB_PG_URI)
.with_driver_name('org.postgresql.Driver')
.with_user_name(CRATEDB_USER)
.with_password(CRATEDB_PASSWORD)
.build(),
JdbcExecutionOptions.builder()
.with_batch_interval_ms(1000)
.with_batch_size(200)
.with_max_retries(5)
.build()
)
)
env.execute()


if __name__ == '__main__':
env = StreamExecutionEnvironment.get_execution_environment()
jars = list(map(lambda x: 'file://' + str(x), (JARS_PATH.glob('*.jar'))))
env.add_jars(*jars)

logging.info("Reading data from kafka")
kafka_to_cratedb(env)
18 changes: 18 additions & 0 deletions application/apache-kafka-flink-streaming/flink_job.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
FROM python:3.10
# Python version is important, because as of today (2024-03-04) kafka-flink is only
# supported for python<=3.10

ARG POSTGRESQL_JAR_URL
ARG FLINK_SQL_JAR_URL
ARG FLINK_KAFKA_JAR_URL

WORKDIR /app
COPY * /app
RUN wget ${POSTGRESQL_JAR_URL} --directory-prefix=/app/jars
RUN wget ${FLINK_SQL_JAR_URL} --directory-prefix=/app/jars
RUN wget ${FLINK_KAFKA_JAR_URL} --directory-prefix=/app/jars

RUN apt update && apt install -y openjdk-11-jdk
RUN pip install poetry

RUN poetry config virtualenvs.create false && poetry install
16 changes: 16 additions & 0 deletions application/apache-kafka-flink-streaming/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[tool.poetry]
name = "cratedb-weather-data"
version = "0.1.0"
description = ""
authors = ["ivan.sanchez <[email protected]>"]
readme = "README.md"

[tool.poetry.dependencies]
python = "^3.9"
requests = "^2.31.0"
kafka-python = "^2.0.2"
apache-flink = "^1.18.1"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
Loading