Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: isolate reliance on SchedulerAction this context #7322

Merged
merged 1 commit into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions packages/rxjs/spec/scheduled/scheduled-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,29 @@ describe('scheduled', () => {
done();
});
});

it('should handle scheduling a promise that unsubscribes prior to complete', (done) => {
const results: any[] = [];
const input = Promise.resolve('x'); // strings are iterables
const subscription = scheduled(input, testScheduler).subscribe({
next(value) {
results.push(value);
subscription.unsubscribe();
},
complete() { results.push('done'); },
});

expect(results).to.deep.equal([]);

// Promises force async, so we can't schedule synchronously, no matter what.
testScheduler.flush();
expect(results).to.deep.equal([]);

Promise.resolve().then(() => {
// NOW it should work, as the other promise should have resolved.
testScheduler.flush();
expect(results).to.deep.equal(['x']);
done();
});
});
});
10 changes: 7 additions & 3 deletions packages/rxjs/src/internal/observable/range.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { SchedulerLike } from '../types.js';
import { Observable } from '../Observable.js';
import { EMPTY } from './empty.js';
import { executeSchedule } from '../util/executeSchedule.js';

export function range(start: number, count?: number): Observable<number>;

Expand Down Expand Up @@ -72,14 +73,17 @@ export function range(start: number, count?: number, scheduler?: SchedulerLike):
? // The deprecated scheduled path.
(subscriber) => {
let n = start;
return scheduler.schedule(function () {
const emit = () => {
if (n < end) {
subscriber.next(n++);
this.schedule();
if (!subscriber.closed) {
executeSchedule(subscriber, scheduler, emit);
}
} else {
subscriber.complete();
}
});
};
executeSchedule(subscriber, scheduler, emit);
}
: // Standard synchronous range.
(subscriber) => {
Expand Down
25 changes: 19 additions & 6 deletions packages/rxjs/src/internal/observable/timer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { SchedulerLike } from '../types.js';
import { asyncScheduler } from '../scheduler/async.js';
import { isScheduler } from '../util/isScheduler.js';
import { isValidDate } from '../util/isDate.js';
import { executeSchedule } from '../util/executeSchedule.js';

/**
* Creates an observable that will wait for a specified time period, or exact date, before
Expand Down Expand Up @@ -167,20 +168,32 @@ export function timer(
let n = 0;

// Start the timer.
return scheduler.schedule(function () {
if (!subscriber.closed) {
// Emit the next value and increment.
return executeSchedule(
subscriber,
scheduler,
() => {
// Emit the first value and schedule the next.
subscriber.next(n++);

if (0 <= intervalDuration) {
// If we have a interval after the initial timer,
// reschedule with the period.
this.schedule(undefined, intervalDuration);
executeSchedule(
subscriber,
scheduler,
() => {
// Emit the interval values.
subscriber.next(n++);
},
intervalDuration,
true
);
} else {
// We didn't have an interval. So just complete.
subscriber.complete();
}
}
}, due);
},
due
);
});
}
67 changes: 22 additions & 45 deletions packages/rxjs/src/internal/operators/debounceTime.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { asyncScheduler } from '../scheduler/async.js';
import type { Subscription} from '../Observable.js';
import type { Subscription } from '../Observable.js';
import { Observable, operate } from '../Observable.js';
import type { MonoTypeOperatorFunction, SchedulerAction, SchedulerLike } from '../types.js';
import type { MonoTypeOperatorFunction, SchedulerLike } from '../types.js';
import { executeSchedule } from '../util/executeSchedule.js';

/**
* Emits a notification from the source Observable only after a particular time span
Expand Down Expand Up @@ -62,64 +63,40 @@ import type { MonoTypeOperatorFunction, SchedulerAction, SchedulerLike } from '.
export function debounceTime<T>(dueTime: number, scheduler: SchedulerLike = asyncScheduler): MonoTypeOperatorFunction<T> {
return (source) =>
new Observable((destination) => {
let activeTask: Subscription | null = null;
let lastValue: T | null = null;
let lastTime: number | null = null;
let scheduling = false;

const emit = () => {
if (scheduling || activeTask) {
// We have a value! Free up memory first, then emit the value.
if (activeTask) {
activeTask.unsubscribe();
activeTask = null;
}
const value = lastValue!;
lastValue = null;
destination.next(value);
}
};
function emitWhenIdle(this: SchedulerAction<unknown>) {
// This is called `dueTime` after the first value
// but we might have received new values during this window!

const targetTime = lastTime! + dueTime;
const now = scheduler.now();
if (now < targetTime) {
// On that case, re-schedule to the new target
activeTask = this.schedule(undefined, targetTime - now);
destination.add(activeTask);
return;
}

emit();
}
let lastValue: T;
let activeTask: Subscription | void;

source.subscribe(
operate({
destination,
next: (value: T) => {
lastValue = value;
lastTime = scheduler.now();
// Clear any pending task and schedule a new one.
activeTask?.unsubscribe();

// Only set up a task if it's not already up
if (!activeTask) {
scheduling = true;
activeTask = scheduler.schedule(emitWhenIdle, dueTime);
scheduling = false;
// Set activeTask as intermediary Subscription to handle synchronous schedulers
destination.add(activeTask);
}
activeTask = executeSchedule(
destination,
scheduler,
() => {
activeTask = undefined;
const v = lastValue;
lastValue = null!;
destination.next(v);
},
dueTime
);
},
complete: () => {
// Source completed.
// Emit any pending debounced values then complete
emit();
if (activeTask) {
destination.next(lastValue);
}
destination.complete();
},
finalize: () => {
// Finalization.
lastValue = activeTask = null;
lastValue = activeTask = null!;
},
})
);
Expand Down
6 changes: 2 additions & 4 deletions packages/rxjs/src/internal/operators/timeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ export function timeout<T, O extends ObservableInput<any>, M>(
let originalSourceSubscription: Subscription;
// The subscription for our timeout timer. This changes
// every time we get a new value.
let timerSubscription: Subscription;
let timerSubscription: Subscription | void;
// A bit of state we pass to our with and error factories to
// tell what the last value we saw was.
let lastValue: T | null = null;
Expand Down Expand Up @@ -353,9 +353,7 @@ export function timeout<T, O extends ObservableInput<any>, M>(
each! > 0 && startTimer(each!);
},
finalize: () => {
if (!timerSubscription?.closed) {
timerSubscription?.unsubscribe();
}
timerSubscription?.unsubscribe();
// Be sure not to hold the last value in memory after unsubscription
// it could be quite large.
lastValue = null;
Expand Down
21 changes: 9 additions & 12 deletions packages/rxjs/src/internal/scheduled/scheduleArray.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,24 @@
import { Observable } from '../Observable.js';
import type { SchedulerLike } from '../types.js';
import { executeSchedule } from '../util/executeSchedule.js';

export function scheduleArray<T>(input: ArrayLike<T>, scheduler: SchedulerLike) {
return new Observable<T>((subscriber) => {
// The current array index.
let i = 0;
// Start iterating over the array like on a schedule.
return scheduler.schedule(function () {
const emit = () => {
// If we have hit the end of the array, complete.
if (i === input.length) {
// If we have hit the end of the array like in the
// previous job, we can complete.
subscriber.complete();
} else {
// Otherwise let's next the value at the current index,
// Otherwise, next the value at the current index,
// then increment our index.
subscriber.next(input[i++]);
// If the last emission didn't cause us to close the subscriber
// (via take or some side effect), reschedule the job and we'll
// make another pass.
if (!subscriber.closed) {
this.schedule();
}
executeSchedule(subscriber, scheduler, emit);
}
});
};

// Start iterating over the array like on a schedule.
return executeSchedule(subscriber, scheduler, emit);
});
}
47 changes: 17 additions & 30 deletions packages/rxjs/src/internal/util/executeSchedule.ts
Original file line number Diff line number Diff line change
@@ -1,44 +1,31 @@
import type { Subscription } from '../Observable.js';
import type { SchedulerAction, SchedulerLike } from '../types.js';

export function executeSchedule(
parentSubscription: Subscription,
scheduler: SchedulerLike,
work: () => void,
delay: number,
repeat: true
): void;
export function executeSchedule(
parentSubscription: Subscription,
scheduler: SchedulerLike,
work: () => void,
delay?: number,
repeat?: false
): Subscription;

export function executeSchedule(
parentSubscription: Subscription,
scheduler: SchedulerLike,
work: () => void,
delay = 0,
repeat = false
): Subscription | void {
const scheduleSubscription = scheduler.schedule(function (this: SchedulerAction<any>) {
work();
if (repeat) {
parentSubscription.add(this.schedule(null, delay));
} else {
this.unsubscribe();
}
}, delay);
if (!parentSubscription.closed) {
const scheduleSubscription = scheduler.schedule(function (this: SchedulerAction<any>) {
work();
if (repeat) {
parentSubscription.add(this.schedule(null, delay));
} else {
this.unsubscribe();
}
}, delay);

parentSubscription.add(scheduleSubscription);
parentSubscription.add(scheduleSubscription);

if (!repeat) {
// Because user-land scheduler implementations are unlikely to properly reuse
// Actions for repeat scheduling, we can't trust that the returned subscription
// will control repeat subscription scenarios. So we're trying to avoid using them
// incorrectly within this library.
return scheduleSubscription;
if (!repeat) {
// Because user-land scheduler implementations are unlikely to properly reuse
// Actions for repeat scheduling, we can't trust that the returned subscription
// will control repeat subscription scenarios. So we're trying to avoid using them
// incorrectly within this library.
return scheduleSubscription;
}
}
}