Skip to content

Commit

Permalink
tunnel server: ssh connections gauge
Browse files Browse the repository at this point in the history
  • Loading branch information
Roy Razon committed Nov 12, 2023
1 parent 9326b37 commit b61ad55
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 78 deletions.
3 changes: 2 additions & 1 deletion tunnel-server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { activeTunnelStoreKey, inMemoryActiveTunnelStore } from './src/tunnel-st
import { getSSHKeys } from './src/ssh-keys'
import { proxy } from './src/proxy'
import { appLoggerFromEnv } from './src/logging'
import { tunnelsGauge, runMetricsServer } from './src/metrics'
import { tunnelsGauge, runMetricsServer, sshConnectionsGauge } from './src/metrics'
import { numberFromEnv, requiredEnv } from './src/env'
import { editUrl } from './src/url'
import { cookieSessionStore } from './src/session'
Expand Down Expand Up @@ -83,6 +83,7 @@ const sshServer = createSshServer({
rootUrl: BASE_URL.toString(),
},
tunnelsGauge,
sshConnectionsGauge,
tunnelUrl: (clientId, remotePath) => tunnelUrl(BASE_URL, clientId, remotePath),
})
.listen(SSH_PORT, LISTEN_HOST, () => {
Expand Down
6 changes: 6 additions & 0 deletions tunnel-server/src/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import fastify from 'fastify'
import { Gauge, Counter, register } from 'prom-client'

export const sshConnectionsGauge = new Gauge({
name: 'sshConnections',
help: 'Current number of open SSH connections',
labelNames: ['envId'],
})

export const tunnelsGauge = new Gauge({
name: 'tunnels',
help: 'Current number of open tunnels',
Expand Down
125 changes: 64 additions & 61 deletions tunnel-server/src/multimap.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,91 +4,94 @@ import { MultiMap, multimap } from './multimap'
describe('multimap', () => {
type ObjType = { x: number }
let a: MultiMap<string, ObjType>
const expectedValues = [{ x: 12 }, { x: 13 }] as const
beforeEach(() => {
a = multimap()
a.add('foo', expectedValues[0])
a.add('foo', expectedValues[1])
})

describe('when the key does not exist', () => {
it('returns undefined', () => {
expect(multimap().get('bar')).toBeUndefined()
})
})

describe('when the key exists', () => {
let values: readonly ObjType[] | undefined
describe('after adding a value with length 2', () => {
const expectedValues = [{ x: 12 }, { x: 13 }] as const
beforeEach(() => {
values = a.get('foo')
})
it('returns the values', () => {
expect(values).toBeDefined()
expect(values).toHaveLength(2)
expect(values).toContain(expectedValues[0])
expect(values).toContain(expectedValues[1])
a = multimap()
a.add('foo', expectedValues[0])
a.add('foo', expectedValues[1])
})

describe('when the returned array is mutated', () => {
beforeEach(() => {
(values as ObjType[]).push({ x: 14 })
})
it('does not affect the multimap', () => {
expect(a.get('foo')).toHaveLength(2)
describe('when getting a key which does not exist', () => {
it('returns undefined', () => {
expect(multimap().get('bar')).toBeUndefined()
})
})

describe('when delete is called with a predicate that returns false for everything', () => {
let deleteReturn: boolean
describe('when getting a key which exists', () => {
let values: readonly ObjType[] | undefined
beforeEach(() => {
deleteReturn = a.delete('foo', () => false)
values = a.get('foo')
})

it('returns false', () => {
expect(deleteReturn).toBe(false)
})

it('does not delete the values', () => {
it('returns the values', () => {
expect(values).toBeDefined()
expect(values).toHaveLength(2)
expect(values).toContain(expectedValues[0])
expect(values).toContain(expectedValues[1])
})
})

describe('when delete is called with a predicate that returns true for everything', () => {
let deleteReturn: boolean
beforeEach(() => {
deleteReturn = a.delete('foo', () => true)
values = a.get('foo')
describe('when the returned array is mutated', () => {
beforeEach(() => {
(values as ObjType[]).push({ x: 14 })
})
it('does not affect the multimap', () => {
expect(a.get('foo')).toHaveLength(2)
})
})

it('returns true', () => {
expect(deleteReturn).toBe(true)
})
describe('when delete is called with a predicate that returns false for everything', () => {
let deleteReturn: boolean
beforeEach(() => {
deleteReturn = a.delete('foo', () => false)
values = a.get('foo')
})

it('deletes the values', () => {
expect(values).toBeUndefined()
})
})
it('returns false', () => {
expect(deleteReturn).toBe(false)
})

describe('when delete is called with a predicate that returns true for a specific value', () => {
let deleteReturn: boolean
beforeEach(() => {
deleteReturn = a.delete('foo', ({ x }) => x === expectedValues[0].x)
values = a.get('foo')
it('does not delete the values', () => {
expect(values).toBeDefined()
expect(values).toHaveLength(2)
expect(values).toContain(expectedValues[0])
expect(values).toContain(expectedValues[1])
})
})

it('returns true', () => {
expect(deleteReturn).toBe(true)
describe('when delete is called with a predicate that returns true for everything', () => {
let deleteReturn: boolean
beforeEach(() => {
deleteReturn = a.delete('foo', () => true)
values = a.get('foo')
})

it('returns true', () => {
expect(deleteReturn).toBe(true)
})

it('deletes the values', () => {
expect(values).toBeUndefined()
})
})

it('deletes the specific value', () => {
expect(values).toBeDefined()
expect(values).toHaveLength(1)
expect(values).not.toContain(expectedValues[0])
expect(values).toContain(expectedValues[1])
describe('when delete is called with a predicate that returns true for a specific value', () => {
let deleteReturn: boolean
beforeEach(() => {
deleteReturn = a.delete('foo', ({ x }) => x === expectedValues[0].x)
values = a.get('foo')
})

it('returns true', () => {
expect(deleteReturn).toBe(true)
})

it('deletes the specific value', () => {
expect(values).toBeDefined()
expect(values).toHaveLength(1)
expect(values).not.toContain(expectedValues[0])
expect(values).toContain(expectedValues[1])
})
})
})
})
Expand Down
39 changes: 23 additions & 16 deletions tunnel-server/src/ssh/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Logger } from 'pino'
import { inspect } from 'util'
import { Gauge } from 'prom-client'
import lodash from 'lodash'
import { BaseSshClient, baseSshServer } from './base-server'
import { ActiveTunnelStore, activeTunnelStoreKey } from '../tunnel-store'
import { KeyAlreadyExistsError } from '../memory-store'
Expand All @@ -14,6 +15,7 @@ export const createSshServer = ({
tunnelUrl,
helloBaseResponse,
tunnelsGauge,
sshConnectionsGauge,
}: {
log: Logger
sshPrivateKey: string
Expand All @@ -22,10 +24,12 @@ export const createSshServer = ({
tunnelUrl: (clientId: string, remotePath: string) => string
helloBaseResponse: Record<string, unknown>
tunnelsGauge: Pick<Gauge, 'inc' | 'dec'>
sshConnectionsGauge: Pick<Gauge, 'inc' | 'dec'>
}) => {
const storeKeyToClient = new Map<string, BaseSshClient>()
const onClient = (client: BaseSshClient) => {
const { clientId, publicKey, envId, connectionId, publicKeyThumbprint, log } = client
sshConnectionsGauge.inc({ envId })
const cleanupClient = lodash.once(() => { sshConnectionsGauge.inc({ envId }) })
const tunnels = new Map<string, string>()
client
.on('forward', async (requestId, { path: tunnelPath, access, meta, inject }, localSocketPath, accept, reject) => {
Expand Down Expand Up @@ -67,35 +71,38 @@ export const createSshServer = ({

const setResult = await set().catch(err => { reject(err) })
if (!setResult) {
return undefined
return
}
const { tx: setTx } = setResult
const forward = await accept().catch(async e => {
log.warn('error accepting forward %j: %j', requestId, inspect(e))
await activeTunnelStore.delete(key, setTx)
})
if (!forward) {
return undefined
return
}
storeKeyToClient.set(key, client)
tunnels.set(requestId, tunnelUrl(clientId, tunnelPath))
const onForwardClose = (event: 'close' | 'error') => (err?: Error) => {
if (err) {
log.info('%s: deleting tunnel %s due to forward server error: %j', event, key, inspect(err))
} else {
log.info('%s: deleting tunnel %s', event, key)
}
const cleanupTunnel = lodash.once(() => {
tunnels.delete(requestId)
storeKeyToClient.delete(key)
void activeTunnelStore.delete(key, setTx)
tunnelsGauge.dec({ clientId })
}
forward.on('close', onForwardClose('close'))
forward.on('error', onForwardClose('error'))
})
forward
.on('close', () => {
log.info('forward close: deleting tunnel %s', key)
cleanupTunnel()
})
.on('error', err => {
log.info('forward error: deleting tunnel %s due to forward server error: %j', key, inspect(err))
cleanupTunnel()
})
tunnelsGauge.inc({ clientId })
return undefined
})
.on('error', err => { log.warn('client error %j: %j', clientId, inspect(err)) })
.on('end', () => { cleanupClient() })
.on('error', err => {
log.warn('client error %j: %j', clientId, inspect(err))
cleanupClient()
})
.on('exec', (command, respondWithJson, reject) => {
if (command === 'hello') {
respondWithJson({
Expand Down

0 comments on commit b61ad55

Please sign in to comment.