Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(windowWith): rename window to windowWith #5335

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs_app/content/guide/operators.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ These are Observable creation operators that also have join functionality -- emi
- [`scan`](/api/operators/scan)
- [`switchMap`](/api/operators/switchMap)
- [`switchMapTo`](/api/operators/switchMapTo)
- [`window`](/api/operators/window)
- [`windowWith`](/api/operators/windowWith)
- [`windowCount`](/api/operators/windowCount)
- [`windowTime`](/api/operators/windowTime)
- [`windowToggle`](/api/operators/windowToggle)
Expand Down
2 changes: 1 addition & 1 deletion docs_app/tools/decision-tree-generator/src/tree.yml
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@
- label: buffer
- label: and emit the group as a nested Observable
children:
- label: window
- label: windowWith
- label: based on the emissions of an Observable created on-demand
children:
- label: and emit the group as an array
Expand Down
255 changes: 255 additions & 0 deletions spec/operators/windowWith-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { windowWith, mergeMap } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { EMPTY, of, Observable } from 'rxjs';

declare const type: Function;
declare const asDiagram: Function;

declare const rxTestScheduler: TestScheduler;

/** @test {window} */
describe('windowWith operator', () => {
asDiagram('windowWith')('should emit windows that close and reopen', () => {
const source = hot('---a---b---c---d---e---f---g---h---i---| ');
const sourceSubs = '^ ! ';
const closings = hot('-------------w------------w----------------|');
const closingSubs = '^ ! ';
const expected = 'x------------y------------z------------| ';
const x = cold( '---a---b---c-| ');
const y = cold( '--d---e---f--| ');
const z = cold( '-g---h---i---| ');
const expectedValues = { x: x, y: y, z: z };

const result = source.pipe(windowWith(closings));

expectObservable(result).toBe(expected, expectedValues);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
expectSubscriptions(closings.subscriptions).toBe(closingSubs);
});

it('should return a single empty window if source is empty and closings are basic', () => {
const source = cold('|');
const sourceSubs = '(^!)';
const closings = cold('--x--x--|');
const closingSubs = '(^!)';
const expected = '(w|)';
const w = cold('|');
const expectedValues = { w: w };

const result = source.pipe(windowWith(closings));

expectObservable(result).toBe(expected, expectedValues);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
expectSubscriptions(closings.subscriptions).toBe(closingSubs);
});

it('should return a single empty window if source is empty and closing is empty', () => {
const source = cold('|');
const sourceSubs = '(^!)';
const closings = cold('|');
const closingSubs = '(^!)';
const expected = '(w|)';
const w = cold('|');
const expectedValues = { w: w };

const result = source.pipe(windowWith(closings));

expectObservable(result).toBe(expected, expectedValues);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
expectSubscriptions(closings.subscriptions).toBe(closingSubs);
});

it('should return a single empty window if source is sync empty and closing is sync empty', () => {
const source = cold('(|)');
const sourceSubs = '(^!)';
const expected = '(w|)';
const w = cold('|');
const expectedValues = { w: w };

const result = source.pipe(windowWith(EMPTY));

expectObservable(result).toBe(expected, expectedValues);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
// expectSubscriptions(closings.subscriptions).toBe(closingSubs);
});

it('should split a Just source into a single window identical to source, using a Never closing',
() => {
const source = cold('(a|)');
const sourceSubs = '(^!)';
const closings = cold('-');
const closingSubs = '(^!)';
const expected = '(w|)';
const w = cold('(a|)');
const expectedValues = { w: w };

const result = source.pipe(windowWith(closings));

expectObservable(result).toBe(expected, expectedValues);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
expectSubscriptions(closings.subscriptions).toBe(closingSubs);
});

it('should return a single Never window if source is Never', () => {
const source = cold('------');
const sourceSubs = '^ ';
const closings = cold('------');
const closingSubs = '^ ';
const expected = 'w-----';
const w = cold('------');
const expectedValues = { w: w };

const result = source.pipe(windowWith(closings));

expectObservable(result).toBe(expected, expectedValues);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
expectSubscriptions(closings.subscriptions).toBe(closingSubs);
});

it('should be able to split a never Observable into timely empty windows', () => {
const source = hot('^--------');
const sourceSubs = '^ !';
const closings = cold('--x--x--|');
const closingSubs = '^ !';
const expected = 'a-b--c--|';
const a = cold('--| ');
const b = cold( '---| ');
const c = cold( '---|');
const expectedValues = { a: a, b: b, c: c };

const result = source.pipe(windowWith(closings));

expectObservable(result).toBe(expected, expectedValues);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
expectSubscriptions(closings.subscriptions).toBe(closingSubs);
});

it('should emit an error-only window if outer is a simple throw-Observable', () => {
const source = cold('#');
const sourceSubs = '(^!)';
const closings = cold('--x--x--|');
const closingSubs = '(^!)';
const expected = '(w#)';
const w = cold('#');
const expectedValues = { w: w };

const result = source.pipe(windowWith(closings));

expectObservable(result).toBe(expected, expectedValues);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
expectSubscriptions(closings.subscriptions).toBe(closingSubs);
});

it('should handle basic case with window closings', () => {
const source = hot('-1-2-^3-4-5-6-7-8-9-| ');
const subs = '^ ! ';
const closings = hot('---^---x---x---x---x---x---|');
const closingSubs = '^ ! ';
const expected = 'a---b---c---d--| ';
const a = cold( '-3-4| ');
const b = cold( '-5-6| ');
const c = cold( '-7-8| ');
const d = cold( '-9-| ');
const expectedValues = { a: a, b: b, c: c, d: d };

const result = source.pipe(windowWith(closings));

expectObservable(result).toBe(expected, expectedValues);
expectSubscriptions(source.subscriptions).toBe(subs);
expectSubscriptions(closings.subscriptions).toBe(closingSubs);
});

it('should handle basic case with window closings, but outer throws', () => {
const source = hot('-1-2-^3-4-5-6-7-8-9-# ');
const subs = '^ ! ';
const closings = hot('---^---x---x---x---x---x---|');
const closingSubs = '^ ! ';
const expected = 'a---b---c---d--# ';
const a = cold( '-3-4| ');
const b = cold( '-5-6| ');
const c = cold( '-7-8| ');
const d = cold( '-9-# ');
const expectedValues = { a: a, b: b, c: c, d: d };

const result = source.pipe(windowWith(closings));

expectObservable(result).toBe(expected, expectedValues);
expectSubscriptions(source.subscriptions).toBe(subs);
expectSubscriptions(closings.subscriptions).toBe(closingSubs);
});

it('should stop emitting windows when outer is unsubscribed early', () => {
const source = hot('-1-2-^3-4-5-6-7-8-9-| ');
const subs = '^ ! ';
const closings = hot('---^---x---x---x---x---x---|');
const closingSubs = '^ ! ';
const expected = 'a---b---- ';
const a = cold( '-3-4| ');
const b = cold( '-5-6 ');
const unsub = ' ! ';
const expectedValues = { a: a, b: b };

const result = source.pipe(windowWith(closings));

expectObservable(result, unsub).toBe(expected, expectedValues);
expectSubscriptions(source.subscriptions).toBe(subs);
expectSubscriptions(closings.subscriptions).toBe(closingSubs);
});

it('should not break unsubscription chains when result is unsubscribed explicitly', () => {
const source = hot('-1-2-^3-4-5-6-7-8-9-| ');
const subs = '^ ! ';
const closings = hot('---^---x---x---x---x---x---|');
const closingSubs = '^ ! ';
const expected = 'a---b---- ';
const a = cold( '-3-4| ');
const b = cold( '-5-6- ');
const unsub = ' ! ';
const expectedValues = { a: a, b: b };

const result = source.pipe(
mergeMap((x: string) => of(x)),
windowWith(closings),
mergeMap((x: Observable<string>) => of(x))
);

expectObservable(result, unsub).toBe(expected, expectedValues);
expectSubscriptions(source.subscriptions).toBe(subs);
expectSubscriptions(closings.subscriptions).toBe(closingSubs);
});

it('should make outer emit error when closing throws', () => {
const source = hot('-1-2-^3-4-5-6-7-8-9-#');
const subs = '^ ! ';
const closings = hot('---^---# ');
const closingSubs = '^ ! ';
const expected = 'a---# ';
const a = cold( '-3-4# ');
const expectedValues = { a: a };

const result = source.pipe(windowWith(closings));

expectObservable(result).toBe(expected, expectedValues);
expectSubscriptions(source.subscriptions).toBe(subs);
expectSubscriptions(closings.subscriptions).toBe(closingSubs);
});

it('should complete the resulting Observable when window closings completes', () => {
const source = hot('-1-2-^3-4-5-6-7-8-9-|');
const subs = '^ ! ';
const closings = hot('---^---x---x---| ');
const closingSubs = '^ ! ';
const expected = 'a---b---c---| ';
const a = cold( '-3-4| ');
const b = cold( '-5-6| ');
const c = cold( '-7-8| ');
const expectedValues = { a: a, b: b, c: c };

const result = source.pipe(windowWith(closings));

expectObservable(result).toBe(expected, expectedValues);
expectSubscriptions(source.subscriptions).toBe(subs);
expectSubscriptions(closings.subscriptions).toBe(closingSubs);
});
});
1 change: 1 addition & 0 deletions src/internal/operators/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ export { timeoutWith } from './timeoutWith';
export { timestamp } from './timestamp';
export { toArray } from './toArray';
export { window } from './window';
export { windowWith } from './windowWith';
export { windowCount } from './windowCount';
export { windowTime } from './windowTime';
export { windowToggle } from './windowToggle';
Expand Down
86 changes: 4 additions & 82 deletions src/internal/operators/window.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
import { Observable } from '../Observable';
import { OperatorFunction } from '../types';
import { Subject } from '../Subject';
import { Subscriber } from '../Subscriber';
import { OuterSubscriber } from '../OuterSubscriber';
import { InnerSubscriber } from '../InnerSubscriber';
import { subscribeToResult } from '../util/subscribeToResult';
import { Operator } from '../Operator';
import { windowWith } from './windowWith';

/**
* Branch out the source Observable values as a nested Observable whenever
Expand Down Expand Up @@ -48,82 +43,9 @@ import { Operator } from '../Operator';
* @return {Observable<Observable<T>>} An Observable of windows, which are
* Observables emitting values of the source Observable.
* @name window
*
* @deprecated use {@link windowWith}
*/
export function window<T>(windowBoundaries: Observable<any>): OperatorFunction<T, Observable<T>> {
return function windowOperatorFunction(source: Observable<T>) {
return source.lift(new WindowOperator(windowBoundaries));
};
}

class WindowOperator<T> implements Operator<T, Observable<T>> {

constructor(private windowBoundaries: Observable<any>) {
}

call(subscriber: Subscriber<Observable<T>>, source: any): any {
const windowSubscriber = new WindowSubscriber(subscriber);
const sourceSubscription = source.subscribe(windowSubscriber);
if (!sourceSubscription.closed) {
windowSubscriber.add(subscribeToResult(windowSubscriber, this.windowBoundaries));
}
return sourceSubscription;
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class WindowSubscriber<T> extends OuterSubscriber<T, any> {

private window: Subject<T> = new Subject<T>();

constructor(destination: Subscriber<Observable<T>>) {
super(destination);
destination.next(this.window);
}

notifyNext(outerValue: T, innerValue: any,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, any>): void {
this.openWindow();
}

notifyError(error: any, innerSub: InnerSubscriber<T, any>): void {
this._error(error);
}

notifyComplete(innerSub: InnerSubscriber<T, any>): void {
this._complete();
}

protected _next(value: T): void {
this.window.next(value);
}

protected _error(err: any): void {
this.window.error(err);
this.destination.error(err);
}

protected _complete(): void {
this.window.complete();
this.destination.complete();
}

/** @deprecated This is an internal implementation detail, do not use. */
_unsubscribe() {
this.window = null!;
}

private openWindow(): void {
const prevWindow = this.window;
if (prevWindow) {
prevWindow.complete();
}
const destination = this.destination;
const newWindow = this.window = new Subject<T>();
destination.next(newWindow);
}
return windowWith<T>(windowBoundaries);
}
Loading