Skip to content

Commit

Permalink
Feature add SynchronizeAsync (#27)
Browse files Browse the repository at this point in the history
* Feature add SynchronizeAsync

Add SubscribeSynchronus

* Update ReactiveExtensionsTests.cs

* Update README.md
  • Loading branch information
ChrisPulman authored Jan 14, 2024
1 parent 7b5b4ca commit f7bed15
Show file tree
Hide file tree
Showing 4 changed files with 274 additions and 0 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,5 @@ Extensions for concerns found in System.Reactive that make consuming the library
- Shuffle
- OnErrorRetry
- TakeUntil
- SyncronizeAsync
- SubscribeSynchronus
95 changes: 95 additions & 0 deletions src/ReactiveMarbles.Extensions.Tests/ReactiveExtensionsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
using System;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Xunit;

Expand Down Expand Up @@ -62,4 +65,96 @@ public void GivenObservable_WhenAsSignal_ThenNotifiesUnit()
.Should()
.Be(Unit.Default);
}

/// <summary>
/// Syncronizes the asynchronous runs with asynchronous tasks in subscriptions.
/// </summary>
[Fact]
public void SubscribeSynchronus_RunsWithAsyncTasksInSubscriptions()
{
// Given, When
var result = 0;
var itterations = 0;
var subject = new Subject<bool>();
using var disposable = subject
.SubscribeSynchronus(async x =>
{
if (x)
{
await Task.Delay(1000);
result++;
}
else
{
await Task.Delay(500);
result--;
}
itterations++;
});

subject.OnNext(true);
subject.OnNext(false);
subject.OnNext(true);
subject.OnNext(false);
subject.OnNext(true);
subject.OnNext(false);

while (itterations < 6)
{
Thread.Yield();
}

// Then
result
.Should()
.Be(0);
}

/// <summary>
/// Syncronizes the asynchronous runs with asynchronous tasks in subscriptions.
/// </summary>
[Fact]
public void SyncronizeAsync_RunsWithAsyncTasksInSubscriptions()
{
// Given, When
var result = 0;
var itterations = 0;
var subject = new Subject<bool>();
using var disposable = subject
.SynchronizeAsync()
.Subscribe(async x =>
{
if (x.Value)
{
await Task.Delay(1000);
result++;
}
else
{
await Task.Delay(500);
result--;
}
x.Sync.Dispose();
itterations++;
});

subject.OnNext(true);
subject.OnNext(false);
subject.OnNext(true);
subject.OnNext(false);
subject.OnNext(true);
subject.OnNext(false);

while (itterations < 6)
{
Thread.Yield();
}

// Then
result
.Should()
.Be(0);
}
}
91 changes: 91 additions & 0 deletions src/ReactiveMarbles.Extensions/Continuation.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright (c) 2019-2023 ReactiveUI Association Incorporated. All rights reserved.
// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ReactiveMarbles.Extensions;

/// <summary>
/// Continuation.
/// </summary>
public class Continuation : IDisposable
{
private readonly Barrier _phaseSync = new(2);
private bool _disposedValue;
private bool _locked;

/// <summary>
/// Gets the number of completed phases.
/// </summary>
/// <value>
/// The completed phases.
/// </value>
public long CompletedPhases => _phaseSync.CurrentPhaseNumber;

/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
public void Dispose()
{
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
Dispose(disposing: true);
GC.SuppressFinalize(this);
}

/// <summary>
/// Locks this instance.
/// </summary>
/// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
/// <param name="item">The item.</param>
/// <param name="observer">The observer.</param>
/// <returns>
/// A <see cref="Task" /> representing the asynchronous operation.
/// </returns>
public Task Lock<T>(T item, IObserver<(T value, IDisposable Sync)> observer)
{
if (_locked)
{
return Task.CompletedTask;
}

_locked = true;
observer.OnNext((item, this));
return Task.Run(() => _phaseSync?.SignalAndWait(CancellationToken.None));
}

/// <summary>
/// Releases unmanaged and - optionally - managed resources.
/// </summary>
/// <param name="disposing"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param>
protected virtual async void Dispose(bool disposing)
{
if (!_disposedValue)
{
if (disposing)
{
await UnLock();
_phaseSync.Dispose();
}

_disposedValue = true;
}
}

/// <summary>
/// UnLocks this instance.
/// </summary>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
private Task UnLock()
{
if (!_locked)
{
return Task.CompletedTask;
}

_locked = false;
return Task.Run(() => _phaseSync?.SignalAndWait(CancellationToken.None));
}
}
86 changes: 86 additions & 0 deletions src/ReactiveMarbles.Extensions/ReactiveExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Reactive;
using System.Reactive.Concurrency;
Expand Down Expand Up @@ -904,6 +905,91 @@ public static IObservable<TSource> TakeUntil<TSource>(this IObservable<TSource>
observer.OnError,
observer.OnCompleted));

/// <summary>
/// Synchronizes the asynchronous operations in downstream operations.
/// Use SubscribeSynchronus instead for a simpler version.
/// Call Sync.Dispose() to release the lock in the downstream methods.
/// </summary>
/// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
/// <param name="source">The source.</param>
/// <returns>An Observable of T and a release mechanism.</returns>
[SuppressMessage("Roslynator", "RCS1047:Non-asynchronous method name should not end with 'Async'.", Justification = "To avoid naming conflicts.")]
public static IObservable<(T Value, IDisposable Sync)> SynchronizeAsync<T>(this IObservable<T> source) =>
Observable.Create<(T Value, IDisposable Sync)>(observer =>
{
var gate = new object();
return source.Synchronize(gate).Subscribe(item => new Continuation().Lock(item, observer).Wait());
});

/// <summary>
/// Subscribes to the specified source synchronously.
/// </summary>
/// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
/// <param name="source">The source.</param>
/// <param name="onNext">The on next.</param>
/// <param name="onError">The on error.</param>
/// <param name="onCompleted">The on completed.</param>
/// <returns><see cref="IDisposable"/> object used to unsubscribe from the observable sequence.</returns>
public static IDisposable SubscribeSynchronus<T>(this IObservable<T> source, Func<T, Task> onNext, Action<Exception> onError, Action onCompleted) =>
source.SynchronizeAsync().Subscribe(
async observer =>
{
await onNext(observer.Value);
observer.Sync.Dispose();
},
onError,
onCompleted);

/// <summary>
/// Subscribes an element handler and an exception handler to an observable sequence synchronously.
/// </summary>
/// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Observable sequence to subscribe to.</param>
/// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
/// <param name="onError">Action to invoke upon exceptional termination of the observable sequence.</param>
/// <returns><see cref="IDisposable"/> object used to unsubscribe from the observable sequence.</returns>
public static IDisposable SubscribeSynchronus<T>(this IObservable<T> source, Func<T, Task> onNext, Action<Exception> onError) =>
source.SynchronizeAsync().Subscribe(
async observer =>
{
await onNext(observer.Value);
observer.Sync.Dispose();
},
onError);

/// <summary>
/// Subscribes an element handler and a completion handler to an observable sequence synchronously.
/// </summary>
/// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Observable sequence to subscribe to.</param>
/// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
/// <param name="onCompleted">Action to invoke upon graceful termination of the observable sequence.</param>
/// <returns><see cref="IDisposable"/> object used to unsubscribe from the observable sequence.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> or <paramref name="onCompleted"/> is <c>null</c>.</exception>
public static IDisposable SubscribeSynchronus<T>(this IObservable<T> source, Func<T, Task> onNext, Action onCompleted) =>
source.SynchronizeAsync().Subscribe(
async observer =>
{
await onNext(observer.Value);
observer.Sync.Dispose();
},
onCompleted);

/// <summary>
/// Subscribes an element handler to an observable sequence synchronously.
/// </summary>
/// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Observable sequence to subscribe to.</param>
/// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
/// <returns><see cref="IDisposable"/> object used to unsubscribe from the observable sequence.</returns>
public static IDisposable SubscribeSynchronus<T>(this IObservable<T> source, Func<T, Task> onNext) =>
source.SynchronizeAsync().Subscribe(
async observer =>
{
await onNext(observer.Value);
observer.Sync.Dispose();
});

private static void FastForEach<T>(IObserver<T> observer, IEnumerable<T> source)
{
if (source is List<T> fullList)
Expand Down

0 comments on commit f7bed15

Please sign in to comment.