diff --git a/packages/backend/src/nest/libp2p/libp2p.service.spec.ts b/packages/backend/src/nest/libp2p/libp2p.service.spec.ts index 91d9d177c4..5822a1502a 100644 --- a/packages/backend/src/nest/libp2p/libp2p.service.spec.ts +++ b/packages/backend/src/nest/libp2p/libp2p.service.spec.ts @@ -9,6 +9,7 @@ import { toString as uint8ArrayToString } from 'uint8arrays/to-string' import validator from 'validator' import waitForExpect from 'wait-for-expect' import { DEFAULT_NUM_TRIES, ProcessInChunksService } from './process-in-chunks.service' +import { sleep } from '../common/sleep' describe('Libp2pService', () => { let module: TestingModule @@ -16,7 +17,7 @@ describe('Libp2pService', () => { let params: Libp2pNodeParams let processInChunks: ProcessInChunksService - beforeAll(async () => { + beforeEach(async () => { module = await Test.createTestingModule({ imports: [TestModule, Libp2pModule], }).compile() @@ -26,7 +27,7 @@ describe('Libp2pService', () => { params = await libp2pInstanceParams() }) - afterAll(async () => { + afterEach(async () => { await libp2pService.libp2pInstance?.stop() await module.close() }) @@ -71,14 +72,19 @@ describe('Libp2pService', () => { libp2pService.createLibp2pAddress('onionAddress1.onion', peerId1.toString()), libp2pService.createLibp2pAddress('onionAddress2.onion', peerId2.toString()), ] - await libp2pService.createInstance(params) - // @ts-expect-error processItem is private - const spyOnProcessItem = jest.spyOn(processInChunks, 'processItem') + await libp2pService.createInstance(params, false) expect(libp2pService.libp2pInstance).not.toBeNull() + + // @ts-expect-error processItem is private + const processItemSpy = jest.spyOn(processInChunks, 'processItem') + const dialSpy = jest.spyOn(libp2pService.libp2pInstance!, 'dial') + libp2pService.emit(Libp2pEvents.INITIAL_DIAL, addresses) libp2pService.emit(Libp2pEvents.DIAL_PEERS, addresses) + await waitForExpect(async () => { - expect(spyOnProcessItem).toBeCalledTimes(addresses.length) - }) + expect(processItemSpy).toBeCalledTimes(6) + expect(dialSpy).toBeCalledTimes(3) + }, 30000) }) it(`Do not dial peer on '${Libp2pEvents.DIAL_PEERS}' event if peer was already dialed`, async () => { @@ -90,15 +96,18 @@ describe('Libp2pService', () => { alreadyDialedAddress, libp2pService.createLibp2pAddress('onionAddress2.onion', peerId2.toString()), ] - await libp2pService.createInstance(params) + await libp2pService.createInstance(params, false) expect(libp2pService.libp2pInstance).not.toBeNull() + // @ts-expect-error processItem is private const processItemSpy = jest.spyOn(processInChunks, 'processItem') const dialSpy = jest.spyOn(libp2pService.libp2pInstance!, 'dial') + libp2pService.emit(Libp2pEvents.INITIAL_DIAL, addresses) libp2pService.emit(Libp2pEvents.DIAL_PEERS, addresses) + await waitForExpect(async () => { - expect(processItemSpy).toBeCalledTimes(2 * DEFAULT_NUM_TRIES) - expect(dialSpy).toBeCalledTimes(1) - }) + expect(processItemSpy).toBeCalledTimes(4) + expect(dialSpy).toBeCalledTimes(2) + }, 30000) }) }) diff --git a/packages/backend/src/nest/libp2p/process-in-chunks.service.ts b/packages/backend/src/nest/libp2p/process-in-chunks.service.ts index c55b131818..97c194f176 100644 --- a/packages/backend/src/nest/libp2p/process-in-chunks.service.ts +++ b/packages/backend/src/nest/libp2p/process-in-chunks.service.ts @@ -15,7 +15,7 @@ type ProcessTask = { export type ProcessInChunksServiceOptions = { initialData: T[] - processItem: (arg: T) => Promise + processItem: (arg: T) => Promise chunkSize?: number | undefined startImmediately?: boolean } diff --git a/packages/backend/src/nest/libp2p/process-in-chunks.spec.ts b/packages/backend/src/nest/libp2p/process-in-chunks.spec.ts index 516e28c935..0075ba3371 100644 --- a/packages/backend/src/nest/libp2p/process-in-chunks.spec.ts +++ b/packages/backend/src/nest/libp2p/process-in-chunks.spec.ts @@ -20,12 +20,13 @@ describe('ProcessInChunks', () => { const mockProcessItem = jest .fn(async a => { console.log('processing', a) + return true }) - .mockResolvedValueOnce() + .mockResolvedValueOnce(true) .mockRejectedValueOnce(new Error('Rejected 1')) - .mockResolvedValueOnce() + .mockResolvedValueOnce(true) .mockRejectedValueOnce(new Error('Rejected 2')) - processInChunks.init({ initialData: ['a', 'b', 'c', 'd'], processItem: mockProcessItem }) + processInChunks.init({ initialData: ['a', 'b', 'c', 'd'], processItem: mockProcessItem, chunkSize: 10 }) await waitForExpect(() => { expect(mockProcessItem).toBeCalledTimes(6) }) @@ -35,10 +36,11 @@ describe('ProcessInChunks', () => { const mockProcessItem = jest .fn(async a => { console.log('processing', a) + return true }) - .mockResolvedValueOnce() + .mockResolvedValueOnce(true) .mockRejectedValueOnce(new Error('Rejected 1')) - processInChunks.init({ initialData: ['a', 'b'], processItem: mockProcessItem }) + processInChunks.init({ initialData: ['a', 'b'], processItem: mockProcessItem, chunkSize: 10 }) processInChunks.updateQueue(['e', 'f']) await waitForExpect(() => { expect(mockProcessItem).toBeCalledTimes(5) @@ -49,10 +51,11 @@ describe('ProcessInChunks', () => { const mockProcessItem = jest .fn(async a => { console.log('processing', a) + return true }) - .mockResolvedValueOnce() + .mockResolvedValueOnce(true) .mockRejectedValueOnce(new Error('Rejected 1')) - .mockResolvedValueOnce() + .mockResolvedValueOnce(true) .mockRejectedValueOnce(new Error('Rejected 2')) const chunkSize = 2 processInChunks.init({ initialData: ['a', 'b', 'c', 'd'], processItem: mockProcessItem, chunkSize }) @@ -63,16 +66,20 @@ describe('ProcessInChunks', () => { }) it('does not process more data if stopped', async () => { - const mockProcessItem = jest.fn(async () => {}) - processInChunks.init({ initialData: [], processItem: mockProcessItem }) + const mockProcessItem = jest.fn(async () => { + return true + }) + processInChunks.init({ initialData: [], processItem: mockProcessItem, chunkSize: 10 }) processInChunks.pause() processInChunks.updateQueue(['a', 'b', 'c', 'd']) expect(mockProcessItem).not.toBeCalled() }) it('processes tasks after resuming from pause', async () => { - const mockProcessItem = jest.fn(async () => {}) - processInChunks.init({ initialData: [], processItem: mockProcessItem }) + const mockProcessItem = jest.fn(async () => { + return true + }) + processInChunks.init({ initialData: [], processItem: mockProcessItem, chunkSize: 10 }) processInChunks.pause() processInChunks.updateQueue(['a', 'b', 'c', 'd']) processInChunks.resume() @@ -82,8 +89,15 @@ describe('ProcessInChunks', () => { }) it('processes tasks when deferred', async () => { - const mockProcessItem = jest.fn(async () => {}) - processInChunks.init({ initialData: ['a', 'b', 'c', 'd'], processItem: mockProcessItem, startImmediately: false }) + const mockProcessItem = jest.fn(async () => { + return true + }) + processInChunks.init({ + initialData: ['a', 'b', 'c', 'd'], + processItem: mockProcessItem, + startImmediately: false, + chunkSize: 10, + }) await waitForExpect(() => { expect(mockProcessItem).toBeCalledTimes(0) }) diff --git a/packages/backend/src/nest/tor/tor.service.ts b/packages/backend/src/nest/tor/tor.service.ts index 7dcd7ccd10..8bbbbb3208 100644 --- a/packages/backend/src/nest/tor/tor.service.ts +++ b/packages/backend/src/nest/tor/tor.service.ts @@ -49,7 +49,6 @@ export class Tor extends EventEmitter implements OnModuleInit { console.warn('No tor binary path, not running the tor service') return } - this.isTorServiceUsed = true await this.init() } @@ -79,6 +78,7 @@ export class Tor extends EventEmitter implements OnModuleInit { } public async init(timeout = 120_000): Promise { + this.isTorServiceUsed = true if (!this.socksPort) this.socksPort = await getPort() this.logger('Initializing tor...') @@ -134,12 +134,11 @@ export class Tor extends EventEmitter implements OnModuleInit { this.logger(`Sending ${SocketActionTypes.INITIAL_DIAL}`) this.emit(SocketActionTypes.INITIAL_DIAL) clearInterval(this.interval) + resolve() } }, 2500) this.logger(`Spawned tor with pid(s): ${this.getTorProcessIds()}`) - - resolve() } catch (e) { this.logger('Killing tor due to error', e) this.clearHangingTorProcess()