Skip to content

Commit

Permalink
chore: restore parallelism (#317)
Browse files Browse the repository at this point in the history
Signed-off-by: SdgJlbl <[email protected]>
  • Loading branch information
SdgJlbl authored Feb 16, 2024
1 parent f332036 commit 693bb5b
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 40 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Parallelism in SDK tests in deactivated until we fix the parallel compute plans issues ([#306](https://github.com/Substra/substra-tests/pull/306))
- A bunch of SDK tests are skipped due to regressions following the decoupled builder merge ([#306](https://github.com/Substra/substra-tests/pull/306))
- BREAKING: replace `todo_count` and `waiting_count` by the new counts following the new statuses in the backend ([#319](https://github.com/Substra/substra-tests/pull/319))
- Reactivated tests and parallelism in SDK tests ([#315](https://github.com/Substra/substra-tests/pull/315), [#317](https://github.com/Substra/substra-tests/pull/317))

### Added

Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ test: test-remote test-local
test-remote: test-remote-sdk test-remote-workflows

test-remote-sdk: pyclean
pytest tests -rs -v --durations=0 -m "not workflows" --log-level=INFO
pytest tests -rs -v --durations=0 -m "not workflows" -n $(PARALLELISM) --log-level=INFO

test-remote-workflows: pyclean
pytest tests -v --durations=0 -m "workflows" --log-level=INFO

test-minimal: pyclean
pytest tests -rs -v --durations=0 -m "not slow and not workflows" --log-level=INFO
pytest tests -rs -v --durations=0 -m "not slow and not workflows" -n $(PARALLELISM) --log-level=INFO

test-local: test-subprocess test-docker test-subprocess-workflows

Expand Down
8 changes: 3 additions & 5 deletions tests/test_docker_image_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,12 @@ def test_base_substra_tools_image(factory, cfg, client, default_dataset, worker)
def test_function_build_when_submitted(factory, cfg, client, worker):
substra_tools_image = cfg.substra_tools.image_local
function_category = FunctionCategory.simple
dockerfile = get_dockerfile(substra_tools_image, function_category, extra_instructions="ENV test=0\nsleep 1")
dockerfile = get_dockerfile(substra_tools_image, function_category, extra_instructions="ENV test=0\nRUN sleep 1")
spec = factory.create_function(function_category, dockerfile=dockerfile)
function = client.add_function(spec)

# Cannot use `get_function` as status is not yet exposed through substra SDK
function = client._backend._client.get("function", function.key)

assert function["status"] == "FUNCTION_STATUS_BUILDING"
function = client.wait_function(function.key, raise_on_failure=True)
assert function.status == "FUNCTION_STATUS_READY"


@pytest.mark.remote_only
Expand Down
26 changes: 12 additions & 14 deletions tests/test_execution.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import pytest
import substra
from substra.sdk.exceptions import TaskAssetNotFoundError
from substra.sdk.models import ComputeTaskStatus
from substra.sdk.models import InputRef
from substra.sdk.models import Status
from substra.sdk.schemas import AssetKind
from substra.sdk.schemas import ComputeTaskOutputSpec
from substra.sdk.schemas import FunctionOutputSpec
Expand Down Expand Up @@ -148,7 +148,6 @@ def test_tasks_execution_on_different_organizations(
default_metric_1,
default_dataset_1,
default_dataset_2,
channel,
workers,
):
"""Execution of a traintask on organization 1 and the following testtask on organization 2."""
Expand All @@ -159,8 +158,8 @@ def test_tasks_execution_on_different_organizations(
predict_function_spec = factory.create_function(FunctionCategory.predict)
predict_function_2 = client_2.add_function(predict_function_spec)

channel.wait_for_asset_synchronized(function_2)
channel.wait_for_asset_synchronized(predict_function_2)
client_2.wait_function(function_2.key)
client_2.wait_function(predict_function_2.key)

# add traintask on organization 2; should execute on organization 2 (dataset located on organization 2)
spec = factory.create_traintask(
Expand Down Expand Up @@ -224,7 +223,7 @@ def test_function_build_failure(factory, network, default_dataset_1, worker):
traintask = network.clients[0].add_task(spec)
traintask = network.clients[0].wait_task(traintask.key, raise_on_failure=False)

assert traintask.status == Status.failed
assert traintask.status == ComputeTaskStatus.failed
assert traintask.error_type == substra.sdk.models.TaskErrorType.build
with pytest.raises(TaskAssetNotFoundError):
network.clients[0].get_task_output_asset(traintask.key, OutputIdentifiers.shared)
Expand Down Expand Up @@ -256,7 +255,7 @@ def test_function_build_failure_different_backend(factory, network, default_data
traintask = network.clients[0].add_task(spec)
traintask = network.clients[0].wait_task(traintask.key, raise_on_failure=False)

assert traintask.status == Status.failed
assert traintask.status == ComputeTaskStatus.failed
assert traintask.error_type == substra.sdk.models.TaskErrorType.build
with pytest.raises(TaskAssetNotFoundError):
network.clients[0].get_task_output_asset(traintask.key, OutputIdentifiers.shared)
Expand All @@ -283,7 +282,7 @@ def test_task_execution_failure(factory, network, default_dataset_1, worker):
traintask = network.clients[0].add_task(spec)
traintask = network.clients[0].wait_task(traintask.key, raise_on_failure=False)

assert traintask.status == Status.failed
assert traintask.status == ComputeTaskStatus.failed
assert traintask.error_type == substra.sdk.models.TaskErrorType.execution
with pytest.raises(TaskAssetNotFoundError):
network.clients[0].get_task_output_asset(traintask.key, OutputIdentifiers.shared)
Expand Down Expand Up @@ -398,7 +397,7 @@ def test_composite_traintask_execution_failure(factory, client, default_dataset,
composite_traintask = client.add_task(spec)
composite_traintask = client.wait_task(composite_traintask.key, raise_on_failure=False)

assert composite_traintask.status == Status.failed
assert composite_traintask.status == ComputeTaskStatus.failed
assert composite_traintask.error_type == substra.sdk.models.TaskErrorType.execution
with pytest.raises(TaskAssetNotFoundError):
client.get_task_output_asset(composite_traintask.key, OutputIdentifiers.local)
Expand Down Expand Up @@ -446,10 +445,10 @@ def test_aggregatetask_execution_failure(factory, client, default_dataset, worke

for composite_traintask_key in composite_traintask_keys:
composite_traintask = client.get_task(composite_traintask_key)
assert composite_traintask.status == Status.done
assert composite_traintask.status == ComputeTaskStatus.done
assert composite_traintask.error_type is None

assert aggregatetask.status == Status.failed
assert aggregatetask.status == ComputeTaskStatus.failed
assert aggregatetask.error_type == substra.sdk.models.TaskErrorType.execution
with pytest.raises(TaskAssetNotFoundError):
client.get_task_output_asset(aggregatetask.key, OutputIdentifiers.shared)
Expand Down Expand Up @@ -507,7 +506,7 @@ def test_composite_traintasks_execution(factory, client, default_dataset, defaul
predicttask = client.add_task(spec)
# `raises = True`, will fail if task not successful
predicttask = client.wait_task(predicttask.key, raise_on_failure=True)
assert predicttask.status == Status.done
assert predicttask.status == ComputeTaskStatus.done
assert predicttask.error_type is None

spec = factory.create_testtask(
Expand All @@ -527,7 +526,6 @@ def test_composite_traintasks_execution(factory, client, default_dataset, defaul
assert set([composite_traintask_1.key, composite_traintask_2.key]).issubset(composite_traintask_keys)


@pytest.mark.skip(reason="Linked to decoupled builder merge")
@pytest.mark.slow
def test_aggregatetask(factory, client, default_metric, default_dataset, worker):
"""Execution of aggregatetask aggregating traintasks. (traintasks -> aggregatetask)"""
Expand Down Expand Up @@ -679,7 +677,7 @@ def test_aggregatetask_traintask(factory, client, default_dataset, worker):
# `raises = True`, will fail if task not successful
traintask_2 = client.wait_task(traintask_2.key, raise_on_failure=True)

assert traintask_2.status == Status.done
assert traintask_2.status == ComputeTaskStatus.done
assert traintask_2.error_type is None


Expand Down Expand Up @@ -880,7 +878,7 @@ def test_aggregate_composite_traintasks(factory, network, clients, default_datas
spec = factory.create_traintask(function=function, inputs=dataset.train_data_inputs, worker=workers[0])
traintask = client.add_task(spec)
traintask = client.wait_task(traintask.key)
assert traintask.status == Status.failed
assert traintask.status == ComputeTaskStatus.failed
assert traintask.error_type == substra.sdk.models.TaskErrorType.execution


Expand Down
17 changes: 8 additions & 9 deletions tests/test_execution_compute_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ def test_compute_plan_simple(
default_dataset_1,
default_dataset_2,
default_metrics,
channel,
workers,
):
"""Execution of a compute plan containing multiple traintasks:
Expand All @@ -40,7 +39,7 @@ def test_compute_plan_simple(
predict_function_spec = factory.create_function(FunctionCategory.predict)
predict_function_2 = client_2.add_function(predict_function_spec)

channel.wait_for_asset_synchronized(simple_function_2)
client_2.wait_function(simple_function_2.key)

# create compute plan
cp_spec = factory.create_compute_plan(
Expand Down Expand Up @@ -106,7 +105,7 @@ def test_compute_plan_simple(

# check all tasks are done and check they have been executed on the expected organization
for t in tasks:
assert t.status == models.Status.done
assert t.status == models.ComputeTaskStatus.done
assert t.start_date is not None
assert t.end_date is not None

Expand Down Expand Up @@ -253,7 +252,7 @@ def test_compute_plan_single_client_success(factory, client, default_dataset, de
+ client.list_compute_plan_tasks(cp.key)
+ client.list_compute_plan_tasks(cp.key)
):
assert t.status == models.Status.done
assert t.status == models.ComputeTaskStatus.done


@pytest.mark.slow
Expand Down Expand Up @@ -351,7 +350,7 @@ def test_compute_plan_update(factory, client, default_dataset, default_metric, w
tasks = client.list_compute_plan_tasks(cp.key)
assert len(tasks) == 9
for t in tasks:
assert t.status == models.Status.done
assert t.status == models.ComputeTaskStatus.done

# Check tasks metadata
traintask = client.get_task(traintask_spec_2.task_id)
Expand Down Expand Up @@ -589,7 +588,7 @@ def test_compute_plan_aggregate_composite_traintasks( # noqa: C901
)

for t in tasks:
assert t.status == models.Status.done, t
assert t.status == models.ComputeTaskStatus.done, t

# Check that permissions were correctly set
for task_id in [ct.task_id for ct in composite_traintask_specs]:
Expand Down Expand Up @@ -669,7 +668,7 @@ def test_execution_compute_plan_canceled(factory, client, default_dataset, cfg,

# check that the status of the done task as not been updated
first_traintask = [t for t in client.list_compute_plan_tasks(cp.key) if t.rank == 0][0]
assert first_traintask.status == models.Status.done
assert first_traintask.status == models.ComputeTaskStatus.done


@pytest.mark.slow
Expand All @@ -690,7 +689,7 @@ def test_compute_plan_no_batching(factory, client, default_dataset, worker):

traintasks = client.list_compute_plan_tasks(cp.key)
assert len(traintasks) == 1
assert all([task_.status == models.Status.done for task_ in traintasks])
assert all([task_.status == models.ComputeTaskStatus.done for task_ in traintasks])

# Update the compute plan
cp_spec = factory.add_compute_plan_tasks(cp)
Expand All @@ -707,7 +706,7 @@ def test_compute_plan_no_batching(factory, client, default_dataset, worker):

traintasks = client.list_compute_plan_tasks(cp.key)
assert len(traintasks) == 2
assert all([task_.status == models.Status.done for task_ in traintasks])
assert all([task_.status == models.ComputeTaskStatus.done for task_ in traintasks])


@pytest.mark.slow
Expand Down
8 changes: 4 additions & 4 deletions tests/test_hybrid_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def test_execution_debug(client, hybrid_client, debug_factory, default_dataset):
worker=hybrid_client.organization_info().organization_id,
)
traintask = hybrid_client.add_task(spec)
assert traintask.status == models.Status.done
assert traintask.status == models.ComputeTaskStatus.done

# Raises an exception if the output asset have not been created
hybrid_client.get_task_output_asset(traintask.key, OutputIdentifiers.shared)
Expand All @@ -52,7 +52,7 @@ def test_execution_debug(client, hybrid_client, debug_factory, default_dataset):
worker=hybrid_client.organization_info().organization_id,
)
predicttask = hybrid_client.add_task(spec)
assert predicttask.status == models.Status.done
assert predicttask.status == models.ComputeTaskStatus.done

spec = debug_factory.create_testtask(
function=metric,
Expand All @@ -62,7 +62,7 @@ def test_execution_debug(client, hybrid_client, debug_factory, default_dataset):
worker=hybrid_client.organization_info().organization_id,
)
testtask = hybrid_client.add_task(spec)
assert testtask.status == models.Status.done
assert testtask.status == models.ComputeTaskStatus.done
performance = hybrid_client.get_task_output_asset(testtask.key, OutputIdentifiers.performance)
assert performance.asset == 3

Expand Down Expand Up @@ -150,7 +150,7 @@ def test_debug_compute_plan_aggregate_composite(network, client, hybrid_client,

tasks = traintasks + composite_traintasks + aggregatetasks + predicttasks + testtasks
for t in tasks:
assert t.status == models.Status.done
assert t.status == models.ComputeTaskStatus.done


@pytest.mark.remote_only
Expand Down
10 changes: 4 additions & 6 deletions tests/test_permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def test_permissions(permissions_1, permissions_2, expected_permissions, factory
outputs=FLTaskOutputGenerator.traintask(authorized_ids=expected_permissions.authorized_ids),
worker=workers[0],
)
channel.wait_for_asset_synchronized(function_2)

traintask = client_1.add_task(spec)
client_1.wait_task(traintask.key, raise_on_failure=True)

Expand Down Expand Up @@ -206,7 +206,7 @@ def test_permissions_denied_process(factory, client_1, client_2, channel, worker
category=FunctionCategory.simple, permissions=Permissions(public=False, authorized_ids=[])
)
function_2 = client_2.add_function(spec)
channel.wait_for_asset_synchronized(function_2)
client_2.wait_function(function_2.key)

# traintasks

Expand Down Expand Up @@ -254,7 +254,7 @@ def test_permissions_model_process(
# function
spec = factory.create_function(category=FunctionCategory.simple, permissions=permissions)
function = client.add_function(spec)
channel.wait_for_asset_synchronized(function)
client.wait_function(function.key)
functions.append(function)

dataset_1, dataset_2 = datasets
Expand Down Expand Up @@ -327,8 +327,7 @@ def test_merge_permissions_denied_process(factory, clients, channel, workers):

data_sample_key_1 = client_1.add_data_sample(spec)
spec = factory.create_function(category=FunctionCategory.metric, permissions=permissions_1)
metric_1 = client_1.add_function(spec)
channel.wait_for_asset_synchronized(metric_1) # used by client_3
client_1.add_function(spec)

# add function on organization 2
spec = factory.create_function(category=FunctionCategory.simple, permissions=permissions_2)
Expand Down Expand Up @@ -384,7 +383,6 @@ def test_permissions_denied_head_model_process(factory, client_1, client_2, chan
# create function
spec = factory.create_function(category=FunctionCategory.composite)
composite_function = client_1.add_function(spec)
channel.wait_for_asset_synchronized(composite_function) # used by client_2

# create composite task
spec = factory.create_composite_traintask(
Expand Down
2 changes: 2 additions & 0 deletions tests/test_synchronized.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def test_synchronized_function(clients, factory, channel, current_client):

spec = factory.create_function(FunctionCategory.simple)
function = current_client.add_function(spec)
function = current_client.wait_function(function.key, raise_on_failure=True)
channel.wait_for_asset_synchronized(function)


Expand All @@ -27,6 +28,7 @@ def test_synchronized_metric(clients, factory, channel, current_client):

spec = factory.create_function(category=FunctionCategory.metric)
metric = current_client.add_function(spec)
metric = current_client.wait_function(metric.key, raise_on_failure=True)
channel.wait_for_asset_synchronized(metric)


Expand Down

0 comments on commit 693bb5b

Please sign in to comment.