diff --git a/CHANGES.md b/CHANGES.md index c58595f..247fd2e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -10,6 +10,7 @@ - DynamoDB CDC: Add `ctk load table` interface for processing CDC events - DynamoDB CDC: Accept a few more options for the Kinesis Stream: batch-size, create, create-shards, start, seqno, idle-sleep, buffer-time +- DynamoDB Full: Improve error handling wrt. bulk operations vs. usability ## 2024/09/10 v0.0.22 - MongoDB: Rename columns with leading underscores to use double leading underscores diff --git a/cratedb_toolkit/io/dynamodb/backlog.md b/cratedb_toolkit/io/dynamodb/backlog.md deleted file mode 100644 index 1361357..0000000 --- a/cratedb_toolkit/io/dynamodb/backlog.md +++ /dev/null @@ -1,35 +0,0 @@ -# DynamoDB Backlog - -## Iteration +1 -- Pagination / Batch Getting. - https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/programming-with-python.html#programming-with-python-pagination - -- Use `batch_get_item`. - https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/batch_get_item.html - -- Scan by query instead of full. - - -## Iteration +2 - -### Resumption on errors? -Another variant to scan the table, probably for resuming on errors? -```python -key = None -while True: - if key is None: - response = table.scan() - else: - response = table.scan(ExclusiveStartKey=key) - key = response.get("LastEvaluatedKey", None) -``` - -### Item transformations? -That's another item transformation idea picked up from an example program. -Please advise if this is sensible in all situations, or if it's just a -special case. - -```python -if 'id' in item and not isinstance(item['id'], str): - item['id'] = str(item['id']) -``` diff --git a/cratedb_toolkit/io/dynamodb/copy.py b/cratedb_toolkit/io/dynamodb/copy.py index c059faa..4f5a59d 100644 --- a/cratedb_toolkit/io/dynamodb/copy.py +++ b/cratedb_toolkit/io/dynamodb/copy.py @@ -1,13 +1,16 @@ # ruff: noqa: S608 import logging +import typing as t import sqlalchemy as sa from commons_codec.transform.dynamodb import DynamoDBFullLoadTranslator from tqdm import tqdm from yarl import URL +from cratedb_toolkit.io.core import BulkProcessor from cratedb_toolkit.io.dynamodb.adapter import DynamoDBAdapter from cratedb_toolkit.model import DatabaseAddress +from cratedb_toolkit.sqlalchemy.patch import monkeypatch_executemany from cratedb_toolkit.util import DatabaseAdapter from cratedb_toolkit.util.data import asbool @@ -23,9 +26,12 @@ def __init__( self, dynamodb_url: str, cratedb_url: str, + on_error: t.Literal["ignore", "raise"] = "ignore", progress: bool = False, debug: bool = True, ): + monkeypatch_executemany() + cratedb_address = DatabaseAddress.from_string(cratedb_url) cratedb_sqlalchemy_url, cratedb_table_address = cratedb_address.decode() cratedb_table = cratedb_table_address.fullname @@ -37,6 +43,7 @@ def __init__( self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table) self.translator = DynamoDBFullLoadTranslator(table_name=self.cratedb_table) + self.on_error = on_error self.progress = progress self.debug = debug @@ -49,9 +56,7 @@ def start(self): """ records_in = self.dynamodb_adapter.count_records(self.dynamodb_table) logger.info(f"Source: DynamoDB table={self.dynamodb_table} count={records_in}") - logger_on_error = logger.warning - if self.debug: - logger_on_error = logger.exception + with self.cratedb_adapter.engine.connect() as connection: if not self.cratedb_adapter.table_exists(self.cratedb_table): connection.execute(sa.text(self.translator.sql_ddl)) @@ -59,26 +64,35 @@ def start(self): records_target = self.cratedb_adapter.count_records(self.cratedb_table) logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}") progress_bar = tqdm(total=records_in) - records_out = 0 - for result in self.dynamodb_adapter.scan( - table_name=self.dynamodb_table, - consistent_read=self.consistent_read, - batch_size=self.batch_size, - ): - result_size = len(result["Items"]) - try: - operation = self.translator.to_sql(result["Items"]) - except Exception as ex: - logger_on_error(f"Transforming query failed: {ex}") - continue - try: - connection.execute(sa.text(operation.statement), operation.parameters) - records_out += result_size - progress_bar.update(n=result_size) - except Exception as ex: - logger_on_error(f"Executing query failed: {ex}") - progress_bar.close() - connection.commit() - logger.info(f"Number of records written: {records_out}") - if records_out == 0: + + processor = BulkProcessor( + connection=connection, + data=self.fetch(), + batch_to_operation=self.translator.to_sql, + progress_bar=progress_bar, + on_error=self.on_error, + debug=self.debug, + ) + metrics = processor.start() + logger.info(f"Bulk processor metrics: {metrics}") + + logger.info( + "Number of records written: " + f"success={metrics.count_success_total}, error={metrics.count_error_total}" + ) + if metrics.count_success_total == 0: logger.warning("No data has been copied") + + return True + + def fetch(self) -> t.Generator[t.List[t.Dict[str, t.Any]], None, None]: + """ + Fetch data from DynamoDB. Generate batches of items. + """ + data = self.dynamodb_adapter.scan( + table_name=self.dynamodb_table, + consistent_read=self.consistent_read, + batch_size=self.batch_size, + ) + for result in data: + yield result["Items"] diff --git a/cratedb_toolkit/io/kinesis/relay.py b/cratedb_toolkit/io/kinesis/relay.py index 6cb4fa4..274eb7f 100644 --- a/cratedb_toolkit/io/kinesis/relay.py +++ b/cratedb_toolkit/io/kinesis/relay.py @@ -63,8 +63,10 @@ def start(self, once: bool = False): self.kinesis_adapter.consume_forever(self.process_event) def stop(self): - self.progress_bar.close() - self.kinesis_adapter.stop() + if hasattr(self, "progress_bar"): + self.progress_bar.close() + if hasattr(self, "kinesis_adapter"): + self.kinesis_adapter.stop() def process_event(self, event): try: diff --git a/tests/io/dynamodb/conftest.py b/tests/io/dynamodb/conftest.py index c21d0e8..ae74fef 100644 --- a/tests/io/dynamodb/conftest.py +++ b/tests/io/dynamodb/conftest.py @@ -1,14 +1,24 @@ import logging +import time +import typing +import botocore import pytest from yarl import URL +from cratedb_toolkit.io.dynamodb.adapter import DynamoDBAdapter +from cratedb_toolkit.io.kinesis.adapter import KinesisAdapter from tests.io.dynamodb.manager import DynamoDBTestManager logger = logging.getLogger(__name__) -# Define databases to be deleted before running each test case. +# Define streams to be deleted before running each test case. +RESET_STREAMS = [ + "demo", +] + +# Define tables to be deleted before running each test case. RESET_TABLES = [ "ProductCatalog", ] @@ -17,11 +27,15 @@ class DynamoDBFixture: """ A little helper wrapping Testcontainer's `LocalStackContainer`. + + TODO: Generalize into `LocalStackFixture`. """ def __init__(self): self.container = None self.url = None + self.dynamodb_adapter: typing.Union[DynamoDBAdapter, None] = None + self.kinesis_adapter: typing.Union[KinesisAdapter, None] = None self.setup() def setup(self): @@ -32,17 +46,49 @@ def setup(self): self.container.with_services("dynamodb", "kinesis") self.container.start() + self.dynamodb_adapter = DynamoDBAdapter(URL(f"{self.get_connection_url_dynamodb()}/?region=us-east-1")) + self.kinesis_adapter = KinesisAdapter( + URL(f"{self.get_connection_url_kinesis_dynamodb_cdc()}/?region=us-east-1") + ) + def finalize(self): self.container.stop() def reset(self): """ - Drop all databases used for testing. + Reset all resources to provide each test case with a fresh canvas. + """ + self.reset_streams() + self.reset_tables() + + def reset_streams(self): + """ + Drop all Kinesis streams used for testing. + """ + kinesis_client = self.kinesis_adapter.kinesis_client + for stream_name in RESET_STREAMS: + try: + kinesis_client.delete_stream(StreamName=stream_name) + except botocore.exceptions.ClientError as error: + if error.response["Error"]["Code"] != "ResourceNotFoundException": + raise + waiter = kinesis_client.get_waiter("stream_not_exists") + waiter.wait(StreamName=stream_name, WaiterConfig={"Delay": 0.3, "MaxAttempts": 15}) + time.sleep(0.25) + + def reset_tables(self): + """ + Drop all DynamoDB tables used for testing. """ - # FIXME - return - for database_name in RESET_TABLES: - self.client.drop_database(database_name) + dynamodb_client = self.dynamodb_adapter.dynamodb_client + for table_name in RESET_TABLES: + try: + dynamodb_client.delete_table(TableName=table_name) + except botocore.exceptions.ClientError as error: + if error.response["Error"]["Code"] != "ResourceNotFoundException": + raise + waiter = dynamodb_client.get_waiter("table_not_exists") + waiter.wait(TableName=table_name, WaiterConfig={"Delay": 0.3, "MaxAttempts": 15}) def get_connection_url_dynamodb(self): url = URL(self.container.get_url()) diff --git a/tests/io/dynamodb/test_copy.py b/tests/io/dynamodb/test_copy.py index 609f29b..984cfea 100644 --- a/tests/io/dynamodb/test_copy.py +++ b/tests/io/dynamodb/test_copy.py @@ -5,22 +5,24 @@ pytestmark = pytest.mark.dynamodb -RECORD = { - "Id": {"N": "101"}, -} - - -def test_dynamodb_copy_success(caplog, cratedb, dynamodb, dynamodb_test_manager): +def test_dynamodb_copy_basic_success(caplog, cratedb, dynamodb, dynamodb_test_manager): """ - Verify `DynamoDBFullLoad` works as expected. + Verify a basic `DynamoDBFullLoad` works as expected. """ + data_in = { + "Id": {"N": "101"}, + } + data_out = { + "Id": 101.0, + } + # Define source and target URLs. dynamodb_url = f"{dynamodb.get_connection_url_dynamodb()}/demo?region=us-east-1" cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" # Populate source database with data. - dynamodb_test_manager.load_records(table_name="demo", records=[RECORD]) + dynamodb_test_manager.load_records(table_name="demo", records=[data_in]) # Run transfer command. table_loader = DynamoDBFullLoad(dynamodb_url=dynamodb_url, cratedb_url=cratedb_url) @@ -32,4 +34,41 @@ def test_dynamodb_copy_success(caplog, cratedb, dynamodb, dynamodb_test_manager) assert cratedb.database.count_records("testdrive.demo") == 1 results = cratedb.database.run_sql("SELECT * FROM testdrive.demo;", records=True) # noqa: S608 - assert results[0]["data"] == {"Id": 101.0} + assert results[0]["data"] == data_out + + +def test_dynamodb_copy_basic_warning(caplog, cratedb, dynamodb, dynamodb_test_manager): + """ + Verify a basic `DynamoDBFullLoad` works as expected, this time omitting a warning on an invalid record. + """ + + data_in = [ + {"Id": {"N": "1"}, "name": {"S": "Foo"}}, + {"Id": {"N": "2"}, "name": {"S": "Bar"}, "nested_array": {"L": [{"L": [{"N": "1"}, {"N": "2"}]}]}}, + {"Id": {"N": "3"}, "name": {"S": "Baz"}}, + ] + data_out = [ + {"data": {"Id": 1, "name": "Foo"}, "aux": {}}, + {"data": {"Id": 3, "name": "Baz"}, "aux": {}}, + ] + + # Define source and target URLs. + dynamodb_url = f"{dynamodb.get_connection_url_dynamodb()}/demo?region=us-east-1" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + + # Populate source database with data. + dynamodb_test_manager.load_records(table_name="demo", records=data_in) + + # Run transfer command. + table_loader = DynamoDBFullLoad(dynamodb_url=dynamodb_url, cratedb_url=cratedb_url) + table_loader.start() + + # Verify data in target database. + assert cratedb.database.table_exists("testdrive.demo") is True + assert cratedb.database.refresh_table("testdrive.demo") is True + assert cratedb.database.count_records("testdrive.demo") == 2 + + results = cratedb.database.run_sql("SELECT * FROM testdrive.demo ORDER BY data['Id'];", records=True) # noqa: S608 + assert results == data_out + + assert "Dynamic nested arrays are not supported" in caplog.text diff --git a/tests/io/dynamodb/test_relay.py b/tests/io/dynamodb/test_relay.py index 7576cf2..ac30a60 100644 --- a/tests/io/dynamodb/test_relay.py +++ b/tests/io/dynamodb/test_relay.py @@ -1,7 +1,6 @@ import threading import time -import botocore import pytest from cratedb_toolkit.io.kinesis.relay import KinesisRelay @@ -45,17 +44,6 @@ def test_kinesis_earliest_dynamodb_cdc_insert_update(caplog, cratedb, dynamodb): # Initialize table loader. table_loader = KinesisRelay(kinesis_url=kinesis_url, cratedb_url=cratedb_url) - # Delete stream for blank canvas. - try: - table_loader.kinesis_adapter.kinesis_client.delete_stream(StreamName="demo", EnforceConsumerDeletion=True) - except botocore.exceptions.ClientError as error: - if error.response["Error"]["Code"] != "ResourceNotFoundException": - raise - - # LocalStack needs a while when deleting the Stream. - # FIXME: Can this be made more efficient? - time.sleep(0.5) - # Populate source database with data. for event in events: table_loader.kinesis_adapter.produce(event) @@ -98,17 +86,6 @@ def test_kinesis_latest_dynamodb_cdc_insert_update(caplog, cratedb, dynamodb): # Initialize table loader. table_loader = KinesisRelay(kinesis_url=kinesis_url, cratedb_url=cratedb_url) - # Delete stream for blank canvas. - try: - table_loader.kinesis_adapter.kinesis_client.delete_stream(StreamName="demo") - except botocore.exceptions.ClientError as error: - if error.response["Error"]["Code"] != "ResourceNotFoundException": - raise - - # LocalStack needs a while when deleting the Stream. - # FIXME: Can this be made more efficient instead of waiting multiple times to orchestrate this sequence? - time.sleep(0.5) - # Start event processor / stream consumer in separate thread, consuming forever. thread = threading.Thread(target=table_loader.start) thread.start()