Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: fix disconnect errors #998

Merged
merged 9 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ const {
wrapWithRetries
} = require('#test/util/helpers')
const TestConsumer = require('#test/integration/helpers/testConsumer')
const KafkaHelper = require('#test/integration/helpers/kafkaHelper')

const ParticipantCached = require('#src/models/participant/participantCached')
const ParticipantCurrencyCached = require('#src/models/participant/participantCurrencyCached')
Expand Down Expand Up @@ -722,9 +721,8 @@ Test('Handlers test', async handlersTest => {

test.pass('done')
test.end()
setupTests.end()
})

await setupTests.end()
})

await handlersTest.test('position batch handler should', async transferPositionPrepare => {
Expand Down Expand Up @@ -1226,10 +1224,9 @@ Test('Handlers test', async handlersTest => {
await Cache.destroyCache()
await Db.disconnect()
assert.pass('database connection closed')
await testConsumer.destroy()
await testConsumer.destroy() // this disconnects the consumers

await KafkaHelper.producers.disconnect()
await KafkaHelper.consumers.disconnect()
await Producer.disconnect()

if (debug) {
const elapsedTime = Math.round(((new Date()) - startTime) / 100) / 10
Expand All @@ -1241,8 +1238,8 @@ Test('Handlers test', async handlersTest => {
Logger.error(`teardown failed with error - ${err}`)
assert.fail()
assert.end()
} finally {
handlersTest.end()
}
})

handlersTest.end()
})
40 changes: 7 additions & 33 deletions test/integration-override/handlers/transfers/handlers.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ const Test = require('tape')
const { randomUUID } = require('crypto')
const Logger = require('@mojaloop/central-services-logger')
const Config = require('#src/lib/config')
const Time = require('@mojaloop/central-services-shared').Util.Time
const sleep = Time.sleep
const Db = require('@mojaloop/database-lib').Db
const Cache = require('#src/lib/cache')
const Producer = require('@mojaloop/central-services-stream').Util.Producer
Expand Down Expand Up @@ -331,13 +329,12 @@ Test('Handlers test', async handlersTest => {
await testConsumer.startListening()
await KafkaHelper.producers.connect()
// TODO: MIG - Disabling these handlers to test running the CL as a separate service independently.
sleep(rebalanceDelay, debug, 'registerAllHandlers', 'awaiting registration of common handlers')
await new Promise(resolve => setTimeout(resolve, rebalanceDelay))

test.pass('done')
test.end()
registerAllHandlers.end()
})

await registerAllHandlers.end()
})

await handlersTest.test('transferPrepare should', async transferPrepare => {
Expand Down Expand Up @@ -425,32 +422,9 @@ Test('Handlers test', async handlersTest => {
await Cache.destroyCache()
await Db.disconnect()
assert.pass('database connection closed')
await testConsumer.destroy()

// TODO: Story to investigate as to why the Producers failed reconnection on the ./transfers/handlers.test.js - https://github.com/mojaloop/project/issues/3067
// const topics = KafkaHelper.topics
// for (const topic of topics) {
// try {
// await Producer.getProducer(topic).disconnect()
// assert.pass(`producer to ${topic} disconnected`)
// } catch (err) {
// assert.pass(err.message)
// }
// }
// Lets make sure that all existing Producers are disconnected
await KafkaHelper.producers.disconnect()

// TODO: Clean this up once the above issue has been resolved.
// for (const topic of topics) {
// try {
// await Consumer.getConsumer(topic).disconnect()
// assert.pass(`consumer to ${topic} disconnected`)
// } catch (err) {
// assert.pass(err.message)
// }
// }
// Lets make sure that all existing Consumers are disconnected
await KafkaHelper.consumers.disconnect()
await testConsumer.destroy() // this disconnects the consumers

await Producer.disconnect()

if (debug) {
const elapsedTime = Math.round(((new Date()) - startTime) / 100) / 10
Expand All @@ -462,8 +436,8 @@ Test('Handlers test', async handlersTest => {
Logger.error(`teardown failed with error - ${err}`)
assert.fail()
assert.end()
} finally {
handlersTest.end()
}
})

handlersTest.end()
})
40 changes: 7 additions & 33 deletions test/integration/handlers/transfers/handlers.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ const retry = require('async-retry')
const Logger = require('@mojaloop/central-services-logger')
const Config = require('#src/lib/config')
const Time = require('@mojaloop/central-services-shared').Util.Time
const sleep = Time.sleep
const Db = require('@mojaloop/database-lib').Db
const Cache = require('#src/lib/cache')
const Producer = require('@mojaloop/central-services-stream').Util.Producer
Expand All @@ -54,7 +53,6 @@ const {
sleepPromise
} = require('#test/util/helpers')
const TestConsumer = require('#test/integration/helpers/testConsumer')
const KafkaHelper = require('#test/integration/helpers/kafkaHelper')

const ParticipantCached = require('#src/models/participant/participantCached')
const ParticipantCurrencyCached = require('#src/models/participant/participantCurrencyCached')
Expand Down Expand Up @@ -390,13 +388,12 @@ Test('Handlers test', async handlersTest => {
await testConsumer.startListening()

// TODO: MIG - Disabling these handlers to test running the CL as a separate service independently.
sleep(rebalanceDelay, debug, 'registerAllHandlers', 'awaiting registration of common handlers')
await new Promise(resolve => setTimeout(resolve, rebalanceDelay))

test.pass('done')
test.end()
registerAllHandlers.end()
})

await registerAllHandlers.end()
})

await handlersTest.test('transferPrepare should', async transferPrepare => {
Expand Down Expand Up @@ -1344,32 +1341,9 @@ Test('Handlers test', async handlersTest => {
await Cache.destroyCache()
await Db.disconnect()
assert.pass('database connection closed')
await testConsumer.destroy()

// TODO: Story to investigate as to why the Producers failed reconnection on the ./transfers/handlers.test.js - https://github.com/mojaloop/project/issues/3067
// const topics = KafkaHelper.topics
// for (const topic of topics) {
// try {
// await Producer.getProducer(topic).disconnect()
// assert.pass(`producer to ${topic} disconnected`)
// } catch (err) {
// assert.pass(err.message)
// }
// }
// Lets make sure that all existing Producers are disconnected
await KafkaHelper.producers.disconnect()

// TODO: Clean this up once the above issue has been resolved.
// for (const topic of topics) {
// try {
// await Consumer.getConsumer(topic).disconnect()
// assert.pass(`consumer to ${topic} disconnected`)
// } catch (err) {
// assert.pass(err.message)
// }
// }
// Lets make sure that all existing Consumers are disconnected
await KafkaHelper.consumers.disconnect()
await testConsumer.destroy() // this disconnects the consumers

await Producer.disconnect()

if (debug) {
const elapsedTime = Math.round(((new Date()) - startTime) / 100) / 10
Expand All @@ -1381,8 +1355,8 @@ Test('Handlers test', async handlersTest => {
Logger.error(`teardown failed with error - ${err}`)
assert.fail()
assert.end()
} finally {
handlersTest.end()
}
})

handlersTest.end()
})
4 changes: 3 additions & 1 deletion test/integration/helpers/testConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ class TestConsumer {
*/
async destroy () {
Logger.warn(`TestConsumer.destroy(): destroying ${this.consumers.length} consumers`)
await Promise.all(this.consumers.map(async c => c.disconnect()))
await Promise.all(this.consumers.map(consumer => new Promise((resolve, reject) => {
consumer.disconnect((err) => err ? reject(err) : resolve())
})))
}

/**
Expand Down
Loading