From 0b03b00320d1e94d98c5ce6253fb1fc65a83dccd Mon Sep 17 00:00:00 2001 From: Faiyaz Hasan Date: Fri, 14 Oct 2022 16:05:38 -0400 Subject: [PATCH] Offload blocking operations in ECS to threadpool (#42) * make upload task async aware * make task submission async aware * Add thread pool solution to remaining methods and update tests * fix tests * Update changelog * Ensure that metadata field naming convention is consistent * Add test skeleton and licenses * add test for util function * add test for util function * Add final test * make test async compatible * Update submit task test * Minor update * use better threadpoll offloading pattern --- CHANGELOG.md | 4 + covalent_ecs_plugin/__init__.py | 19 ++ covalent_ecs_plugin/ecs.py | 200 +++++++++++--------- covalent_ecs_plugin/utils.py | 28 +++ tests/__init__.py | 19 ++ tests/functional_tests/__init__.py | 19 ++ tests/functional_tests/basic_workflow.py | 21 ++ tests/functional_tests/executor_instance.py | 21 ++ tests/functional_tests/svm_workflow.py | 21 ++ tests/functional_tests/terraform_output.py | 21 ++ tests/test_ecs.py | 44 ++++- tests/test_utils.py | 39 ++++ 12 files changed, 353 insertions(+), 103 deletions(-) create mode 100644 covalent_ecs_plugin/utils.py create mode 100644 tests/test_utils.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 13e1523..8eaac79 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [UNRELEASED] +### Changed + +- Updated `boto3` calls to make them compatible with the async library. + ### Operations - Add ref to license checker path diff --git a/covalent_ecs_plugin/__init__.py b/covalent_ecs_plugin/__init__.py index e69de29..523f776 100644 --- a/covalent_ecs_plugin/__init__.py +++ b/covalent_ecs_plugin/__init__.py @@ -0,0 +1,19 @@ +# Copyright 2021 Agnostiq Inc. +# +# This file is part of Covalent. +# +# Licensed under the GNU Affero General Public License 3.0 (the "License"). +# A copy of the License may be obtained with this software package or at +# +# https://www.gnu.org/licenses/agpl-3.0.en.html +# +# Use of this file is prohibited except in compliance with the License. Any +# modifications or derivative works of this file must retain this copyright +# notice, and modified files must contain a notice indicating that they have +# been altered from the originals. +# +# Covalent is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the License for more details. +# +# Relief from the License may be granted by purchasing a commercial license. diff --git a/covalent_ecs_plugin/ecs.py b/covalent_ecs_plugin/ecs.py index ca2a9e9..b09d7cc 100644 --- a/covalent_ecs_plugin/ecs.py +++ b/covalent_ecs_plugin/ecs.py @@ -24,6 +24,7 @@ import os import re import tempfile +from functools import partial from pathlib import Path from typing import Any, Callable, Dict, List, Tuple @@ -31,9 +32,10 @@ import cloudpickle as pickle from covalent._shared_files.config import get_config from covalent._shared_files.logger import app_log -from covalent._shared_files.util_classes import DispatchInfo from covalent_aws_plugins import AWSExecutor +from .utils import _execute_partial_in_threadpool + _EXECUTOR_PLUGIN_DEFAULTS = { "credentials": os.environ.get("AWS_SHARED_CREDENTIALS_FILE") or os.path.join(os.environ["HOME"], ".aws/credentials"), @@ -146,11 +148,8 @@ def __init__( f"{self.ecs_task_security_group_id} is not a valid security group id. Please set a valid security group id either in the ECS executor definition or in the Covalent config file." ) - async def _upload_task(self, function, args, kwargs, task_metadata) -> None: - - dispatch_id = task_metadata["dispatch_id"] - node_id = task_metadata["node_id"] - + async def _upload_task_to_s3(self, dispatch_id, node_id, function, args, kwargs) -> None: + """Upload task to S3.""" s3 = boto3.Session(**self.boto_session_options()).client("s3") s3_object_filename = FUNC_FILENAME.format(dispatch_id=dispatch_id, node_id=node_id) @@ -160,95 +159,109 @@ async def _upload_task(self, function, args, kwargs, task_metadata) -> None: function_file.flush() s3.upload_file(function_file.name, self.s3_bucket_name, s3_object_filename) - async def submit_task(self, task_metadata: Dict, identity: Dict) -> str: + async def _upload_task( + self, function: Callable, args: List, kwargs: Dict, task_metadata: Dict + ): + """Wrapper to make boto3 s3 upload calls async.""" + dispatch_id = task_metadata["dispatch_id"] + node_id = task_metadata["node_id"] + partial_func = partial( + self._upload_task_to_s3, + dispatch_id, + node_id, + function, + args, + kwargs, + ) + return await _execute_partial_in_threadpool(partial_func) + async def submit_task(self, task_metadata: Dict, identity: Dict) -> Any: + """Submit task to ECS.""" dispatch_id = task_metadata["dispatch_id"] node_id = task_metadata["node_id"] container_name = CONTAINER_NAME.format(dispatch_id=dispatch_id, node_id=node_id) account = identity["Account"] - dispatch_info = DispatchInfo(dispatch_id) - with self.get_dispatch_context(dispatch_info): - - ecs = boto3.Session(**self.boto_session_options()).client("ecs") - - # Register the task definition - self._debug_log("Registering ECS task definition...") - ecs.register_task_definition( - family=self.ecs_task_family_name, - taskRoleArn=self.ecs_task_role_name, - executionRoleArn=f"arn:aws:iam::{account}:role/{self.execution_role}", - networkMode="awsvpc", - requiresCompatibilities=["FARGATE"], - containerDefinitions=[ - { - "name": container_name, - "image": COVALENT_EXEC_BASE_URI, - "essential": True, - "logConfiguration": { - "logDriver": "awslogs", - "options": { - "awslogs-region": self.region, - "awslogs-group": self.log_group_name, - "awslogs-create-group": "true", - "awslogs-stream-prefix": "covalent-fargate", - }, - }, - "environment": [ - {"name": "S3_BUCKET_NAME", "value": self.s3_bucket_name}, - { - "name": "COVALENT_TASK_FUNC_FILENAME", - "value": FUNC_FILENAME.format( - dispatch_id=dispatch_id, node_id=node_id - ), - }, - { - "name": "RESULT_FILENAME", - "value": RESULT_FILENAME.format( - dispatch_id=dispatch_id, node_id=node_id - ), - }, - ], - }, - ], - cpu=str(int(self.vcpu * 1024)), - memory=str(int(self.memory * 1024)), - ) + ecs = boto3.Session(**self.boto_session_options()).client("ecs") - # Run the task - response = ecs.run_task( - taskDefinition=self.ecs_task_family_name, - launchType="FARGATE", - cluster=self.ecs_cluster_name, - count=1, - networkConfiguration={ - "awsvpcConfiguration": { - "subnets": [self.ecs_task_subnet_id], - "securityGroups": [self.ecs_task_security_group_id], - # This is only needed if we're using public subnets - "assignPublicIp": "ENABLED", + # Register the task definition + self._debug_log("Registering ECS task definition...") + partial_func = partial( + ecs.register_task_definition, + family=self.ecs_task_family_name, + taskRoleArn=self.ecs_task_role_name, + executionRoleArn=f"arn:aws:iam::{account}:role/{self.execution_role}", + networkMode="awsvpc", + requiresCompatibilities=["FARGATE"], + containerDefinitions=[ + { + "name": container_name, + "image": COVALENT_EXEC_BASE_URI, + "essential": True, + "logConfiguration": { + "logDriver": "awslogs", + "options": { + "awslogs-region": self.region, + "awslogs-group": self.log_group_name, + "awslogs-create-group": "true", + "awslogs-stream-prefix": "covalent-fargate", + }, }, + "environment": [ + {"name": "S3_BUCKET_NAME", "value": self.s3_bucket_name}, + { + "name": "COVALENT_TASK_FUNC_FILENAME", + "value": FUNC_FILENAME.format( + dispatch_id=dispatch_id, node_id=node_id + ), + }, + { + "name": "RESULT_FILENAME", + "value": RESULT_FILENAME.format( + dispatch_id=dispatch_id, node_id=node_id + ), + }, + ], }, - ) - # Return this task ARN in an async setting - task_arn = response["tasks"][0]["taskArn"] - return task_arn + ], + cpu=str(int(self.vcpu * 1024)), + memory=str(int(self.memory * 1024)), + ) + await _execute_partial_in_threadpool(partial_func) + + # Run the task + self._debug_log("Running task on ECS...") + partial_func = partial( + ecs.run_task, + taskDefinition=self.ecs_task_family_name, + launchType="FARGATE", + cluster=self.ecs_cluster_name, + count=1, + networkConfiguration={ + "awsvpcConfiguration": { + "subnets": [self.ecs_task_subnet_id], + "securityGroups": [self.ecs_task_security_group_id], + # This is only needed if we're using public subnets + "assignPublicIp": "ENABLED", + }, + }, + ) + response = await _execute_partial_in_threadpool(partial_func) + return response["tasks"][0]["taskArn"] def _is_valid_subnet_id(self, subnet_id: str) -> bool: """Check if the subnet is valid.""" - return re.fullmatch(r"subnet-[0-9a-z]{8,17}", subnet_id) is not None def _is_valid_security_group(self, security_group: str) -> bool: """Check if the security group is valid.""" - return re.fullmatch(r"sg-[0-9a-z]{8,17}", security_group) is not None def _debug_log(self, message): app_log.debug(f"AWS ECS Executor: {message}") async def run(self, function: Callable, args: List, kwargs: Dict, task_metadata: Dict): - + """Main run method.""" dispatch_id = task_metadata["dispatch_id"] node_id = task_metadata["node_id"] @@ -266,8 +279,8 @@ async def run(self, function: Callable, args: List, kwargs: Dict, task_metadata: self._debug_log(f"Successfully submitted task with ARN: {task_arn}") await self._poll_task(task_arn) - - return await self.query_result(task_metadata) + partial_func = partial(self.query_result, task_metadata) + return await _execute_partial_in_threadpool(partial_func) async def get_status(self, task_arn: str) -> Tuple[str, int]: """Query the status of a previously submitted ECS task. @@ -281,20 +294,25 @@ async def get_status(self, task_arn: str) -> Tuple[str, int]: """ ecs = boto3.Session(**self.boto_session_options()).client("ecs") paginator = ecs.get_paginator("list_tasks") - page_iterator = paginator.paginate( + partial_func = partial( + paginator.paginate, cluster=self.ecs_cluster_name, family=self.ecs_task_family_name, desiredStatus="STOPPED", ) + page_iterator = await _execute_partial_in_threadpool(partial_func) for page in page_iterator: if len(page["taskArns"]) == 0: break - tasks = ecs.describe_tasks( + partial_func = partial( + ecs.describe_tasks, cluster=self.ecs_cluster_name, tasks=page["taskArns"], - )["tasks"] + ) + future = await _execute_partial_in_threadpool(partial_func) + tasks = future["tasks"] for task in tasks: if task["taskArn"] == task_arn: @@ -310,14 +328,7 @@ async def get_status(self, task_arn: str) -> Tuple[str, int]: return ("TASK_NOT_FOUND", -1) async def _poll_task(self, task_arn: str) -> None: - """Poll an ECS task until completion. - - Args: - task_arn: ARN used to identify an ECS task. - - Returns: - None - """ + """Poll an ECS task until completion.""" self._debug_log(f"Polling task with arn {task_arn}...") status, exit_code = await self.get_status(task_arn) @@ -336,11 +347,13 @@ async def _get_log_events(self, task_arn, task_metadata: Dict): node_id = task_metadata["node_id"] task_id = task_arn.split("/")[-1] - events = logs.get_log_events( + partial_func = partial( + logs.get_log_events, logGroupName=self.log_group_name, logStreamName=f"covalent-fargate/covalent-task-{dispatch_id}-{node_id}/{task_id}", - )["events"] - + ) + future = await _execute_partial_in_threadpool(partial_func) + events = future["events"] return "".join(event["message"] + "\n" for event in events) async def query_result(self, task_metadata: Dict) -> Tuple[Any, str, str]: @@ -374,10 +387,9 @@ async def cancel(self, task_arn: str, reason: str = "None") -> None: Args: task_arn: ARN used to identify an ECS task. reason: An optional string used to specify a cancellation reason. - - Returns: - None """ - ecs = boto3.Session(**self.boto_session_options()).client("ecs") - ecs.stop_task(cluster=self.ecs_cluster_name, task=task_arn, reason=reason) + partial_func = partial( + ecs.stop_task, cluster=self.ecs_cluster_name, task=task_arn, reason=reason + ) + await _execute_partial_in_threadpool(partial_func) diff --git a/covalent_ecs_plugin/utils.py b/covalent_ecs_plugin/utils.py new file mode 100644 index 0000000..c933398 --- /dev/null +++ b/covalent_ecs_plugin/utils.py @@ -0,0 +1,28 @@ +# Copyright 2021 Agnostiq Inc. +# +# This file is part of Covalent. +# +# Licensed under the GNU Affero General Public License 3.0 (the "License"). +# A copy of the License may be obtained with this software package or at +# +# https://www.gnu.org/licenses/agpl-3.0.en.html +# +# Use of this file is prohibited except in compliance with the License. Any +# modifications or derivative works of this file must retain this copyright +# notice, and modified files must contain a notice indicating that they have +# been altered from the originals. +# +# Covalent is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the License for more details. +# +# Relief from the License may be granted by purchasing a commercial license. + +"""Helper methods for ECS executor plugin.""" + +import asyncio + + +async def _execute_partial_in_threadpool(partial_func): + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, partial_func) diff --git a/tests/__init__.py b/tests/__init__.py index e69de29..523f776 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -0,0 +1,19 @@ +# Copyright 2021 Agnostiq Inc. +# +# This file is part of Covalent. +# +# Licensed under the GNU Affero General Public License 3.0 (the "License"). +# A copy of the License may be obtained with this software package or at +# +# https://www.gnu.org/licenses/agpl-3.0.en.html +# +# Use of this file is prohibited except in compliance with the License. Any +# modifications or derivative works of this file must retain this copyright +# notice, and modified files must contain a notice indicating that they have +# been altered from the originals. +# +# Covalent is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the License for more details. +# +# Relief from the License may be granted by purchasing a commercial license. diff --git a/tests/functional_tests/__init__.py b/tests/functional_tests/__init__.py index e69de29..523f776 100644 --- a/tests/functional_tests/__init__.py +++ b/tests/functional_tests/__init__.py @@ -0,0 +1,19 @@ +# Copyright 2021 Agnostiq Inc. +# +# This file is part of Covalent. +# +# Licensed under the GNU Affero General Public License 3.0 (the "License"). +# A copy of the License may be obtained with this software package or at +# +# https://www.gnu.org/licenses/agpl-3.0.en.html +# +# Use of this file is prohibited except in compliance with the License. Any +# modifications or derivative works of this file must retain this copyright +# notice, and modified files must contain a notice indicating that they have +# been altered from the originals. +# +# Covalent is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the License for more details. +# +# Relief from the License may be granted by purchasing a commercial license. diff --git a/tests/functional_tests/basic_workflow.py b/tests/functional_tests/basic_workflow.py index 6a7c107..fd14091 100644 --- a/tests/functional_tests/basic_workflow.py +++ b/tests/functional_tests/basic_workflow.py @@ -1,3 +1,24 @@ +# Copyright 2021 Agnostiq Inc. +# +# This file is part of Covalent. +# +# Licensed under the GNU Affero General Public License 3.0 (the "License"). +# A copy of the License may be obtained with this software package or at +# +# https://www.gnu.org/licenses/agpl-3.0.en.html +# +# Use of this file is prohibited except in compliance with the License. Any +# modifications or derivative works of this file must retain this copyright +# notice, and modified files must contain a notice indicating that they have +# been altered from the originals. +# +# Covalent is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the License for more details. +# +# Relief from the License may be granted by purchasing a commercial license. + + import sys import covalent as ct diff --git a/tests/functional_tests/executor_instance.py b/tests/functional_tests/executor_instance.py index f8d5ad6..27a3b20 100644 --- a/tests/functional_tests/executor_instance.py +++ b/tests/functional_tests/executor_instance.py @@ -1,3 +1,24 @@ +# Copyright 2021 Agnostiq Inc. +# +# This file is part of Covalent. +# +# Licensed under the GNU Affero General Public License 3.0 (the "License"). +# A copy of the License may be obtained with this software package or at +# +# https://www.gnu.org/licenses/agpl-3.0.en.html +# +# Use of this file is prohibited except in compliance with the License. Any +# modifications or derivative works of this file must retain this copyright +# notice, and modified files must contain a notice indicating that they have +# been altered from the originals. +# +# Covalent is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the License for more details. +# +# Relief from the License may be granted by purchasing a commercial license. + + import os import covalent as ct diff --git a/tests/functional_tests/svm_workflow.py b/tests/functional_tests/svm_workflow.py index 4de0977..8f78c17 100644 --- a/tests/functional_tests/svm_workflow.py +++ b/tests/functional_tests/svm_workflow.py @@ -1,3 +1,24 @@ +# Copyright 2021 Agnostiq Inc. +# +# This file is part of Covalent. +# +# Licensed under the GNU Affero General Public License 3.0 (the "License"). +# A copy of the License may be obtained with this software package or at +# +# https://www.gnu.org/licenses/agpl-3.0.en.html +# +# Use of this file is prohibited except in compliance with the License. Any +# modifications or derivative works of this file must retain this copyright +# notice, and modified files must contain a notice indicating that they have +# been altered from the originals. +# +# Covalent is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the License for more details. +# +# Relief from the License may be granted by purchasing a commercial license. + + import sys import covalent as ct diff --git a/tests/functional_tests/terraform_output.py b/tests/functional_tests/terraform_output.py index ff2d245..d2cdca1 100644 --- a/tests/functional_tests/terraform_output.py +++ b/tests/functional_tests/terraform_output.py @@ -1,3 +1,24 @@ +# Copyright 2021 Agnostiq Inc. +# +# This file is part of Covalent. +# +# Licensed under the GNU Affero General Public License 3.0 (the "License"). +# A copy of the License may be obtained with this software package or at +# +# https://www.gnu.org/licenses/agpl-3.0.en.html +# +# Use of this file is prohibited except in compliance with the License. Any +# modifications or derivative works of this file must retain this copyright +# notice, and modified files must contain a notice indicating that they have +# been altered from the originals. +# +# Covalent is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the License for more details. +# +# Relief from the License may be granted by purchasing a commercial license. + + import json import os import subprocess diff --git a/tests/test_ecs.py b/tests/test_ecs.py index 5040284..1c028e9 100644 --- a/tests/test_ecs.py +++ b/tests/test_ecs.py @@ -21,11 +21,9 @@ """Unit tests for AWS ECS executor.""" import os -import tempfile -from base64 import b64encode from pathlib import Path from unittest import mock -from unittest.mock import AsyncMock, MagicMock +from unittest.mock import AsyncMock import cloudpickle as pickle import pytest @@ -67,7 +65,7 @@ def MOCK_TASK_METADATA(self): def mock_executor_config(self, tmp_path): MOCK_CREDENTIALS_FILE: Path = tmp_path / "credentials" MOCK_CREDENTIALS_FILE.touch() - config = { + return { "profile": self.MOCK_PROFILE, "s3_bucket_name": self.MOCK_S3_BUCKET_NAME, "ecs_cluster_name": self.MOCK_ECS_CLUSTER_NAME, @@ -81,10 +79,9 @@ def mock_executor_config(self, tmp_path): "memory": self.MOCK_MEMORY, "poll_freq": self.MOCK_POLL_FREQ, } - return config @pytest.fixture - def mock_executor(self, mock_executor_config, mocker): + def mock_executor(self, mock_executor_config): # mocker.patch("tempfile") return ECSExecutor(**mock_executor_config) @@ -115,17 +112,46 @@ def test_init_explicit_values(self, mocker, mock_executor_config): @pytest.mark.asyncio async def test_upload_file_to_s3(self, mock_executor, mocker): - """Test method to upload file to s3.""" + """Test to upload file to s3.""" boto3_mock = mocker.patch("covalent_ecs_plugin.ecs.boto3") def some_function(): pass - await mock_executor._upload_task( - some_function, ("some_arg"), {"some": "kwarg"}, self.MOCK_TASK_METADATA + await mock_executor._upload_task_to_s3( + some_function, + self.MOCK_DISPATCH_ID, + self.MOCK_NODE_ID, + ("some_arg"), + {"some": "kwarg"}, ) boto3_mock.Session().client().upload_file.assert_called_once() + @pytest.mark.asyncio + async def test_upload_task(self, mock_executor, mocker): + """Test for method to call the upload task method.""" + + def some_function(x): + return x + + upload_to_s3_mock = mocker.patch( + "covalent_ecs_plugin.ecs.ECSExecutor._upload_task_to_s3", return_value=AsyncMock() + ) + + await mock_executor._upload_task(some_function, (1), {}, self.MOCK_TASK_METADATA) + upload_to_s3_mock.assert_called_once_with( + self.MOCK_DISPATCH_ID, self.MOCK_NODE_ID, some_function, (1), {} + ) + + @pytest.mark.asyncio + async def test_submit_task(self, mock_executor, mocker): + """Test submit task method.""" + MOCK_IDENTITY = {"Account": 1234} + boto3_mock = mocker.patch("covalent_ecs_plugin.ecs.boto3") + await mock_executor.submit_task(self.MOCK_TASK_METADATA, MOCK_IDENTITY) + boto3_mock.Session().client().register_task_definition.assert_called_once() + boto3_mock.Session().client().run_task.assert_called_once() + def test_is_valid_subnet_id(self, mock_executor): """Test the valid subnet checking method.""" assert mock_executor._is_valid_subnet_id("subnet-871545e1") is True diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..754d424 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,39 @@ +# Copyright 2021 Agnostiq Inc. +# +# This file is part of Covalent. +# +# Licensed under the GNU Affero General Public License 3.0 (the "License"). +# A copy of the License may be obtained with this software package or at +# +# https://www.gnu.org/licenses/agpl-3.0.en.html +# +# Use of this file is prohibited except in compliance with the License. Any +# modifications or derivative works of this file must retain this copyright +# notice, and modified files must contain a notice indicating that they have +# been altered from the originals. +# +# Covalent is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the License for more details. +# +# Relief from the License may be granted by purchasing a commercial license. + +"""Unit tests for AWS ECS executor utils file.""" + +from functools import partial + +import pytest + +from covalent_ecs_plugin.utils import _execute_partial_in_threadpool + + +@pytest.mark.asyncio +async def test_execute_partial_in_threadpool(): + """Test method to execute partial function in asyncio threadpool.""" + + def test_func(x): + return x + + partial_func = partial(test_func, x=1) + future = await _execute_partial_in_threadpool(partial_func) + assert future == 1