Skip to content

Commit

Permalink
feat: introducing a new demo for ingest data from kafka (#31)
Browse files Browse the repository at this point in the history
* feat: introducing a new demo for ingest data from kafka

* feat: add some more demo component

* feat: initial kafka topic

* feat: corrected producer and vector

* chore: add pipeline init (#33)

* feat: update greptimedb to 0.8.2

* doc: add a readme

* doc: update readme

* doc: update readme for web ui

* doc: rename exmaple to kafka-ingestion

* doc: add docs link to configurations

---------

Co-authored-by: localhost <[email protected]>
  • Loading branch information
sunng87 and paomian authored Aug 20, 2024
1 parent 927e9b4 commit aa92fec
Show file tree
Hide file tree
Showing 8 changed files with 273 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Scripts and samples to support Greptime Demos and Talks. Might be rough around t
* [InfluxDB Line Protocol + GreptimeDB + Web UI](influxdb-lineprotocol) ([🎥 tutorial](https://www.youtube.com/watch?v=JZuq0inSO9Q))
* [Prometheus Node Exporter + GreptimeDB + Web Dashboard or Grafana Dashboard](node-exporter)
* [Telegraf + GreptimeDB + Web UI](telegraf-ingestion)
* [Kafka + Vector + GreptimeDB + Web UI](kafka-ingestion)

### Data Migrations

Expand Down
102 changes: 102 additions & 0 deletions kafka-ingestion/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# GreptimeDB Kafka Ingestion Demo

This docker-compose file demos how to ingest log data from Kafka to GreptimeDB.

It uses [Vector](https://vector.dev) as Kafka consumer to bridge Kafka and
GreptimeDB. Vector is a observability data pipeline that has built-in support
for Kafka as source and GreptimeDB as sinks.

## How to run this demo

Ensure you have `git`, `docker`, `docker-compose` and `mysql` client
installed. To run this demo:

```shell
git clone [email protected]:GreptimeTeam/demo-scene.git
cd demo-scene/kafka-ingestion
docker compose up
```

It can take a while for the first run to pull down images and also build
necessary components.

You can access GreptimeDB using `mysql` client. Just run `mysql -h 127.0.0.1 -P
4002` to connect to the database and use SQL query like `SHOW TABLES` as a
start.

```
$ mysql -h 127.0.0.1 -P 4002
mysql: Deprecated program name. It will be removed in a future release, use '/usr/bin/mariadb' instead
WARNING: option --ssl-verify-server-cert is disabled, because of an insecure passwordless login.
Welcome to the MariaDB monitor. Commands end with ; or \g.
Your MySQL connection id is 8
Server version: 8.4.2 Greptime
Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
MySQL [(none)]> show tables;
+-----------+
| Tables |
+-----------+
| demo_logs |
| numbers |
+-----------+
2 rows in set (0.009 sec)
MySQL [(none)]> select * from demo_logs order by timestamp desc limit 10;
+------------------------+----------------------------+
| message | timestamp |
+------------------------+----------------------------+
| this is a test message | 2024-08-20 08:46:50.700000 |
| this is a test message | 2024-08-20 08:46:47.696000 |
| this is a test message | 2024-08-20 08:46:44.693000 |
| this is a test message | 2024-08-20 08:46:41.689000 |
| this is a test message | 2024-08-20 08:46:38.685000 |
| this is a test message | 2024-08-20 08:46:35.682000 |
| this is a test message | 2024-08-20 08:46:32.679000 |
| this is a test message | 2024-08-20 08:46:29.675000 |
| this is a test message | 2024-08-20 08:46:26.671000 |
| this is a test message | 2024-08-20 08:46:23.668000 |
+------------------------+----------------------------+
10 rows in set (0.005 sec)
MySQL [(none)]> Bye
```

You can also open your browser at http://localhost:4000/dashboard for the Web
UI.

## How it works

The topology is illustrated in this diagram. One-shot containers are ignored.

```mermaid
flowchart LR
greptimedb[(GreptimeDB)]
message_producer --> kafka
kafka --> vector
vector --> greptimedb
```

All the generated logs are stored in GreptimeDB using a pipeline definition at
[pipeline.yaml](./config_data/pipeline.yaml). Basically, it extracts timestamp
and message fields from the data Vector sents to GreptimeDB and stores them as
table columns.

Once greptimedb starts, we use an init container `init_pipeline` to send a http
post call to store the pipeline definition named as `demo_pipeline`.

In the [vector configuration](./config_data/vector.toml), we specify the
pipeline name `demo_pipeline` and table name `demo_logs` (you can customize it).

To learn more about logs and pipeline definition, [see our
docs](https://docs.greptime.com/user-guide/logs/overview).

## Note

If you are going to restart this demo, press `Ctrl-C` and remember to call
`docker compose down` to clean up the data before you run `docker compose up`
again.
24 changes: 24 additions & 0 deletions kafka-ingestion/build_vector/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Use Rust as the base image
FROM rust:1.79 AS builder

# Install system dependencies
RUN apt-get update && apt-get install -y cmake pkg-config libssl-dev protobuf-compiler libsasl2-dev

# Clone the Vector source code
RUN git clone --depth 1 -b chore/greptime_log https://github.com/paomian/vector.git /vector
WORKDIR /vector

# Build Vector
RUN cargo build --release --no-default-features --features=sinks-greptimedb_logs,sinks-greptimedb_metrics,sinks-console,sources-kafka

# Use a minimal base image for the final image
FROM debian:bookworm-slim

# Copy the Vector binary from the builder stage
COPY --from=builder /vector/target/release/vector /usr/local/bin/vector

# Set up the config directory
ENV VECTOR_CONFIG_LOCATION "/etc/vector/vector.toml"

# Define the entry point
CMD vector -c $VECTOR_CONFIG_LOCATION
15 changes: 15 additions & 0 deletions kafka-ingestion/config_data/pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
processors:
- date:
fields:
- timestamp
formats:
- "%Y-%m-%dT%H:%M:%S%.3fZ"

transform:
- fields:
- message
type: string
- fields:
- timestamp
type: timestamp
index: timestamp
18 changes: 18 additions & 0 deletions kafka-ingestion/config_data/vector.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[sources.mq]
type = "kafka"
group_id = "vector0"
topics = ["test_topic"]
bootstrap_servers = "kafka:9092"

[sinks.console]
type = "console"
inputs = [ "mq" ]
encoding.codec = "text"

[sinks.sink_greptime_logs]
type = "greptimedb_logs"
table = "demo_logs"
pipeline_name = "demo_pipeline"
compression = "gzip"
inputs = [ "mq" ]
endpoint = "http://greptimedb:4000"
88 changes: 88 additions & 0 deletions kafka-ingestion/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
services:
kafka:
image: docker.io/bitnami/kafka:3.6.0
container_name: kafka
ports:
- 9092
networks:
- demo-network
environment:
# KRaft settings
KAFKA_KRAFT_CLUSTER_ID: Kmp-xkTnSf-WWXhWmiorDg
KAFKA_ENABLE_KRAFT: "yes"
KAFKA_CFG_NODE_ID: "1"
KAFKA_CFG_PROCESS_ROLES: broker,controller
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: [email protected]:2181
# Listeners
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://:9092
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:2181
ALLOW_PLAINTEXT_LISTENER: "yes"
KAFKA_BROKER_ID: "1"

kafka-init:
image: docker.io/bitnami/kafka:3.6.0
networks:
- demo-network
command: ["/opt/bitnami/kafka/bin/kafka-topics.sh", "--create", "--topic", "test_topic", "--bootstrap-server", "kafka:9092"]
depends_on:
kafka:
condition: service_started
init: true

msg_gen:
build:
context: ./producer
dockerfile: ./Dockerfile
networks:
- demo-network
environment:
- KAFKA_TOPIC_NAME=test_topic
depends_on:
kafka-init:
condition: service_completed_successfully

init_pipeline:
image: docker.io/alpine/curl
networks:
- demo-network
depends_on:
greptimedb:
condition: service_started
volumes:
- ./config_data:/config_data
command: sh -c "curl -X 'POST' 'http://greptimedb:4000/v1/events/pipelines/demo_pipeline' -F 'file=@/config_data/pipeline.yaml' -v"

vector:
build:
context: ./build_vector
dockerfile: ./Dockerfile
networks:
- demo-network
volumes:
- ./config_data:/config_data
depends_on:
kafka-init:
condition: service_completed_successfully
environment:
VECTOR_CONFIG_LOCATION: /config_data/vector.toml

greptimedb:
image: docker.io/greptime/greptimedb:v0.9.2
command: standalone start --http-addr=0.0.0.0:4000 --rpc-addr=0.0.0.0:4001 --mysql-addr=0.0.0.0:4002 --postgres-addr 0.0.0.0:4003
ports:
- 4000:4000
- 4001:4001
- 4002:4002
- 4003:4003
networks:
- demo-network
healthcheck:
test: ["CMD", "curl", "-f", "http://127.0.0.1:4000/health"]
interval: 3s
timeout: 3s
retries: 5

networks:
demo-network:
9 changes: 9 additions & 0 deletions kafka-ingestion/producer/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
FROM docker.io/python:3.10

WORKDIR /app

RUN pip install kafka-python

COPY app.py .

CMD ["python3", "app.py"]
16 changes: 16 additions & 0 deletions kafka-ingestion/producer/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from kafka import KafkaProducer
import time
import os

print("starting producer", flush=True)

topic = os.environ["KAFKA_TOPIC_NAME"]
print("using topic: ", topic, flush=True)

producer = KafkaProducer(bootstrap_servers="kafka:9092")
print("using producer: ", producer, flush=True)

while True:
producer.send(topic, b"this is a test message")
print("message sent", flush=True)
time.sleep(3)

0 comments on commit aa92fec

Please sign in to comment.