-
Notifications
You must be signed in to change notification settings - Fork 206
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* CDC blog post Signed-off-by: Matt Lord <[email protected]> * Abandon full walkthrough Signed-off-by: Matt Lord <[email protected]> * Apply suggestions from code review Co-authored-by: Deepthi Sigireddi <[email protected]> Signed-off-by: Matt Lord <[email protected]> * Apply review suggestion Signed-off-by: Matt Lord <[email protected]> * Minor tweaks Signed-off-by: Matt Lord <[email protected]> * Update date Signed-off-by: Matt Lord <[email protected]> --------- Signed-off-by: Matt Lord <[email protected]> Co-authored-by: Deepthi Sigireddi <[email protected]>
- Loading branch information
Showing
1 changed file
with
93 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
--- | ||
author: 'Matt Lord' | ||
date: 2024-07-29 | ||
slug: '2024-07-29-cdc-vstream' | ||
tags: ['Vitess', 'PlanetScale', 'MySQL', 'VReplication', 'CDC', 'Debezium'] | ||
title: 'Building Data Pipelines With Vitess' | ||
description: "Learn the basics of Change Data Capture (CDC) and how to leverage Vitess's VStream API to build data pipelines." | ||
--- | ||
|
||
Vitess is a popular [CNCF project](https://www.cncf.io/projects/vitess/) that is used to scale some of the largest MySQL installations in the world — by [companies like Slack, Square, Shopify, and GitHub](https://vitess.io). It provides [sharding](https://en.wikipedia.org/wiki/Shard_(database_architecture)), | ||
[connection pooling](../../docs/reference/features/connection-pools/), and many other [features that make it easy to scale MySQL horizontally](../../docs/reference/features/). | ||
|
||
Vitess and MySQL are ideally suited for use as an [Online Transaction Processing (OLTP)](https://aws.amazon.com/compare/the-difference-between-olap-and-oltp) system — where the end-user interacts directly with the system and fast response times are essential as they get product | ||
and service information, generating critical business records such as orders, user profiles, and more. They are not optimized for [Online Analytical Processing (OLAP)](https://aws.amazon.com/compare/the-difference-between-olap-and-oltp) workloads and other use cases and needs | ||
that you will encounter as your product, company, and data needs grow. This is where [Change Data Capture (CDC)](https://www.confluent.io/learn/change-data-capture/), AKA [ETL or Extract-Transform-Load](https://aws.amazon.com/what-is/etl/), and [Data Pipelines](https://www.ibm.com/topics/data-pipeline) | ||
more generally come into play as they allow you to maintain in-sync copies of data across various systems that serve specific needs, with CDC being a technique used to track changes in a database and propagate them to other systems. This is useful for a variety of use cases, | ||
including data replication, data warehousing, and data integration. This allows you to e.g. maintain a [Data Warehouse](https://cloud.google.com/learn/what-is-a-data-warehouse) and/or [Data Lake](https://azure.microsoft.com/en-us/resources/cloud-computing-dictionary/what-is-a-data-lake) | ||
for analytics and reporting purposes (e.g. quarterly and yearly sales reports), or to integrate data with other systems where you need to work with data that is initially created and updated by your OLTP system. | ||
|
||
## Vitess Primitives | ||
|
||
Vitess has a number of primitives or building blocks that make it easy to build your data pipelines. These are features of [VReplication](../../docs/reference/vreplication/vreplication/), a powerful system that allows for various types of data replication and transformation. | ||
For CDC and similar use cases, VReplication provides the [VStream API](../../docs/reference/vreplication/vstream/) in [VTGates](../../docs/concepts/vtgate/) (Vitess Gateways) that allows you to stream changes from a Vitess cluster in real-time. | ||
|
||
This low-level [VStream](../../docs/reference/vreplication/vstream/) primitive is then used by popular CDC tools like [Debezium](https://debezium.io/) to [capture changes in Vitess and propagate them](https://debezium.io/documentation/reference/connectors/vitess.html) to other systems. | ||
[PlanetScale](https://planetscale.com) also uses the VStream API to build the [Connect feature](https://planetscale.com/blog/extract-load-and-transform-your-data-with-planetscale-connect), using additional open source drivers for popular CDC/ETL services such as [AirByte](https://airbyte.com) | ||
([source](https://github.com/planetscale/airbyte-source)) and [FiveTran](https://www.fivetran.com) ([source](https://github.com/planetscale/fivetran-source)). | ||
|
||
### A Look Under the Hood at VStream | ||
|
||
[VStream](https://github.com/vitessio/vitess/blob/eb29999a3f4730b2ebe2e0abe1d1996ce2d861d9/proto/queryservice.proto#L103-L113) is a low-level component, provided via [gRPC](https://grpc.io), that is used internally by [VReplication](../../docs/reference/vreplication/vreplication/) | ||
to replicate data within Vitess for various workflow types such as [`MoveTables`](../../docs/reference/vreplication/movetables/) and [`Reshard`](../../docs/reference/vreplication/reshard/) — typically from one [VTTablet](../../docs/concepts/tablet/) to another. The | ||
[VTGate VStream RPC](https://github.com/vitessio/vitess/blob/eb29999a3f4730b2ebe2e0abe1d1996ce2d861d9/proto/vtgateservice.proto#L55-L56) leverages this low-level component to stream data from the [Shards](../../docs/concepts/shard/) within a Vitess [Keyspace](../../docs/concepts/keyspace/), | ||
providing a single unified change stream spanning the logical database which may consist of hundreds or even thousands of shards. You can see a [simple example client that uses the VStream API directly here](https://github.com/vitessio/vitess/blob/main/examples/local/vstream_client.go). | ||
|
||
This is what the output looks like, with commands that you can run yourself if you are interested in the lower-level aspects (not necessary if you're going to use an existing connector/driver such as the [Debezium Connector for Vitess](https://debezium.io/documentation/reference/connectors/vitess.html)): | ||
|
||
```shell | ||
git clone [email protected]:vitessio/vitess.git | ||
cd vitess | ||
git checkout main | ||
make build | ||
cd examples/local | ||
|
||
./101_initial_cluster.sh; mysql < ../common/insert_commerce_data.sql; ./201_customer_tablets.sh; ./202_move_tables.sh; ./203_switch_reads.sh; ./204_switch_writes.sh; ./205_clean_commerce.sh; ./301_customer_sharded.sh; ./302_new_shards.sh; ./303_reshard.sh; ./304_switch_reads.sh; ./305_switch_writes.sh; ./306_down_shard_0.sh; ./307_delete_shard_0.sh | ||
|
||
go run vstream_client.go | ||
|
||
# In another terminal, connecting to the VTGate that was started | ||
for i in {1..10}; do | ||
command mysql --no-defaults -h 127.0.0.1 -P 15306 customer -e "insert into customer (email) values ('${i}@foo.com')" | ||
done | ||
|
||
# Cleanup whenever you're done testing | ||
./401_teardown.sh | ||
``` | ||
|
||
The VStream client will output the changes that are being streamed from the VTGate that look like this — first snapshotting the current state of the `customer` table in the sharded `customer` keyspace, before then streaming the subsequent changes to the table as they happen in real-time: | ||
|
||
```shell | ||
$ go run vstream_client.go | ||
[type:BEGIN keyspace:"customer" shard:"80-" type:FIELD field_event:{table_name:"customer.customer" fields:{name:"customer_id" type:INT64 table:"customer" org_table:"customer" database:"vt_customer" org_name:"customer_id" column_length:20 charset:63 flags:53251 column_type:"bigint"} fields:{name:"email" type:VARBINARY table:"customer" org_table:"customer" database:"vt_customer" org_name:"email" column_length:128 charset:63 flags:128 column_type:"varbinary(128)"} keyspace:"customer" shard:"80-" enum_set_string_values:true} keyspace:"customer" shard:"80-"] | ||
[type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/97257a80-3e14-11ef-b24b-2eaacc019d10:1-58"}} keyspace:"customer" shard:"80-"] | ||
[type:ROW row_event:{table_name:"customer.customer" row_changes:{after:{lengths:1 lengths:14 values:"[email protected]"}} keyspace:"customer" shard:"80-"} keyspace:"customer" shard:"80-" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/97257a80-3e14-11ef-b24b-2eaacc019d10:1-58" table_p_ks:{table_name:"customer" lastpk:{fields:{name:"customer_id" type:INT64 charset:63 flags:53251} rows:{lengths:1 values:"4"}}}}} keyspace:"customer" shard:"80-" type:COMMIT keyspace:"customer" shard:"80-"] | ||
[type:BEGIN keyspace:"customer" shard:"80-" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/97257a80-3e14-11ef-b24b-2eaacc019d10:1-58"}} keyspace:"customer" shard:"80-" type:COMMIT keyspace:"customer" shard:"80-"] | ||
[type:COPY_COMPLETED keyspace:"customer" shard:"80-"] | ||
[type:BEGIN keyspace:"customer" shard:"-80" type:FIELD field_event:{table_name:"customer.customer" fields:{name:"customer_id" type:INT64 table:"customer" org_table:"customer" database:"vt_customer" org_name:"customer_id" column_length:20 charset:63 flags:53251 column_type:"bigint"} fields:{name:"email" type:VARBINARY table:"customer" org_table:"customer" database:"vt_customer" org_name:"email" column_length:128 charset:63 flags:128 column_type:"varbinary(128)"} keyspace:"customer" shard:"-80" enum_set_string_values:true} keyspace:"customer" shard:"-80"] | ||
[type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/90a3e2d2-3e14-11ef-bb33-30b3ef9417b6:1-58"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/97257a80-3e14-11ef-b24b-2eaacc019d10:1-58"}} keyspace:"customer" shard:"-80"] | ||
[type:ROW row_event:{table_name:"customer.customer" row_changes:{after:{lengths:1 lengths:16 values:"[email protected]"}} keyspace:"customer" shard:"-80"} keyspace:"customer" shard:"-80" type:ROW row_event:{table_name:"customer.customer" row_changes:{after:{lengths:1 lengths:14 values:"[email protected]"}} keyspace:"customer" shard:"-80"} keyspace:"customer" shard:"-80" type:ROW row_event:{table_name:"customer.customer" row_changes:{after:{lengths:1 lengths:18 values:"[email protected]"}} keyspace:"customer" shard:"-80"} keyspace:"customer" shard:"-80" type:ROW row_event:{table_name:"customer.customer" row_changes:{after:{lengths:1 lengths:14 values:"[email protected]"}} keyspace:"customer" shard:"-80"} keyspace:"customer" shard:"-80" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/90a3e2d2-3e14-11ef-bb33-30b3ef9417b6:1-58" table_p_ks:{table_name:"customer" lastpk:{fields:{name:"customer_id" type:INT64 charset:63 flags:53251} rows:{lengths:1 values:"5"}}}} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/97257a80-3e14-11ef-b24b-2eaacc019d10:1-58"}} keyspace:"customer" shard:"-80" type:COMMIT keyspace:"customer" shard:"-80"] | ||
[type:BEGIN keyspace:"customer" shard:"-80" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/90a3e2d2-3e14-11ef-bb33-30b3ef9417b6:1-58"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/97257a80-3e14-11ef-b24b-2eaacc019d10:1-58"}} keyspace:"customer" shard:"-80" type:COMMIT keyspace:"customer" shard:"-80"] | ||
[type:COPY_COMPLETED keyspace:"customer" shard:"-80" type:COPY_COMPLETED] | ||
[type:BEGIN timestamp:1720544456 current_time:1720544456166536000 keyspace:"customer" shard:"-80" type:FIELD timestamp:1720544456 field_event:{table_name:"customer.customer" fields:{name:"customer_id" type:INT64 table:"customer" org_table:"customer" database:"vt_customer" org_name:"customer_id" column_length:20 charset:63 flags:53251 column_type:"bigint"} fields:{name:"email" type:VARBINARY table:"customer" org_table:"customer" database:"vt_customer" org_name:"email" column_length:128 charset:63 flags:128 column_type:"varbinary(128)"} keyspace:"customer" shard:"-80"} current_time:1720544456168488000 keyspace:"customer" shard:"-80" type:ROW timestamp:1720544456 row_event:{table_name:"customer.customer" row_changes:{after:{lengths:4 lengths:9 values:"[email protected]"}} keyspace:"customer" shard:"-80" flags:1} current_time:1720544456168646000 keyspace:"customer" shard:"-80" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/90a3e2d2-3e14-11ef-bb33-30b3ef9417b6:1-59"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/97257a80-3e14-11ef-b24b-2eaacc019d10:1-58"}} keyspace:"customer" shard:"-80" type:COMMIT timestamp:1720544456 current_time:1720544456168652000 keyspace:"customer" shard:"-80"] | ||
[type:BEGIN timestamp:1720544456 current_time:1720544456182035000 keyspace:"customer" shard:"80-" type:FIELD timestamp:1720544456 field_event:{table_name:"customer.customer" fields:{name:"customer_id" type:INT64 table:"customer" org_table:"customer" database:"vt_customer" org_name:"customer_id" column_length:20 charset:63 flags:53251 column_type:"bigint"} fields:{name:"email" type:VARBINARY table:"customer" org_table:"customer" database:"vt_customer" org_name:"email" column_length:128 charset:63 flags:128 column_type:"varbinary(128)"} keyspace:"customer" shard:"80-"} current_time:1720544456183630000 keyspace:"customer" shard:"80-" type:ROW timestamp:1720544456 row_event:{table_name:"customer.customer" row_changes:{after:{lengths:4 lengths:9 values:"[email protected]"}} keyspace:"customer" shard:"80-" flags:1} current_time:1720544456183642000 keyspace:"customer" shard:"80-" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/90a3e2d2-3e14-11ef-bb33-30b3ef9417b6:1-59"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/97257a80-3e14-11ef-b24b-2eaacc019d10:1-59"}} keyspace:"customer" shard:"80-" type:COMMIT timestamp:1720544456 current_time:1720544456183649000 keyspace:"customer" shard:"80-"] | ||
[type:BEGIN timestamp:1720544456 current_time:1720544456197796000 keyspace:"customer" shard:"-80" type:ROW timestamp:1720544456 row_event:{table_name:"customer.customer" row_changes:{after:{lengths:4 lengths:9 values:"[email protected]"}} keyspace:"customer" shard:"-80" flags:1} current_time:1720544456197810000 keyspace:"customer" shard:"-80" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/90a3e2d2-3e14-11ef-bb33-30b3ef9417b6:1-60"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/97257a80-3e14-11ef-b24b-2eaacc019d10:1-59"}} keyspace:"customer" shard:"-80" type:COMMIT timestamp:1720544456 current_time:1720544456197814000 keyspace:"customer" shard:"-80"] | ||
[type:BEGIN timestamp:1720544456 current_time:1720544456211383000 keyspace:"customer" shard:"80-" type:ROW timestamp:1720544456 row_event:{table_name:"customer.customer" row_changes:{after:{lengths:4 lengths:9 values:"[email protected]"}} keyspace:"customer" shard:"80-" flags:1} current_time:1720544456211392000 keyspace:"customer" shard:"80-" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/90a3e2d2-3e14-11ef-bb33-30b3ef9417b6:1-60"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/97257a80-3e14-11ef-b24b-2eaacc019d10:1-60"}} keyspace:"customer" shard:"80-" type:COMMIT timestamp:1720544456 current_time:1720544456211398000 keyspace:"customer" shard:"80-"] | ||
[type:BEGIN timestamp:1720544456 current_time:1720544456224248000 keyspace:"customer" shard:"80-" type:ROW timestamp:1720544456 row_event:{table_name:"customer.customer" row_changes:{after:{lengths:4 lengths:9 values:"[email protected]"}} keyspace:"customer" shard:"80-" flags:1} current_time:1720544456224258000 keyspace:"customer" shard:"80-" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/90a3e2d2-3e14-11ef-bb33-30b3ef9417b6:1-60"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/97257a80-3e14-11ef-b24b-2eaacc019d10:1-61"}} keyspace:"customer" shard:"80-" type:COMMIT timestamp:1720544456 current_time:1720544456224261000 keyspace:"customer" shard:"80-"] | ||
[type:BEGIN timestamp:1720544456 current_time:1720544456237018000 keyspace:"customer" shard:"80-" type:ROW timestamp:1720544456 row_event:{table_name:"customer.customer" row_changes:{after:{lengths:4 lengths:9 values:"[email protected]"}} keyspace:"customer" shard:"80-" flags:1} current_time:1720544456237029000 keyspace:"customer" shard:"80-" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/90a3e2d2-3e14-11ef-bb33-30b3ef9417b6:1-60"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/97257a80-3e14-11ef-b24b-2eaacc019d10:1-62"}} keyspace:"customer" shard:"80-" type:COMMIT timestamp:1720544456 current_time:1720544456237031000 keyspace:"customer" shard:"80-"] | ||
[type:BEGIN timestamp:1720544456 current_time:1720544456249777000 keyspace:"customer" shard:"80-" type:ROW timestamp:1720544456 row_event:{table_name:"customer.customer" row_changes:{after:{lengths:4 lengths:9 values:"[email protected]"}} keyspace:"customer" shard:"80-" flags:1} current_time:1720544456250142000 keyspace:"customer" shard:"80-" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/90a3e2d2-3e14-11ef-bb33-30b3ef9417b6:1-60"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/97257a80-3e14-11ef-b24b-2eaacc019d10:1-63"}} keyspace:"customer" shard:"80-" type:COMMIT timestamp:1720544456 current_time:1720544456250150000 keyspace:"customer" shard:"80-"] | ||
[type:BEGIN timestamp:1720544456 current_time:1720544456263391000 keyspace:"customer" shard:"80-" type:ROW timestamp:1720544456 row_event:{table_name:"customer.customer" row_changes:{after:{lengths:4 lengths:9 values:"[email protected]"}} keyspace:"customer" shard:"80-" flags:1} current_time:1720544456263407000 keyspace:"customer" shard:"80-" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/90a3e2d2-3e14-11ef-bb33-30b3ef9417b6:1-60"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/97257a80-3e14-11ef-b24b-2eaacc019d10:1-64"}} keyspace:"customer" shard:"80-" type:COMMIT timestamp:1720544456 current_time:1720544456263411000 keyspace:"customer" shard:"80-"] | ||
[type:BEGIN timestamp:1720544456 current_time:1720544456276388000 keyspace:"customer" shard:"-80" type:ROW timestamp:1720544456 row_event:{table_name:"customer.customer" row_changes:{after:{lengths:4 lengths:9 values:"[email protected]"}} keyspace:"customer" shard:"-80" flags:1} current_time:1720544456276398000 keyspace:"customer" shard:"-80" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/90a3e2d2-3e14-11ef-bb33-30b3ef9417b6:1-61"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/97257a80-3e14-11ef-b24b-2eaacc019d10:1-64"}} keyspace:"customer" shard:"-80" type:COMMIT timestamp:1720544456 current_time:1720544456276402000 keyspace:"customer" shard:"-80"] | ||
[type:BEGIN timestamp:1720544456 current_time:1720544456289697000 keyspace:"customer" shard:"-80" type:ROW timestamp:1720544456 row_event:{table_name:"customer.customer" row_changes:{after:{lengths:4 lengths:10 values:"[email protected]"}} keyspace:"customer" shard:"-80" flags:1} current_time:1720544456289711000 keyspace:"customer" shard:"-80" type:VGTID vgtid:{shard_gtids:{keyspace:"customer" shard:"-80" gtid:"MySQL56/90a3e2d2-3e14-11ef-bb33-30b3ef9417b6:1-62"} shard_gtids:{keyspace:"customer" shard:"80-" gtid:"MySQL56/97257a80-3e14-11ef-b24b-2eaacc019d10:1-64"}} keyspace:"customer" shard:"-80" type:COMMIT timestamp:1720544456 current_time:1720544456289714000 keyspace:"customer" shard:"-80"] | ||
``` | ||
|
||
If you are interested in additional lower level details, you can check out the [VStream API documentation](../../docs/reference/vreplication/vstream/). | ||
|
||
## An Example Setup | ||
|
||
You could use a similar setup to [the one described here](https://debezium.io/blog/2017/09/25/streaming-to-another-database/), but using the [Debezium Connector for Vitess](https://debezium.io/documentation/reference/connectors/vitess.html) rather than the | ||
[Debezium Connector for MySQL](https://debezium.io/documentation/reference/connectors/mysql.html) and an [AWS RedShift](https://aws.amazon.com/redshift/) instance rather than PostgreSQL as the target | ||
(with [RedShift being based on PostgreSQL](https://docs.aws.amazon.com/redshift/latest/dg/c_redshift-and-postgres-sql.html)). This also demonstrates the general rule that in setting these kinds of systems up you would use a Vitess variant of the | ||
connector/driver rather than the MySQL one — with things otherwise being the same. | ||
|
||
Happy streaming! |