Skip to content

Commit

Permalink
Add node-mongo-kafka-emitter
Browse files Browse the repository at this point in the history
  • Loading branch information
hovoodd committed Oct 14, 2020
1 parent c860b40 commit 331932a
Show file tree
Hide file tree
Showing 14 changed files with 124 additions and 51 deletions.
32 changes: 25 additions & 7 deletions src/kafka-processor.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,22 +81,40 @@ class KafkaProcessor {
}
}

on(eventName, listener) {
let listeners = this.listeners.get(eventName);
addListener(event, listener) {
let listeners = this.listeners.get(event);

if (!listeners) {
listeners = new Set();
this.listeners.set(eventName, listeners);
this.listeners.set(event, listeners);
}

listeners.add(listener);
return this;
}

off(eventName, listener) {
const listeners = this.listeners.get(eventName);
if (!listeners) return;
on(event, listener) {
return this.addListener(event, listener);
}

removeListener(event, listener) {
const listeners = this.listeners.get(event);
if (listeners) listeners.delete(listener);
return this;
}

off(event, listener) {
return this.removeListener(event, listener);
}

once(eventName, listener) {
const wrapper = async (...args) => {
this.removeListener(eventName, wrapper);
await listener(...args);
};

listeners.delete(listener);
this.addListener(eventName, wrapper);
return this;
}
}

Expand Down
60 changes: 33 additions & 27 deletions src/kafka.js
Original file line number Diff line number Diff line change
@@ -1,35 +1,41 @@
const { Kafka } = require('kafkajs');
const KafkaJS = require('kafkajs');

const { KafkaProcessor } = require('./kafka-processor');

const kafka = new Kafka({
clientId: 'ship',
brokers: ['kafka:9092'],
});
class Kafka {
constructor(config) {
this.kafka = new KafkaJS.Kafka(config);

const processors = {
user: new KafkaProcessor(kafka, { groupId: 'ship' }, { topic: 'user' }),
};
this.producer = this.kafka.producer();
this.processors = {};
}

async function run() {
await Promise.all(Object.values(processors).map((p) => p.run()));
}
addProcessor(topic, consumerConfig, subscribeConfig = { topic }) {
if (this.processors[topic]) {
throw new Error(`Processor for "${topic}" topic already exists`);
}

this.processors[topic] = new KafkaProcessor(this.kafka, consumerConfig, subscribeConfig);
return this;
}

async run() {
await this.producer.connect();
await Promise.all(Object.values(this.processors).map((p) => p.run()));
}

async function send({ event, data, ...record }) {
const producer = kafka.producer();
await producer.connect();
await producer.send({
...record,
messages: [
{ value: JSON.stringify({ event, data }) },
],
});
await producer.disconnect();
async send({ event, data, ...record }) {
await this.producer.send({
...record,
messages: [
{ value: JSON.stringify({ event, data }) },
],
});
}
}

module.exports = {
kafka,
processors,
run,
send,
};
const kafka = new Kafka({ clientId: 'ship', brokers: ['kafka:9092'] });

kafka.addProcessor('user', { groupId: 'ship' });

module.exports = kafka;
2 changes: 1 addition & 1 deletion src/migrations/migration.schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ const schema = Joi.object({
.required(),
});

module.exports = (obj) => schema.validate(obj, { allowUnknown: false });
module.exports.validate = (obj) => schema.validate(obj, { allowUnknown: false });
4 changes: 2 additions & 2 deletions src/migrations/migration.service.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
const db = require('db');
const fs = require('fs');
const path = require('path');
const validateSchema = require('./migration.schema');
const { validate } = require('./migration.schema');

const service = db.createService('__migrationVersion', { validateSchema });
const service = db.createService('__migrationVersion', { validate });
const migrationsPath = path.join(__dirname, 'migrations');
const _id = 'migration_version';

Expand Down
2 changes: 1 addition & 1 deletion src/migrations/migrations-log/migration-log.schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ const schema = Joi.object({
.required(),
});

module.exports = (obj) => schema.validate(obj, { allowUnknown: false });
module.exports.validate = (obj) => schema.validate(obj, { allowUnknown: false });
4 changes: 2 additions & 2 deletions src/migrations/migrations-log/migration-log.service.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
const db = require('db');

const validateSchema = require('./migration-log.schema.js');
const { validate } = require('./migration-log.schema.js');

const service = db.createService('__migrationLog', { validateSchema });
const service = db.createService('__migrationLog', { validate });

service.startMigrationLog = (_id, startTime, migrationVersion) => {
return service.atomic.findOneAndUpdate({ _id }, {
Expand Down
42 changes: 42 additions & 0 deletions src/node-mongo-kafka-emitter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
class NodeMongoKafkaEmitter {
constructor(_topic, _kafka, _logger = console) {
this.topic = _topic;
this.kafka = _kafka;
this.logger = _logger;

this.processor = this.kafka.processors[this.topic];
if (!this.processor) {
throw new Error(`Processor for "${this.topic}" topic doesn't exist`);
}
}

castEvent(event) {
return `${this.topic}:${event}`;
}

async emit(event, data) {
try {
await this.kafka.send({
topic: this.topic,
event: this.castEvent(event),
data,
});
} catch (error) {
this.logger.error(error);
}
}

on(event, listener) {
this.processor.on(this.castEvent(event), async (message) => {
await listener(message.data);
});
}

once(event, listener) {
this.processor.once(this.castEvent(event), async (message) => {
await listener(message.data);
});
}
}

module.exports = NodeMongoKafkaEmitter;
4 changes: 2 additions & 2 deletions src/resources/account/account.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ const db = require('tests/db');
const { USER, ERRORS } = require('tests/constants');
const testsHelper = require('tests/tests.helper');
const UserBuilder = require('resources/user/user.builder');
const validateSchema = require('resources/user/user.schema');
const { validate } = require('resources/user/user.schema');

const userService = db.createService(USER.COLLECTION, { validateSchema });
const userService = db.createService(USER.COLLECTION, { validate });
const app = server.listen();

const request = supertest.agent(app);
Expand Down
2 changes: 1 addition & 1 deletion src/resources/token/token.schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ const schema = Joi.object({
isShadow: Joi.boolean(),
});

module.exports = (obj) => schema.validate(obj, { allowUnknown: false });
module.exports.validate = (obj) => schema.validate(obj, { allowUnknown: false });
4 changes: 2 additions & 2 deletions src/resources/token/token.service.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ const db = require('db');
const securityUtil = require('security.util');
const { DATABASE_DOCUMENTS, TOKEN_SECURITY_LENGTH, TOKEN_TYPES } = require('app.constants');

const validateSchema = require('./token.schema');
const { validate } = require('./token.schema');

const service = db.createService(DATABASE_DOCUMENTS.TOKENS, { validateSchema });
const service = db.createService(DATABASE_DOCUMENTS.TOKENS, { validate });

const createToken = async (userId, type) => {
const value = await securityUtil.generateSecureToken(TOKEN_SECURITY_LENGTH);
Expand Down
4 changes: 2 additions & 2 deletions src/resources/user/user.builder.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ const { USER } = require('tests/constants');
const securityUtil = require('tests/security.util');
const BaseBuilder = require('tests/base.builder');

const validateSchema = require('./user.schema');
const { validate } = require('./user.schema');

const userService = db.createService(USER.COLLECTION, { validateSchema });
const userService = db.createService(USER.COLLECTION, { validate });

class UserBuilder extends BaseBuilder {
constructor({
Expand Down
2 changes: 1 addition & 1 deletion src/resources/user/user.handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ kafka.processors.user.on('user:signup', async (message) => {
await userService.create(message.data);
});

userService.on('updated', ({ doc }) => {
kafka.processors.user.on('user:updated', ({ doc }) => {
const roomId = `user-${doc._id}`;
ioEmitter.to(roomId).emit('user:updated', doc);
});
2 changes: 1 addition & 1 deletion src/resources/user/user.schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ const schema = Joi.object({
lastRequest: Joi.date(),
});

module.exports = (obj) => schema.validate(obj, { allowUnknown: false });
module.exports.validate = (obj) => schema.validate(obj, { allowUnknown: false });
11 changes: 9 additions & 2 deletions src/resources/user/user.service.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
const _ = require('lodash');

const db = require('db');
const kafka = require('kafka');
const NodeMongoKafkaEmitter = require('node-mongo-kafka-emitter');

const constants = require('app.constants');

const validateSchema = require('./user.schema');
const { validate } = require('./user.schema');

const service = db.createService(constants.DATABASE_DOCUMENTS.USERS, { validateSchema });
const service = db.createService(
constants.DATABASE_DOCUMENTS.USERS,
{ validate },
new NodeMongoKafkaEmitter('user', kafka),
);

service.updateLastRequest = async (_id) => {
return service.atomic.update({ _id }, {
Expand Down

0 comments on commit 331932a

Please sign in to comment.