Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is the conservative parallelization algorithm newly introduced in xUnit 2.8.0 not respected? #15

Open
aomader opened this issue Jun 7, 2024 · 5 comments
Labels
help wanted Extra attention is needed

Comments

@aomader
Copy link

aomader commented Jun 7, 2024

xUnit 2.8 introduced an additional algorithm to handle the parallelization. That new conservative algorithm can limit the amount of tests (or test collections to be more precise I guess) that are run in parallel. In our use case that is very desirable, since otherwise it used to start all async tests immediately. That change now let's us limit that.

However, to still achieve parallelization within a test collection, we would need this framework here. However, it seems that is not respecting that new conservative algorithm, since it still spawns all tests.

I am not deep into the internals that are adjusted here, so I can not really judge what the issue is exactly.

@meziantou meziantou added the help wanted Extra attention is needed label Jun 7, 2024
@meziantou
Copy link
Owner

meziantou commented Jun 7, 2024

I won't implement it myself as I don't have time to spend on this project. However, I would be happy to review a PR if someone wants to implement it.

In our use case that is very desirable, since otherwise it used to start all async tests immediately. That change now let's us limit that.

Even if all tests are run in parallel, you can limit the concurrency. This parameter is respected by this project. The implementation rely on a SynchronizationContext.

@vmandic
Copy link

vmandic commented Aug 27, 2024

I have been looking into this also and my case heavily relies on not launching all tests at once as each test makes an infrastructural dependency which takes host resources and I wish to limit this to eg. 4 or max CPUs at least

So I have been trying to set the xunit.runner.json config but it has no effect, eg.

{
  "maxParallelThreads": 4
}

(I have verified it is copied in the bin output dir.)

The tests are still executed all at once.

I tried creating my own sync context with a SemaphoreSlim and hardcoding it to accept only eg. 4 entries but yet again to no effect. The SyncContext was set before all tests in a fixture CTOR.

Is this really configurable or I am misunderstanding your comment @meziantou:

This parameter is respected by this project.

I guess by "this parameter" you mean the one I tried setting in the config above?

I see this (ref below) in the code ie. check for a sync context but I am not sure if it really works or I do not understand it? My custom sync context should be respected?

Ref: https://github.com/meziantou/Meziantou.Xunit.ParallelTestFramework/blob/main/Meziantou.Xunit.ParallelTestFramework/ParallelTestMethodRunner.cs#L53

Any ideas please on how to limit concurrency here?

e:

Even adding [assembly: CollectionBehavior(MaxParallelThreads = 4)] does not yield effect.

P.S.
Thanks for the great lib and work!

@meziantou
Copy link
Owner

I tried creating my own sync context

I don't think this works. The expected sync context should be created by xunit when you set the maxParallelThreads in the xunit.runner.json file. You can attach a debugger to validate that there is a synchronization context.

I don't know if setting the max concurrency using the attribute is supported.

@vmandic
Copy link

vmandic commented Aug 28, 2024

Thanks, I have debugged and indeed it is null, I do not understand why.

Screenshot 2024-08-28 at 10 40 04

Not to throw my today's efforts into thin air, maybe my next attempt will help someone achieve controlled parallelization.

I have been studying your implementation and I decided to implement my own framework and classes down the stack. I have implemented a very rudimentary SemaphoreSlim controlled ParallelTestCollectionRunner which helps achieve a similar behavior as your library (without the attributes). I needed everything parallelized in my integration tests suit (single lib) anyway.

This code below will respect the xunit.runner.json settings.

Code:

public class CustomTestFramework : XunitTestFramework
{
    public CustomTestFramework(IMessageSink messageSink)
        : base(messageSink)
    {
    }

    protected override ITestFrameworkExecutor CreateExecutor(AssemblyName assemblyName)
    {
        return new CustomTestFrameworkExecutor(assemblyName, SourceInformationProvider, DiagnosticMessageSink);
    }
}


public class CustomTestFrameworkExecutor : XunitTestFrameworkExecutor
{
    public CustomTestFrameworkExecutor(AssemblyName assemblyName, ISourceInformationProvider sourceInformationProvider, IMessageSink diagnosticMessageSink)
        : base(assemblyName, sourceInformationProvider, diagnosticMessageSink)
    {
    }

    protected override async void RunTestCases(IEnumerable<IXunitTestCase> testCases,
        IMessageSink executionMessageSink,
        ITestFrameworkExecutionOptions executionOptions)
    {
        using var assemblyRunner = new CustomTestAssemblyRunner(TestAssembly, testCases, DiagnosticMessageSink, executionMessageSink, executionOptions);
        await assemblyRunner.RunAsync();
    }
}

public class CustomTestAssemblyRunner : XunitTestAssemblyRunner
{
    public CustomTestAssemblyRunner(ITestAssembly testAssembly,
        IEnumerable<IXunitTestCase> testCases,
        IMessageSink diagnosticMessageSink,
        IMessageSink executionMessageSink,
        ITestFrameworkExecutionOptions executionOptions)
        : base(testAssembly, testCases, diagnosticMessageSink, executionMessageSink, executionOptions)
    {
    }

    protected override Task<RunSummary> RunTestCollectionAsync(IMessageBus messageBus, ITestCollection testCollection, IEnumerable<IXunitTestCase> testCases,
        CancellationTokenSource cancellationTokenSource)
    {
        var maxParallelThreads = ExecutionOptions.MaxParallelThreadsOrDefault();
        return new ParallelTestCollectionRunner(
            testCollection,
            testCases,
            DiagnosticMessageSink,
            messageBus,
            TestCaseOrderer,
            new ExceptionAggregator(Aggregator),
            cancellationTokenSource,
            maxParallelThreads).RunAsync();
    }
}

public class ParallelTestCollectionRunner : XunitTestCollectionRunner
{
    private static SemaphoreSlim _semaphore = null!;
    private readonly CancellationTokenSource _cancellationTokenSource;

    public ParallelTestCollectionRunner(ITestCollection testCollection, IEnumerable<IXunitTestCase> testCases,
        IMessageSink diagnosticMessageSink, IMessageBus messageBus, ITestCaseOrderer testCaseOrderer,
        ExceptionAggregator aggregator, CancellationTokenSource cancellationTokenSource, int maxParallelThreads)
        : base(testCollection, testCases, diagnosticMessageSink, messageBus, testCaseOrderer, aggregator, cancellationTokenSource)
    {
        _cancellationTokenSource = cancellationTokenSource;
        _semaphore = new(maxParallelThreads);
    }

    protected override async Task<RunSummary> RunTestClassAsync(
        ITestClass testClass,
        IReflectionTypeInfo @class,
        IEnumerable<IXunitTestCase> testCases)
    {
        var results = new ConcurrentBag<RunSummary>(); // Thread-safe collection for results
        var tasks = new List<Task>();
        var constructorArguments = GetConstructorArguments(testClass.Class.ToRuntimeType());

        foreach (var testCase in testCases)
        {
            await _semaphore.WaitAsync(_cancellationTokenSource.Token);  // Control concurrency here
            var task = Task.Run(async () =>
            {
                try
                {
                    var runner = new XunitTestCaseRunner(
                        testCase,
                        testCase.DisplayName,
                        testCase.SkipReason,
                        constructorArguments,
                        testCase.TestMethodArguments ?? Array.Empty<object>(),
                        MessageBus,
                        new ExceptionAggregator(Aggregator),
                        CancellationTokenSource);

                    var result = await runner.RunAsync();
                    results.Add(result); // Aggregate results from each runner
                }
                finally
                {
                    _semaphore.Release();  // Ensure semaphore is released
                }
            }, _cancellationTokenSource.Token);

            tasks.Add(task);
        }

        // you can implement your own WithCancellation() here or look up Stephen Cleary about it
        await Task.WhenAll(tasks).WithCancellation(_cancellationTokenSource.Token); // Wait for all tasks to complete

        // Aggregate final results
        var summary = new RunSummary();
        foreach (var result in results)
        {
            summary.Aggregate(result);
        }

        return summary;
    }

    private static object[] GetConstructorArguments(Type type)
    {
        // Get the first constructor of the class
        var constructor = type.GetConstructors(BindingFlags.Public | BindingFlags.Instance)
            .FirstOrDefault();

        if (constructor == null)
            return Array.Empty<object>();

        // Get the parameters of the constructor
        var parameters = constructor.GetParameters();

        // Create instances of the parameters' types
        var arguments = parameters.Select(param => Activator.CreateInstance(param.ParameterType))
            .ToArray();

        return arguments!;
    }
}

e: this needs more work, GetConstructorArguments() wont do it properly

Thanks for the input once again.

@vmandic
Copy link

vmandic commented Aug 28, 2024

Hi again, I managed to get this working and respecting the xunit.runner.json config, the key change was in the ParallelTestClassRunner, hope this helps others:

public class ParallelTestClassRunner : XunitTestClassRunner
{
    private static SemaphoreSlim _semaphore = null!;

    public ParallelTestClassRunner(ITestClass testClass, IReflectionTypeInfo @class,
        IEnumerable<IXunitTestCase> testCases, IMessageSink diagnosticMessageSink, IMessageBus messageBus,
        ITestCaseOrderer testCaseOrderer, ExceptionAggregator aggregator,
        CancellationTokenSource cancellationTokenSource, IDictionary<Type, object> collectionFixtureMappings,
        ITestFrameworkExecutionOptions executionOptions)
        : base(testClass, @class, testCases, diagnosticMessageSink, messageBus, testCaseOrderer, aggregator,
            cancellationTokenSource, collectionFixtureMappings)
    {
        // make sure you pass executionOptions additionally from xunit assembly class
        _semaphore = new(executionOptions.MaxParallelThreadsOrDefault());
    }

    // This method has been slightly modified from the original implementation to run tests in parallel
    // https://github.com/xunit/xunit/blob/2.4.2/src/xunit.execution/Sdk/Frameworks/Runners/TestClassRunner.cs#L194-L219
    protected override async Task<RunSummary> RunTestMethodsAsync()
    {
        var disableParallelizationAttribute =
            TestClass.Class.GetCustomAttributes(typeof(DisableParallelizationAttribute)).Any();

        var disableParallelizationOnCustomCollection =
            TestClass.Class.GetCustomAttributes(typeof(CollectionAttribute)).Any()
            && !TestClass.Class.GetCustomAttributes(typeof(EnableParallelizationAttribute)).Any();

        var disableParallelization = disableParallelizationAttribute || disableParallelizationOnCustomCollection;

        if (disableParallelization)
            return await base.RunTestMethodsAsync().ConfigureAwait(false);

        var summary = new RunSummary();
        IEnumerable<IXunitTestCase> orderedTestCases;
        try
        {
            orderedTestCases = TestCaseOrderer.OrderTestCases(TestCases);
        }
        catch (Exception ex)
        {
            var innerEx = Unwrap(ex);
            DiagnosticMessageSink.OnMessage(new DiagnosticMessage(
                $"Test case orderer '{TestCaseOrderer.GetType().FullName}' threw '{innerEx.GetType().FullName}' during ordering: {innerEx.Message}{Environment.NewLine}{innerEx.StackTrace}"));
            orderedTestCases = TestCases.ToList();
        }

        var constructorArguments = CreateTestClassConstructorArguments();
        var methodGroups = orderedTestCases.GroupBy(tc => tc.TestMethod, TestMethodComparer.Instance);

        var methodTasks = methodGroups.Select(m =>
            RunTestMethodAsync(m.Key, (IReflectionMethodInfo)m.Key.Method, m, constructorArguments));

        var methodSummaries = await Task.WhenAll(methodTasks).ConfigureAwait(false);

        foreach (var methodSummary in methodSummaries)
        {
            summary.Aggregate(methodSummary);
        }

        return summary;
    }

    protected override async Task<RunSummary> RunTestMethodAsync(ITestMethod testMethod, IReflectionMethodInfo method,
        IEnumerable<IXunitTestCase> testCases, object[] constructorArguments)
    {
        await _semaphore.WaitAsync(CancellationTokenSource.Token).ConfigureAwait(false);

        try
        {
            var summary = await new ParallelTestMethodRunner(
                testMethod,
                Class,
                method,
                testCases,
                DiagnosticMessageSink,
                MessageBus,
                new ExceptionAggregator(Aggregator),
                CancellationTokenSource,
                constructorArguments).RunAsync();

            return summary;
        }
        finally
        {
            _semaphore.Release();
        }
    }

    private static Exception Unwrap(Exception ex)
    {
        while (true)
        {
            if (ex is not TargetInvocationException tiex || tiex.InnerException == null)
                return ex;

            ex = tiex.InnerException;
        }
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted Extra attention is needed
Projects
None yet
Development

No branches or pull requests

3 participants