Skip to content

Commit

Permalink
(Maybe) fix the ping issue and mild refactor for my sanity
Browse files Browse the repository at this point in the history
  • Loading branch information
islathehut committed Sep 23, 2024
1 parent 91f2e97 commit 3be6240
Show file tree
Hide file tree
Showing 13 changed files with 593 additions and 564 deletions.
958 changes: 417 additions & 541 deletions packages/backend/package-lock.json

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion packages/backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -118,24 +118,27 @@
"get-port": "^5.1.1",
"helia": "4.0.2",
"@helia/unixfs": "3.0.1",
"@helia/block-brokers": "2.0.2",
"@helia/block-brokers": "2.0.3",
"http-server": "^0.12.3",
"https-proxy-agent": "^7.0.5",
"image-size": "^1.0.1",
"@ipld/dag-cbor": "^9.2.1",
"@ipld/dag-pb": "^4.1.2",
"it-drain": "^3.0.7",
"it-pipe": "^3.0.1",
"it-ws": "^6.1.5",
"joi": "^17.8.1",
"level": "^8.0.1",
"libp2p": "^1.7.0",
"@libp2p/crypto": "^3.0.4",
"@libp2p/echo": "^1.1.1",
"@libp2p/identify": "^2.1.5",
"@libp2p/interface": "^1.7.0",
"@libp2p/kad-dht": "^12.1.5",
"@libp2p/mplex": "^10.1.5",
"@libp2p/peer-id": "^4.2.4",
"@libp2p/peer-id-factory": "^4.2.4",
"@libp2p/ping": "1.1.6",
"@libp2p/pnet": "1.0.0-3c8dd5bbf",
"@libp2p/utils": "^5.4.9",
"@libp2p/websockets": "^8.2.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import { ServiceState, TorInitState } from './connections-manager.types'
import { DateTime } from 'luxon'
import { createLogger } from '../common/logger'
import { createFromJSON } from '@libp2p/peer-id-factory'
import { PeerId } from '@libp2p/interface'

@Injectable()
export class ConnectionsManagerService extends EventEmitter implements OnModuleInit {
Expand Down Expand Up @@ -599,7 +600,7 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI

const onionAddress = await this.spawnTorHiddenService(community.id, network)

const peerId = await createFromJSON(network.peerId)
const peerId: PeerId = await createFromJSON(network.peerId)

const peers = community.peerList
this.logger.info(`Launching community ${community.id}: payload peers: ${peers}`)
Expand Down
34 changes: 24 additions & 10 deletions packages/backend/src/nest/libp2p/libp2p.service.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,38 @@
import { gossipsub } from '@chainsafe/libp2p-gossipsub'
import { noise } from '@chainsafe/libp2p-noise'
import { kadDHT } from '@libp2p/kad-dht'
import { mplex } from '@libp2p/mplex'
import { yamux } from '@chainsafe/libp2p-yamux'
import { type Libp2p } from '@libp2p/interface'

import { echo } from '@libp2p/echo'
import { identify, identifyPush } from '@libp2p/identify'
import { kadDHT } from '@libp2p/kad-dht'
import { CodeError, ERR_INVALID_MESSAGE, ERR_TIMEOUT, PeerId, type Libp2p } from '@libp2p/interface'
import { preSharedKey } from '@libp2p/pnet'
import { peerIdFromString } from '@libp2p/peer-id'
import { identify, identifyPush } from '@libp2p/identify'
import { ping } from '@libp2p/ping'
import * as filters from '@libp2p/websockets/filters'
import { createLibp2p } from 'libp2p'

import { multiaddr } from '@multiformats/multiaddr'
import { Inject, Injectable } from '@nestjs/common'
import { createLibp2pAddress, createLibp2pListenAddress } from '@quiet/common'
import { ConnectionProcessInfo, type NetworkDataPayload, PeerId, SocketActionTypes, type UserData } from '@quiet/types'
import { getUsersAddresses } from '../common/utils'

import crypto from 'crypto'
import { EventEmitter } from 'events'
import { Agent } from 'https'
import { createLibp2p } from 'libp2p'
import { DateTime } from 'luxon'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { pipe } from 'it-pipe'

import { createLibp2pAddress, createLibp2pListenAddress } from '@quiet/common'
import { ConnectionProcessInfo, type NetworkDataPayload, SocketActionTypes, type UserData } from '@quiet/types'

import { getUsersAddresses } from '../common/utils'
import { SERVER_IO_PROVIDER, SOCKS_PROXY_AGENT } from '../const'
import { ServerIoProviderTypes } from '../types'
import { webSockets } from '../websocketOverTor'
import { Libp2pConnectedPeer, Libp2pEvents, Libp2pNodeParams } from './libp2p.types'
import { createLogger } from '../common/logger'
import * as filters from '@libp2p/websockets/filters'
import { bitswap } from '@helia/block-brokers'

const KEY_LENGTH = 32
export const LIBP2P_PSK_METADATA = '/key/swarm/psk/1.0.0/\n/base16/\n'
Expand Down Expand Up @@ -215,6 +223,10 @@ export class Libp2pService extends EventEmitter {
},
peerId: params.peerId,
addresses: { listen: params.listenAddresses },
connectionMonitor: {
abortConnectionOnPingFailure: false,
pingInterval: 60_000,
},
connectionProtector: preSharedKey({ psk: params.psk }),
streamMuxers: [yamux()],
connectionEncryption: [noise()],
Expand All @@ -232,18 +244,20 @@ export class Libp2pService extends EventEmitter {
dht: kadDHT({
allowQueryWithZeroPeers: true,
}),
echo: echo(),
pubsub: gossipsub({
// neccessary to run a single peer
allowPublishToZeroTopicPeers: true,
fallbackToFloodsub: true,
// emitSelf: false,
doPX: true,
}),
identify: identify(),
identifyPush: identifyPush(),
},
})
} catch (err) {
this.logger.error('Create libp2p:', err)
this.logger.error('Error while creating instance of libp2p', err)
throw err
}
this.libp2pInstance = libp2p
Expand Down
3 changes: 2 additions & 1 deletion packages/backend/src/nest/libp2p/libp2p.types.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { PeerId } from '@libp2p/interface'
import { Agent } from 'http'

export enum Libp2pEvents {
Expand All @@ -7,7 +8,7 @@ export enum Libp2pEvents {
}

export interface Libp2pNodeParams {
peerId: any
peerId: PeerId
listenAddresses: string[]
agent: Agent
localAddress: string
Expand Down
126 changes: 126 additions & 0 deletions packages/backend/src/nest/libp2p/ping.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// import { randomBytes } from '@libp2p/crypto';
// import { CodeError, ERR_INVALID_MESSAGE, ERR_TIMEOUT } from '@libp2p/interface';
// import first from 'it-first';
// import { pipe } from 'it-pipe';
// import { Components } from 'libp2p/dist/src/components.js';
// import { equals as uint8ArrayEquals } from 'uint8arrays/equals';
// import { PROTOCOL_PREFIX, PROTOCOL_NAME, PING_LENGTH, PROTOCOL_VERSION, TIMEOUT, MAX_INBOUND_STREAMS, MAX_OUTBOUND_STREAMS, ERR_WRONG_PING_ACK } from './constants.js';
// export class PingService {
// protocol;
// components;
// started;
// timeout;
// maxInboundStreams;
// maxOutboundStreams;
// runOnTransientConnection;
// log;
// constructor(components: Components, init = {}) {
// this.components = components;
// this.log = components.logger.forComponent('libp2p:ping');
// this.started = false;
// this.protocol = `/${init.protocolPrefix ?? PROTOCOL_PREFIX}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}`;
// this.timeout = init.timeout ?? TIMEOUT;
// this.maxInboundStreams = init.maxInboundStreams ?? MAX_INBOUND_STREAMS;
// this.maxOutboundStreams = init.maxOutboundStreams ?? MAX_OUTBOUND_STREAMS;
// this.runOnTransientConnection = init.runOnTransientConnection ?? true;
// this.handleMessage = this.handleMessage.bind(this);
// }
// [Symbol.toStringTag] = '@libp2p/ping';
// async start() {
// await this.components.registrar.handle(this.protocol, this.handleMessage, {
// maxInboundStreams: this.maxInboundStreams,
// maxOutboundStreams: this.maxOutboundStreams,
// runOnTransientConnection: this.runOnTransientConnection
// });
// this.started = true;
// }
// async stop() {
// await this.components.registrar.unhandle(this.protocol);
// this.started = false;
// }
// isStarted() {
// return this.started;
// }
// /**
// * A handler to register with Libp2p to process ping messages
// */
// handleMessage(data) {
// this.log('incoming ping from %p', data.connection.remotePeer);
// const { stream } = data;
// const start = Date.now();
// const signal = AbortSignal.timeout(this.timeout);
// signal.addEventListener('abort', () => {
// stream?.abort(new CodeError('ping timeout', ERR_TIMEOUT));
// });
// void pipe(stream, async function* (source) {
// let received = 0;
// for await (const buf of source) {
// received += buf.byteLength;
// if (received > PING_LENGTH) {
// stream?.abort(new CodeError('Too much data received', ERR_INVALID_MESSAGE));
// return;
// }
// yield buf;
// }
// }, stream)
// .catch(err => {
// this.log.error('incoming ping from %p failed with error', data.connection.remotePeer, err);
// stream?.abort(err);
// })
// .finally(() => {
// const ms = Date.now() - start;
// this.log('incoming ping from %p complete in %dms', data.connection.remotePeer, ms);
// });
// }
// /**
// * Ping a given peer and wait for its response, getting the operation latency.
// */
// async ping(peer, options = {}) {
// this.log('pinging %p', peer);
// const start = Date.now();
// const data = randomBytes(PING_LENGTH);
// const connection = await this.components.connectionManager.openConnection(peer, options);
// let stream;
// let onAbort = () => { };
// if (options.signal == null) {
// const signal = AbortSignal.timeout(this.timeout);
// options = {
// ...options,
// signal
// };
// }
// try {
// stream = await connection.newStream(this.protocol, {
// ...options,
// runOnTransientConnection: this.runOnTransientConnection
// });
// onAbort = () => {
// stream?.abort(new CodeError('ping timeout', ERR_TIMEOUT));
// };
// // make stream abortable
// options.signal?.addEventListener('abort', onAbort, { once: true });
// const result = await pipe([data], stream, async (source) => first(source));
// const ms = Date.now() - start;
// if (result == null) {
// throw new CodeError(`Did not receive a ping ack after ${ms}ms`, ERR_WRONG_PING_ACK);
// }
// if (!uint8ArrayEquals(data, result.subarray())) {
// throw new CodeError(`Received wrong ping ack after ${ms}ms`, ERR_WRONG_PING_ACK);
// }
// this.log('ping %p complete in %dms', connection.remotePeer, ms);
// return ms;
// }
// catch (err) {
// this.log.error('error while pinging %p', connection.remotePeer, err);
// stream?.abort(err);
// throw err;
// }
// finally {
// options.signal?.removeEventListener('abort', onAbort);
// if (stream != null) {
// await stream.close();
// }
// }
// }
// }
// //# sourceMappingURL=ping.js.map
12 changes: 10 additions & 2 deletions packages/backend/src/nest/socket/socket.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,10 @@ export class SocketService extends EventEmitter implements OnModuleInit {
public getConnections = (): Promise<number> => {
return new Promise(resolve => {
this.serverIoProvider.server.getConnections((err, count) => {
if (err) throw new Error(err.message)
if (err) {
this.logger.error(`Error occurred while getting connection`, err)
throw new Error(`Error occurred while getting connection: ${err.message}`)
}
resolve(count)
})
})
Expand Down Expand Up @@ -267,7 +270,12 @@ export class SocketService extends EventEmitter implements OnModuleInit {
}

this.serverIoProvider.io.close(err => {
if (err) throw new Error(err.message)
if (err) {
this.logger.error(`Error occurred while closing data server on port ${this.configOptions.socketIOPort}`, err)
throw new Error(
`Error occurred while closing data server on port ${this.configOptions.socketIOPort}: ${err.message}`
)
}
this.logger.info('Data server closed')
resolve()
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export class CertificatesRequestsStore extends EventStoreBase<string> {
await parsedCsr.verify()
await this.validateCsrFormat(csr)
} catch (err) {
this.logger.error('Failed to validate user CSR:', csr, err?.message)
this.logger.error('Failed to validate user CSR:', csr, err)
return false
}
return true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ export class CertificatesStore extends EventStoreBase<string> {
await this.validateCertificateAuthority(certificate)
await this.validateCertificateFormat(certificate)
} catch (err) {
this.logger.error('Failed to validate user certificate:', certificate, err?.message)
this.logger.error('Failed to validate user certificate:', certificate, err)
return false
}
return true
Expand Down
2 changes: 1 addition & 1 deletion packages/backend/src/nest/tor/tor-control.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export class TorControl {

this.connection.once('error', err => {
this.disconnect()
reject(new Error(`Connection via Tor control failed: ${err.message}`))
reject(new Error(`Connection via Tor control failed: ${err}`))
})

this.connection.once('data', (data: any) => {
Expand Down
4 changes: 2 additions & 2 deletions packages/backend/src/nest/tor/tor.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ export class Tor extends EventEmitter implements OnModuleInit {
try {
process.kill(Number(id.trim()))
} catch (e) {
this.logger.error(`Tried killing hanging tor process with id ${id}. Failed. Reason: ${e.message}`)
this.logger.error(`Tried killing hanging tor process with id ${id}. Failed`, e)
}
}
}
Expand Down Expand Up @@ -273,7 +273,7 @@ export class Tor extends EventEmitter implements OnModuleInit {
})

this.process.on('error', err => {
this.logger.error(`Tor process. Error occurred: ${err.message}`)
this.logger.error(`Tor process. Error occurred`, err)
})

this.process.stdout.on('data', (data: any) => {
Expand Down
4 changes: 2 additions & 2 deletions packages/common/src/invitationCode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ const parseDeepUrl = ({ url, expectedProtocol = `${DEEP_URL_SCHEME}:` }: ParseDe
try {
validUrl = new URL(_url)
} catch (e) {
logger.error(`Could not retrieve invitation code from deep url '${url}'. Reason: ${e.message}`)
logger.error(`Could not retrieve invitation code from deep url '${url}'`, e)
throw e
}
if (!validUrl || validUrl.protocol !== expectedProtocol) {
Expand Down Expand Up @@ -277,7 +277,7 @@ const isParamValid = (param: string, value: string) => {
try {
new URL(value)
} catch (e) {
logger.error(e.message)
logger.error(`Error while URL encoding ${value}`, e)
return false
}
return true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export function* sendFileMessageSaga(
filePath = decodeURIComponent(filePath.startsWith(fileProtocol) ? filePath.slice(fileProtocol.length) : filePath)
tmpPath = tmpPath ? decodeURIComponent(tmpPath.slice(fileProtocol.length)) : undefined
} catch (e) {
logger.error(`Can't send file with path ${filePath}, Details: ${e.message}`)
logger.error(`Can't send file with path ${filePath}`, e)
return
}

Expand Down

0 comments on commit 3be6240

Please sign in to comment.