From 96ffe3d08dd9f30000107d5c00007d1c4dcace2c Mon Sep 17 00:00:00 2001 From: April M <36110273+aimurphy@users.noreply.github.com> Date: Mon, 11 Nov 2024 13:34:52 -0800 Subject: [PATCH] remove duplicate content from streaming (#122) --- modules/ROOT/pages/astream-faq.adoc | 2 +- modules/developing/pages/astream-cdc.adoc | 477 +----------------- modules/operations/pages/astream-regions.adoc | 10 +- 3 files changed, 8 insertions(+), 481 deletions(-) diff --git a/modules/ROOT/pages/astream-faq.adoc b/modules/ROOT/pages/astream-faq.adoc index d2c66fb..f892a66 100644 --- a/modules/ROOT/pages/astream-faq.adoc +++ b/modules/ROOT/pages/astream-faq.adoc @@ -30,7 +30,7 @@ And finally {product} offers a user-friendly interface and free tier to satisfy == What is CDC for {astra_db}? -Change Data Capture (CDC) for {astra_db} uses a Pulsar IO source connector that processes changes from the Cassandra Change Agent via a Pulsar topic. +Change Data Capture (CDC) for {astra_db} uses a Pulsar IO source connector that processes changes from the Cassandra Change Agent through a Pulsar topic. For more information, see xref:developing:astream-cdc.adoc[]. == What are tenants, namespaces, topics, and sinks? diff --git a/modules/developing/pages/astream-cdc.adoc b/modules/developing/pages/astream-cdc.adoc index cf46cc6..4778442 100644 --- a/modules/developing/pages/astream-cdc.adoc +++ b/modules/developing/pages/astream-cdc.adoc @@ -3,9 +3,7 @@ [IMPORTANT] ==== -* CDC connectors are only available for {db-serverless} deployments. - -* Enabling CDC for {db-serverless} databases increases costs based on your {product} usage. +Enabling CDC for {db-serverless} databases incurs billed charges based on your {product} usage. See https://www.datastax.com/pricing/astra-streaming[{product} pricing] and https://www.datastax.com/products/datastax-astra/cdc-for-astra-db[CDC metering rates]. ==== @@ -13,475 +11,4 @@ CDC for {astra_db} automatically captures changes in real time, de-duplicates th {product} processes data changes via a Pulsar topic. By design, the Change Data Capture (CDC) component is simple, with a 1:1 correspondence between the table and a single Pulsar topic. -This guide explains how to connect your {astra_db} database to CDC and send change data to an Elasticsearch sink. - -== Supported data structures - -The following data types and corresponding AVRO or logical types are supported for CDC for {db-serverless} databases: - -[cols="1,1"] -|=== -| Data type | AVRO type - -| ascii -| string - -| bigint -| long - -| blob -| bytes - -| boolean -| boolean - -| counter -| long - -| date -| int - -| decimal -| cql_decimal - -| double -| double - -| duration -| cql_duration - -| float -| float - -| inet -| string - -| int -| int - -| list -| array - -| map -| map (only string-type keys are supported) - -| set -| array - -| smallint -| int - -| text -| string - -| time -| long - -| timestamp -| long - -| timeuuid -| string - -| tinyint -| int - -| uuid -| string - -| varchar -| string - -| varint -| cql_varint / bytes -|=== - -Cassandra static columns are supported: - -* On row-level updates, static columns are included in the message value. -* On partition-level updates, the clustering keys are null in the message key. -The message value only has static columns on `INSERT` and `UPDATE` operations. - -For columns using data types that are not supported, the data types are omitted from the events sent to the data topic. -If a row update contains both supported and unsupported data types, the event includes only columns with supported data types. - -=== AVRO interpretation - -{db-serverless} database keys are strings, while CDC produces AVRO messages which are structures. The conversion for some AVRO structures requires additional tooling that can result in unexpected output. - -The table below describes the conversion of AVRO logical types. The `record` type is a schema containing the listed fields. - -.AVRO complex types -[cols="1,1,1,1"] -|=== -|Name |AVRO type |Fields |Explanation - -|collections -|array -|lists, sets -|Sets and Lists are treated as AVRO type `array`, with the attribute `items` containing the schema of the array's items. - -|decimal -|record -|BIG_INT, DECIMAL_SCALE -|The Cassandra DECIMAL type is converted to a `record` with the `cql_decimal` logical type. - -|duration -|record -|CQL_DURATION_MONTHS, CQL_DURATION_DAYS, CQL_DURATION_NANOSECONDS -|The Cassandra DURATION type is converted to a `record` with the `cql_duration` logical type. - -|maps -|map -|KEYS_CONVERTED_TO_STRINGS, VALUE_SCHEMA -|The Cassandra MAP type is converted to the AVRO map type, but the keys are converted to strings. -For complex types, the key is represented in JSON. - -|=== - -== Limitations - -CDC for {db-serverless} databases has the following limitations: - -* Does not manage table truncates. -* Does not sync data available before starting the CDC agent. -* Does not replay logged batches. -* Does not manage time-to-live. -* Does not support range deletes. -* CQL column names must not match a Pulsar primitive type name (ex: INT32). -* Does not support multi-region. -* Does not support multi-table mutations. - -== Prerequisites - -You need the following items to complete this procedure: - -* An active {url-astra}[{astra_db} account^]. -* An https://docs.datastax.com/en/astra-db-serverless/databases/create-database.html#create-a-serverless-non-vector-database[{db-serverless} database] created in the {link-astra-portal}. -* An https://docs.datastax.com/en/astra-db-serverless/databases/manage-keyspaces.html[keyspace] created in the {link-astra-portal}. -* An active https://cloud.elastic.co/login[Elasticsearch] account. -* An Elasticsearch endpoint, index name, and API key retrieved from your https://cloud.elastic.co/[Elasticsearch Deployment]. - -[[create-tenant]] -== Create a streaming tenant - -. In the {link-astra-portal} navigation menu, click *Streaming*. - -. Select *Create Tenant*. - -. Enter a name for your new streaming tenant. - -. Select a provider and region. -+ -{product} CDC can only be used in a region that supports both {product} and {db-serverless} databases. -See xref:operations:astream-regions.adoc[] for more information. - -. Select *Create Tenant*. - -[[create-table]] -== Create a table - -. Select *Databases* from the main navigation. - -. Select the name of the active database that you would like to use. - -. Select the *CQL Console* tab. - -. Create a table with a primary key column using the following command. Edit the command to add your *`KEYSPACE_NAME`* and choose a *`TABLE_NAME`*. -+ -[source,cql,subs="+quotes"] ----- -CREATE TABLE IF NOT EXISTS *KEYSPACE_NAME*.*TABLE_NAME* (key text PRIMARY KEY, c1 text); ----- - -. Confirm that your table was created: -+ -[source,cql,subs="+quotes"] ----- -select * from *KEYSPACE_NAME*.*TABLE_NAME*; ----- -+ -.Result -[%collapsible] -==== -[source,console,subs="verbatim,quotes"] ----- - key | c1 ------+---- - -(0 rows) ----- -==== - -== Connect to CDC for {db-serverless} databases - -Complete the following steps after you have created a <> and a <>. - -. In the {astra_ui}, go to your database. - -. Click the *CDC* tab. - -. Click *Enable CDC*. - -. Select a tenant, keyspace, and table. - -. Click *Enable CDC*. - -Enabling CDC creates a new `astracdc` namespace with two new topics: `data-` and `log-`. -The `log-` topic consumes schema changes, processes them, and then writes clean data to the `data-` topic. -The `log-` topic is for mandatory CDC functionality and should not be used. -The `data-` topic is used to consume CDC data in {product}. -For more information, see <>. - -== Connect Elasticsearch sink - -Connect an Elasticsearch sink to CDC that consumes messages from the `data-` topic and sends them to your Elasticsearch deployment. - -. Go to your database's *CDC* tab. - -. Under *Change Data Capture*, select the name of the CDC-enabled table you would like to use. -You should still be in the CDC tab after selecting a name, but the header becomes *CDC for `TABLE_NAME`* with a green *Active* icon next to it. - -. Select *Add Elastic Search Sink* to select your settings. - -. Select the `astracdc` namespace. - -. Select *Elastic Search* for the sink type. - -. Enter a name for your sink. - -. Under *Connect Topics*, select a `data-` topic in the `astracdc` namespace for the input topic. - -. Complete *Sink-Specific Configuration* with the *Elasticsearch URL*, *Index name*, and *API key* found in your https://cloud.elastic.co/[Elasticsearch deployment portal]. -Leave username, password, and token blank. -+ -Default values auto-populate. -These values are recommended: -+ -* `Ignore Record Key` as `false` -* `Null Value Action` as `DELETE` -* `Enable Schema` as `true` - -. Click *Create*. - -. Confirm that your new sink was created on the *Sinks* tab. - -== Send messages - -Process some changes with CDC: - -. Go to your database's *CQL Console* tab. - -. Modify the table you created: -+ -[source,cql,subs="+quotes"] ----- -INSERT INTO *KEYSPACE_NAME*.*TABLE_NAME* (key,c1) VALUES ('32a','bob3123'); -INSERT INTO *KEYSPACE_NAME*.*TABLE_NAME* (key,c1) VALUES ('32b','bob3123b'); ----- - -. Confirm the changes you made: -+ -[source,cql,subs="+quotes"] ----- -select * from *KEYSPACE_NAME*.*TABLE_NAME*; ----- -+ -.Result -[%collapsible] -==== -[source,console] ----- - key | c1 ------+---------- - 32a | bob3123 - 32b | bob3123b - -(2 rows) ----- -==== - -Your processed changes in the resulting table verify that the messages sent successfully. - -== Confirm Elasticsearch receives change data - -Send a `GET` request to your Elasticsearch deployment to confirm Elasticsearch is receiving changes from your database via CDC: - -[source,curl,subs="+quotes"] ----- -curl -sS --location -X POST "*ELASTIC_URL*/*INDEX_NAME*/_search?pretty" ---header "Authorization: ApiKey '*API_KEY*'" ----- - -Make sure the response includes your changes to the index: - -[source,json,subs="+quotes"] ----- -{ - "took": 1, - "timed_out": false, - "_shards": { - "total": 1, - "successful": 1, - "skipped": 0, - "failed": 0 - }, - "hits": { - "total": { - "value": 3, - "relation": "eq" - }, - "max_score": 1.0, - "hits": [ - { - "_index": "*INDEX_NAME*", - "_id": "khl_hI0Bh25AUvCHghQo", - "_score": 1.0, - "_source": { - "name": "foo", - "title": "bar" - } - }, - { - "_index": "*INDEX_NAME*", - "_id": "32a", - "_score": 1.0, - "_source": { - "c1": "bob3123" - } - }, - { - "_index": "*INDEX_NAME*", - "_id": "32b", - "_score": 1.0, - "_source": { - "c1": "bob3123b" - } - } - ] - } -} ----- - -[[increase-partitions]] -== Increase the CDC data-topic partitions - -After enabling CDC, 3 data and 3 log partitions are created under the `astracdc` namespace. -Increasing the number of partitions will create new partitions, but existing data will remain in the old partitions. -New messages will be distributed across the new partitions. - -. Confirm the current state of the topic before making changes: -+ -[source,bash] ----- -bin/pulsar-admin topics list-partitioned-topics astracdc ----- -+ -.Result -[%collapsible] -==== -[source,console] ----- -persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-1 -persistent://ten01/astracdc/log-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-2 -persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-0 -persistent://ten01/astracdc/log-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-0 -persistent://ten01/astracdc/log-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-1 -persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-2 ----- -==== - -. Use the `update-partitioned-topic` command to change the number of partitions for a specified topic: -+ -[source,bash] ----- -bin/pulsar-admin topics update-partitioned-topic ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1 --partitions 10 ----- -+ -This example increases the number of partitions to 10. -You can only increase the number of partitions. -Decreasing is not supported due to potential data loss and message ordering issues. - -. Verify the update: -+ -[source,bash] ----- -bin/pulsar-admin topics list ten01/astracdc ----- -+ -.Result -[%collapsible] -==== -[source,console] ----- -persistent://ten01/astracdc/log-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-2 -persistent://ten01/astracdc/log-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-0 -persistent://ten01/astracdc/log-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-1 -persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-9 -persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-8 -persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-7 -persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-6 -persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-1 -persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-0 -persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-5 -persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-4 -persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-3 -persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1-partition-2 ----- -==== - -. Check the topic to confirm it has been updated to have 10 partitions: -+ -[source,bash] ----- -bin/pulsar-admin topics partitioned-stats persistent://ten01/astracdc/data-7e3a1b2c-4d5e-6f7a-8b9c-0d1e2f3a4b5c-keysp.table1 ----- -+ -.Result -[%collapsible] -==== -[source,json] ----- -{ - "msgRateIn" : 0.0, - "msgThroughputIn" : 0.0, - "msgRateOut" : 0.0, - "msgThroughputOut" : 0.0, - "bytesInCounter" : 0, - "msgInCounter" : 0, - "bytesOutCounter" : 0, - "msgOutCounter" : 0, - "averageMsgSize" : 0.0, - "msgChunkPublished" : false, - "storageSize" : 0, - "backlogSize" : 0, - "publishRateLimitedTimes" : 0, - "earliestMsgPublishTimeInBacklogs" : 0, - "offloadedStorageSize" : 0, - "lastOffloadLedgerId" : 0, - "lastOffloadSuccessTimeStamp" : 0, - "lastOffloadFailureTimeStamp" : 0, - "publishers" : [ ], - "waitingPublishers" : 0, - "subscriptions" : { }, - "replication" : { }, - "nonContiguousDeletedMessagesRanges" : 0, - "nonContiguousDeletedMessagesRangesSerializedSize" : 0, - "compaction" : { - "lastCompactionRemovedEventCount" : 0, - "lastCompactionSucceedTimestamp" : 0, - "lastCompactionFailedTimestamp" : 0, - "lastCompactionDurationTimeInMills" : 0 - }, - "metadata" : { - "partitions" : 10 - }, - "partitions" : { } -} ----- -==== - -== See also - -* xref:ROOT:astream-faq.adoc[] -* xref:developing:clients/index.adoc[] \ No newline at end of file +For instructions and more information about CDC for {astra_db}, see xref:astra-db-serverless:databases:change-data-capture.adoc[]. diff --git a/modules/operations/pages/astream-regions.adoc b/modules/operations/pages/astream-regions.adoc index 0fe6b88..91b3cb6 100644 --- a/modules/operations/pages/astream-regions.adoc +++ b/modules/operations/pages/astream-regions.adoc @@ -1,12 +1,12 @@ = {product} regions :page-tag: astra-streaming,admin,manage,pulsar -When creating a tenant, select a region for your tenant. -Choose a region that is geographically close to your users to optimize performance. - +When you create a tenant, you must choose a region for your tenant. {product} supports AWS, Microsoft Azure, and Google Cloud regions. -These regions are also supported by CDC for {astra_db}. -{product} CDC can only be used in a region that supports both {product} and {astra_db}. +For optimal performance, choose a region that is geographically close to your users. + +These regions also support CDC for {astra_db}. +You can only use xref:developing:astream-cdc.adoc[CDC for {astra_db}] in regions that support both {product} and {astra_db}. ElasticSearch and Snowflake can be in different regions than {product}.