diff --git a/README.md b/README.md index 448ca5fa2..ccc7f7c3c 100644 --- a/README.md +++ b/README.md @@ -428,6 +428,8 @@ The arguments are: CONNACK is received - `username`: the username required by your broker, if any - `password`: the password required by your broker, if any + - `socksProxy`: establish TCP and TLS connections via a socks proxy (URL, supported protocols are `socks5://`, `socks5h://`, `socks4://`, `socks4a://`) + - `socksTimeout`: timeout for connecting to the socks proxy - `incomingStore`: a [Store](#store) for the incoming packets - `outgoingStore`: a [Store](#store) for the outgoing packets - `queueQoSZero`: if connection is broken, queue outgoing QoS zero messages (default `true`) @@ -485,6 +487,8 @@ The arguments are: - `forceNativeWebSocket`: set to true if you're having detection issues (i.e. the `ws does not work in the browser` exception) to force the use of native WebSocket. It is important to note that if set to true for the first client created, then all the clients will use native WebSocket. And conversely, if not set or set to false, all will use the detection result. - `unixSocket`: if you want to connect to a unix socket, set this to true +Instead of setting `socksProxy` you can also supple the same parameter via the environment variable `MQTTJS_SOCKS_PROXY`. + In case mqtts (mqtt over tls) is required, the `options` object is passed through to [`tls.connect()`](http://nodejs.org/api/tls.html#tls_tls_connect_options_callback). If using a **self-signed certificate**, set `rejectUnauthorized: false`. However, be cautious as this exposes you to potential man in the middle attacks and isn't recommended for production. For those supporting multiple TLS protocols on a single port, like MQTTS and MQTT over WSS, utilize the `ALPNProtocols` option. This lets you define the Application Layer Protocol Negotiation (ALPN) protocol. You can set `ALPNProtocols` as a string array, Buffer, or Uint8Array based on your setup. diff --git a/esbuild.js b/esbuild.js index f157201d7..019418a5f 100644 --- a/esbuild.js +++ b/esbuild.js @@ -44,6 +44,26 @@ const options = { ) } }, + { + name: 'resolve-socks', + setup(build) { + // socks is not supported in the browser and adds several 100kb to the build, so stub it + build.onResolve({ filter: /socks$/ }, args => { + return { + path: args.path, + namespace: 'socks-stub' + } + }) + + build.onLoad({ filter: /.*/, namespace: 'socks-stub' }, args => { + return { + contents: 'module.exports = {}', + loader: 'js' + } + } + ) + } + }, ], } diff --git a/package-lock.json b/package-lock.json index cfc159275..5fd0a5c69 100644 --- a/package-lock.json +++ b/package-lock.json @@ -22,6 +22,7 @@ "readable-stream": "^4.7.0", "reinterval": "^1.1.0", "rfdc": "^1.4.1", + "socks": "^2.8.3", "split2": "^4.2.0", "worker-timers": "^7.1.8", "ws": "^8.18.0" @@ -9931,7 +9932,6 @@ "version": "9.0.5", "resolved": "https://registry.npmjs.org/ip-address/-/ip-address-9.0.5.tgz", "integrity": "sha512-zHtQzGojZXTwZTHQqra+ETKd4Sn3vgi7uBmlPoXVWZqYvuKmtI0l/VZTjqGmJY9x88GGOaZ9+G9ES8hC4T4X8g==", - "dev": true, "dependencies": { "jsbn": "1.1.0", "sprintf-js": "^1.1.3" @@ -9943,8 +9943,7 @@ "node_modules/ip-address/node_modules/sprintf-js": { "version": "1.1.3", "resolved": "https://registry.npmjs.org/sprintf-js/-/sprintf-js-1.1.3.tgz", - "integrity": "sha512-Oo+0REFV59/rz3gfJNKQiBlwfHaSESl1pcGyABQsnnIfWOFt6JNj5gCog2U6MLZ//IGYD+nA8nI+mTShREReaA==", - "dev": true + "integrity": "sha512-Oo+0REFV59/rz3gfJNKQiBlwfHaSESl1pcGyABQsnnIfWOFt6JNj5gCog2U6MLZ//IGYD+nA8nI+mTShREReaA==" }, "node_modules/ip-regex": { "version": "4.3.0", @@ -10910,8 +10909,7 @@ "node_modules/jsbn": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/jsbn/-/jsbn-1.1.0.tgz", - "integrity": "sha512-4bYVV3aAMtDTTu4+xsDYa6sy9GyJ69/amsu9sYF2zqjiEoZA5xJi3BrfX3uY+/IekIu7MwdObdbDWpoZdBv3/A==", - "dev": true + "integrity": "sha512-4bYVV3aAMtDTTu4+xsDYa6sy9GyJ69/amsu9sYF2zqjiEoZA5xJi3BrfX3uY+/IekIu7MwdObdbDWpoZdBv3/A==" }, "node_modules/jsesc": { "version": "2.5.2", @@ -15791,7 +15789,6 @@ "version": "4.2.0", "resolved": "https://registry.npmjs.org/smart-buffer/-/smart-buffer-4.2.0.tgz", "integrity": "sha512-94hK0Hh8rPqQl2xXc3HsaBoOXKV20MToPkcXvwbISWLEs+64sBq5kFgn2kJDHb1Pry9yrP0dxrCI9RRci7RXKg==", - "dev": true, "engines": { "node": ">= 6.0.0", "npm": ">= 3.0.0" @@ -15897,7 +15894,6 @@ "version": "2.8.3", "resolved": "https://registry.npmjs.org/socks/-/socks-2.8.3.tgz", "integrity": "sha512-l5x7VUUWbjVFbafGLxPWkYsHIhEvmF85tbIeFZWc8ZPtoMyybuEhL7Jye/ooC4/d48FgOjSJXgsF/AJPYCW8Zw==", - "dev": true, "dependencies": { "ip-address": "^9.0.5", "smart-buffer": "^4.2.0" @@ -16202,10 +16198,11 @@ } }, "node_modules/stream-shift": { - "version": "1.0.1", - "resolved": "https://registry.npmjs.org/stream-shift/-/stream-shift-1.0.1.tgz", - "integrity": "sha512-AiisoFqQ0vbGcZgQPY1cdP2I76glaVA/RauYR4G4thNFgkTqr90yXTo4LYX60Jl+sIlPNHHdGSwo01AvbKUSVQ==", - "dev": true + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/stream-shift/-/stream-shift-1.0.3.tgz", + "integrity": "sha512-76ORR0DO1o1hlKwTbi/DM3EXWGf3ZJYO8cXX5RJwnul2DEg2oyoZyjLNoQM8WsvZiFKCRfC1O0J7iCvie3RZmQ==", + "dev": true, + "license": "MIT" }, "node_modules/stream-to-pull-stream": { "version": "1.7.3", @@ -25206,7 +25203,6 @@ "version": "9.0.5", "resolved": "https://registry.npmjs.org/ip-address/-/ip-address-9.0.5.tgz", "integrity": "sha512-zHtQzGojZXTwZTHQqra+ETKd4Sn3vgi7uBmlPoXVWZqYvuKmtI0l/VZTjqGmJY9x88GGOaZ9+G9ES8hC4T4X8g==", - "dev": true, "requires": { "jsbn": "1.1.0", "sprintf-js": "^1.1.3" @@ -25215,8 +25211,7 @@ "sprintf-js": { "version": "1.1.3", "resolved": "https://registry.npmjs.org/sprintf-js/-/sprintf-js-1.1.3.tgz", - "integrity": "sha512-Oo+0REFV59/rz3gfJNKQiBlwfHaSESl1pcGyABQsnnIfWOFt6JNj5gCog2U6MLZ//IGYD+nA8nI+mTShREReaA==", - "dev": true + "integrity": "sha512-Oo+0REFV59/rz3gfJNKQiBlwfHaSESl1pcGyABQsnnIfWOFt6JNj5gCog2U6MLZ//IGYD+nA8nI+mTShREReaA==" } } }, @@ -25855,8 +25850,7 @@ "jsbn": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/jsbn/-/jsbn-1.1.0.tgz", - "integrity": "sha512-4bYVV3aAMtDTTu4+xsDYa6sy9GyJ69/amsu9sYF2zqjiEoZA5xJi3BrfX3uY+/IekIu7MwdObdbDWpoZdBv3/A==", - "dev": true + "integrity": "sha512-4bYVV3aAMtDTTu4+xsDYa6sy9GyJ69/amsu9sYF2zqjiEoZA5xJi3BrfX3uY+/IekIu7MwdObdbDWpoZdBv3/A==" }, "jsesc": { "version": "2.5.2", @@ -29566,8 +29560,7 @@ "smart-buffer": { "version": "4.2.0", "resolved": "https://registry.npmjs.org/smart-buffer/-/smart-buffer-4.2.0.tgz", - "integrity": "sha512-94hK0Hh8rPqQl2xXc3HsaBoOXKV20MToPkcXvwbISWLEs+64sBq5kFgn2kJDHb1Pry9yrP0dxrCI9RRci7RXKg==", - "dev": true + "integrity": "sha512-94hK0Hh8rPqQl2xXc3HsaBoOXKV20MToPkcXvwbISWLEs+64sBq5kFgn2kJDHb1Pry9yrP0dxrCI9RRci7RXKg==" }, "snazzy": { "version": "9.0.0", @@ -29650,7 +29643,6 @@ "version": "2.8.3", "resolved": "https://registry.npmjs.org/socks/-/socks-2.8.3.tgz", "integrity": "sha512-l5x7VUUWbjVFbafGLxPWkYsHIhEvmF85tbIeFZWc8ZPtoMyybuEhL7Jye/ooC4/d48FgOjSJXgsF/AJPYCW8Zw==", - "dev": true, "requires": { "ip-address": "^9.0.5", "smart-buffer": "^4.2.0" @@ -29900,9 +29892,9 @@ "dev": true }, "stream-shift": { - "version": "1.0.1", - "resolved": "https://registry.npmjs.org/stream-shift/-/stream-shift-1.0.1.tgz", - "integrity": "sha512-AiisoFqQ0vbGcZgQPY1cdP2I76glaVA/RauYR4G4thNFgkTqr90yXTo4LYX60Jl+sIlPNHHdGSwo01AvbKUSVQ==", + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/stream-shift/-/stream-shift-1.0.3.tgz", + "integrity": "sha512-76ORR0DO1o1hlKwTbi/DM3EXWGf3ZJYO8cXX5RJwnul2DEg2oyoZyjLNoQM8WsvZiFKCRfC1O0J7iCvie3RZmQ==", "dev": true }, "stream-to-pull-stream": { diff --git a/package.json b/package.json index 41c6aec8f..95cdacb7f 100644 --- a/package.json +++ b/package.json @@ -124,6 +124,7 @@ "readable-stream": "^4.7.0", "reinterval": "^1.1.0", "rfdc": "^1.4.1", + "socks": "^2.8.3", "split2": "^4.2.0", "worker-timers": "^7.1.8", "ws": "^8.18.0" diff --git a/src/lib/client.ts b/src/lib/client.ts index 2c0f9623a..0ccbed34c 100644 --- a/src/lib/client.ts +++ b/src/lib/client.ts @@ -140,6 +140,10 @@ export interface IClientOptions extends ISecureClientOptions { query?: Record /** Auth string in the format : */ auth?: string + /** Optional SOCKS proxy to use for TCP / TLS connections , i.e. socks5://localhost:1333, socks4://localhost:1333, socks5h://localhost:1333 . Default is socks5h. */ + socksProxy?: string + /** Timeout for establishing a socks connection */ + socksTimeout?: number /** Custom ack handler */ customHandleAcks?: AckHandler /** Broker port */ diff --git a/src/lib/connect/index.ts b/src/lib/connect/index.ts index cda09ac2b..486ee7191 100644 --- a/src/lib/connect/index.ts +++ b/src/lib/connect/index.ts @@ -104,6 +104,15 @@ function connect( opts.clientId = opts.query.clientId } + if (isBrowser || opts.unixSocket) { + opts.socksProxy = undefined + } else if ( + opts.socksProxy === undefined && + typeof process !== 'undefined' + ) { + opts.socksProxy = process.env['MQTTJS_SOCKS_PROXY'] + } + if (opts.cert && opts.key) { if (opts.protocol) { if (['mqtts', 'wss', 'wxs', 'alis'].indexOf(opts.protocol) === -1) { diff --git a/src/lib/connect/socks.ts b/src/lib/connect/socks.ts new file mode 100644 index 000000000..21677a392 --- /dev/null +++ b/src/lib/connect/socks.ts @@ -0,0 +1,236 @@ +import _debug from 'debug' +import { Duplex } from 'stream' +import { SocksClient, SocksProxy } from 'socks' +import * as dns from 'dns' +import { SocksProxyType } from 'socks/typings/common/constants' +import { IStream } from '../shared' +import { promisify } from 'util' +import { Socket } from 'net' +import assert from 'assert' + +const debug = _debug('mqttjs:socks') + +export interface SocksConnectionOptions { + timeout?: number + lookup?: (hostname: string) => Promise<{ address: string }> +} + +class ProxyStream extends Duplex { + private _flowing = false + + private _socket?: Socket + + constructor() { + super({ autoDestroy: false }) + + this.cork() + } + + _start(socket: Socket): void { + debug('proxy stream started') + + assert(!this._socket) + + if (this.destroyed) { + socket.destroy(this.errored) + return + } + + this._socket = socket + + if (!this._flowing) socket.pause() + + socket.on('data', this._onData) + socket.on('end', this._onEnd) + socket.on('error', this._onError) + socket.on('close', this._onClose) + + socket.emit('connect') + + this.uncork() + } + + _write( + chunk: any, + encoding: BufferEncoding, + callback: (error?: Error | null) => void, + ): void { + assert(this._socket) + + this._socket.write(chunk, callback) + } + + _read(size: number): void { + this._flowing = true + + this._socket?.resume?.() + } + + _destroy( + error: Error | null, + callback: (error?: Error | null) => void, + ): void { + this._socket?.destroy?.(error) + + callback(error) + } + + private _onData = (chunk: any): void => { + assert(this._socket) + + this._flowing = this.push(chunk) + if (!this._flowing) this._socket.pause() + } + + private _onEnd = (): void => { + debug('proxy stream received EOF') + + this.push(null) + } + + private _onClose = (): void => { + debug('proxy stream closed') + + this.destroy() + } + + private _onError = (err: any): void => { + debug('proxy stream died with error %s', err) + + this.destroy(err) + } +} + +function fatal(e: T): T { + try { + if ((e as any).code === undefined) (e as any).code = 'SOCKS' + return e + } catch { + return e + } +} + +function typeFromProtocol( + proto: string, +): [SocksProxyType | undefined, boolean] { + switch (proto) { + case 'socks5h:': + return [5, true] + + case 'socks4a:': + return [4, true] + + case 'socks5:': + return [5, false] + + case 'socks4:': + return [4, false] + + default: + return [undefined, false] + } +} + +function parseSocksUrl(url: string): [SocksProxy, boolean] { + const parsedUrl = new URL(url) + + if (parsedUrl.pathname || parsedUrl.hash || parsedUrl.search) { + throw fatal(new Error('bad SOCKS URL')) + } + + const [type, resolveThroughProxy] = typeFromProtocol(parsedUrl.protocol) + if (!type) { + throw fatal(new Error('bad SOCKS URL: invalid protocol')) + } + + const port = parseInt(parsedUrl.port, 10) + if (Number.isNaN(port)) { + throw fatal(new Error('bad SOCKS URL: invalid port')) + } + + const proxy: SocksProxy = { + host: parsedUrl.hostname, + port, + type, + } + + return [proxy, resolveThroughProxy] +} + +async function connectSocks( + destinationHost: string, + destinationPort: number, + socksUrl: string, + stream: ProxyStream, + options: SocksConnectionOptions = {}, +): Promise { + const lookup = options.lookup ?? promisify(dns.lookup) + + const [proxy, resolveThroughProxy] = parseSocksUrl(socksUrl) + + if (!resolveThroughProxy) { + debug('resolving %s locally', destinationHost) + + destinationHost = ( + await lookup(destinationHost, { + family: proxy.type === 4 ? 4 : 0, + }) + ).address + } + + debug( + 'establishing SOCKS%d connection to %s:%d via %s:%d', + proxy.type, + destinationHost, + destinationPort, + proxy.host, + proxy.port, + ) + + const socksClient = new SocksClient({ + command: 'connect', + destination: { + host: destinationHost, + port: destinationPort, + }, + proxy: { ...proxy }, + timeout: options.timeout, + }) + socksClient.connect() + + socksClient.on('established', ({ socket }) => stream._start(socket)) + + socksClient.on('error', (e) => { + debug('SOCKS failed: %s', e) + stream.destroy(fatal(e)) + }) +} + +export default function openSocks( + destinationHost: string, + destinationPort: number, + socksUrl: string, + options?: SocksConnectionOptions, +): IStream { + debug( + 'SOCKS connection to %s:%d via %s', + destinationHost, + destinationPort, + socksUrl, + ) + + const stream = new ProxyStream() + + connectSocks( + destinationHost, + destinationPort, + socksUrl, + stream, + options, + ).catch((e) => { + debug('SOCKS failed: %s', e) + stream.destroy(e) + }) + + return stream +} diff --git a/src/lib/connect/tcp.ts b/src/lib/connect/tcp.ts index ef16daba3..40eb0daca 100644 --- a/src/lib/connect/tcp.ts +++ b/src/lib/connect/tcp.ts @@ -2,6 +2,7 @@ import { StreamBuilder } from '../shared' import net from 'net' import _debug from 'debug' +import openSocks from './socks' const debug = _debug('mqttjs:tcp') /* @@ -12,6 +13,12 @@ const buildStream: StreamBuilder = (client, opts) => { opts.port = opts.port || 1883 opts.hostname = opts.hostname || opts.host || 'localhost' + if (opts.socksProxy) { + return openSocks(opts.hostname, opts.port, opts.socksProxy, { + timeout: opts.socksTimeout, + }) + } + const { port, path } = opts const host = opts.hostname diff --git a/src/lib/connect/tls.ts b/src/lib/connect/tls.ts index 877f4bc2d..0a6cdbaaa 100644 --- a/src/lib/connect/tls.ts +++ b/src/lib/connect/tls.ts @@ -1,10 +1,27 @@ -import tls from 'tls' +import tls, { TLSSocket } from 'tls' import net from 'net' import _debug from 'debug' import { StreamBuilder } from '../shared' +import { IClientOptions } from '../client' +import openSocks from './socks' const debug = _debug('mqttjs:tls') +function connect(opts: IClientOptions): TLSSocket { + const { host, port, socksProxy, ...rest } = opts + + return tls.connect( + socksProxy + ? { + ...rest, + socket: openSocks(host, port, socksProxy, { + timeout: opts.socksTimeout, + }), + } + : opts, + ) +} + const buildStream: StreamBuilder = (client, opts) => { opts.port = opts.port || 8883 opts.host = opts.hostname || opts.host || 'localhost' @@ -24,7 +41,7 @@ const buildStream: StreamBuilder = (client, opts) => { opts.rejectUnauthorized, ) - const connection = tls.connect(opts) + const connection = connect(opts) connection.on('secureConnect', () => { if (opts.rejectUnauthorized && !connection.authorized) { connection.emit('error', new Error('TLS not authorized')) diff --git a/test/node/socks.ts b/test/node/socks.ts new file mode 100644 index 000000000..d77a0e59d --- /dev/null +++ b/test/node/socks.ts @@ -0,0 +1,528 @@ +import assert from 'assert' +import { AddressInfo, createServer, Server, Socket } from 'net' +import { describe, it, mock, afterEach, beforeEach } from 'node:test' +import openSocks from 'src/lib/connect/socks' + +const PORT = 6666 + +type State5 = 'new' | 'id' | 'connect' + +class MockServer5 { + readonly connect: Promise + + responseID = Buffer.from([0x05, 0x00]) + + responseREQUEST = Buffer.from([ + 0x05, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x12, 0x34, + ]) + + private server = createServer() + + private onConnect: (socket: Socket) => void + + private onError: (err: any) => void + + private socket?: Socket + + private state: State5 = 'new' + + private destination?: [string, number] + + constructor() { + this.connect = new Promise((resolve, reject) => { + this.onConnect = resolve + this.onError = reject + }) + } + + start(): Promise { + this.server.listen(undefined, 'localhost') + + this.server.on('connection', this.onConnection) + + return new Promise((r) => { + this.server.once('listening', () => r(this.port())) + }) + } + + port(): number { + return (this.server.address() as AddressInfo).port + } + + destroy() { + this.server.close() + this.socket?.end() + this.socket?.destroy() + } + + destinationAddress(): string | undefined { + return this.destination?.[0] + } + + destinationPort(): number | undefined { + return this.destination?.[1] + } + + private onConnection = (socket: Socket) => { + if (this.socket) { + socket.destroy() + return this.onError(new Error('double connect to SOCKS5 server')) + } + + this.socket = socket + + socket.on('data', this.onData) + } + + private onData = (chunk: Buffer) => { + switch (this.state) { + case 'new': { + const [ver, nmethods] = chunk + + if ( + ver !== 0x05 || + nmethods === 0 || + chunk.length !== nmethods + 2 + ) { + return this.onError(new Error('bad ID packet')) + } + + if (chunk.subarray(2, 2 + nmethods).indexOf(0x00) === -1) { + return this.onError(new Error('no supported METHOD')) + } + + this.socket?.write?.(this.responseID) + this.state = 'id' + + break + } + + case 'id': + this.destination = this.parseConnect(chunk) + + if (this.destination === undefined) { + return this.onError(new Error('bad REQUEST packet')) + } + + this.socket?.write(this.responseREQUEST) + + this.state = 'connect' + this.socket.off('data', this.onData) + this.onConnect(this.socket) + + break + } + } + + private parseConnect(buf: Buffer): [string, number] | undefined { + const [ver, cmd, rsv, atyp] = buf + + if (ver !== 0x05 || cmd !== 0x01 || rsv !== 0x00) return undefined + + const port = (buf[buf.length - 2] << 8) | buf[buf.length - 1] + + switch (atyp) { + case 0x01: + if (buf.length !== 10) return undefined + + return [buf.subarray(4, 8).join('.'), port] + + case 0x03: + if (buf.length !== 7 + buf[4]) return undefined + + return [buf.subarray(5, 5 + buf[4]).toString('ascii'), port] + + default: + return undefined + } + } +} + +describe('SOCKS layer', { timeout: 1000 }, () => { + let server5!: MockServer5 + let server4: Server | undefined + + beforeEach(() => { + server5 = new MockServer5() + }) + + afterEach(() => { + server5.destroy() + server4?.close() + }) + + it('should resolve hostnames locally for socks5', async () => { + const port = await server5.start() + + const lookup = mock.fn((_: string) => + Promise.resolve({ address: '1.2.3.4' }), + ) + + const stream = openSocks( + 'foo.bar', + 1883, + `socks5://localhost:${port}`, + { + lookup, + }, + ) + + await server5.connect + + stream.destroy() + + await new Promise((r) => { + stream.once('close', r) + }) + + assert.strictEqual(lookup.mock.callCount(), 1) + assert.strictEqual(lookup.mock.calls[0].arguments[0], 'foo.bar') + assert.strictEqual(server5.destinationAddress(), '1.2.3.4') + assert.strictEqual(server5.destinationPort(), 1883) + }) + + it('should resolve hostnames remotely for socks5h', async () => { + const port = await server5.start() + + const lookup = mock.fn((_: string) => + Promise.resolve({ address: '1.2.3.4' }), + ) + + const stream = openSocks( + 'foo.bar', + 1883, + `socks5h://localhost:${port}`, + { + lookup, + }, + ) + + await server5.connect + + stream.destroy() + + await new Promise((r) => { + stream.once('close', r) + }) + + assert.strictEqual(lookup.mock.callCount(), 0) + assert.strictEqual(server5.destinationAddress(), 'foo.bar') + assert.strictEqual(server5.destinationPort(), 1883) + }) + + it('errors during name resolution should be emitted on stream', async () => { + const ERROR = new Error() + + const lookup = mock.fn((address) => Promise.reject(ERROR)) + + const stream = openSocks('foo.bar', 1883, 'socks5://localhost:6666', { + lookup, + }) + + const error = await new Promise((r) => { + stream.once('error', r) + }) + + assert.strictEqual(error, ERROR) + }) + + it('errors during SOCKS connect should be emitted on stream', async () => { + const port = await server5.start() + server5.responseID = Buffer.from([0x00, 0x00]) + + const lookup = mock.fn((_: string) => + Promise.resolve({ address: '1.2.3.4' }), + ) + + const stream = openSocks( + 'foo.bar', + 1883, + `socks5://localhost:${port}`, + { + lookup, + }, + ) + + const err = await new Promise((r) => { + stream.once('error', r) + }) + + stream.destroy() + + assert(err instanceof Error) + }) + + it('data flows through the stream after SOCKS has connected', async () => { + const port = await server5.start() + + const lookup = mock.fn((_: string) => + Promise.resolve({ address: '1.2.3.4' }), + ) + + const stream = openSocks( + 'foo.bar', + 1883, + `socks5://localhost:${port}`, + { + lookup, + }, + ) + + const socket = await server5.connect + + socket.once('data', (chunk) => socket.write(`${chunk.toString()} pong`)) + + const response = await new Promise((resolve, reject) => { + stream.once('error', (err) => { + reject(err) + }) + + stream.once('data', (chunk) => { + resolve(chunk.toString()) + }) + + stream.write('ping') + }) + + server5.destroy() + stream.destroy() + + assert.strictEqual(response, 'ping pong') + }) + + it('data written to the stream is buffered until SOCKS has connected', async () => { + const port = await server5.start() + + let startNameResolution!: () => undefined + const resolutionPromise = new Promise((r) => { + startNameResolution = r as () => undefined + }) + + const lookup = mock.fn((_: string) => + resolutionPromise.then(() => ({ + address: '1.2.3.4', + })), + ) + + const stream = openSocks( + 'foo.bar', + 1883, + `socks5://localhost:${port}`, + { + lookup, + }, + ) + + stream.write('ping') + startNameResolution() + + const socket = await server5.connect + + socket.once('data', (chunk) => socket.write(`${chunk.toString()} pong`)) + + const response = await new Promise((resolve, reject) => { + stream.once('error', (err) => { + reject(err) + }) + + stream.once('data', (chunk) => { + resolve(chunk.toString()) + }) + }) + + server5.destroy() + stream.destroy() + + assert.strictEqual(response, 'ping pong') + }) + + it('closing the stream closes the connection', async () => { + const port = await server5.start() + + const lookup = mock.fn((_: string) => + Promise.resolve({ address: '1.2.3.4' }), + ) + + const stream = openSocks( + 'foo.bar', + 1883, + `socks5://localhost:${port}`, + { + lookup, + }, + ) + + const socket = await server5.connect + + stream.destroy() + + await new Promise((r) => { + socket.once('close', r) + }) + }) + + it('closing the connection closes the stream', async () => { + const port = await server5.start() + + const lookup = mock.fn((_: string) => + Promise.resolve({ address: '1.2.3.4' }), + ) + + const stream = openSocks( + 'foo.bar', + 1883, + `socks5://localhost:${port}`, + { + lookup, + }, + ) + + const socket = await server5.connect + socket.destroy() + + await new Promise((r) => { + stream.once('close', r) + }) + }) + + it('resetting the connection errors the stream', async () => { + const port = await server5.start() + + const lookup = mock.fn((_: string) => + Promise.resolve({ address: '1.2.3.4' }), + ) + + const stream = openSocks( + 'foo.bar', + 1883, + `socks5://localhost:${port}`, + { + lookup, + }, + ) + + const socket = await server5.connect + socket.resetAndDestroy() + + const error = await new Promise((r) => { + stream.once('error', r) + }) + + assert(error instanceof Error) + }) + + it('an invalid protocol errors the stream', async () => { + const port = await server5.start() + + const lookup = mock.fn((_: string) => + Promise.resolve({ address: '1.2.3.4' }), + ) + + const stream = openSocks('foo.bar', 1883, `socks://localhost:${port}`, { + lookup, + }) + + const error = await new Promise((r) => { + stream.once('error', r) + }) + + assert(error instanceof Error) + }) + + it('an invalid URL errors the stream', async () => { + const port = await server5.start() + + const lookup = mock.fn((_: string) => + Promise.resolve({ address: '1.2.3.4' }), + ) + + const stream = openSocks('foo.bar', 1883, `socks:localhost:${port}`, { + lookup, + }) + + const error = await new Promise((r) => { + stream.once('error', r) + }) + + assert(error instanceof Error) + }) + + it('should resolve hostnames locally for socks4', async () => { + let onConnect!: (socket: Socket) => void + const connect = new Promise((r) => { + onConnect = mock.fn((socket: Socket) => { + socket.destroy() + r(socket) + }) + }) + + server4 = await new Promise((resolve, reject) => { + const server = createServer(onConnect) + + server.on('listening', () => resolve(server)) + server.on('error', reject) + + server.listen() + }) + + const lookup = mock.fn((_: string) => + Promise.resolve({ address: '1.2.3.4' }), + ) + + const stream = openSocks( + 'foo.bar', + 1883, + `socks4://localhost:${(server4.address() as AddressInfo).port}`, + { + lookup, + }, + ) + + const socket = await connect + + socket.destroy() + stream.destroy() + + assert.strictEqual(lookup.mock.callCount(), 1) + assert.strictEqual(lookup.mock.calls[0].arguments[0], 'foo.bar') + }) + + it('should resolve hostnames remotely for socks4a', async () => { + let onConnect!: (socket: Socket) => void + const connect = new Promise((r) => { + onConnect = mock.fn((socket: Socket) => { + socket.destroy() + r(socket) + }) + }) + + server4 = await new Promise((resolve, reject) => { + const server = createServer(onConnect) + + server.on('listening', () => resolve(server)) + server.on('error', reject) + + server.listen() + }) + + const lookup = mock.fn((_: string) => + Promise.resolve({ address: '1.2.3.4' }), + ) + + const stream = openSocks( + 'foo.bar', + 1883, + `socks4a://localhost:${(server4.address() as AddressInfo).port}`, + { + lookup, + }, + ) + + const socket = await connect + + socket.destroy() + stream.destroy() + + assert.strictEqual(lookup.mock.callCount(), 0) + }) +})