diff --git a/src/kafka-processor.js b/src/kafka-processor.js index f6618f3..127e6fc 100644 --- a/src/kafka-processor.js +++ b/src/kafka-processor.js @@ -81,22 +81,40 @@ class KafkaProcessor { } } - on(eventName, listener) { - let listeners = this.listeners.get(eventName); + addListener(event, listener) { + let listeners = this.listeners.get(event); if (!listeners) { listeners = new Set(); - this.listeners.set(eventName, listeners); + this.listeners.set(event, listeners); } listeners.add(listener); + return this; } - off(eventName, listener) { - const listeners = this.listeners.get(eventName); - if (!listeners) return; + on(event, listener) { + return this.addListener(event, listener); + } + + removeListener(event, listener) { + const listeners = this.listeners.get(event); + if (listeners) listeners.delete(listener); + return this; + } + + off(event, listener) { + return this.removeListener(event, listener); + } + + once(eventName, listener) { + const wrapper = async (...args) => { + this.removeListener(eventName, wrapper); + await listener(...args); + }; - listeners.delete(listener); + this.addListener(eventName, wrapper); + return this; } } diff --git a/src/kafka.js b/src/kafka.js index 21c80bd..4508f48 100644 --- a/src/kafka.js +++ b/src/kafka.js @@ -1,35 +1,41 @@ -const { Kafka } = require('kafkajs'); +const KafkaJS = require('kafkajs'); const { KafkaProcessor } = require('./kafka-processor'); -const kafka = new Kafka({ - clientId: 'ship', - brokers: ['kafka:9092'], -}); +class Kafka { + constructor(config) { + this.kafka = new KafkaJS.Kafka(config); -const processors = { - user: new KafkaProcessor(kafka, { groupId: 'ship' }, { topic: 'user' }), -}; + this.producer = this.kafka.producer(); + this.processors = {}; + } -async function run() { - await Promise.all(Object.values(processors).map((p) => p.run())); -} + addProcessor(topic, consumerConfig, subscribeConfig = { topic }) { + if (this.processors[topic]) { + throw new Error(`Processor for "${topic}" topic already exists`); + } + + this.processors[topic] = new KafkaProcessor(this.kafka, consumerConfig, subscribeConfig); + return this; + } + + async run() { + await this.producer.connect(); + await Promise.all(Object.values(this.processors).map((p) => p.run())); + } -async function send({ event, data, ...record }) { - const producer = kafka.producer(); - await producer.connect(); - await producer.send({ - ...record, - messages: [ - { value: JSON.stringify({ event, data }) }, - ], - }); - await producer.disconnect(); + async send({ event, data, ...record }) { + await this.producer.send({ + ...record, + messages: [ + { value: JSON.stringify({ event, data }) }, + ], + }); + } } -module.exports = { - kafka, - processors, - run, - send, -}; +const kafka = new Kafka({ clientId: 'ship', brokers: ['kafka:9092'] }); + +kafka.addProcessor('user', { groupId: 'ship' }); + +module.exports = kafka; diff --git a/src/migrations/migration.schema.js b/src/migrations/migration.schema.js index 0a10a2a..5f6c7c7 100644 --- a/src/migrations/migration.schema.js +++ b/src/migrations/migration.schema.js @@ -8,4 +8,4 @@ const schema = Joi.object({ .required(), }); -module.exports = (obj) => schema.validate(obj, { allowUnknown: false }); +module.exports.validate = (obj) => schema.validate(obj, { allowUnknown: false }); diff --git a/src/migrations/migration.service.js b/src/migrations/migration.service.js index 7fe1e76..56304e4 100644 --- a/src/migrations/migration.service.js +++ b/src/migrations/migration.service.js @@ -1,9 +1,9 @@ const db = require('db'); const fs = require('fs'); const path = require('path'); -const validateSchema = require('./migration.schema'); +const { validate } = require('./migration.schema'); -const service = db.createService('__migrationVersion', { validateSchema }); +const service = db.createService('__migrationVersion', { validate }); const migrationsPath = path.join(__dirname, 'migrations'); const _id = 'migration_version'; diff --git a/src/migrations/migrations-log/migration-log.schema.js b/src/migrations/migrations-log/migration-log.schema.js index 09a74a0..b10f967 100644 --- a/src/migrations/migrations-log/migration-log.schema.js +++ b/src/migrations/migrations-log/migration-log.schema.js @@ -16,4 +16,4 @@ const schema = Joi.object({ .required(), }); -module.exports = (obj) => schema.validate(obj, { allowUnknown: false }); +module.exports.validate = (obj) => schema.validate(obj, { allowUnknown: false }); diff --git a/src/migrations/migrations-log/migration-log.service.js b/src/migrations/migrations-log/migration-log.service.js index 3924d27..820be5e 100644 --- a/src/migrations/migrations-log/migration-log.service.js +++ b/src/migrations/migrations-log/migration-log.service.js @@ -1,8 +1,8 @@ const db = require('db'); -const validateSchema = require('./migration-log.schema.js'); +const { validate } = require('./migration-log.schema.js'); -const service = db.createService('__migrationLog', { validateSchema }); +const service = db.createService('__migrationLog', { validate }); service.startMigrationLog = (_id, startTime, migrationVersion) => { return service.atomic.findOneAndUpdate({ _id }, { diff --git a/src/node-mongo-kafka-emitter.js b/src/node-mongo-kafka-emitter.js new file mode 100644 index 0000000..dbcf83b --- /dev/null +++ b/src/node-mongo-kafka-emitter.js @@ -0,0 +1,42 @@ +class NodeMongoKafkaEmitter { + constructor(_topic, _kafka, _logger = console) { + this.topic = _topic; + this.kafka = _kafka; + this.logger = _logger; + + this.processor = this.kafka.processors[this.topic]; + if (!this.processor) { + throw new Error(`Processor for "${this.topic}" topic doesn't exist`); + } + } + + castEvent(event) { + return `${this.topic}:${event}`; + } + + async emit(event, data) { + try { + await this.kafka.send({ + topic: this.topic, + event: this.castEvent(event), + data, + }); + } catch (error) { + this.logger.error(error); + } + } + + on(event, listener) { + this.processor.on(this.castEvent(event), async (message) => { + await listener(message.data); + }); + } + + once(event, listener) { + this.processor.once(this.castEvent(event), async (message) => { + await listener(message.data); + }); + } +} + +module.exports = NodeMongoKafkaEmitter; diff --git a/src/resources/account/account.spec.js b/src/resources/account/account.spec.js index db5a7f4..339b0c7 100644 --- a/src/resources/account/account.spec.js +++ b/src/resources/account/account.spec.js @@ -6,9 +6,9 @@ const db = require('tests/db'); const { USER, ERRORS } = require('tests/constants'); const testsHelper = require('tests/tests.helper'); const UserBuilder = require('resources/user/user.builder'); -const validateSchema = require('resources/user/user.schema'); +const { validate } = require('resources/user/user.schema'); -const userService = db.createService(USER.COLLECTION, { validateSchema }); +const userService = db.createService(USER.COLLECTION, { validate }); const app = server.listen(); const request = supertest.agent(app); diff --git a/src/resources/token/token.schema.js b/src/resources/token/token.schema.js index cf59105..f0ed7f3 100644 --- a/src/resources/token/token.schema.js +++ b/src/resources/token/token.schema.js @@ -16,4 +16,4 @@ const schema = Joi.object({ isShadow: Joi.boolean(), }); -module.exports = (obj) => schema.validate(obj, { allowUnknown: false }); +module.exports.validate = (obj) => schema.validate(obj, { allowUnknown: false }); diff --git a/src/resources/token/token.service.js b/src/resources/token/token.service.js index 355eaf0..641c6da 100644 --- a/src/resources/token/token.service.js +++ b/src/resources/token/token.service.js @@ -2,9 +2,9 @@ const db = require('db'); const securityUtil = require('security.util'); const { DATABASE_DOCUMENTS, TOKEN_SECURITY_LENGTH, TOKEN_TYPES } = require('app.constants'); -const validateSchema = require('./token.schema'); +const { validate } = require('./token.schema'); -const service = db.createService(DATABASE_DOCUMENTS.TOKENS, { validateSchema }); +const service = db.createService(DATABASE_DOCUMENTS.TOKENS, { validate }); const createToken = async (userId, type) => { const value = await securityUtil.generateSecureToken(TOKEN_SECURITY_LENGTH); diff --git a/src/resources/user/user.builder.js b/src/resources/user/user.builder.js index 11b4c95..c95cef3 100644 --- a/src/resources/user/user.builder.js +++ b/src/resources/user/user.builder.js @@ -5,9 +5,9 @@ const { USER } = require('tests/constants'); const securityUtil = require('tests/security.util'); const BaseBuilder = require('tests/base.builder'); -const validateSchema = require('./user.schema'); +const { validate } = require('./user.schema'); -const userService = db.createService(USER.COLLECTION, { validateSchema }); +const userService = db.createService(USER.COLLECTION, { validate }); class UserBuilder extends BaseBuilder { constructor({ diff --git a/src/resources/user/user.schema.js b/src/resources/user/user.schema.js index d460ff2..f848922 100644 --- a/src/resources/user/user.schema.js +++ b/src/resources/user/user.schema.js @@ -27,4 +27,4 @@ const schema = Joi.object({ lastRequest: Joi.date(), }); -module.exports = (obj) => schema.validate(obj, { allowUnknown: false }); +module.exports.validate = (obj) => schema.validate(obj, { allowUnknown: false }); diff --git a/src/resources/user/user.service.js b/src/resources/user/user.service.js index 2a365b3..674d2ca 100644 --- a/src/resources/user/user.service.js +++ b/src/resources/user/user.service.js @@ -1,11 +1,18 @@ const _ = require('lodash'); const db = require('db'); +const kafka = require('kafka'); +const NodeMongoKafkaEmitter = require('node-mongo-kafka-emitter'); + const constants = require('app.constants'); -const validateSchema = require('./user.schema'); +const { validate } = require('./user.schema'); -const service = db.createService(constants.DATABASE_DOCUMENTS.USERS, { validateSchema }); +const service = db.createService( + constants.DATABASE_DOCUMENTS.USERS, + { validate }, + new NodeMongoKafkaEmitter('user', kafka), +); service.updateLastRequest = async (_id) => { return service.atomic.update({ _id }, {