Skip to content

Commit

Permalink
fix: Reorder the closing of services, prevent sagas running multiple …
Browse files Browse the repository at this point in the history
…times and close backend server properly (#2499)
  • Loading branch information
Lucas Leblow authored May 9, 2024
1 parent c51d29a commit 134fbcb
Show file tree
Hide file tree
Showing 37 changed files with 418 additions and 265 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* Fix issues with recreating general channel when deleted while offline ([#2334](https://github.com/TryQuiet/quiet/issues/2334))
* Fix package.json license inconsistency
* Fixes issue with reconnecting to peers on resume on iOS ([#2424](https://github.com/TryQuiet/quiet/issues/2424))
* Reorder the closing of services, prevent sagas running multiple times and close backend server properly

[2.1.2]

Expand Down
11 changes: 11 additions & 0 deletions packages/backend/ipfs-pubsub-peer-monitor.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
--- node_modules/ipfs-pubsub-peer-monitor/src/ipfs-pubsub-peer-monitor.js 2024-05-08 12:44:48
+++ node_modules/ipfs-pubsub-peer-monitor/src/ipfs-pubsub-peer-monitor.backup.js 2024-05-08 12:44:25
@@ -55,7 +55,7 @@
async _pollPeers () {
try {
const peers = await this._pubsub.peers(this._topic)
- IpfsPubsubPeerMonitor._emitJoinsAndLeaves(new Set(this._peers), new Set(peers), this)
+ IpfsPubsubPeerMonitor._emitJoinsAndLeaves(new Set(this._peers.map(p => p.toString())), new Set(peers.map(p => p.toString())), this)
this._peers = peers
} catch (err) {
clearInterval(this._interval)
2 changes: 1 addition & 1 deletion packages/backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"build": "tsc -p tsconfig.build.json",
"webpack": "webpack --env mode=development && cp ./lib/bundle.cjs ../backend-bundle/bundle.cjs",
"webpack:prod": "webpack --env mode=production && cp ./lib/bundle.cjs ../backend-bundle/bundle.cjs",
"applyPatches": "patch -f -p0 < ./electron-fetch.patch || true && patch -f -p0 --forward --binary < ./parse-duration.patch || true && patch -f -p0 --forward --binary < ./parse-duration-esm.patch || true",
"applyPatches": "patch -f -p0 < ./electron-fetch.patch || true && patch -f -p0 --forward --binary < ./parse-duration.patch || true && patch -f -p0 --forward --binary < ./parse-duration-esm.patch || true && patch -f -p0 < ./ipfs-pubsub-peer-monitor.patch || true",
"prepare": "npm run applyPatches && npm run webpack",
"version": "git add -A src",
"lint:no-fix": "eslint --ext .jsx,.js,.ts,.tsx ./src/",
Expand Down
8 changes: 4 additions & 4 deletions packages/backend/src/backendManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,12 @@ export const runBackendMobile = async () => {
{ logger: ['warn', 'error', 'log', 'debug', 'verbose'] }
)

rn_bridge.channel.on('close', async () => {
rn_bridge.channel.on('close', () => {
const connectionsManager = app.get<ConnectionsManagerService>(ConnectionsManagerService)
await connectionsManager.pause()
connectionsManager.pause()
})

rn_bridge.channel.on('open', async (msg: OpenServices) => {
rn_bridge.channel.on('open', (msg: OpenServices) => {
const connectionsManager = app.get<ConnectionsManagerService>(ConnectionsManagerService)
const torControl = app.get<TorControl>(TorControl)
const proxyAgent = app.get<{ proxy: { port: string } }>(SOCKS_PROXY_AGENT)
Expand All @@ -123,7 +123,7 @@ export const runBackendMobile = async () => {
torControl.torControlParams.auth.value = msg.authCookie
proxyAgent.proxy.port = msg.httpTunnelPort

await connectionsManager.resume()
connectionsManager.resume()
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,46 +224,44 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
}

public async closeAllServices(options: { saveTor: boolean } = { saveTor: false }) {
this.logger('Closing services')

await this.closeSocket()

if (this.tor && !options.saveTor) {
this.logger('Killing tor')
await this.tor.kill()
} else if (options.saveTor) {
this.logger('Saving tor')
}
if (this.storageService) {
this.logger('Stopping orbitdb')
this.logger('Stopping OrbitDB')
await this.storageService?.stopOrbitDb()
}
if (this.serverIoProvider?.io) {
this.logger('Closing socket server')
this.serverIoProvider.io.close()
}
if (this.localDbService) {
this.logger('Closing local storage')
await this.localDbService.close()
}
if (this.libp2pService) {
this.logger('Stopping libp2p')
await this.libp2pService.close()
}
if (this.localDbService) {
this.logger('Closing local DB')
await this.localDbService.close()
}
}

public closeSocket() {
this.serverIoProvider.io.close()
public async closeSocket() {
await this.socketService.close()
}

public async pause() {
this.logger('Pausing!')
this.logger('Closing socket!')
this.closeSocket()
await this.closeSocket()
this.logger('Pausing libp2pService!')
this.peerInfo = await this.libp2pService?.pause()
this.logger('Found the following peer info on pause: ', this.peerInfo)
}

public async resume() {
this.logger('Resuming!')
this.logger('Reopening socket!')
await this.openSocket()
this.logger('Attempting to redial peers!')
if (this.peerInfo && (this.peerInfo?.connected.length !== 0 || this.peerInfo?.dialed.length !== 0)) {
Expand All @@ -289,21 +287,14 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
public async leaveCommunity(): Promise<boolean> {
this.logger('Running leaveCommunity')

this.logger('Resetting tor')
this.tor.resetHiddenServices()

this.logger('Closing the socket')
this.closeSocket()

this.logger('Purging local DB')
await this.localDbService.purge()

this.logger('Closing services')
await this.closeAllServices({ saveTor: true })

this.logger('Purging data')
await this.purgeData()

this.logger('Resetting Tor')
this.tor.resetHiddenServices()

this.logger('Resetting state')
await this.resetState()

Expand Down
2 changes: 0 additions & 2 deletions packages/backend/src/nest/libp2p/libp2p.module.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import { Module } from '@nestjs/common'
import { SocketModule } from '../socket/socket.module'
import { Libp2pService } from './libp2p.service'
import { ProcessInChunksService } from './process-in-chunks.service'

@Module({
imports: [SocketModule],
providers: [Libp2pService, ProcessInChunksService],
exports: [Libp2pService],
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export class RegistrationService extends EventEmitter implements OnModuleInit {
// Get the next event.
const event = this.registrationEvents.shift()
if (event) {
this.logger('Processing registration event', event)
this.logger('Processing registration event')
// Event processing in progress
this.registrationEventInProgress = true

Expand Down
81 changes: 69 additions & 12 deletions packages/backend/src/nest/socket/socket.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ import { CONFIG_OPTIONS, SERVER_IO_PROVIDER } from '../const'
import { ConfigOptions, ServerIoProviderTypes } from '../types'
import { suspendableSocketEvents } from './suspendable.events'
import Logger from '../common/logger'
import type net from 'node:net'

@Injectable()
export class SocketService extends EventEmitter implements OnModuleInit {
private readonly logger = Logger(SocketService.name)

public resolveReadyness: (value: void | PromiseLike<void>) => void
public readyness: Promise<void>
private sockets: Set<net.Socket>

constructor(
@Inject(SERVER_IO_PROVIDER) public readonly serverIoProvider: ServerIoProviderTypes,
Expand All @@ -44,14 +46,15 @@ export class SocketService extends EventEmitter implements OnModuleInit {
this.readyness = new Promise<void>(resolve => {
this.resolveReadyness = resolve
})

this.sockets = new Set<net.Socket>()

this.attachListeners()
}

async onModuleInit() {
this.logger('init: Started')

this.attachListeners()
await this.init()

this.logger('init: Finished')
}

Expand All @@ -71,7 +74,9 @@ export class SocketService extends EventEmitter implements OnModuleInit {
this.logger('init: Frontend connected')
}

private readonly attachListeners = (): void => {
private readonly attachListeners = () => {
this.logger('Attaching listeners')

// Attach listeners here
this.serverIoProvider.io.on(SocketActionTypes.CONNECTION, socket => {
this.logger('Socket connection')
Expand Down Expand Up @@ -173,7 +178,7 @@ export class SocketService extends EventEmitter implements OnModuleInit {
}
)

socket.on(SocketActionTypes.LEAVE_COMMUNITY, async (callback: (closed: boolean) => void) => {
socket.on(SocketActionTypes.LEAVE_COMMUNITY, (callback: (closed: boolean) => void) => {
this.logger('Leaving community')
this.emit(SocketActionTypes.LEAVE_COMMUNITY, callback)
})
Expand All @@ -195,25 +200,77 @@ export class SocketService extends EventEmitter implements OnModuleInit {
this.emit(SocketActionTypes.LOAD_MIGRATION_DATA, data)
})
})

// Ensure the underlying connections get closed. See:
// https://github.com/socketio/socket.io/issues/1602
this.serverIoProvider.server.on('connection', conn => {
this.sockets.add(conn)
conn.on('close', () => {
this.sockets.delete(conn)
})
})
}

public listen = async (port = this.configOptions.socketIOPort): Promise<void> => {
return await new Promise(resolve => {
if (this.serverIoProvider.server.listening) resolve()
public getConnections = (): Promise<number> => {
return new Promise(resolve => {
this.serverIoProvider.server.getConnections((err, count) => {
if (err) throw new Error(err.message)
resolve(count)
})
})
}

// Ensure the underlying connections get closed. See:
// https://github.com/socketio/socket.io/issues/1602
//
// I also tried `this.serverIoProvider.io.disconnectSockets(true)`
// which didn't work for me, but we still call it.
public closeSockets = () => {
this.logger('Disconnecting sockets')
this.serverIoProvider.io.disconnectSockets(true)
this.sockets.forEach(s => s.destroy())
}

public listen = async (): Promise<void> => {
this.logger(`Opening data server on port ${this.configOptions.socketIOPort}`)

if (this.serverIoProvider.server.listening) {
this.logger('Failed to listen. Server already listening.')
return
}

const numConnections = await this.getConnections()

if (numConnections > 0) {
this.logger('Failed to listen. Connections still open:', numConnections)
return
}

return new Promise(resolve => {
this.serverIoProvider.server.listen(this.configOptions.socketIOPort, '127.0.0.1', () => {
this.logger(`Data server running on port ${this.configOptions.socketIOPort}`)
resolve()
})
})
}

public close = async (): Promise<void> => {
this.logger(`Closing data server on port ${this.configOptions.socketIOPort}`)
return await new Promise(resolve => {
this.serverIoProvider.server.close(err => {
public close = (): Promise<void> => {
return new Promise(resolve => {
this.logger(`Closing data server on port ${this.configOptions.socketIOPort}`)

if (!this.serverIoProvider.server.listening) {
this.logger('Data server is not running.')
resolve()
return
}

this.serverIoProvider.io.close(err => {
if (err) throw new Error(err.message)
this.logger('Data server closed')
resolve()
})

this.closeSockets()
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export class CertificatesRequestsStore extends EventEmitter {
write: ['*'],
},
})
await this.store.load()

this.store.events.on('write', async (_address, entry) => {
this.logger('Added CSR to database')
Expand All @@ -40,8 +41,6 @@ export class CertificatesRequestsStore extends EventEmitter {
this.loadedCertificateRequests()
})

// @ts-ignore
await this.store.load({ fetchEntryTimeout: 15000 })
this.logger('Initialized')
}

Expand All @@ -52,9 +51,9 @@ export class CertificatesRequestsStore extends EventEmitter {
}

public async close() {
this.logger('Closing...')
this.logger('Closing certificate requests DB')
await this.store?.close()
this.logger('Closed')
this.logger('Closed certificate requests DB')
}

public getAddress() {
Expand Down Expand Up @@ -91,8 +90,6 @@ export class CertificatesRequestsStore extends EventEmitter {

public async getCsrs() {
const filteredCsrsMap: Map<string, string> = new Map()
// @ts-expect-error - OrbitDB's type declaration of `load` lacks 'options'
await this.store.load({ fetchEntryTimeout: 15000 })
const allEntries = this.store
.iterator({ limit: -1 })
.collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ export class CertificatesStore extends EventEmitter {
write: ['*'],
},
})
await this.store.load()

this.store.events.on('ready', async () => {
this.logger('Loaded certificates to memory')
Expand All @@ -59,8 +58,7 @@ export class CertificatesStore extends EventEmitter {
await this.loadedCertificates()
})

// // @ts-expect-error - OrbitDB's type declaration of `load` lacks 'options'
// await this.store.load({ fetchEntryTimeout: 15000 })
await this.store.load()

this.logger('Initialized')
}
Expand All @@ -72,7 +70,9 @@ export class CertificatesStore extends EventEmitter {
}

public async close() {
this.logger('Closing certificates DB')
await this.store?.close()
this.logger('Closed certificates DB')
}

public getAddress() {
Expand Down Expand Up @@ -154,8 +154,6 @@ export class CertificatesStore extends EventEmitter {
return []
}

// @ts-expect-error - OrbitDB's type declaration of `load` lacks 'options'
await this.store.load({ fetchEntryTimeout: 15000 })
const allCertificates = this.store
.iterator({ limit: -1 })
.collect()
Expand Down
Loading

0 comments on commit 134fbcb

Please sign in to comment.