From 993fcdbbb19d64db4a2652b5cb5c49075a1bbbef Mon Sep 17 00:00:00 2001 From: Angga Muhammad Date: Sun, 28 Oct 2018 22:20:59 +0700 Subject: [PATCH 1/4] ADD google cloud datastore implementation ADD google cloud datastore index definition [mandatory for test] --- index.yaml | 160 ++++++++ lib/databases/datastore.js | 758 +++++++++++++++++++++++++++++++++++++ package.json | 3 +- test/eventstoreTest.js | 2 +- test/storeTest.js | 10 +- 5 files changed, 926 insertions(+), 7 deletions(-) create mode 100644 index.yaml create mode 100644 lib/databases/datastore.js diff --git a/index.yaml b/index.yaml new file mode 100644 index 00000000..eff2829b --- /dev/null +++ b/index.yaml @@ -0,0 +1,160 @@ +// This is needed for testing google cloud datastore, since a lot of queries are using composite index +// please set up your gcloud-cli, and run "gcloud datastore create-indexes index.yaml" + +indexes: +- kind: snapshots + ancestor: yes + properties: + - name: aggregateId + - name: commitStamp + direction: desc +- kind: snapshots + properties: + - name: aggregate + - name: commitStamp + direction: desc +- kind: snapshots + properties: + - name: context + - name: commitStamp + direction: desc +- kind: snapshots + ancestor: yes + properties: + - name: aggregateId + - name: revision + - name: commitStamp + direction: desc +- kind: snapshots + ancestor: yes + properties: + - name: aggregate + - name: aggregateId + - name: context + - name: commitStamp + direction: desc +- kind: snapshots + ancestor: yes + properties: + - name: aggregate + - name: aggregateId + - name: revision + - name: commitStamp + direction: desc +- kind: snapshots + ancestor: yes + properties: + - name: aggregateId + - name: context + - name: revision + - name: commitStamp + direction: desc +- kind: snapshots + ancestor: yes + properties: + - name: aggregate + - name: aggregateId + - name: context + - name: revision + - name: commitStamp + direction: desc + +- kind: events + ancestor: yes + properties: + - name: commitStamp +- kind: events + ancestor: yes + properties: + - name: dispatched + - name: commitStamp + - name: id +- kind: events + ancestor: yes + properties: + - name: commitStamp + - name: streamRevision +- kind: events + ancestor: yes + properties: + - name: aggregateId + - name: streamRevision + - name: commitStamp +- kind: events + ancestor: yes + properties: + - name: aggregateId + - name: commitStamp + - name: streamRevision +- kind: events + ancestor: yes + properties: + - name: aggregate + - name: commitStamp + - name: streamRevision +- kind: events + ancestor: yes + properties: + - name: context + - name: commitStamp + - name: streamRevision +- kind: events + ancestor: yes + properties: + - name: dispatched + - name: aggregateId + - name: commitStamp + - name: id +- kind: events + ancestor: yes + properties: + - name: dispatched + - name: aggregate + - name: commitStamp + - name: id +- kind: events + ancestor: yes + properties: + - name: dispatched + - name: context + - name: commitStamp + - name: id +- kind: events + ancestor: yes + properties: + - name: aggregateId + - name: context + - name: streamRevision + - name: commitStamp +- kind: events + ancestor: yes + properties: + - name: aggregate + - name: aggregateId + - name: streamRevision + - name: commitStamp +- kind: events + ancestor: yes + properties: + - name: aggregate + - name: aggregateId + - name: commitStamp + - name: streamRevision +- kind: events + ancestor: yes + properties: + - name: aggregateId + - name: commitStamp + direction: desc + - name: streamRevision + direction: desc + - name: commitSequence + direction: desc +- kind: events + ancestor: yes + properties: + - name: aggregate + - name: aggregateId + - name: context + - name: streamRevision + - name: commitStamp diff --git a/lib/databases/datastore.js b/lib/databases/datastore.js new file mode 100644 index 00000000..8481f499 --- /dev/null +++ b/lib/databases/datastore.js @@ -0,0 +1,758 @@ +var Store = require('../base'), + util = require('util'), + _ = require('lodash'), + async = require('async'), + dbg = require('debug'), + DS = require('@google-cloud/datastore'); + +var debug = dbg('eventstore:store:datastore'), + error = dbg("eventstore:store:datastore:error"); + +/* +######################################### +####### Special note on testing ######### +######################################### + +Since query in cloud datastore by default is eventual consistency, to pass the unit test please activate an env variable for test only + +$ export DATASTORE_TEST=true + +What it will do, is forced all query and entity to have common ancestor which will make every query strong consistence, +with the tradeoff of limit of write per sec = 1. +*/ +function Datastore(options) { + options = options || {}; + + var dsConf = { + projectId: "" + }; + + this.options = _.defaults(options, dsConf); + + var defaults = { + eventsTableName: 'events', + snapshotsTableName: 'snapshots' + }; + + this.options = _.defaults(this.options, defaults); +} + +util.inherits(Datastore, Store); + +_.extend(Datastore.prototype, { + + AGGREGATE_KIND: "Aggregate", + + connect: function (callback) { + var self = this; + self.client = new DS(self.options); + self.isConnected = true; + + self.emit('connect'); + if (callback) callback(null, self); + }, + + disconnect: function (callback) { + // do nothing on cloud datastore client + this.emit('disconnect'); + if (callback) callback(null); + }, + + clear: function (done) { + var self = this; + + var clearEvents = function (callback) { + clearKind(self.options.eventsTableName, self.options, self.client, function (err) { + if (err) { + error("clear events kind error: " + err); + return callback(err); + } + + callback(null, "events"); + }); + }; + + var clearSnapshots = function (callback) { + clearKind(self.options.snapshotsTableName, self.options, self.client, function (err) { + if (err) { + error("clear snapshots kind error: " + err); + return callback(err); + } + + callback(null, "snapshots"); + }); + }; + + async.parallel([ + clearEvents, + clearSnapshots + ], function (err, data) { + if (err) { + error("removeKinds error: " + err); + if (done) done(err); + return; + } + if (done) done(null, self); + }); + }, + + addEvents: function (events, callback) { + var self = this; + + var noAggId = _.every(events, function (event) { + return !event.aggregateId + }); + + if (noAggId) { + var errMsg = 'aggregateId not defined!'; + error(errMsg); + if (callback) callback(new Error(errMsg)); + return; + } + + if (!events || events.length === 0) { + return callback(null); + } + + var exclusions = [ + 'header', + 'payload' + ]; + + var entities = _.map(events, function(event) { + var path = [self.options.eventsTableName, event.id]; + if (process.env.DATASTORE_TEST) { + path.unshift(self.AGGREGATE_KIND, "default"); + } + + return { + key: self.client.key(path), + excludeFromIndexes: exclusions, + data: new StoredEvent(event) + }; + }); + + debug("Saving event to events table: " + JSON.stringify(entities, null, 2)); + self.client.save(entities, function(err, apiResponse) { + if (err) { + error("addEvents error: " + JSON.stringify(err)); + return callback(err); + } + + callback(null, apiResponse); + }); + }, + + getEvents: function (query, skip, limit, callback) { + var self = this; + var client = self.client; + var nextCursor = null; + var results = []; + + var pageSize = skip + limit; + + async.doWhilst(function (end) { + var q = client.createQuery(self.options.eventsTableName); + + if (process.env.DATASTORE_TEST) { + q.hasAncestor(client.key([self.AGGREGATE_KIND, "default"])); + } + + if (query && query.aggregateId) + q.filter("aggregateId", "=", query.aggregateId); + + if (query && query.aggregate) + q.filter("aggregate", "=", query.aggregate); + + if (query && query.context) + q.filter("context", "=", query.context); + + if (limit !== -1) + q.limit(pageSize); + + if (nextCursor) + q.start(nextCursor); + + q.order("commitStamp").order("streamRevision"); + + client.runQuery(q, function(err, entities, info) { + if (err) { + error("getEvents scan error: " + err); + return end(err); + } + + if (info.moreResults !== DS.NO_MORE_RESULTS) { + nextCursor = info.endCursor; + } + results = results.concat(entities); + end(null); + }); + }, function () { + return (results.length < pageSize || pageSize == -1) ? nextCursor !== null : false; + }, function (err) { + if (err) { + error("getEvents error: " + err); + return callback(err); + } + + results = results.map(MapStoredEventToEvent); + + if (limit === -1) { + results = results.slice(skip); + } else { + results = results.slice(skip, skip + limit); + } + + callback(null, results); + }); + }, + + getEventsSince: function (date, skip, limit, callback) { + var self = this; + var client = self.client; + var nextCursor = null; + var results = []; + + var pageSize = skip + limit; + + async.doWhilst(function (end) { + var q = client + .createQuery(self.options.eventsTableName) + .filter("commitStamp", ">=", date); + + if (process.env.DATASTORE_TEST) { + q.hasAncestor(client.key([self.AGGREGATE_KIND, "default"])); + } + + if (limit !== -1) + q.limit(pageSize); + + if (nextCursor) + q.start(nextCursor); + + q.order("commitStamp"); + + client.runQuery(q, function(err, entities, info) { + if (err) { + error("getEventsSince scan error: " + err); + return end(err); + } + + if (info.moreResults !== DS.NO_MORE_RESULTS) { + nextCursor = info.endCursor; + } + results = results.concat(entities); + end(null); + }); + }, function () { + return (results.length < pageSize || pageSize == -1) ? nextCursor !== null : false; + }, function (err) { + if (err) { + error("getEventsSince error: " + err); + return callback(err); + } + + results = results.map(MapStoredEventToEvent); + + if (limit === -1) { + results = results.slice(skip); + } else { + results = results.slice(skip, skip + limit); + } + + callback(null, results); + }); + }, + + getEventsByRevision: function (query, revMin, revMax, callback) { + var self = this; + var client = self.client; + var nextCursor = null; + var results = []; + + if (!query.aggregateId) { + var errMsg = 'aggregateId not defined!'; + error(errMsg); + if (callback) callback(new Error(errMsg)); + return; + } + + async.doWhilst(function (end) { + var q = client + .createQuery(self.options.eventsTableName) + .filter("aggregateId", "=", query.aggregateId) + .filter("streamRevision", ">=", revMin); + + if (process.env.DATASTORE_TEST) { + q.hasAncestor(client.key([self.AGGREGATE_KIND, "default"])); + } + + if (revMax !== -1) + q.filter("streamRevision", "<=", revMax); + + if (query && query.aggregate) + q.filter("aggregate", "=", query.aggregate); + + if (query && query.context) + q.filter("context", "=", query.context); + + if (nextCursor) + q.start(nextCursor); + + q.order("streamRevision").order("commitStamp"); + + client.runQuery(q, function(err, entities, info) { + if (err) { + error("getEventsByRevision scan error: " + err); + return end(err); + } + + if (info.moreResults !== DS.NO_MORE_RESULTS) { + nextCursor = info.endCursor; + } + results = results.concat(entities); + end(null); + }); + }, function () { + return nextCursor !== null; + }, function (err) { + if (err) { + error("getEventsByRevision error: " + err); + return callback(err); + } + + results = results.map(MapStoredEventToEvent); + + results = _.sortBy(results, function (e) { + return e.commitStamp; + }); + + callback(null, results); + }); + }, + + setEventToDispatched: function (id, callback) { + var self = this; + var client = self.client; + + var path = [self.options.eventsTableName, id]; + if (process.env.DATASTORE_TEST) { + path.unshift(self.AGGREGATE_KIND, "default"); + } + + var evtKey = client.key(path); + var tx = client.transaction(); + + tx.run() + .then(function() { return tx.get(evtKey); }) + .then(function(results) { + var evt = results[0]; + if (evt) { + evt.dispatched = true; + + tx.save({ + key: evtKey, + data: evt + }); + return tx.commit(callback); + } else { + // when not found + return tx.rollback(callback); + } + }) + .catch(function() { tx.rollback(callback); }); + }, + + getUndispatchedEvents: function (query, callback) { + var self = this; + var client = self.client; + var nextCursor = null; + var results = []; + + async.doWhilst(function (end) { + var q = client + .createQuery(self.options.eventsTableName) + .filter("dispatched", false); + + if (process.env.DATASTORE_TEST) { + q.hasAncestor(client.key([self.AGGREGATE_KIND, "default"])); + } + + if (query && query.aggregateId) + q.filter("aggregateId", "=", query.aggregateId); + + if (query && query.aggregate) + q.filter("aggregate", "=", query.aggregate); + + if (query && query.context) + q.filter("context", "=", query.context); + + if (nextCursor) + q.start(nextCursor); + + q.order("commitStamp").order("id"); + + client.runQuery(q, function(err, entities, info) { + if (err) { + error("getUndispatchedEvents scan error: " + err); + return end(err); + } + + if (info.moreResults !== DS.NO_MORE_RESULTS) { + nextCursor = info.endCursor; + } + results = results.concat(entities); + end(null); + }); + }, function () { + return nextCursor !== null; + }, function (err) { + if (err) { + error("getUndispatchedEvents error: " + err); + return callback(err); + } + + results = results.map(MapStoredEventToEvent); + + callback(null, results); + }); + }, + + getLastEvent: function (query, callback) { + var self = this; + var client = self.client; + + if (!query.aggregateId) { + var errMsg = 'aggregateId not defined!'; + error(errMsg); + if (callback) callback(new Error(errMsg)); + return; + } + + var q = client + .createQuery(self.options.eventsTableName) + .filter("aggregateId", "=", query.aggregateId) + .order("commitStamp", { descending: true }) + .order("streamRevision", { descending: true }) + .order("commitSequence", { descending: true }) + .limit(1); + + if (process.env.DATASTORE_TEST) { + q.hasAncestor(client.key([self.AGGREGATE_KIND, "default"])); + } + + client.runQuery(q, function(err, entities, info) { + if (err) { + error("getLastEvent query error: " + err); + return callback(err); + } + + if ( entities.length < 1 ) { + var errMsg = "Last event #aggr[" + query.aggregateId + "] not found !"; + error("getLastEvent query error: " + errMsg); + return callback(new Error(errMsg)); + } + + callback(null, entities[0]); + }); + }, + + addSnapshot: function (snap, callback) { + var self = this; + var client = self.client; + + if (!snap.aggregateId) { + var errMsg = 'aggregateId not defined!'; + error(errMsg); + if (callback) callback(new Error(errMsg)); + return; + } + + var path = [self.options.snapshotsTableName, snap.id]; + if (process.env.DATASTORE_TEST) { + path.unshift(self.AGGREGATE_KIND, "default"); + } + + var ent = { + key: client.key(path), + data: new StoredSnapshot(snap) + }; + + client.save(ent, function(err, apiResponse) { + if (err) { + error("addSnapshot error: " + err); + return callback(err); + } + callback(null, apiResponse); + }); + }, + + getSnapshot: function (query, revMax, callback) { + var self = this; + var client = self.client; + + if (!query.aggregateId) { + var errMsg = 'aggregateId not defined!'; + error(errMsg); + if (callback) callback(new Error(errMsg)); + return; + } + + var q = client + .createQuery(self.options.snapshotsTableName) + .filter("aggregateId", "=", query.aggregateId) + .limit(1); + + if (process.env.DATASTORE_TEST) { + q.hasAncestor(client.key([self.AGGREGATE_KIND, "default"])); + } + + if (query && query.aggregate) + q.filter("aggregate", "=", query.aggregate); + + if (query && query.context) + q.filter("context", "=", query.context); + + if (revMax != -1) { + q.filter("revision", "<=", revMax); + q.order("revision"); + } + q.order("commitStamp", { descending: true }) + + client.runQuery(q, function(err, entities, info) { + if (err) { + error("getSnapshot error: " + err); + return callback(err); + } + + entities = entities.map(MapStoredSnapshotToSnapshot); + + if ( entities.length < 1 ) { + return callback(null, null); + } + + callback(null, entities[0]); + }); + }, + + cleanSnapshots: function (query, callback) { + var self = this; + var client = self.client; + + self.scanSnapshots(query, function(error, keys) { + if (error) { + debug(error); + if (callback) callback(error); + return; + } + + var keysToDelete = keys + .slice(0, -1 * self.options.maxSnapshotsCount); + + if (keysToDelete.length === 0) { + return callback(null, 0); + } + + client.delete(keysToDelete, function(err, apiResponse) { + if (err) { + error("Clear (batchWrite) error): " + JSON.stringify(batch, null, 2)); + return callback(err); + } + + callback(null, keysToDelete.length); + }); + }); + }, + + scanSnapshots: function (query, callback) { + var self = this; + var client = self.client; + var nextCursor = null; + var results = []; + + if (!query.aggregateId) { + var errMsg = 'aggregateId not defined!'; + debug(errMsg); + if (callback) callback(new Error(errMsg)); + return; + } + + async.doWhilst(function (end) { + var q = client + .createQuery(self.options.snapshotsTableName) + .select('__key__') + .filter("aggregateId", "=", query.aggregateId); + + if (process.env.DATASTORE_TEST) { + q.hasAncestor(client.key([self.AGGREGATE_KIND, "default"])); + } + + if (query && query.aggregate) + q.filter("aggregate", "=", query.aggregate); + + if (query && query.context) + q.filter("context", "=", query.context); + + q.order("commitStamp", { descending: true }); + + if (nextCursor) + q.start(nextCursor); + + client.runQuery(q, function(err, entities, info) { + if (err) { + error("scanSnapshot query error: " + err); + return end(err); + } + + if (info.moreResults !== DS.NO_MORE_RESULTS) { + nextCursor = info.endCursor; + } + results = results.concat(entities); + end(null); + }); + }, function () { + return nextCursor !== null; + }, function (err) { + if (err) { + error("scanSnapshot error: " + err); + return callback(err); + } + + results = results.map(function(entity) { + return entity[client.KEY]; + }); + + callback(null, results); + }); + } + +}); + +var clearKind = function (kind, opts, client, cleared) { + debug("Clearing " + kind + " events table") + + var nextPageCursor = null; + + var read = function (callback) { + var q = client + .createQuery(kind) + .select('__key__') + .limit(100); + + if (nextPageCursor) { + q.start(nextPageCursor); + } + + client.runQuery(q, function(err, entities, info) { + if (err) { + error("clearKind " + kind + " read error: " + err); + return callback(err); + } + + var keys = entities.map(function(entity) { + return entity[DS.KEY]; + }); + + if (info.moreResults !== DS.NO_MORE_RESULTS) { + nextPageCursor = info.endCursor; + } + + callback(null, keys); + }); + }; + + var del = function (batch, callback) { + if (batch && batch.length) { + debug("Clear: calling batchWrites: " + JSON.stringify(batch, null, 2)); + client.delete(batch, function(err, apiResponse) { + if (err) { + error("Clear (batchWrite) error): " + JSON.stringify(batch, null, 2)); + return callback(err); + } + + callback(null, apiResponse); + }); + } else { + callback(null); + } + }; + + async.doWhilst(function (next) { + async.seq(read, del)(function (err, result) { + if (err) next(err); + else next(null, result); + }); + }, function() { + return nextPageCursor !== null; + }, function (err, r) { + if (err) { + error("Error while clearing " + kind + " kind: " + JSON.stringify(err, null, 2)); + return cleared(err); + } + debug(kind + " kind successfully cleared."); + return cleared(); + }); +}; + +var StoredEvent = function (event) { + debug("Converting event to StoredEvent: " + JSON.stringify(event, null, 2)); + this.aggregateId = event.aggregateId; + this.rowKey = (event.context || "") + ":" + (event.aggregate || "") + ":" + _.padStart(event.streamRevision, 16, '0'); + this.id = event.id; + this.context = event.context || null; + this.aggregate = event.aggregate || null; + this.streamRevision = event.streamRevision; + this.commitId = event.commitId; + this.commitSequence = event.commitSequence; + this.commitStamp = new Date(event.commitStamp); + this.header = event.header || null; + this.dispatched = event.dispatched || false; + this.payload = event.payload; + debug("Event converted to StoredEvent: " + JSON.stringify(this, null, 2)); +}; + +function MapStoredEventToEvent(storedEvent) { + var event = { + aggregateId: storedEvent.aggregateId, + id: storedEvent.id, + context: storedEvent.context, + aggregate: storedEvent.aggregate, + streamRevision: storedEvent.streamRevision, + commitId: storedEvent.commitId, + commitSequence: storedEvent.commitSequence, + commitStamp: storedEvent.commitStamp || null, + header: storedEvent.header || null, + dispatched: storedEvent.dispatched, + payload: storedEvent.payload || null + }; + + return event; +} + +var StoredSnapshot = function (snapshot) { + this.id = snapshot.id; + this.aggregateId = snapshot.aggregateId; + this.aggregate = snapshot.aggregate || null; + this.context = snapshot.context || null; + this.revision = snapshot.revision; + this.version = snapshot.version; + this.commitStamp = new Date(snapshot.commitStamp).getTime(); + this.data = snapshot.data; +}; + +function MapStoredSnapshotToSnapshot(storedSnapshot) { + var snapshot = { + id: storedSnapshot.id, + aggregateId: storedSnapshot.aggregateId, + aggregate: storedSnapshot.aggregate || undefined, + context: storedSnapshot.context || undefined, + revision: storedSnapshot.revision, + version: storedSnapshot.version, + commitStamp: new Date(storedSnapshot.commitStamp) || null, + data: storedSnapshot.data || null + }; + + return snapshot; +} + +module.exports = Datastore; \ No newline at end of file diff --git a/package.json b/package.json index dcf5520a..04ae018f 100644 --- a/package.json +++ b/package.json @@ -59,7 +59,8 @@ "mocha": "3.x.x", "mongodb": "2.1.x", "redis": ">=0.10.1", - "tingodb": ">=0.0.1" + "tingodb": ">=0.0.1", + "@google-cloud/datastore": "^2.0.0" }, "scripts": { "test": "mocha" diff --git a/test/eventstoreTest.js b/test/eventstoreTest.js index 159dda5c..366f0364 100644 --- a/test/eventstoreTest.js +++ b/test/eventstoreTest.js @@ -874,7 +874,7 @@ describe('eventstore', function () { describe('with options containing a type property with the value of', function () { - var types = ['inmemory', 'tingodb', 'mongodb', 'redis'/*, 'elasticsearch', 'azuretable', 'dynamodb'*/]; + var types = ['inmemory', 'tingodb', 'mongodb', 'redis', 'datastore'/*, 'elasticsearch', 'azuretable', 'dynamodb'*/]; var streamingApiTypes = ['mongodb']; var positionTypes = ['mongodb', 'inmemory']; diff --git a/test/storeTest.js b/test/storeTest.js index 32223c49..9f7a3afd 100644 --- a/test/storeTest.js +++ b/test/storeTest.js @@ -4,7 +4,7 @@ var expect = require('expect.js'), _ = require('lodash'), crypto = require('crypto'); -var types = ['inmemory', 'tingodb', 'mongodb', 'redis'/*, 'elasticsearch', 'azuretable', 'dynamodb'*/]; +var types = ['inmemory', 'tingodb', 'mongodb', 'redis', 'datastore' /*, 'elasticsearch', 'azuretable', 'dynamodb'*/]; var token = crypto.randomBytes(16).toString('hex'); @@ -269,7 +269,7 @@ types.forEach(function (type) { store.getLastEvent({ aggregateId: event2.aggregateId }, function(err, evt) { expect(err).not.to.be.ok(); - + expect(evt.commitStamp.getTime()).to.eql(event2.commitStamp.getTime()); expect(evt.aggregateId).to.eql(event2.aggregateId); expect(evt.commitId).to.eql(event2.commitId); @@ -2425,7 +2425,7 @@ types.forEach(function (type) { store.setEventToDispatched('119', function (err) { expect(err).not.to.be.ok(); - + store.getUndispatchedEvents(null, function (err, evts) { expect(err).not.to.be.ok(); expect(evts.length).to.eql(1); @@ -2612,7 +2612,7 @@ types.forEach(function (type) { describe('with an aggregateId being used only in one context and aggregate', function () { it('it should return the correct snapshot', function (done) { - + store.getSnapshot({ aggregateId: '142351' }, -1, function (err, shot) { expect(err).not.to.be.ok(); expect(shot.id).to.eql(snap3.id); @@ -2907,7 +2907,7 @@ types.forEach(function (type) { describe('with a revision that already exists but with a newer version', function () { it('it should return the correct snapshot', function (done) { - + store.getSnapshot({ aggregateId: '920193847313131313', aggregate: 'myCoolAggregate2', context: 'myCoolContext' }, -1, function (err, shot) { expect(err).not.to.be.ok(); expect(shot.id).to.eql(snap8.id); From c8c763d5047c7a769fa515222b0f7c2dd876a5cd Mon Sep 17 00:00:00 2001 From: Angga Muhammad Date: Sun, 28 Oct 2018 23:10:50 +0700 Subject: [PATCH 2/4] FIX comment on index.yaml --- index.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/index.yaml b/index.yaml index eff2829b..49b0a69d 100644 --- a/index.yaml +++ b/index.yaml @@ -1,5 +1,5 @@ -// This is needed for testing google cloud datastore, since a lot of queries are using composite index -// please set up your gcloud-cli, and run "gcloud datastore create-indexes index.yaml" +# This is needed for testing google cloud datastore, since a lot of queries are using composite index +# please set up your gcloud-cli, and run "gcloud datastore create-indexes index.yaml" indexes: - kind: snapshots From 984865c57101c7f18f0b8669e9a06a171ebbc347 Mon Sep 17 00:00:00 2001 From: Angga Muhammad Date: Wed, 31 Oct 2018 00:09:02 +0700 Subject: [PATCH 3/4] move index.yaml (gcloud datastore) into test folder --- index.yaml => test/index.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename index.yaml => test/index.yaml (99%) diff --git a/index.yaml b/test/index.yaml similarity index 99% rename from index.yaml rename to test/index.yaml index 49b0a69d..a79c2179 100644 --- a/index.yaml +++ b/test/index.yaml @@ -1,5 +1,5 @@ # This is needed for testing google cloud datastore, since a lot of queries are using composite index -# please set up your gcloud-cli, and run "gcloud datastore create-indexes index.yaml" +# please set up your gcloud-cli, and run "gcloud datastore create-indexes test/index.yaml" indexes: - kind: snapshots From 034942c49f74029f071756a8899dde4dcd73916a Mon Sep 17 00:00:00 2001 From: Angga Muhammad Date: Wed, 31 Oct 2018 07:44:40 +0700 Subject: [PATCH 4/4] use Store.use instead of "require" update README to cover "cloud datastore" --- README.md | 31 ++++++++++++++++++++++++++++++- lib/databases/datastore.js | 2 +- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index f7572835..02595d5f 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ The project goal is to provide an eventstore implementation for node.js: - load and store events via EventStream object - event dispatching to your publisher (optional) -- supported Dbs (inmemory, mongodb, redis, tingodb, elasticsearch, azuretable, dynamodb) +- supported Dbs (inmemory, mongodb, redis, tingodb, elasticsearch, azuretable, dynamodb, cloud datastore) - snapshot support - query your events @@ -135,6 +135,34 @@ example with azuretable: timeout: 10000 // optional }); +example with cloud datastore: + + var es = require('eventstore')({ + type: 'datastore', + projectId: 'my-project-id', // optional + eventsTableName: 'events', // optional + snapshotsTableName: 'snapshots' // optional + }); + +Google Cloud Datastore credentials are obtained from your default gcloud project, you need to set it up first: + +1. Go to the [Google Cloud API Manager](https://console.cloud.google.com/apis) and select "Credentials" on the left. +2. Click on "Create credentials" and select "Service account key". +3. Select "New service account" in the "Service account" dropdown. +4. Enter a name for your "Service account name" (e.g. "serverless-framework"). +5. Select "Project" --> "Owner" as the "Role". +6. The "Key type" should be "JSON". +7. Click on "Create" to create your private key. +8. That's your so called `keyfile` which should be downloaded on your machine. +9. Save the `keyfile` somewhere secure. We recommend making a folder in your root folder and putting it there. Like this, `~/.gcloud/keyfile.json`. You can change the file name from `keyfile` to anything. Remember the path you saved it to. + +Lastly, you need to prepare the index for the table since the library will use composite index. Example [here](test/index.yaml). Please note that for test purposes, it uses "Ancestor" query to ensure strong consistency. In real use case, you can remove all lines with `ancestor: yes`. + +Run the command to populate the index. +```bash +$ gcloud datastore create-indexes test/index.yaml +``` + example with dynamodb: var es = require('eventstore')({ @@ -607,6 +635,7 @@ Currently these databases are supported: 4. tingodb ([tingodb](https://github.com/sergeyksv/tingodb)) 5. azuretable ([azure-storage](https://github.com/Azure/azure-storage-node)) 6. dynamodb ([aws-sdk](https://github.com/aws/aws-sdk-js)) +7. datastore ([cloud-datastore](https://github.com/googleapis/nodejs-datastore)) ## own db implementation You can use your own db implementation by extending this... diff --git a/lib/databases/datastore.js b/lib/databases/datastore.js index 8481f499..42553796 100644 --- a/lib/databases/datastore.js +++ b/lib/databases/datastore.js @@ -3,7 +3,7 @@ var Store = require('../base'), _ = require('lodash'), async = require('async'), dbg = require('debug'), - DS = require('@google-cloud/datastore'); + DS = Store.use('@google-cloud/datastore'); var debug = dbg('eventstore:store:datastore'), error = dbg("eventstore:store:datastore:error");