Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a cleanup mechanism for old db entries #104

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 29 additions & 9 deletions flottform/forms/src/flottform-channel-host.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import {

export class FlottformChannelHost extends EventEmitter<FlottformEventMap> {
private flottformApi: string | URL;
private baseApi: string;
private endpointId: string = '';
private hostKey: string = '';
private createClientUrl: (params: { endpointId: string }) => Promise<string>;
private rtcConfiguration: RTCConfiguration;
private pollTimeForIceInMs: number;
Expand All @@ -35,6 +38,11 @@ export class FlottformChannelHost extends EventEmitter<FlottformEventMap> {
}) {
super();
this.flottformApi = flottformApi;
this.baseApi = (
this.flottformApi instanceof URL ? this.flottformApi : new URL(this.flottformApi)
)
.toString()
.replace(/\/$/, '');
this.createClientUrl = createClientUrl;
this.rtcConfiguration = DEFAULT_WEBRTC_CONFIG;
this.pollTimeForIceInMs = pollTimeForIceInMs;
Expand All @@ -55,14 +63,9 @@ export class FlottformChannelHost extends EventEmitter<FlottformEventMap> {
if (this.openPeerConnection) {
this.close();
}
const baseApi = (
this.flottformApi instanceof URL ? this.flottformApi : new URL(this.flottformApi)
)
.toString()
.replace(/\/$/, '');

try {
this.rtcConfiguration.iceServers = await this.fetchIceServers(baseApi);
this.rtcConfiguration.iceServers = await this.fetchIceServers(this.baseApi);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing this.baseApi shouldn't be necessary if we use this.baseApi in the function instead of a parameter

} catch (error) {
// Use the default configuration as a fallback
this.logger.error(error);
Expand All @@ -74,11 +77,13 @@ export class FlottformChannelHost extends EventEmitter<FlottformEventMap> {
const session = await this.openPeerConnection.createOffer();
await this.openPeerConnection.setLocalDescription(session);

const { endpointId, hostKey } = await this.createEndpoint(baseApi, session);
const { endpointId, hostKey } = await this.createEndpoint(this.baseApi, session);
this.hostKey = hostKey;
this.endpointId = endpointId;
this.logger.log('Created endpoint', { endpointId, hostKey });

const getEndpointInfoUrl = `${baseApi}/${endpointId}`;
const putHostInfoUrl = `${baseApi}/${endpointId}/host`;
const getEndpointInfoUrl = `${this.baseApi}/${endpointId}`;
const putHostInfoUrl = `${this.baseApi}/${endpointId}/host`;

const hostIceCandidates = new Set<RTCIceCandidateInit>();
await this.putHostInfo(putHostInfoUrl, hostKey, hostIceCandidates, session);
Expand All @@ -103,6 +108,8 @@ export class FlottformChannelHost extends EventEmitter<FlottformEventMap> {
this.openPeerConnection = null;
}
this.changeState('disconnected');
// Cleanup old entries.
this.deleteEndpoint(this.baseApi, this.endpointId, this.hostKey);
};

private setupDataChannelListener = () => {
Expand Down Expand Up @@ -223,6 +230,19 @@ export class FlottformChannelHost extends EventEmitter<FlottformEventMap> {
return response.json();
};

private deleteEndpoint = async (baseApi: string, endpointId: string, hostKey: string) => {
const response = await fetch(`${baseApi}/${endpointId}`, {
method: 'DELETE',
headers: {
Accept: 'application/json',
'Content-Type': 'application/json'
},
body: JSON.stringify({ hostKey })
});

return response.json();
};

private fetchIceServers = async (baseApi: string) => {
const response = await fetch(`${baseApi}/ice-server-credentials`, {
method: 'GET',
Expand Down
72 changes: 70 additions & 2 deletions flottform/server/src/database.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { describe, expect, it } from 'vitest';
import { beforeEach, afterEach, describe, expect, it, vi } from 'vitest';
import { createFlottformDatabase } from './database';

describe('Flottform database', () => {
Expand Down Expand Up @@ -168,9 +168,77 @@ describe('Flottform database', () => {
session: answer,
iceCandidates: []
})
).rejects.toThrow(/peerkey/i);
).rejects.toThrow(
/clientKey is wrong: Another peer is already connected and you cannot change this info without the correct key anymore. If you lost your key, initiate a new Flottform connection./i
);
Comment on lines +171 to +173
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since peerkey isn't used in this error message but I don't see a change to the error itself, was/is this test not working? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that test was not working since as you've said the string 'peerkey' is not used anymore and is replaced by this string: 'clientKey is wrong: Another peer is already connected and you cannot change this info without the correct key anymore. If you lost your key, initiate a new Flottform connection.'
--> I fixed it in this commit: 481b54b

const infoAfter = await db.getEndpoint({ endpointId });
expect(infoBefore).toStrictEqual(infoAfter);
});
});

describe('startCleanup()', () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});

it('Should clean up stale entries after entryTTL', async () => {
const db = await createFlottformDatabase({
cleanupPeriod: 1000,
entryTimeToLive: 500
});
const conn = new RTCPeerConnection();
const offer = await conn.createOffer();
const { endpointId } = await db.createEndpoint({ session: offer });

const connPeer = new RTCPeerConnection();
await connPeer.setRemoteDescription(offer);
const answer = await connPeer.createAnswer();
const clientKey = 'random-key';

await db.putClientInfo({
endpointId,
clientKey,
session: answer,
iceCandidates: [],
lastUpdate: Date.now()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lastUpdate should be taken care of internally only - no need to set it here

});

// Sleep for enough time to trigger the first cleanup
vi.advanceTimersByTime(1100);

// The endpoint should be cleaned by now
expect(async () => await db.getEndpoint({ endpointId })).rejects.toThrow(/endpoint/i);
});

it("Shouldn't clean up entries before entryTTL is expired", async () => {
const db = await createFlottformDatabase({
cleanupPeriod: 1000,
entryTimeToLive: 500
});
const conn = new RTCPeerConnection();
const offer = await conn.createOffer();
const { endpointId } = await db.createEndpoint({ session: offer });

const connPeer = new RTCPeerConnection();
await connPeer.setRemoteDescription(offer);
const answer = await connPeer.createAnswer();
const clientKey = 'random-key';

await db.putClientInfo({
endpointId,
clientKey,
session: answer,
iceCandidates: [],
lastUpdate: Date.now()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lastUpdate should be taken care of internally only - no need to set it here

});

// The endpoint shouldn't be cleaned by now
const retrievedInfo = await db.getEndpoint({ endpointId });
expect(retrievedInfo).toBeDefined();
expect(retrievedInfo?.hostInfo.session).toStrictEqual(offer);
});
});
});
78 changes: 70 additions & 8 deletions flottform/server/src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@ type EndpointInfo = {
session: RTCSessionDescriptionInit;
iceCandidates: RTCIceCandidateInit[];
};
lastUpdate: number;
};
type SafeEndpointInfo = Omit<EndpointInfo, 'hostKey' | 'clientKey'>;

const DEFAULT_CLEANUP_PERIOD = 30 * 60 * 1000;
const DEFAULT_ENTRY_TIME_TO_LIVE_IN_MS = 25 * 60 * 1000;

function createRandomHostKey(): string {
return crypto.randomUUID();
}
Expand All @@ -25,8 +29,52 @@ function createRandomEndpointId(): string {

class FlottformDatabase {
private map = new Map<EndpointId, EndpointInfo>();
private cleanupTimeoutId: NodeJS.Timeout | null = null;
private cleanupPeriod: number;
private entryTimeToLive: number;

constructor({
cleanupPeriod = DEFAULT_CLEANUP_PERIOD,
entryTimeToLive = DEFAULT_ENTRY_TIME_TO_LIVE_IN_MS
}: {
cleanupPeriod?: number;
entryTimeToLive?: number;
} = {}) {
this.cleanupPeriod = cleanupPeriod;
this.entryTimeToLive = entryTimeToLive;
this.startCleanup();
}

constructor() {}
private startCleanup() {
this.cleanupTimeoutId = setTimeout(this.cleanupFn.bind(this), this.cleanupPeriod);
}

private cleanupFn() {
if (this.map && this.map.size !== 0) {
const now = Date.now();
// Loop over all entries and delete the stale ones.
for (const [endpointId, endpointInfo] of this.map) {
const lastUpdated = endpointInfo.lastUpdate;
if (now - lastUpdated > this.entryTimeToLive) {
this.map.delete(endpointId);
}
}
}
this.cleanupTimeoutId = setTimeout(this.startCleanup.bind(this), this.cleanupPeriod);
}

private stopCleanup() {
// Clear the interval to stop cleanup
if (this.cleanupTimeoutId) {
clearTimeout(this.cleanupTimeoutId);
this.cleanupTimeoutId = null;
}
}

// Stop the cleanup when the database is no longer needed
destroy() {
this.stopCleanup();
}

async createEndpoint({ session }: { session: RTCSessionDescriptionInit }): Promise<EndpointInfo> {
const entry = {
Expand All @@ -35,7 +83,8 @@ class FlottformDatabase {
hostInfo: {
session,
iceCandidates: []
}
},
lastUpdate: Date.now()
};
this.map.set(entry.endpointId, entry);
return entry;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is exposing lastUpdate, see comment below

Expand All @@ -46,6 +95,7 @@ class FlottformDatabase {
if (!entry) {
throw Error('Endpoint not found');
}
entry.lastUpdate = Date.now();
const { hostKey: _ignore1, clientKey: _ignore2, ...endpoint } = entry;

return endpoint;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lastUpdate should not be exposed to the client as it should only be used internally for cleanup

Expand All @@ -55,12 +105,14 @@ class FlottformDatabase {
endpointId,
hostKey,
session,
iceCandidates
iceCandidates,
lastUpdate = Date.now()
}: {
endpointId: EndpointId;
hostKey: HostKey;
session: RTCSessionDescriptionInit;
iceCandidates: RTCIceCandidateInit[];
lastUpdate?: number;
}): Promise<SafeEndpointInfo> {
const existingSession = this.map.get(endpointId);
if (!existingSession) {
Expand All @@ -72,7 +124,8 @@ class FlottformDatabase {

const newInfo = {
...existingSession,
hostInfo: { ...existingSession.hostInfo, session, iceCandidates }
hostInfo: { ...existingSession.hostInfo, session, iceCandidates },
lastUpdate
};
this.map.set(endpointId, newInfo);

Expand All @@ -85,12 +138,14 @@ class FlottformDatabase {
endpointId,
clientKey,
session,
iceCandidates
iceCandidates,
lastUpdate = Date.now()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be done in the function directly where it's necessary instead of passing an argument (and potentially overwriting it)

}: {
endpointId: EndpointId;
clientKey: ClientKey;
session: RTCSessionDescriptionInit;
iceCandidates: RTCIceCandidateInit[];
lastUpdate?: number;
}): Promise<Required<SafeEndpointInfo>> {
const existingSession = this.map.get(endpointId);
if (!existingSession) {
Expand All @@ -105,7 +160,8 @@ class FlottformDatabase {
const newInfo = {
...existingSession,
clientKey,
clientInfo: { session, iceCandidates }
clientInfo: { session, iceCandidates },
lastUpdate
};
this.map.set(endpointId, newInfo);

Expand All @@ -130,8 +186,14 @@ class FlottformDatabase {
}
}

export async function createFlottformDatabase(): Promise<FlottformDatabase> {
return new FlottformDatabase();
export async function createFlottformDatabase({
cleanupPeriod = DEFAULT_CLEANUP_PERIOD,
entryTimeToLive = DEFAULT_ENTRY_TIME_TO_LIVE_IN_MS
}: {
cleanupPeriod?: number;
entryTimeToLive?: number;
} = {}): Promise<FlottformDatabase> {
return new FlottformDatabase({ cleanupPeriod, entryTimeToLive });
}

export type { FlottformDatabase };