diff --git a/README.md b/README.md index 4b6c5bd..75e4590 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,7 @@ Scripts and samples to support Greptime Demos and Talks. Might be rough around t * [InfluxDB Line Protocol + GreptimeDB + Web UI](influxdb-lineprotocol) ([🎥 tutorial](https://www.youtube.com/watch?v=JZuq0inSO9Q)) * [Prometheus Node Exporter + GreptimeDB + Web Dashboard or Grafana Dashboard](node-exporter) * [Telegraf + GreptimeDB + Web UI](telegraf-ingestion) +* [Kafka + Vector + GreptimeDB + Web UI](kafka-ingestion) ### Data Migrations diff --git a/kafka-ingestion/README.md b/kafka-ingestion/README.md new file mode 100644 index 0000000..06052e2 --- /dev/null +++ b/kafka-ingestion/README.md @@ -0,0 +1,102 @@ +# GreptimeDB Kafka Ingestion Demo + +This docker-compose file demos how to ingest log data from Kafka to GreptimeDB. + +It uses [Vector](https://vector.dev) as Kafka consumer to bridge Kafka and +GreptimeDB. Vector is a observability data pipeline that has built-in support +for Kafka as source and GreptimeDB as sinks. + +## How to run this demo + +Ensure you have `git`, `docker`, `docker-compose` and `mysql` client +installed. To run this demo: + +```shell +git clone git@github.com:GreptimeTeam/demo-scene.git +cd demo-scene/kafka-ingestion +docker compose up +``` + +It can take a while for the first run to pull down images and also build +necessary components. + +You can access GreptimeDB using `mysql` client. Just run `mysql -h 127.0.0.1 -P +4002` to connect to the database and use SQL query like `SHOW TABLES` as a +start. + +``` +$ mysql -h 127.0.0.1 -P 4002 +mysql: Deprecated program name. It will be removed in a future release, use '/usr/bin/mariadb' instead +WARNING: option --ssl-verify-server-cert is disabled, because of an insecure passwordless login. +Welcome to the MariaDB monitor. Commands end with ; or \g. +Your MySQL connection id is 8 +Server version: 8.4.2 Greptime + +Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others. + +Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. + +MySQL [(none)]> show tables; ++-----------+ +| Tables | ++-----------+ +| demo_logs | +| numbers | ++-----------+ +2 rows in set (0.009 sec) + +MySQL [(none)]> select * from demo_logs order by timestamp desc limit 10; ++------------------------+----------------------------+ +| message | timestamp | ++------------------------+----------------------------+ +| this is a test message | 2024-08-20 08:46:50.700000 | +| this is a test message | 2024-08-20 08:46:47.696000 | +| this is a test message | 2024-08-20 08:46:44.693000 | +| this is a test message | 2024-08-20 08:46:41.689000 | +| this is a test message | 2024-08-20 08:46:38.685000 | +| this is a test message | 2024-08-20 08:46:35.682000 | +| this is a test message | 2024-08-20 08:46:32.679000 | +| this is a test message | 2024-08-20 08:46:29.675000 | +| this is a test message | 2024-08-20 08:46:26.671000 | +| this is a test message | 2024-08-20 08:46:23.668000 | ++------------------------+----------------------------+ +10 rows in set (0.005 sec) + +MySQL [(none)]> Bye +``` + +You can also open your browser at http://localhost:4000/dashboard for the Web +UI. + +## How it works + +The topology is illustrated in this diagram. One-shot containers are ignored. + +```mermaid +flowchart LR + greptimedb[(GreptimeDB)] + + message_producer --> kafka + kafka --> vector + vector --> greptimedb +``` + +All the generated logs are stored in GreptimeDB using a pipeline definition at +[pipeline.yaml](./config_data/pipeline.yaml). Basically, it extracts timestamp +and message fields from the data Vector sents to GreptimeDB and stores them as +table columns. + +Once greptimedb starts, we use an init container `init_pipeline` to send a http +post call to store the pipeline definition named as `demo_pipeline`. + +In the [vector configuration](./config_data/vector.toml), we specify the +pipeline name `demo_pipeline` and table name `demo_logs` (you can customize it). + +To learn more about logs and pipeline definition, [see our +docs](https://docs.greptime.com/user-guide/logs/overview). + +## Note + +If you are going to restart this demo, press `Ctrl-C` and remember to call +`docker compose down` to clean up the data before you run `docker compose up` +again. diff --git a/kafka-ingestion/build_vector/Dockerfile b/kafka-ingestion/build_vector/Dockerfile new file mode 100644 index 0000000..ff2f463 --- /dev/null +++ b/kafka-ingestion/build_vector/Dockerfile @@ -0,0 +1,24 @@ +# Use Rust as the base image +FROM rust:1.79 AS builder + +# Install system dependencies +RUN apt-get update && apt-get install -y cmake pkg-config libssl-dev protobuf-compiler libsasl2-dev + +# Clone the Vector source code +RUN git clone --depth 1 -b chore/greptime_log https://github.com/paomian/vector.git /vector +WORKDIR /vector + +# Build Vector +RUN cargo build --release --no-default-features --features=sinks-greptimedb_logs,sinks-greptimedb_metrics,sinks-console,sources-kafka + +# Use a minimal base image for the final image +FROM debian:bookworm-slim + +# Copy the Vector binary from the builder stage +COPY --from=builder /vector/target/release/vector /usr/local/bin/vector + +# Set up the config directory +ENV VECTOR_CONFIG_LOCATION "/etc/vector/vector.toml" + +# Define the entry point +CMD vector -c $VECTOR_CONFIG_LOCATION diff --git a/kafka-ingestion/config_data/pipeline.yaml b/kafka-ingestion/config_data/pipeline.yaml new file mode 100644 index 0000000..a5d0a15 --- /dev/null +++ b/kafka-ingestion/config_data/pipeline.yaml @@ -0,0 +1,15 @@ +processors: + - date: + fields: + - timestamp + formats: + - "%Y-%m-%dT%H:%M:%S%.3fZ" + +transform: + - fields: + - message + type: string + - fields: + - timestamp + type: timestamp + index: timestamp diff --git a/kafka-ingestion/config_data/vector.toml b/kafka-ingestion/config_data/vector.toml new file mode 100644 index 0000000..5911c15 --- /dev/null +++ b/kafka-ingestion/config_data/vector.toml @@ -0,0 +1,18 @@ +[sources.mq] +type = "kafka" +group_id = "vector0" +topics = ["test_topic"] +bootstrap_servers = "kafka:9092" + +[sinks.console] +type = "console" +inputs = [ "mq" ] +encoding.codec = "text" + +[sinks.sink_greptime_logs] +type = "greptimedb_logs" +table = "demo_logs" +pipeline_name = "demo_pipeline" +compression = "gzip" +inputs = [ "mq" ] +endpoint = "http://greptimedb:4000" diff --git a/kafka-ingestion/docker-compose.yml b/kafka-ingestion/docker-compose.yml new file mode 100644 index 0000000..a721d06 --- /dev/null +++ b/kafka-ingestion/docker-compose.yml @@ -0,0 +1,88 @@ +services: + kafka: + image: docker.io/bitnami/kafka:3.6.0 + container_name: kafka + ports: + - 9092 + networks: + - demo-network + environment: + # KRaft settings + KAFKA_KRAFT_CLUSTER_ID: Kmp-xkTnSf-WWXhWmiorDg + KAFKA_ENABLE_KRAFT: "yes" + KAFKA_CFG_NODE_ID: "1" + KAFKA_CFG_PROCESS_ROLES: broker,controller + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@127.0.0.1:2181 + # Listeners + KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://:9092 + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:2181 + ALLOW_PLAINTEXT_LISTENER: "yes" + KAFKA_BROKER_ID: "1" + + kafka-init: + image: docker.io/bitnami/kafka:3.6.0 + networks: + - demo-network + command: ["/opt/bitnami/kafka/bin/kafka-topics.sh", "--create", "--topic", "test_topic", "--bootstrap-server", "kafka:9092"] + depends_on: + kafka: + condition: service_started + init: true + + msg_gen: + build: + context: ./producer + dockerfile: ./Dockerfile + networks: + - demo-network + environment: + - KAFKA_TOPIC_NAME=test_topic + depends_on: + kafka-init: + condition: service_completed_successfully + + init_pipeline: + image: docker.io/alpine/curl + networks: + - demo-network + depends_on: + greptimedb: + condition: service_started + volumes: + - ./config_data:/config_data + command: sh -c "curl -X 'POST' 'http://greptimedb:4000/v1/events/pipelines/demo_pipeline' -F 'file=@/config_data/pipeline.yaml' -v" + + vector: + build: + context: ./build_vector + dockerfile: ./Dockerfile + networks: + - demo-network + volumes: + - ./config_data:/config_data + depends_on: + kafka-init: + condition: service_completed_successfully + environment: + VECTOR_CONFIG_LOCATION: /config_data/vector.toml + + greptimedb: + image: docker.io/greptime/greptimedb:v0.9.2 + command: standalone start --http-addr=0.0.0.0:4000 --rpc-addr=0.0.0.0:4001 --mysql-addr=0.0.0.0:4002 --postgres-addr 0.0.0.0:4003 + ports: + - 4000:4000 + - 4001:4001 + - 4002:4002 + - 4003:4003 + networks: + - demo-network + healthcheck: + test: ["CMD", "curl", "-f", "http://127.0.0.1:4000/health"] + interval: 3s + timeout: 3s + retries: 5 + +networks: + demo-network: diff --git a/kafka-ingestion/producer/Dockerfile b/kafka-ingestion/producer/Dockerfile new file mode 100644 index 0000000..fc40d15 --- /dev/null +++ b/kafka-ingestion/producer/Dockerfile @@ -0,0 +1,9 @@ +FROM docker.io/python:3.10 + +WORKDIR /app + +RUN pip install kafka-python + +COPY app.py . + +CMD ["python3", "app.py"] diff --git a/kafka-ingestion/producer/app.py b/kafka-ingestion/producer/app.py new file mode 100644 index 0000000..c516b00 --- /dev/null +++ b/kafka-ingestion/producer/app.py @@ -0,0 +1,16 @@ +from kafka import KafkaProducer +import time +import os + +print("starting producer", flush=True) + +topic = os.environ["KAFKA_TOPIC_NAME"] +print("using topic: ", topic, flush=True) + +producer = KafkaProducer(bootstrap_servers="kafka:9092") +print("using producer: ", producer, flush=True) + +while True: + producer.send(topic, b"this is a test message") + print("message sent", flush=True) + time.sleep(3)