Skip to content

Commit

Permalink
feat!: add websocket server option
Browse files Browse the repository at this point in the history
BREAKING CHANGE: the subscription server export is now an abstract class
  • Loading branch information
jquense committed Oct 5, 2021
1 parent 030b3df commit 6ef63e8
Show file tree
Hide file tree
Showing 17 changed files with 800 additions and 196 deletions.
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1 +1,36 @@
# GraphQL Subscription Server

A subscription server for GraphQL subscriptions. Supports streaming over plain web sockets
or Socket.IO, and integrates with Redis or any other Pub/Sub service.

## Setup

### Socket.IO

```js
import http from 'http';
import {
SocketIOSubscriptionServer, // or WebSocketSubscriptionServer
JwtCredentialManager,
RedisSubscriber,
} from '@4c/graphql-subscription-server';

const server = http.createServer();

const subscriptionServer = new SocketIOSubscriptionServer({
schema,
path: '/socket.io/graphql',
subscriber: new RedisSubscriber(),
hasPermission: (message, credentials) => {
authorize(message, credentials);
},
createCredentialsManager: (req) => new JwtCredentialManager(),
createLogger: () => console.debug,
});

subscriptionServer.attach(server);

server.listen(4000, () => {
console.log('server running');
});
```
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
"tdd": "jest --watch",
"test": "yarn lint && yarn typecheck && jest",
"testonly": "jest",
"typecheck": "tsc --noEmit && tsc -p test --noEmit"
"typecheck": "tsc --noEmit && tsc -p test --noEmit",
"update-schema": "NODE_ENV=test babel-node ./update-schema.js"
},
"gitHooks": {
"pre-commit": "lint-staged"
Expand Down Expand Up @@ -47,7 +48,7 @@
"express": "^4.17.1",
"graphql-ws": "^4.3.2",
"redis": "^3.1.2",
"ws": "^7.4.4"
"ws": "^7.4.5"
},
"peerDependencies": {
"graphql": ">=0.12.3",
Expand Down
21 changes: 10 additions & 11 deletions src/AuthorizedSocketConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import {
validate,
} from 'graphql';
import { ExecutionResult } from 'graphql/execution/execute';
import io from 'socket.io';

import * as AsyncUtils from './AsyncUtils';
import { CredentialsManager } from './CredentialsManager';
import { CreateLogger, Logger } from './Logger';
import { Subscriber } from './Subscriber';
import SubscriptionContext from './SubscriptionContext';
import { WebSocket } from './types';

export type CreateValidationRules = ({
query,
Expand Down Expand Up @@ -62,7 +62,7 @@ const acknowledge = (cb?: () => void) => {
* - Rudimentary connection constraints (max connections)
*/
export default class AuthorizedSocketConnection<TContext, TCredentials> {
socket: io.Socket;
socket: WebSocket;

config: AuthorizedSocketOptions<TContext, TCredentials>;

Expand All @@ -76,7 +76,7 @@ export default class AuthorizedSocketConnection<TContext, TCredentials> {
readonly clientId: string;

constructor(
socket: io.Socket,
socket: WebSocket,
config: AuthorizedSocketOptions<TContext, TCredentials>,
) {
this.socket = socket;
Expand All @@ -85,14 +85,13 @@ export default class AuthorizedSocketConnection<TContext, TCredentials> {
this.log = config.createLogger('AuthorizedSocket');
this.subscriptionContexts = new Map();

this.clientId = this.socket.id;
this.clientId = this.socket.id!;

this.socket
.on('authenticate', this.handleAuthenticate)
.on('subscribe', this.handleSubscribe)
.on('unsubscribe', this.handleUnsubscribe)
.on('connect', this.handleConnect)
.on('disconnect', this.handleDisconnect);
this.socket.on('authenticate', this.handleAuthenticate);
this.socket.on('subscribe', this.handleSubscribe);
this.socket.on('unsubscribe', this.handleUnsubscribe);
this.socket.on('connect', this.handleConnect);
this.socket.on('disconnect', this.handleDisconnect);
}

emitError(error: { code: string; data?: any }) {
Expand Down Expand Up @@ -125,7 +124,7 @@ export default class AuthorizedSocketConnection<TContext, TCredentials> {
});

await this.config.credentialsManager.authenticate(authorization);
} catch (error) {
} catch (error: any) {
this.log('error', error.message, { error, clientId: this.clientId });
this.emitError({ code: 'invalid_authorization' });
}
Expand Down
110 changes: 0 additions & 110 deletions src/GraphqlSocketSubscriptionServer.ts

This file was deleted.

84 changes: 84 additions & 0 deletions src/SocketIOSubscriptionServer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import { promisify } from 'util';

import express from 'express';
import type io from 'socket.io';

import SubscriptionServer, {
SubscriptionServerConfig,
} from './SubscriptionServer';

export interface SocketIOSubscriptionServerConfig<TContext, TCredentials>
extends SubscriptionServerConfig<TContext, TCredentials> {
socketIoServer?: io.Server;
}

export default class SocketIOSubscriptionServer<
TContext,
TCredentials,
> extends SubscriptionServer<TContext, TCredentials> {
io: io.Server;

constructor({
socketIoServer,
...config
}: SocketIOSubscriptionServerConfig<TContext, TCredentials>) {
super(config);

this.io = socketIoServer!;
if (!this.io) {
// eslint-disable-next-line global-require, @typescript-eslint/no-var-requires
const IoServer = require('socket.io').Server;
this.io = new IoServer({
serveClient: false,
path: this.config.path,
transports: ['websocket'],
allowEIO3: true,
});
}

this.io.on('connection', (socket: io.Socket) => {
const clientId = socket.id;

const request = Object.create((express as any).request);
Object.assign(request, socket.request);

this.log('debug', 'SubscriptionServer: new socket connection', {
clientId,
numClients: this.io.engine?.clientsCount ?? 0,
});

this.opened(
{
id: clientId,
protocol: 'socket-io',
on: socket.on.bind(socket),
emit(event: string, data: any) {
socket.emit(event, data);
},
close() {
socket.disconnect();
},
},
request,
);

// add after so the logs happen in order
socket.once('disconnect', (reason) => {
this.log('debug', 'socket disconnected', {
reason,
clientId,
numClients: (this.io.engine.clientsCount ?? 0) - 1, // number hasn't decremented at this point for this client
});
});
});
}

attach(httpServer: any) {
this.io.attach(httpServer);
}

async close() {
// @ts-ignore
await promisify((...args) => this.io.close(...args))();
}
}
Loading

0 comments on commit 6ef63e8

Please sign in to comment.