diff --git a/README.md b/README.md index 3f0aaf7..aa30f63 100644 --- a/README.md +++ b/README.md @@ -35,3 +35,5 @@ Extensions for concerns found in System.Reactive that make consuming the library - Shuffle - OnErrorRetry - TakeUntil +- SyncronizeAsync +- SubscribeSynchronus diff --git a/src/ReactiveMarbles.Extensions.Tests/ReactiveExtensionsTests.cs b/src/ReactiveMarbles.Extensions.Tests/ReactiveExtensionsTests.cs index b648710..4304ec5 100644 --- a/src/ReactiveMarbles.Extensions.Tests/ReactiveExtensionsTests.cs +++ b/src/ReactiveMarbles.Extensions.Tests/ReactiveExtensionsTests.cs @@ -5,6 +5,9 @@ using System; using System.Reactive; using System.Reactive.Linq; +using System.Reactive.Subjects; +using System.Threading; +using System.Threading.Tasks; using FluentAssertions; using Xunit; @@ -62,4 +65,96 @@ public void GivenObservable_WhenAsSignal_ThenNotifiesUnit() .Should() .Be(Unit.Default); } + + /// + /// Syncronizes the asynchronous runs with asynchronous tasks in subscriptions. + /// + [Fact] + public void SubscribeSynchronus_RunsWithAsyncTasksInSubscriptions() + { + // Given, When + var result = 0; + var itterations = 0; + var subject = new Subject(); + using var disposable = subject + .SubscribeSynchronus(async x => + { + if (x) + { + await Task.Delay(1000); + result++; + } + else + { + await Task.Delay(500); + result--; + } + + itterations++; + }); + + subject.OnNext(true); + subject.OnNext(false); + subject.OnNext(true); + subject.OnNext(false); + subject.OnNext(true); + subject.OnNext(false); + + while (itterations < 6) + { + Thread.Yield(); + } + + // Then + result + .Should() + .Be(0); + } + + /// + /// Syncronizes the asynchronous runs with asynchronous tasks in subscriptions. + /// + [Fact] + public void SyncronizeAsync_RunsWithAsyncTasksInSubscriptions() + { + // Given, When + var result = 0; + var itterations = 0; + var subject = new Subject(); + using var disposable = subject + .SynchronizeAsync() + .Subscribe(async x => + { + if (x.Value) + { + await Task.Delay(1000); + result++; + } + else + { + await Task.Delay(500); + result--; + } + + x.Sync.Dispose(); + itterations++; + }); + + subject.OnNext(true); + subject.OnNext(false); + subject.OnNext(true); + subject.OnNext(false); + subject.OnNext(true); + subject.OnNext(false); + + while (itterations < 6) + { + Thread.Yield(); + } + + // Then + result + .Should() + .Be(0); + } } diff --git a/src/ReactiveMarbles.Extensions/Continuation.cs b/src/ReactiveMarbles.Extensions/Continuation.cs new file mode 100644 index 0000000..7eeb834 --- /dev/null +++ b/src/ReactiveMarbles.Extensions/Continuation.cs @@ -0,0 +1,91 @@ +// Copyright (c) 2019-2023 ReactiveUI Association Incorporated. All rights reserved. +// ReactiveUI Association Incorporated licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace ReactiveMarbles.Extensions; + +/// +/// Continuation. +/// +public class Continuation : IDisposable +{ + private readonly Barrier _phaseSync = new(2); + private bool _disposedValue; + private bool _locked; + + /// + /// Gets the number of completed phases. + /// + /// + /// The completed phases. + /// + public long CompletedPhases => _phaseSync.CurrentPhaseNumber; + + /// + /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. + /// + public void Dispose() + { + // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + + /// + /// Locks this instance. + /// + /// The type of the elements in the source sequence. + /// The item. + /// The observer. + /// + /// A representing the asynchronous operation. + /// + public Task Lock(T item, IObserver<(T value, IDisposable Sync)> observer) + { + if (_locked) + { + return Task.CompletedTask; + } + + _locked = true; + observer.OnNext((item, this)); + return Task.Run(() => _phaseSync?.SignalAndWait(CancellationToken.None)); + } + + /// + /// Releases unmanaged and - optionally - managed resources. + /// + /// true to release both managed and unmanaged resources; false to release only unmanaged resources. + protected virtual async void Dispose(bool disposing) + { + if (!_disposedValue) + { + if (disposing) + { + await UnLock(); + _phaseSync.Dispose(); + } + + _disposedValue = true; + } + } + + /// + /// UnLocks this instance. + /// + /// A representing the asynchronous operation. + private Task UnLock() + { + if (!_locked) + { + return Task.CompletedTask; + } + + _locked = false; + return Task.Run(() => _phaseSync?.SignalAndWait(CancellationToken.None)); + } +} diff --git a/src/ReactiveMarbles.Extensions/ReactiveExtensions.cs b/src/ReactiveMarbles.Extensions/ReactiveExtensions.cs index 4cc8d6e..1b96d58 100644 --- a/src/ReactiveMarbles.Extensions/ReactiveExtensions.cs +++ b/src/ReactiveMarbles.Extensions/ReactiveExtensions.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; using System.Linq; using System.Reactive; using System.Reactive.Concurrency; @@ -904,6 +905,91 @@ public static IObservable TakeUntil(this IObservable observer.OnError, observer.OnCompleted)); + /// + /// Synchronizes the asynchronous operations in downstream operations. + /// Use SubscribeSynchronus instead for a simpler version. + /// Call Sync.Dispose() to release the lock in the downstream methods. + /// + /// The type of the elements in the source sequence. + /// The source. + /// An Observable of T and a release mechanism. + [SuppressMessage("Roslynator", "RCS1047:Non-asynchronous method name should not end with 'Async'.", Justification = "To avoid naming conflicts.")] + public static IObservable<(T Value, IDisposable Sync)> SynchronizeAsync(this IObservable source) => + Observable.Create<(T Value, IDisposable Sync)>(observer => + { + var gate = new object(); + return source.Synchronize(gate).Subscribe(item => new Continuation().Lock(item, observer).Wait()); + }); + + /// + /// Subscribes to the specified source synchronously. + /// + /// The type of the elements in the source sequence. + /// The source. + /// The on next. + /// The on error. + /// The on completed. + /// object used to unsubscribe from the observable sequence. + public static IDisposable SubscribeSynchronus(this IObservable source, Func onNext, Action onError, Action onCompleted) => + source.SynchronizeAsync().Subscribe( + async observer => + { + await onNext(observer.Value); + observer.Sync.Dispose(); + }, + onError, + onCompleted); + + /// + /// Subscribes an element handler and an exception handler to an observable sequence synchronously. + /// + /// The type of the elements in the source sequence. + /// Observable sequence to subscribe to. + /// Action to invoke for each element in the observable sequence. + /// Action to invoke upon exceptional termination of the observable sequence. + /// object used to unsubscribe from the observable sequence. + public static IDisposable SubscribeSynchronus(this IObservable source, Func onNext, Action onError) => + source.SynchronizeAsync().Subscribe( + async observer => + { + await onNext(observer.Value); + observer.Sync.Dispose(); + }, + onError); + + /// + /// Subscribes an element handler and a completion handler to an observable sequence synchronously. + /// + /// The type of the elements in the source sequence. + /// Observable sequence to subscribe to. + /// Action to invoke for each element in the observable sequence. + /// Action to invoke upon graceful termination of the observable sequence. + /// object used to unsubscribe from the observable sequence. + /// or or is null. + public static IDisposable SubscribeSynchronus(this IObservable source, Func onNext, Action onCompleted) => + source.SynchronizeAsync().Subscribe( + async observer => + { + await onNext(observer.Value); + observer.Sync.Dispose(); + }, + onCompleted); + + /// + /// Subscribes an element handler to an observable sequence synchronously. + /// + /// The type of the elements in the source sequence. + /// Observable sequence to subscribe to. + /// Action to invoke for each element in the observable sequence. + /// object used to unsubscribe from the observable sequence. + public static IDisposable SubscribeSynchronus(this IObservable source, Func onNext) => + source.SynchronizeAsync().Subscribe( + async observer => + { + await onNext(observer.Value); + observer.Sync.Dispose(); + }); + private static void FastForEach(IObserver observer, IEnumerable source) { if (source is List fullList)