Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an optional filterFn to PubSubAsyncIterator to workaround performance issues found in withFilter #601

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/pubsub-async-iterator.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { PubSubEngine } from 'graphql-subscriptions';
import { FilterFn } from './with-filter';

/**
* A class for digesting PubSubEngine events via the new AsyncIterator interface.
Expand Down Expand Up @@ -31,9 +32,10 @@ import { PubSubEngine } from 'graphql-subscriptions';
*/
export class PubSubAsyncIterator<T> implements AsyncIterableIterator<T> {

constructor(pubsub: PubSubEngine, eventNames: string | string[], options?: unknown) {
constructor(pubsub: PubSubEngine, eventNames: string | string[], options?: unknown, filterFn?: FilterFn) {
this.pubsub = pubsub;
this.options = options;
this.filterFn = filterFn;
this.pullQueue = [];
this.pushQueue = [];
this.listening = true;
Expand Down Expand Up @@ -66,9 +68,14 @@ export class PubSubAsyncIterator<T> implements AsyncIterableIterator<T> {
private listening: boolean;
private pubsub: PubSubEngine;
private options: unknown;
private filterFn: FilterFn | undefined;

private async pushValue(event) {
await this.subscribeAll();
if (this.filterFn) {
const filterResult = await this.filterFn(event, 0);
if (!filterResult) return;
}
if (this.pullQueue.length !== 0) {
this.pullQueue.shift()({ value: event, done: false });
} else {
Expand Down
5 changes: 3 additions & 2 deletions src/redis-pubsub.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {Cluster, Redis, RedisOptions} from 'ioredis';
import {PubSubEngine} from 'graphql-subscriptions';
import {PubSubAsyncIterator} from './pubsub-async-iterator';
import { FilterFn } from './with-filter';

type RedisClient = Redis | Cluster;
type OnMessage<T> = (message: T) => void;
Expand Down Expand Up @@ -139,8 +140,8 @@ export class RedisPubSub implements PubSubEngine {
delete this.subscriptionMap[subId];
}

public asyncIterator<T>(triggers: string | string[], options?: unknown): AsyncIterator<T> {
return new PubSubAsyncIterator<T>(this, triggers, options);
public asyncIterator<T>(triggers: string | string[], options?: unknown, filterFn?: FilterFn): AsyncIterator<T> {
return new PubSubAsyncIterator<T>(this, triggers, options, filterFn);
}

public getSubscriber(): RedisClient {
Expand Down
20 changes: 20 additions & 0 deletions src/test/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,26 @@ describe('PubSubAsyncIterator', () => {
pubSub.publish(eventName, { test: true });
});

it('should only publish filtered events', done => {
const pubSub = new RedisPubSub(mockOptions);
const eventName = 'test';
const iterator = pubSub.asyncIterator(eventName, undefined, (eventData) => eventData.filtered === false);

iterator.next().then(result => {
// tslint:disable-next-line:no-unused-expression
expect(result).to.exist;
// tslint:disable-next-line:no-unused-expression
expect(result.value).to.exist;
// tslint:disable-next-line:no-unused-expression
expect(result.done).to.exist;
expect(result.value.filtered).to.equal(false);
done();
});

pubSub.publish(eventName, { filtered: true });
pubSub.publish(eventName, { filtered: false });
});

it('should not trigger event on asyncIterator when publishing other event', async () => {
const pubSub = new RedisPubSub(mockOptions);
const eventName = 'test2';
Expand Down
5 changes: 5 additions & 0 deletions src/with-filter.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
export type FilterFn = (rootValue?: any, args?: any, context?: any, info?: any) => boolean;

/**
* Wraps an async-iterator and filters incoming events based on the provided filter function.
* Note: Due to promise chaining this function can use a large amount of memory when a high percentage of messages are filtered
* If using the PubSubAsyncIterator, use the filterFn property directly
*/
export const withFilter = (asyncIteratorFn: () => AsyncIterableIterator<any>, filterFn: FilterFn) => {
return (rootValue: any, args: any, context: any, info: any): AsyncIterator<any> => {
const asyncIterator = asyncIteratorFn();
Expand Down