Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DynamoDB stream support #21

Merged
merged 17 commits into from
Mar 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions docs/docs/getting_started/introduction.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,13 @@ Dynamode is highly influenced by other ORMs/ODMs, such as [TypeORM](https://www.
## Coming soon

- Migrations and automatic migrations generation.
- DynamoDB streams support
- PartiQL support
- Capture DynamoDB errors and make it easier to work with
- Support binary data type

### Road map

* [ ] Query that supports querying different types of entities at once with TS in mind.
* [ ] Add DynamoDB streams support
* [ ] Support binary types [link](https://www.github.com/aws/aws-sdk-js-v3/blob/06417909a3/packages/util-dynamodb/src/convertToAttr.ts#L166) and [link](https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/modules/_aws_sdk_util_dynamodb.html)
* [ ] Possibility to have more than one suffix/prefix
* [ ] PartiQL support
Expand Down
93 changes: 93 additions & 0 deletions docs/docs/guide/stream.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
---
title: Stream | Dynamode
description: DynamoDB streams processing with Dynamode
sidebar_label: Stream
hide_title: true
---

# Stream

The `Stream` class is designed to process [DynamoDB streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html#Streams.Enabling) and convert them into corresponding Dynamode entity instances.

This simplifies the process of handling DynamoDB streams and allows you to work with Dynamode entities directly.

:::info
DynamoDB streams of type `KEYS_ONLY` are not supported in Dynamode.
:::

## Stream constructor - new Stream(Entity)

Every stream has to be initialized with data from a DynamoDB stream record. The `Stream` class constructor takes a DynamoDB [`Record`](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_Record.html) and initializes a `Stream` instance.

```ts
import Stream from 'dynamode/stream';

const stream = new Stream(dynamoDBRecord);
```

## Stream properties

The `Stream` class has the following properties:
- `streamType`: The type of the stream. It can be one of the following: `newImage`, `oldImage`, or `both`.
- `operation`: The operation that was performed on the entity. It can be one of the following: `insert`, `modify`, or `remove`.
- `newImage`: The new entity image after the operation. Instance of the [entity](/docs/guide/entity/modeling) class.
- `oldImage`: The old entity image before the operation. Instance of the [entity](/docs/guide/entity/modeling) class.
- `entity`: The entity class that the stream corresponds to.

## stream.isEntity(entity)

### Description

A method to check if the stream entity corresponds to the provided entity class. It also narrows down the type of the stream entity to the provided entity class.

### Arguments

`entity: Entity`: The entity class you want to check the stream entity against.

### Returns

Returns `true` if it's the same entity type, otherwise `false`.

### Examples

Let's say you have a stream that you want to check if it corresponds to the [`User`](/docs/guide/entity/modeling#user) entity. This way you can easily check if the stream entity is a `User` entity and work with it accordingly.

```ts
const stream = new Stream(dynamoDBRecord);

// Check that the stream refers to the User entity
if (stream.isEntity(User)) {
// images are narrowed down here (Stream<Entity> -> Stream<User>)
const newUser = stream.newImage;
const oldUser = stream.oldImage;

// Do logic based on custom logic
if(oldUser.username !== newUser.username && newUser.age > 18) {
// Do something
}
}
```

## Support for `aws-lambda`

The `Stream` class is designed to work with AWS Lambda functions that process DynamoDB streams. Here is an example of how you can use the `Stream` class in a Lambda function.

```ts
import type { Context, DynamoDBStreamEvent } from 'aws-lambda';
import Stream from 'dynamode/stream';

async function itemStream(event: DynamoDBStreamEvent, context: Context): void {
const stream = new Stream(event.Records[0]);

// Update list progress when an item is created
if (stream.isEntity(List)) {
if (stream.operation === 'insert' && stream.newImage) {
await ListManager.update(List.getPrimaryKey(stream.newImage.listId), {
increment: {
'progress.checked': 1,
},
});
}
}
}
```
1 change: 1 addition & 0 deletions docs/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ module.exports = {
'guide/query',
'guide/scan',
'guide/transactions',
'guide/stream',
'guide/dynamode',
],
},
Expand Down
16 changes: 16 additions & 0 deletions lib/dynamode/storage/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,22 @@ export default class DynamodeStorage {
return mergeObjects(...entitiesAttributes.reverse());
}

public getEntityClass(entityName: string): typeof Entity {
const entityMetadata = this.entities[entityName];

if (!entityMetadata) {
throw new DynamodeStorageError(`Invalid entity name "${entityName}"`);
}

if (!entityMetadata.entity) {
throw new DynamodeStorageError(
`Entity "${entityName}" not registered, use TableManager.entityManager(${entityName}) first.`,
);
}

return entityMetadata.entity;
}

public getEntityTableName(entityName: string): string {
if (!this.entities[entityName]) {
throw new DynamodeStorageError(`Invalid entity name "${entityName}`);
Expand Down
4 changes: 4 additions & 0 deletions lib/module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import Dynamode from '@lib/dynamode/index';
import Entity from '@lib/entity';
import Query from '@lib/query';
import Scan from '@lib/scan';
import Stream from '@lib/stream';
import TableManager from '@lib/table';
import transactionGet from '@lib/transactionGet';
import transactionWrite from '@lib/transactionWrite';
Expand Down Expand Up @@ -44,4 +45,7 @@ export {
//transactions
transactionGet,
transactionWrite,

//Stream
Stream,
};
72 changes: 72 additions & 0 deletions lib/stream/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import Dynamode from '@lib/dynamode/index';
import Entity from '@lib/entity';
import { convertAttributeValuesToEntity } from '@lib/entity/helpers/converters';
import { AttributeValues, DynamodeStreamError, fromDynamo } from '@lib/utils';

import { DynamoDBRecord } from './types';

export default class Stream<E extends typeof Entity = typeof Entity> {
streamType: 'newImage' | 'oldImage' | 'both';
operation: 'insert' | 'modify' | 'remove';
oldImage?: InstanceType<E>;
newImage?: InstanceType<E>;

// Dynamode entity class
entity: E;

constructor({ dynamodb: record, eventName }: DynamoDBRecord) {
switch (eventName) {
case 'INSERT':
this.operation = 'insert';
break;
case 'MODIFY':
this.operation = 'modify';
break;
case 'REMOVE':
this.operation = 'remove';
break;
default:
throw new DynamodeStreamError('Invalid operation');
}

if (!record) {
throw new DynamodeStreamError('Invalid record');
}

switch (record.StreamViewType) {
case 'KEYS_ONLY':
throw new DynamodeStreamError("Stream of 'KEYS_ONLY' type is not supported");
case 'NEW_IMAGE':
this.streamType = 'newImage';
break;
case 'OLD_IMAGE':
this.streamType = 'oldImage';
break;
case 'NEW_AND_OLD_IMAGES':
this.streamType = 'both';
break;
default:
throw new DynamodeStreamError('Invalid streamType');
}

const item = fromDynamo((record.NewImage as AttributeValues) ?? (record.OldImage as AttributeValues) ?? {});
const dynamodeEntity = item?.dynamodeEntity;

if (!dynamodeEntity || typeof dynamodeEntity !== 'string') {
throw new DynamodeStreamError("Processed item isn't a Dynamode entity");
}

this.entity = Dynamode.storage.getEntityClass(dynamodeEntity) as E;

if (record.OldImage) {
this.oldImage = convertAttributeValuesToEntity(this.entity, record.OldImage as AttributeValues);
}
if (record.NewImage) {
this.newImage = convertAttributeValuesToEntity(this.entity, record.NewImage as AttributeValues);
}
}

isEntity<TargetEntity extends E>(entity: TargetEntity): this is Stream<TargetEntity> {
return this.entity === entity;
}
}
26 changes: 26 additions & 0 deletions lib/stream/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import type { AttributeValue } from '@aws-sdk/client-dynamodb';

// For compatibility with aws lambda: https://github.com/DefinitelyTyped/DefinitelyTyped/blob/master/types/aws-lambda/trigger/dynamodb-stream.d.ts
type AttributeValueAWSLambda = {
B?: string | undefined;
BS?: string[] | undefined;
BOOL?: boolean | undefined;
L?: AttributeValueAWSLambda[] | undefined;
M?: { [id: string]: AttributeValueAWSLambda } | undefined;
N?: string | undefined;
NS?: string[] | undefined;
NULL?: boolean | undefined;
S?: string | undefined;
SS?: string[] | undefined;
};

export type StreamPayload = {
NewImage?: Record<string, AttributeValue | AttributeValueAWSLambda> | undefined;
OldImage?: Record<string, AttributeValue | AttributeValueAWSLambda> | undefined;
StreamViewType?: 'KEYS_ONLY' | 'NEW_IMAGE' | 'OLD_IMAGE' | 'NEW_AND_OLD_IMAGES' | undefined;
};

export type DynamoDBRecord = {
eventName?: 'INSERT' | 'MODIFY' | 'REMOVE' | undefined;
dynamodb?: StreamPayload | undefined;
};
1 change: 1 addition & 0 deletions lib/utils/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ export const InvalidParameter = createError('Invalid Parameter', 'InvalidParamet
export const ValidationError = createError('Validation failed', 'ValidationError');
export const ConflictError = createError('Conflict', 'ConflictError');
export const DynamodeStorageError = createError('Dynamode storage failed', 'DynamodeStorageError');
export const DynamodeStreamError = createError('Dynamode stream failed', 'DynamodeStreamError');
Loading
Loading