Skip to content

Commit

Permalink
Present and persist updatedAt and createdAt in meta (#16)
Browse files Browse the repository at this point in the history
* When fetching a version or entity, populate entity.meta with
  updatedAt and createdAt based on entity_version created and entity
  entity_created as well as correlationId from entity_version
  correlation_id overriding persistd data in order to not display
  faulty dates for old objects persited with the user supplied
  createdAt and/or updatedAt.
* When upserting an entity, ignore user specified updatedAt and
  createdAt as they will not be used and set the actual updatedAt
  and createdAt on the supplied entity object before persisting.
  I.e. updatedAt = now and createdAt = now or when the entity was
  created if it already exists.
* Change all timestamp fields to use timestamp with tz
* Changed port for pg instance to 5435 to now clash with other running
  instances when testing.
  • Loading branch information
markusn authored Dec 14, 2018
1 parent 8ee336f commit e10c79a
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 28 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@ const entity = {
db.upsert(entity, (dbErr, entity) => {
if (dbErr) return dbErr;
// entity.id will contain id, will be created with uuid.v4() if not set
// entity.meta.createdAt and entity.meta.updatedAt will be set/updated with
// the correct update and creation times. User supplied values for
// createdAt and updatedAt are ignored and overwritten.
});
```

### Load a document
```js
const db = require("pg-doc-store").crud;
Expand Down
2 changes: 1 addition & 1 deletion config/test.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"user": "et",
"password": "ET",
"host": "localhost",
"port": "5432",
"port": "5435",
"database": "entity_test"
}
}
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ services:
image: postgres:9
restart: always
ports:
- "5432:5432"
- "5435:5432"
environment:
- POSTGRES_USER=et
- POSTGRES_PASSWORD=ET
Expand Down
64 changes: 48 additions & 16 deletions lib/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ function load(id, force, cb) {
}

const q = [
"SELECT doc",
"SELECT doc, created, entity_created, correlation_id",
"FROM entity e, entity_version ev",
"WHERE e.latest_version_id = ev.version_id AND e.entity_id = $1"
];
Expand All @@ -29,14 +29,14 @@ function load(id, force, cb) {
pgClient.query(q.join(" "), [id], (err, res) => {
if (err) return cb(err);
const entity = (res.rows && res.rows.length > 0) ? res.rows[0] : null;
const doc = (entity !== null) ? entity.doc : null;
const doc = getDocWithMeta(entity);
return cb(null, doc);
});
}

function queryByRelationship(options, cb) {
const q = [
"SELECT doc",
"SELECT doc, entity_created, created, correlation_id",
"FROM entity e, entity_version ev",
"WHERE e.latest_version_id = ev.version_id",
"AND e.entity_type = $1",
Expand All @@ -57,14 +57,27 @@ function queryByRelationship(options, cb) {
if (err) return cb(err);
let docs = [];
if (res.rows && res.rows.length > 0) {
docs = res.rows.map((entity) => entity.doc);
docs = res.rows.map(getDocWithMeta);
} else if (options.errorOnNotFound === true) {
return cb(new Error(`DOC_NOT_FOUND: No document found using options: ${JSON.stringify(options)}`));
}
return cb(null, docs);
});
}

function getDocWithMeta(entity) {
const doc = (entity !== null) ? entity.doc : null;
if (doc) {
if (!doc.meta) doc.meta = {};
if (entity.correlation_id && !doc.meta.correlationId) doc.meta.correlationId = entity.correlation_id;
// override persisted value since previous versions used the faulty user supplied date
if (entity.entity_created) doc.meta.createdAt = entity.entity_created.toISOString();
// override persisted value since previous versions used the faulty user supplied date
if (entity.created) doc.meta.updatedAt = entity.created.toISOString();
}
return doc;
}

function queryBySingleRelationship(options, cb) {
queryByRelationship(options, (err, docs) => {
if (err) return cb(err);
Expand All @@ -77,7 +90,7 @@ function queryBySingleRelationship(options, cb) {

function loadByExternalId(options, cb) {
const q = [
"SELECT doc",
"SELECT doc, created, entity_created, correlation_id",
"FROM entity e, entity_version ev",
"WHERE e.latest_version_id = ev.version_id",
"AND e.entity_type = $1",
Expand All @@ -97,7 +110,7 @@ function loadByExternalId(options, cb) {
if (err) return cb(err);
let doc = null;
if (res.rows && res.rows.length === 1) {
doc = res.rows[0].doc;
doc = getDocWithMeta(res.rows[0]);
} else if (res.rows && res.rows.length > 1) {
return cb(new Error(`Found more than one document using options: ${JSON.stringify(options)}`));
} else if (options.errorOnNotFound === true) {
Expand Down Expand Up @@ -127,7 +140,7 @@ function remove(id, correlationId, cb) {
if (upsertErr) return cb(upsertErr);
const q = [
"UPDATE entity",
"SET entity_removed = now()",
"SET entity_removed = now() at time zone 'utc'",
"WHERE entity_id = $1"];
return pgClient.query(q.join(" "), [id], (rmErr, res) => {
if (rmErr) return cb(rmErr);
Expand Down Expand Up @@ -176,7 +189,7 @@ function loadVersion(versionId, force, cb) {
}

const q = [
"SELECT ev.entity_id, created, doc, version_id, correlation_id",
"SELECT ev.entity_id, created, entity_created, doc, version_id, correlation_id",
"FROM entity e, entity_version ev",
"WHERE version_id = $1"
];
Expand All @@ -195,7 +208,7 @@ function loadVersion(versionId, force, cb) {
created: v.created,
versionId: v.version_id,
correlationId: v.correlation_id,
entity: v.doc
entity: getDocWithMeta(v)
};

return cb(null, output);
Expand Down Expand Up @@ -243,17 +256,24 @@ function upsert(entity, done) {

function upsertWithForce(entity, force, done) {
entity.id = entity.id || uuid.v4();
if (!entity.meta) entity.meta = {};

let newVersionId;
async.waterfall([
(cb) => insertVersion(entity, force, cb),
(cb) => getEntityCreated(entity, cb),
(entityCreatedRes, cb) => {
const createdTimestamp = (entityCreatedRes.rows && entityCreatedRes.rows.length === 1) ? entityCreatedRes.rows[0].entity_created : undefined;
return insertVersion(entity, createdTimestamp, force, cb);
},
(versionId, created, cb) => {
if (versionId) {
newVersionId = versionId;
return doUpsert(entity, versionId, created, cb);
}
return cb(null, {
wasConflict: true,
oid: 0
oid: 0,
rows: []
});
}
], (err, upsertRes) => {
Expand All @@ -271,7 +291,7 @@ function doUpsert(entity, versionId, created, done) {
pgClient.query([
"INSERT into entity as e",
"(entity_type, entity_id, entity_created, latest_version_id)",
"VALUES ($1::text, $2::text, $3::timestamp, $4::text)",
"VALUES ($1::text, $2::text, $3::timestamptz, $4::text)",
"ON CONFLICT(entity_id) DO UPDATE",
"SET latest_version_id=$4",
"WHERE e.entity_id=$2"
Expand All @@ -280,22 +300,34 @@ function doUpsert(entity, versionId, created, done) {
);
}

function insertVersion(entity, force, done) {
function getEntityCreated(entity, done) {
pgClient.query([
"SELECT entity_created FROM entity WHERE entity_type = $1::text AND entity_id = $2::text",
].join(" "), [entity.type, entity.id],
done
);
}


function insertVersion(entity, createdTimestamp, force, done) {
const versionId = uuid.v4();
const correlationId = (entity.meta && entity.meta.correlationId) || null;
const now = new Date().toISOString();
entity.meta.createdAt = createdTimestamp || now;
entity.meta.updatedAt = now;

const q = [
"INSERT into entity_version",
"(version_id, entity_id, correlation_id, doc)",
"SELECT $1::text, $2::text, $3::text, $4::jsonb"
"(version_id, entity_id, correlation_id, doc, created)",
"SELECT $1::text, $2::text, $3::text, $4::jsonb, $5::timestamptz"
];

if (!force) {
q.push("WHERE NOT EXISTS (SELECT FROM entity WHERE entity_id = $2 AND entity_removed IS NOT NULL)");
}
q.push("RETURNING created");

pgClient.query(q.join(" "), [versionId, entity.id, correlationId, entity],
pgClient.query(q.join(" "), [versionId, entity.id, correlationId, entity, entity.meta.updatedAt],
(err, res) => {
const responseHasTimestamp = (!err && res && res.rows && res.rows.length === 1);
const timestamp = responseHasTimestamp ? res.rows[0].created : undefined;
Expand Down
2 changes: 2 additions & 0 deletions migrations/005-timestamp-with-tz.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE entity ALTER entity_created TYPE timestamptz USING entity_created AT TIME ZONE 'UTC', ALTER entity_removed TYPE timestamptz USING entity_removed AT TIME ZONE 'UTC', ALTER COLUMN entity_created SET DEFAULT now() at time zone 'utc';
ALTER TABLE entity_version ALTER created TYPE timestamptz USING created AT TIME ZONE 'UTC', ALTER COLUMN created SET DEFAULT now() at time zone 'utc';
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"Markus Ekholm",
"Ivan Malmberg"
],
"version": "2.3.3",
"version": "3.0.0",
"scripts": {
"test": "mocha",
"posttest": "eslint --cache ."
Expand Down
73 changes: 68 additions & 5 deletions test/feature/entity-feature.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,45 @@ Feature("Entity", () => {
}
};

Scenario("Save and load an entity", () => {
Scenario("Save an entity", () => {
before((done) => {
helper.clearAndInit(done);
});

const originalEntity = Object.assign(
JSON.parse(JSON.stringify(entity)), {
meta: {
createdAt: new Date("1917-01-01"),
updatedAt: new Date("1939-01-01")}
});

const beforeCreation = new Date();
let savedEntity;

Given("a new entity is saved with meta createdAt set", (done) => {
query.upsert(originalEntity, done);
});

When("we load it", (done) => {
query.load(entity.id, (err, dbEntity) => {
if (err) return done(err);
savedEntity = dbEntity;
return done();
});
});

Then("the originalEntity should have had its' createdAt and updatedAt correctly overwritten", () => {
(new Date(originalEntity.meta.createdAt)).should.be.above(beforeCreation);
originalEntity.meta.updatedAt.should.eql(originalEntity.meta.createdAt);
});

And("the loaded and saved entity should be identical", () => {
savedEntity.should.deep.eql(originalEntity);
});

});

Scenario("Save and load an entity", () => {
before((done) => {
helper.clearAndInit(done);
});
Expand All @@ -62,6 +99,14 @@ Feature("Entity", () => {
savedEntity.type.should.equal(entity.type);
savedEntity.attributes.name.should.equal(entity.attributes.name);
});

And("a createdAt and updatedAt which are equal and before now", () => {
const createdAt = new Date(savedEntity.meta.createdAt);
const updatedAt = new Date(savedEntity.meta.updatedAt);
const now = new Date();
createdAt.should.be.at.most(now);
createdAt.should.eql(updatedAt);
});
});

Scenario("Save, remove and try to load", () => {
Expand Down Expand Up @@ -245,7 +290,10 @@ Feature("Entity", () => {
});

Then("it should be found and match the one upserted", () => {
savedEntity.should.deep.equal(entity);
for (const key of Object.keys(entity)) {
if (key === "meta") continue;
savedEntity[key].should.deep.equal(entity[key]);
}
});
});

Expand Down Expand Up @@ -275,7 +323,10 @@ Feature("Entity", () => {
});

Then("it should be found and match the one upserted", () => {
savedEntity.should.deep.equal(entity);
for (const key of Object.keys(entity)) {
if (key === "meta") continue;
savedEntity[key].should.deep.equal(entity[key]);
}
});
});

Expand Down Expand Up @@ -309,7 +360,16 @@ Feature("Entity", () => {

Then("it should be found and match the one upserted", () => {
savedEntities.length.should.equal(2);
savedEntities.should.have.deep.members([entity, otherEntity]);
const savedEnt = savedEntities.find((x) => x.id === entity.id);
const savedOtherEnt = savedEntities.find((x) => x.id === otherEntity.id);
for (const key of Object.keys(entity)) {
if (key === "meta") continue;
savedEnt[key].should.deep.equal(entity[key]);
}
for (const key of Object.keys(entity)) {
if (key === "meta") continue;
savedOtherEnt[key].should.deep.equal(otherEntity[key]);
}
});
});

Expand Down Expand Up @@ -338,7 +398,10 @@ Feature("Entity", () => {
});

Then("it should be found and match the one upserted", () => {
savedEntity.should.deep.eql(entity);
for (const key of Object.keys(entity)) {
if (key === "meta") continue;
savedEntity[key].should.deep.equal(entity[key]);
}
});
});

Expand Down
28 changes: 24 additions & 4 deletions test/feature/remove-feature.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,26 @@ Feature("Clean version history for given entity", () => {
And("the entity should have the latest attributes", (done) => {
crud.load(entity.id, (err, anonymousEntity) => {
if (err) return done(err);
anonymousEntity.should.eql(expectedEntity);
for (const key of Object.keys(expectedEntity)) {
if (key === "meta") {
anonymousEntity[key].correlationId.should.eql(entity[key].correlationId);
} else {
anonymousEntity[key].should.deep.equal(entity[key]);
}
}
return done();
});
});

And("the version should have the latest attributes", (done) => {
crud.loadVersion(entityVersions[0].versionId, (err, res) => {
if (err) return done(err);
res.entity.should.eql(expectedEntity);
for (const key of Object.keys(expectedEntity)) {
if (key === "meta") continue;
res.entity[key].should.deep.equal(entity[key]);
}
res.correlationId.should.equal(correlationIds[2]);
res.entity.meta.correlationId.should.eql(correlationIds[2]);
return done();
});
});
Expand Down Expand Up @@ -127,16 +137,26 @@ Feature("Clean version history for given entity", () => {
And("the entity should have the latest attributes and only possible to fetch forcefully", (done) => {
crud.load(entity.id, true, (err, anonymousEntity) => {
if (err) return done(err);
anonymousEntity.should.eql(expectedEntity);
for (const key of Object.keys(expectedEntity)) {
if (key === "meta") {
anonymousEntity[key].correlationId.should.eql(entity[key].correlationId);
} else {
anonymousEntity[key].should.deep.equal(entity[key]);
}
}
return done();
});
});

And("the version should have the latest attributes and only possible to fetch forcefully", (done) => {
crud.loadVersion(entityVersions[0].versionId, true, (err, res) => {
if (err) return done(err);
res.entity.should.eql(expectedEntity);
for (const key of Object.keys(expectedEntity)) {
if (key === "meta") continue;
res.entity[key].should.deep.equal(entity[key]);
}
res.correlationId.should.equal(correlationIds[2]);
res.entity.meta.correlationId.should.eql(correlationIds[2]);
return done();
});
});
Expand Down
Loading

0 comments on commit e10c79a

Please sign in to comment.