Skip to content

Commit

Permalink
feat(postgres): wip first implementation with pg with jsonb
Browse files Browse the repository at this point in the history
  • Loading branch information
gtoselli committed Apr 29, 2024
1 parent e3bcccc commit 2ee797b
Show file tree
Hide file tree
Showing 9 changed files with 587 additions and 0 deletions.
9 changes: 9 additions & 0 deletions packages/ddd-toolkit-postgres/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
services:
rabbitmq:
image: postgres:16
environment:
POSTGRES_USER: guest
POSTGRES_PASSWORD: guest
POSTGRES_DB: guest
ports:
- 5432:5432
71 changes: 71 additions & 0 deletions packages/ddd-toolkit-postgres/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
{
"name": "@fizzbuds/ddd-toolkit-postgres",
"version": "0.1.10",
"description": "",
"author": "Gabriele Toselli, Luca Giovenzana",
"private": false,
"license": "Apache-2.0",
"main": "dist/index",
"types": "dist/index.d.ts",
"scripts": {
"build": "rimraf dist && tsc --project tsconfig.build.json",
"test": "jest",
"test:ci": "jest --coverage --runInBand",
"test:coverage": "jest --coverage",
"check": "cspell lint --quiet src",
"prepublishOnly": "pnpm run build"
},
"peerDependencies": {
"@fizzbuds/ddd-toolkit": "workspace:^",
"pg": "^8.11.5",
"postgres": "^3.4.4"
},
"devDependencies": {
"@fizzbuds/ddd-toolkit": "workspace:^",
"@types/jest": "^29.5.2",
"@types/lodash": "^4.14.195",
"@types/node": "^20.3.1",
"@types/pg": "^8.11.5",
"@types/uuid": "^9.0.2",
"@typescript-eslint/eslint-plugin": "^6.18.1",
"@typescript-eslint/parser": "^6.18.1",
"cspell": "^8.3.2",
"eslint": "^8.56.0",
"eslint-config-prettier": "^9.1.0",
"eslint-plugin-prettier": "^5.1.2",
"husky": "^8.0.0",
"jest": "^29.5.0",
"lint-staged": "^14.0.1",
"pg": "^8.11.5",
"postgres": "^3.4.4",
"prettier": "^3.1.1",
"rimraf": "^5.0.5",
"ts-jest": "^29.1.0",
"ts-node": "^10.9.1",
"tsconfig-paths": "^4.2.0",
"typescript": "^5.1.3"
},
"publishConfig": {
"access": "public"
},
"jest": {
"moduleFileExtensions": [
"js",
"json",
"ts"
],
"rootDir": "src",
"testRegex": ".*spec\\.ts$",
"testPathIgnorePatterns": [
".api-spec.ts$"
],
"transform": {
"^.+\\.(t|j)s$": "ts-jest"
},
"collectCoverageFrom": [
"**/*.(t|j)s"
],
"coverageDirectory": "../coverage",
"testEnvironment": "node"
}
}
1 change: 1 addition & 0 deletions packages/ddd-toolkit-postgres/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './postgres-aggregate-repo';
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import { PostgresAggregateRepo } from './postgres-aggregate-repo';
import { AggregateNotFoundError, ISerializer } from '@fizzbuds/ddd-toolkit';
import { Pool } from 'pg';

class TestAggregate {
constructor(
public id: string,
public data: any,
) {}
}

type TestAggregateModel = {
id: string;
data: any;
};

class TestAggregateSerializer implements ISerializer<TestAggregate, TestAggregateModel> {
aggregateToModel(aggregate: TestAggregate): TestAggregateModel {
return {
id: aggregate.id,
data: aggregate.data,
};
}

modelToAggregate(model: TestAggregateModel): TestAggregate {
return new TestAggregate(model.id, model.data);
}
}

describe('PostgresAggregateRepo Integration', () => {
let aggregateRepo: PostgresAggregateRepo<TestAggregate, TestAggregateModel>;
let pool: Pool;

beforeAll(async () => {
pool = new Pool({
user: 'guest',
host: 'localhost',
database: 'guest',
password: 'guest',
port: 5432,
});

aggregateRepo = new PostgresAggregateRepo<TestAggregate, TestAggregateModel>(
new TestAggregateSerializer(),
pool,
'test_aggregate_table',
undefined,
undefined,
);

await aggregateRepo.init();
});

afterEach(async () => {
await pool.query('DELETE FROM test_aggregate_table');
jest.resetAllMocks();
});

afterAll(async () => {
await pool.end();
});

describe('Save and Get', () => {
describe('Given an existing aggregate', () => {
describe('When saving', () => {
const id1 = 'id1';
beforeEach(async () => {
await aggregateRepo.save({ id: id1, data: 'value' });
});

it('should be saved into aggregate write model', async () => {
expect(await aggregateRepo.getById(id1)).toMatchObject({
id: id1,
data: 'value',
});
});
});
});

describe('Given an un-existing aggregate', () => {
describe('When getById', () => {
it('should return null', async () => {
expect(await aggregateRepo.getById('not-existing-id')).toBeNull();
});
});

describe('When getByIdOrThrow', () => {
it('should throw AggregateNotFoundError', async () => {
await expect(() => aggregateRepo.getByIdOrThrow('not-existing-id')).rejects.toThrowError(
AggregateNotFoundError,
);
});
});
});
});

describe('Optimistic Lock', () => {
describe('Given a saved aggregate', () => {
const id1 = 'id1';
beforeEach(async () => {
await aggregateRepo.save({ id: id1, data: 'value' });
});

describe('When getting from db the aggregate', () => {
it('should return an aggregate with version 1', async () => {
expect(await aggregateRepo.getById(id1)).toMatchObject({ __version: 1 });
});
});

describe('When saving a new instance with the same id', () => {
it('should throw due to unique index on id', async () => {
const newAggregate = { id: id1, data: 'newValue' };
await expect(async () => await aggregateRepo.save(newAggregate)).rejects.toThrowError(
'duplicated id',
);
});
});

describe('When saving a new instance with undefined __version', () => {
it('should throw due to optimistic locking', async () => {
const newAggregate = { id: id1, data: 'newValue', __version: undefined };
await expect(async () => await aggregateRepo.save(newAggregate)).rejects.toThrowError(
'duplicated id', // FIXME more precise error for optimistic lock
);
});
});

describe('When saving and getting multiple times the aggregate', () => {
it('should increase the aggregate version', async () => {
const firstInstance = await aggregateRepo.getById(id1);
if (firstInstance === null) throw new Error('Not found');
await aggregateRepo.save(firstInstance);

const secondInstance = await aggregateRepo.getById(id1);
if (secondInstance === null) throw new Error('Not found');

await aggregateRepo.save(secondInstance);

const thirdInstance = await aggregateRepo.getById(id1);
if (thirdInstance === null) throw new Error('Not found');

expect(thirdInstance).toMatchObject({ __version: 3 });
});
});

describe('When saving an outdated aggregate', () => {
it('should throw an optimistic locking error', async () => {
const firstInstance = await aggregateRepo.getById(id1);
if (firstInstance === null) throw new Error('Not found');

const secondInstance = await aggregateRepo.getById(id1);
if (secondInstance === null) throw new Error('Not found');
await aggregateRepo.save(secondInstance);

await expect(async () => await aggregateRepo.save(firstInstance)).rejects.toThrowError(
'optimistic locking',
); // FIXME more precise error for optimistic lock
});
});
});
});
});
101 changes: 101 additions & 0 deletions packages/ddd-toolkit-postgres/src/postgres-aggregate-repo.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import {
AggregateNotFoundError,
DuplicatedIdError,
IAggregateRepo,
IInit,
ILogger,
IRepoHooks,
ISerializer,
OptimisticLockError,
WithOptionalVersion,
WithVersion,
} from '@fizzbuds/ddd-toolkit';
import { Pool } from 'pg';
import { merge } from 'lodash';

export class PostgresAggregateRepo<A, AM extends { id: string }> implements IAggregateRepo<A>, IInit {
constructor(
protected readonly serializer: ISerializer<A, AM>,
protected readonly pool: Pool,
protected readonly tableName: string,
protected readonly repoHooks?: IRepoHooks<AM>,
protected readonly logger: ILogger = console,
) {}

async init() {
await this.pool.query(
`CREATE TABLE IF NOT EXISTS ${this.tableName} (
id TEXT NOT NULL,
version INTEGER NOT NULL,
model JSONB NOT NULL,
PRIMARY KEY (id)
);`,
);
}

async save(aggregate: WithOptionalVersion<A>) {
const aggregateModel = this.serializer.aggregateToModel(aggregate);
const aggregateVersion = aggregate.__version || 0;
const client = await this.pool.connect();

try {
await client.query('BEGIN');

const { rows } = await client.query(`SELECT id, version FROM ${this.tableName} WHERE id = $1 FOR UPDATE`, [
aggregateModel.id,
]);

if (rows.length === 0) {
await client.query(
`INSERT INTO ${this.tableName}(id, version, model)
VALUES($1, $2, $3)`,
[aggregateModel.id, aggregateVersion + 1, JSON.stringify(aggregateModel)],
);
} else {
const { id, version } = rows[0];
if (version !== aggregateVersion) {
if (aggregateVersion === 0) {
throw new DuplicatedIdError(
`Cannot save aggregate with id: ${aggregateModel.id} due to duplicated id.`,
);
} else {
throw new OptimisticLockError(
`Cannot save aggregate with id: ${aggregateModel.id} due to optimistic locking.`,
);
}
}
await client.query(`UPDATE ${this.tableName} SET version = $1, model = $2 WHERE id = $3`, [
aggregateVersion + 1,
JSON.stringify(aggregateModel),
id,
]);
}

await this.pool.query('COMMIT');
} catch (e) {
await this.pool.query('ROLLBACK');
throw e;
} finally {
client.release();
}
}

public async getById(id: string): Promise<WithVersion<A> | null> {
const queryText: string = `SELECT * FROM ${this.tableName} WHERE id = $1`;
const { rows } = await this.pool.query(queryText, [id]);
if (rows.length === 0) return null;
const aggregateModel = rows[0].model;
this.logger.debug(
`Retrieving aggregate ${id}. Found: ${JSON.stringify(rows[0].model)} with version ${rows[0].version}`,
);

const aggregate = this.serializer.modelToAggregate(aggregateModel as AM);
return merge<A, { __version: number }>(aggregate, { __version: rows[0].version });
}

public async getByIdOrThrow(id: string): Promise<WithVersion<A>> {
const aggregate = await this.getById(id);
if (!aggregate) throw new AggregateNotFoundError(`Aggregate ${id} not found.`);
return aggregate;
}
}
7 changes: 7 additions & 0 deletions packages/ddd-toolkit-postgres/tsconfig.build.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"extends": "../../tsconfig.build.json",
"compilerOptions": {
"baseUrl": "./src",
"outDir": "./dist"
}
}
7 changes: 7 additions & 0 deletions packages/ddd-toolkit-postgres/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"extends": "../../tsconfig.json",
"compilerOptions": {
"baseUrl": "./src",
"outDir": "./dist"
}
}
2 changes: 2 additions & 0 deletions packages/ddd-toolkit/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ export * from './query-bus';
export * from './command-bus';
export * from './event-bus';
export * from './outbox';
export * from './init.interface';
export * from './terminate.interface';
Loading

0 comments on commit 2ee797b

Please sign in to comment.