Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Storing all donations in single table #240

Merged
merged 3 commits into from
Dec 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions scripts/donations-new-table-reload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""
Module to read data from DynamoDB tables and populate into a single DynamoDB table, with logging and dry-run feature.

This module provides functions to read items from multiple DynamoDB tables that were storing donations data for
individual funds, and populate these items into a single DynamoDB table that will hold donations.
"""

import sys
import boto3
from botocore.exceptions import ClientError
import logging

def get_dynamodb_tables(dynamodb_client, exclude_table):
"""
Get a list of DynamoDB table names, excluding a specific table.

Args:
dynamodb_client: A boto3 DynamoDB client.
exclude_table: The name of the table to exclude from the list.

Returns:
A list of table names.
"""
try:
response = dynamodb_client.list_tables()
return [table for table in response['TableNames'] if table != exclude_table]
except ClientError as error:
logging.error(f"Error fetching table names: {error}")
return []

def read_table_items(dynamodb_client, table_name):
"""
Read all items from a DynamoDB table.

Args:
dynamodb_client: A boto3 DynamoDB client.
table_name: The name of the DynamoDB table.

Returns:
A list of items from the table.
"""
try:
response = dynamodb_client.scan(TableName=table_name)
items = response['Items']
logging.info(f"Read {len(items)} items from table '{table_name}'")
return items
except ClientError as error:
logging.error(f"Error reading items from table {table_name}: {error}")
return []

def write_items_to_table(dynamodb_client, target_table, items, original_table_name, dry_run):
"""
Write items to a DynamoDB table with an additional attribute.

Args:
dynamodb_client: A boto3 DynamoDB client.
target_table: The name of the target DynamoDB table.
items: A list of items to write.
original_table_name: The name of the original table to use for the 'fund_id' attribute.
dry_run: If True, do not write to the table, only log the actions.
"""
for item in items:
item['fund_id'] = {'S': original_table_name}
if not dry_run:
try:
dynamodb_client.put_item(TableName=target_table, Item=item)
except ClientError as error:
logging.error(f"Error writing item to table {target_table}: {error}")

def main(dry_run=False, region_name='us-east-1'):
"""
Main function to migrate DynamoDB items from multiple tables to a single table.

Args:
dry_run: If True, do not write to the target table, only read and log the actions.
region_name: AWS region where the DynamoDB tables are located.
"""
dynamodb_client = boto3.client('dynamodb', region_name=region_name)
source_tables = get_dynamodb_tables(dynamodb_client, 'funds')
target_table = 'donations-all'

for table_name in source_tables:
items = read_table_items(dynamodb_client, table_name)
write_items_to_table(dynamodb_client, target_table, items, table_name, dry_run)

if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
stream=sys.stdout
)
main(dry_run=True, region_name='us-east-1') # Set to False to execute data writing
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package com.yuriytkach.tracker.fundraiser;

import static com.yuriytkach.tracker.fundraiser.DynamoDbTestResource.DONATIONS_TABLE;
import static com.yuriytkach.tracker.fundraiser.DynamoDbTestResource.FUND;
import static com.yuriytkach.tracker.fundraiser.DynamoDbTestResource.FUNDS_TABLE;
import static com.yuriytkach.tracker.fundraiser.DynamoDbTestResource.FUND_1_TABLE;
import static com.yuriytkach.tracker.fundraiser.service.dynamodb.DynamoDbDonationClientDonation.ALL_ATTRIBUTES;
import static com.yuriytkach.tracker.fundraiser.service.dynamodb.DynamoDbDonationClientDonation.ALL_ATTRIBUTES_WITHOUT_FUND_ID;
import static com.yuriytkach.tracker.fundraiser.service.dynamodb.DynamoDbDonationClientDonation.COL_FUND_ID;
import static com.yuriytkach.tracker.fundraiser.service.dynamodb.DynamoDbDonationClientDonation.COL_ID;
import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -47,21 +48,23 @@ public abstract class AbstractFundOperationsTestCommon {
@AfterEach
void cleanUp() {
log.info("----- CLEANUP ------");
deleteItemByIdDirectly(FUND_1_TABLE, COL_ID, ITEM_ID_1, ITEM_ID_2);
deleteItemByIdDirectly(DONATIONS_TABLE, COL_ID, ITEM_ID_1, ITEM_ID_2);
deleteItemByIdDirectly(FUNDS_TABLE, DynamoDbFundStorageClient.COL_NAME, FUND_2_NAME);
deleteTableIfExists(FundService.FUND_TABLE_PREFIX + FUND_2_NAME);
fundStorageClient.save(FUND);
}

protected Optional<Donation> getDonationDirectlyById(final String donationId) {
protected Optional<DonationWithFundId> getDonationDirectlyById(final String donationId) {
final GetItemRequest dbGetItemRequest = GetItemRequest.builder()
.tableName(FUND_1_TABLE)
.tableName(DONATIONS_TABLE)
.key(Map.of(COL_ID, AttributeValue.builder().s(donationId).build()))
.attributesToGet(ALL_ATTRIBUTES)
.attributesToGet(StreamEx.of(ALL_ATTRIBUTES_WITHOUT_FUND_ID, 0, ALL_ATTRIBUTES_WITHOUT_FUND_ID.length)
.append(COL_FUND_ID).toArray(String.class))
.build();
final GetItemResponse response = dynamoDB.getItem(dbGetItemRequest);
assertThat(response.item()).isNotEmpty();
return DynamoDbDonationClientDonation.parseDonation(response.item());
return DynamoDbDonationClientDonation.parseDonation(response.item())
.map(donation -> new DonationWithFundId(response.item().get(COL_FUND_ID).s(), donation));
}

protected Optional<Fund> getFundDirectlyByName(final String name) {
Expand All @@ -75,7 +78,7 @@ protected Optional<Fund> getFundDirectlyByName(final String name) {
return DynamoDbFundStorageClient.parseFund(response.item());
}

protected void deleteItemByIdDirectly(final String fundTable, final String keyColumn, final Object... ids) {
protected void deleteItemByIdDirectly(final String table, final String keyColumn, final Object... ids) {
final var requests = StreamEx.of(ids)
.map(id -> AttributeValue.builder().s(id.toString()).build())
.map(attrValue -> Map.of(keyColumn, attrValue))
Expand All @@ -84,7 +87,7 @@ protected void deleteItemByIdDirectly(final String fundTable, final String keyCo
.toList();

dynamoDB.batchWriteItem(BatchWriteItemRequest.builder()
.requestItems(Map.of(fundTable, requests))
.requestItems(Map.of(table, requests))
.build());
}

Expand All @@ -95,4 +98,6 @@ private void deleteTableIfExists(final String tableName) {
}
}

public record DonationWithFundId(String fundId, Donation donation) { }

}
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package com.yuriytkach.tracker.fundraiser;

import static com.yuriytkach.tracker.fundraiser.DynamoDbTestResource.FUND;
import static com.yuriytkach.tracker.fundraiser.DynamoDbTestResource.FUNDS_TABLE;
import static com.yuriytkach.tracker.fundraiser.DynamoDbTestResource.FUND_1_TABLE;
import static com.yuriytkach.tracker.fundraiser.DynamoDbTestResource.FUND_1_ID;
import static com.yuriytkach.tracker.fundraiser.DynamoDbTestResource.FUND_OWNER;
import static com.yuriytkach.tracker.fundraiser.service.dynamodb.DynamoDbDonationClientDonation.COL_AMOUNT;
import static com.yuriytkach.tracker.fundraiser.service.dynamodb.DynamoDbDonationClientDonation.COL_CURR;
import static com.yuriytkach.tracker.fundraiser.service.dynamodb.DynamoDbDonationClientDonation.COL_FUND_ID;
import static com.yuriytkach.tracker.fundraiser.service.dynamodb.DynamoDbDonationClientDonation.COL_ID;
import static com.yuriytkach.tracker.fundraiser.service.dynamodb.DynamoDbDonationClientDonation.COL_PERSON;
import static com.yuriytkach.tracker.fundraiser.service.dynamodb.DynamoDbDonationClientDonation.COL_TIME;
Expand Down Expand Up @@ -55,7 +55,6 @@
import one.util.streamex.StreamEx;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;

@Slf4j
Expand Down Expand Up @@ -145,11 +144,6 @@ void shouldCreateFund(final String cmdTextSuffix, final String expectedDesc, fin
assertThat(fund2.getCurrency()).isEqualTo(fund2Currency);
assertThat(fund2.getCreatedAt()).isCloseTo(Instant.now(), within(1, SECONDS));
assertThat(fund2.getUpdatedAt()).isCloseTo(Instant.now(), within(1, SECONDS));

final ListTablesResponse allTablesResponse = dynamoDB.listTables();
assertThat(allTablesResponse.tableNames()).containsExactlyInAnyOrder(
expectedFund2TableName, FUNDS_TABLE, FUND_1_TABLE
);
}

@Test
Expand Down Expand Up @@ -190,11 +184,6 @@ void shouldDeleteFund() {
.responseType(SlackResponse.RESPONSE_PRIVATE)
.text(":white_check_mark: Deleted fund `" + FUND_2_NAME + "`")
.build()));

final ListTablesResponse allTablesResponse = dynamoDB.listTables();
assertThat(allTablesResponse.tableNames()).containsExactlyInAnyOrder(
FUNDS_TABLE, FUND_1_TABLE
);
}

@ParameterizedTest
Expand Down Expand Up @@ -225,14 +214,17 @@ void shouldReturnOKIfTrackSuccessful(final String person, final String expectedP
+ " - :open_book: `fundy` 22.30% [223 of 1000] EUR - :bank:-1")
.build()));

final Optional<Donation> donation = getDonationDirectlyById(ITEM_ID_1.toString());
assertThat(donation).hasValue(Donation.builder()
.id(ITEM_ID_1.toString())
.currency(FUND.getCurrency())
.amount(123)
.dateTime(Instant.parse("2022-02-01T12:13:00Z"))
.person(expectedPerson)
.build());
final Optional<DonationWithFundId> donation = getDonationDirectlyById(ITEM_ID_1.toString());
assertThat(donation).hasValue(new DonationWithFundId(
FUND_1_ID,
Donation.builder()
.id(ITEM_ID_1.toString())
.currency(FUND.getCurrency())
.amount(123)
.dateTime(Instant.parse("2022-02-01T12:13:00Z"))
.person(expectedPerson)
.build()
));

final Optional<Fund> fund = getFundDirectlyByName(FUND.getName());
assertThat(fund).hasValue(FUND.toBuilder()
Expand Down Expand Up @@ -417,13 +409,14 @@ private void addDonationDirectly(
) {
final Map<String, AttributeValue> item = new HashMap<>();
item.put(COL_ID, AttributeValue.builder().s(itemId.toString()).build());
item.put(COL_FUND_ID, AttributeValue.builder().s(FUND_1_ID).build());
item.put(COL_CURR, AttributeValue.builder().s(curr).build());
item.put(COL_PERSON, AttributeValue.builder().s(person).build());
item.put(COL_AMOUNT, AttributeValue.builder().n(String.valueOf(amount)).build());
item.put(COL_TIME, AttributeValue.builder().s(dateTime.toString()).build());

final var putRequest = PutItemRequest.builder()
.tableName(FUND_1_TABLE)
.tableName(DynamoDbTestResource.DONATIONS_TABLE)
.item(item)
.build();
dynamoDB.putItem(putRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@
public class DynamoDbTestResource implements QuarkusTestResourceLifecycleManager {

public static final String FUNDS_TABLE = "all-funds-table";
public static final String ENABLED_INDEX = "test-mono-index";
public static final String FUND_1_TABLE = "donations-table";
public static final String FUND_2_TABLE = "disabled-table";
public static final String ENABLED_INDEX = "test-enabled-index";
public static final String FUND_ID_INDEX = "test-fund-id-index";
public static final String DONATIONS_TABLE = "donations-table";
public static final String FUND_1_ID = "fund-id-1";
public static final String FUND_2_ID = "fund-id-2";
public static final String FUND_OWNER = "owner";
public static final String FUND_RED = "red";
public static final String FUND_DESC = "description";
Expand All @@ -51,7 +53,7 @@ public class DynamoDbTestResource implements QuarkusTestResourceLifecycleManager
public static final String FUND_DISABLED_NAME = "dis-fund";

public static final Fund FUND = Fund.builder()
.id(FUND_1_TABLE)
.id(FUND_1_ID)
.enabled(true)
.name(FUND_1_NAME)
.goal(1000)
Expand All @@ -66,7 +68,7 @@ public class DynamoDbTestResource implements QuarkusTestResourceLifecycleManager
.build();

public static final Fund FUND_DISABLED = Fund.builder()
.id(FUND_2_TABLE)
.id(FUND_2_ID)
.enabled(false)
.name(FUND_DISABLED_NAME)
.goal(2000)
Expand Down Expand Up @@ -99,16 +101,25 @@ public Map<String, String> start() {
dynamoDB,
FUNDS_TABLE,
DynamoDbFundStorageClient.COL_NAME,
buildSecondaryIndex(ENABLED_INDEX, DynamoDbFundStorageClient.COL_ENABLED)
buildSecondaryIndex(ENABLED_INDEX, DynamoDbFundStorageClient.COL_ENABLED),
ScalarAttributeType.N
);
createTable(
dynamoDB,
DONATIONS_TABLE,
DynamoDbDonationClientDonation.COL_ID,
buildSecondaryIndex(FUND_ID_INDEX, DynamoDbDonationClientDonation.COL_FUND_ID),
ScalarAttributeType.S
);
createTable(dynamoDB, FUND_1_TABLE, DynamoDbDonationClientDonation.COL_ID, null);

createFundItem(dynamoDB, FUND);
createFundItem(dynamoDB, FUND_DISABLED);

return Map.of(
"app.funds-table", FUNDS_TABLE,
"app.funds-enabled-index", ENABLED_INDEX,
"app.donations-table", DONATIONS_TABLE,
"app.donations-fund-id-index", FUND_ID_INDEX,
"quarkus.dynamodb.endpoint-override", url
);
}
Expand Down Expand Up @@ -166,7 +177,8 @@ private void createTable(
final AmazonDynamoDB dynamoDB,
final String tableName,
final String keyColumn,
@Nullable final GlobalSecondaryIndex index
@Nullable final GlobalSecondaryIndex index,
final ScalarAttributeType secondKeyType
) {
final CreateTableRequest request = new CreateTableRequest();
request.setTableName(tableName);
Expand All @@ -181,7 +193,7 @@ private void createTable(
if (index != null) {
request.withGlobalSecondaryIndexes(List.of(index));
request.withAttributeDefinitions(new AttributeDefinition(
index.getKeySchema().get(0).getAttributeName(), ScalarAttributeType.N));
index.getKeySchema().get(0).getAttributeName(), secondKeyType));
}

final CreateTableResult table = dynamoDB.createTable(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.yuriytkach.tracker.fundraiser.service.dynamodb.DynamoDbDonationClientDonation.COL_AMOUNT;
import static com.yuriytkach.tracker.fundraiser.service.dynamodb.DynamoDbDonationClientDonation.COL_CURR;
import static com.yuriytkach.tracker.fundraiser.service.dynamodb.DynamoDbDonationClientDonation.COL_FUND_ID;
import static com.yuriytkach.tracker.fundraiser.service.dynamodb.DynamoDbDonationClientDonation.COL_ID;
import static com.yuriytkach.tracker.fundraiser.service.dynamodb.DynamoDbDonationClientDonation.COL_PERSON;
import static com.yuriytkach.tracker.fundraiser.service.dynamodb.DynamoDbDonationClientDonation.COL_TIME;
Expand Down Expand Up @@ -261,13 +262,14 @@ private void addDonationDirectly(
) {
final Map<String, AttributeValue> item = new HashMap<>();
item.put(COL_ID, AttributeValue.builder().s(itemId.toString()).build());
item.put(COL_FUND_ID, AttributeValue.builder().s(DynamoDbTestResource.FUND_1_ID).build());
item.put(COL_CURR, AttributeValue.builder().s(curr).build());
item.put(COL_PERSON, AttributeValue.builder().s(person).build());
item.put(COL_AMOUNT, AttributeValue.builder().n(String.valueOf(amount)).build());
item.put(COL_TIME, AttributeValue.builder().s(dateTime.toString()).build());

final var putRequest = PutItemRequest.builder()
.tableName(DynamoDbTestResource.FUND_1_TABLE)
.tableName(DynamoDbTestResource.DONATIONS_TABLE)
.item(item)
.build();
dynamoDB.putItem(putRequest);
Expand All @@ -282,13 +284,13 @@ private void deleteItemByIdDirectly(final UUID... ids) {
.toList();

dynamoDB.batchWriteItem(BatchWriteItemRequest.builder()
.requestItems(Map.of(DynamoDbTestResource.FUND_1_TABLE, requests))
.requestItems(Map.of(DynamoDbTestResource.DONATIONS_TABLE, requests))
.build());
}

private Fund dummyFund() {
return Fund.builder()
.id(DynamoDbTestResource.FUND_1_TABLE)
.id(DynamoDbTestResource.FUND_1_ID)
.goal(1000)
.currency(FUND_CURR)
.raised(0)
Expand Down
Loading
Loading