-
Notifications
You must be signed in to change notification settings - Fork 237
AutoMQ with Apache Doris: Real Time Data Streaming and Analytics
Apache Doris is a high-performance, real-time analytical database that leverages an MPP architecture for its lightning-fast usability. It delivers query results in sub-second times, even under enormous data volumes. Apache Doris excels in supporting both high-concurrency point queries and high-throughput complex analytical scenarios. As a result, it effectively satisfies the requirements for report analysis, ad-hoc queries, unified data warehouse construction, and accelerated federated queries for data lakes. Users can utilize this technology to develop applications focused on user behavior analysis, AB testing platforms, log search analysis, user profiling, and order analysis.
This article will explore how to use Apache Doris Routine Load to import data from AutoMQ into Apache Doris. For more information on Routine Load, please refer to the Routine Load Fundamental Principles document.
Ensure that a functional Apache Doris cluster is ready. For demonstration purposes, we used the Docker Deployment for Doris document to set up a test environment of Apache Doris on Linux.
Creating databases and test tables:
create database automq_db;
CREATE TABLE automq_db.users (
id bigint NOT NULL,
name string NOT NULL,
timestamp string NULL,
status string NULL
) DISTRIBUTED BY hash (id) PROPERTIES ('replication_num' = '1');
Download the latest TGZ package from AutoMQ Releases and extract it. Assume the extraction directory is $AUTOMQ_HOME, and tools from $AUTOMQ_HOME/bin will be used in this article to create topics and generate test data.
Refer to the AutoMQ Official Deployment Document to deploy a functional cluster, ensuring network connectivity between AutoMQ and Apache Doris.
Quickly create a topic named example_topic in AutoMQ and write a test JSON data to it following the steps below.
Use the Apache Kafka® command-line tool to create topics, ensuring you have access to a Kafka environment and the Kafka service is running. Here is an example command to create a topic:
$AUTOMQ_HOME/bin/kafka-topics.sh --create --topic exampleto_topic --bootstrap-server 127.0.0.1:9092 --partitions 1 --replication-factor 1
When executing the command, replace "topic" and "bootstrap-server" with the actual AutoMQ Bootstrap Server address.
After creating the topic, you can use the following command to verify that the topic has been successfully created.
$AUTOMQ_HOME/bin/kafka-topics.sh --describe example_topic --bootstrap-server 127.0.0.1:9092
Create JSON formatted test data corresponding to the previously mentioned table.
{
"id": 1,
"name": "测试用户",
"timestamp": "2023-11-10T12:00:00",
"status": "active"
}
Use Kafka's command line tool or a programming method to write the test data into a topic named example_topic. Here is an example using the command line tool:
echo '{"id": 1, "name": "测试用户", "timestamp": "2023-11-10T12:00:00", "status": "active"}' | sh kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic example_topic
To view the data just written into the topic, use the following command:
sh $AUTOMQ_HOME/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic example_topic --from-beginning
When executing the command, replace "topic" and "bootstrap-server" with the actual AutoMQ Bootstrap Server address.
In the Apache Doris command line, set up a Routine Load job that accepts JSON data to continuously import data from an AutoMQ Kafka topic. For details on the Routine Load parameters, please refer to Doris Routine Load.
CREATE ROUTINE LOAD automq_example_load ON users
COLUMNS(id, name, timestamp, status)
PROPERTIES
(
"format" = "json",
"jsonpaths" = "[\"$.id\",\"$.name\",\"$.timestamp\",\"$.status\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "127.0.0.1:9092",
"kafka_topic" = "example_topic",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
When running the command, replace `kafka_broker_list` with the actual AutoMQ Bootstrap Server address.
First, verify the status of the Routine Load job to ensure it is actively running.
show routine load\G;
Then, query the relevant tables in the Apache Doris database to confirm that the data has been successfully imported.
select * from users;
+------+--------------+---------------------+--------+
| id | name | timestamp | status |
+------+--------------+---------------------+--------+
| 1 | 测试用户 | 2023-11-10T12:00:00 | active |
| 2 | 测试用户 | 2023-11-10T12:00:00 | active |
+------+--------------+---------------------+--------+
2 rows in set (0.01 sec)
- What is automq: Overview
- Difference with Apache Kafka
- Difference with WarpStream
- Difference with Tiered Storage
- Compatibility with Apache Kafka
- Licensing
- Deploy Locally
- Cluster Deployment on Linux
- Cluster Deployment on Kubernetes
- Example: Produce & Consume Message
- Example: Simple Benchmark
- Example: Partition Reassignment in Seconds
- Example: Self Balancing when Cluster Nodes Change
- Example: Continuous Data Self Balancing
-
S3stream shared streaming storage
-
Technical advantage
- Deployment: Overview
- Runs on Cloud
- Runs on CEPH
- Runs on CubeFS
- Runs on MinIO
- Runs on HDFS
- Configuration
-
Data analysis
-
Object storage
-
Kafka ui
-
Observability
-
Data integration