-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
f05f864
commit 9ca2151
Showing
1 changed file
with
98 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,14 +1,103 @@ | ||
def work(): | ||
import time | ||
import asyncio | ||
import time | ||
|
||
time.sleep(5) | ||
import pytest | ||
|
||
from a_sync import ProcessPoolExecutor, ThreadPoolExecutor, PruningThreadPoolExecutor | ||
|
||
from a_sync import ProcessPoolExecutor | ||
|
||
@pytest.mark.asyncio | ||
async def test_process_pool_executor_run(): | ||
"""Tests the ProcessPoolExecutor by running and submitting the work function asynchronously.""" | ||
executor = ProcessPoolExecutor(1) | ||
coro = executor.run(time.sleep, 0.1) | ||
assert asyncio.iscoroutine(coro) | ||
await coro | ||
|
||
@pytest.marks.asyncio | ||
async def test_executor(): | ||
executor = ProcessPoolExecutor(6) | ||
await executor.run(work()) | ||
await executor.submit(work()) | ||
@pytest.mark.asyncio | ||
async def test_thread_pool_executor_run(): | ||
"""Tests the ThreadPoolExecutor by running and submitting the work function asynchronously.""" | ||
executor = ThreadPoolExecutor(1) | ||
coro = executor.run(time.sleep, 0.1) | ||
assert asyncio.iscoroutine(coro) | ||
await coro | ||
|
||
@pytest.mark.asyncio | ||
async def test_pruning_thread_pool_executor_run(): | ||
"""Tests the PruningThreadPoolExecutor by running and submitting the work function asynchronously.""" | ||
executor = PruningThreadPoolExecutor(1) | ||
coro = executor.run(time.sleep, 0.1) | ||
assert asyncio.iscoroutine(coro) | ||
await coro | ||
|
||
@pytest.mark.asyncio | ||
async def test_process_pool_executor_submit(): | ||
"""Tests the ProcessPoolExecutor by submitting the work function asynchronously.""" | ||
executor = ProcessPoolExecutor(1) | ||
fut = executor.submit(time.sleep, 0.1) | ||
assert isinstance(fut, asyncio.Future) | ||
await fut | ||
|
||
@pytest.mark.asyncio | ||
async def test_thread_pool_executor_submit(): | ||
"""Tests the ThreadPoolExecutor by submitting the work function asynchronously.""" | ||
executor = ThreadPoolExecutor(1) | ||
fut = executor.submit(time.sleep, 0.1) | ||
assert isinstance(fut, asyncio.Future) | ||
await fut | ||
|
||
@pytest.mark.asyncio | ||
async def test_pruning_thread_pool_executor_submit(): | ||
"""Tests the PruningThreadPoolExecutor by submitting the work function asynchronously.""" | ||
executor = PruningThreadPoolExecutor(1) | ||
fut = executor.submit(time.sleep, 0.1) | ||
assert isinstance(fut, asyncio.Future) | ||
await fut | ||
|
||
@pytest.mark.asyncio | ||
async def test_process_pool_executor_sync_run(): | ||
"""Tests the ProcessPoolExecutor by running and submitting the work function synchronously.""" | ||
executor = ProcessPoolExecutor(0) | ||
coro = executor.run(time.sleep, 0.1) | ||
assert asyncio.iscoroutine(coro) | ||
await coro | ||
|
||
@pytest.mark.asyncio | ||
async def test_thread_pool_executor_sync_run(): | ||
"""Tests the ThreadPoolExecutor by running and submitting the work function synchronously.""" | ||
executor = ThreadPoolExecutor(0) | ||
coro = executor.run(time.sleep, 0.1) | ||
assert asyncio.iscoroutine(coro) | ||
await coro | ||
|
||
@pytest.mark.asyncio | ||
async def test_pruning_thread_pool_executor_sync_run(): | ||
"""Tests the PruningThreadPoolExecutor by running and submitting the work function synchronously.""" | ||
executor = PruningThreadPoolExecutor(0) | ||
coro = executor.run(time.sleep, 0.1) | ||
assert asyncio.iscoroutine(coro) | ||
await coro | ||
|
||
@pytest.mark.asyncio | ||
async def test_process_pool_executor_sync_submit(): | ||
"""Tests the ProcessPoolExecutor by submitting the work function synchronously.""" | ||
executor = ProcessPoolExecutor(0) | ||
fut = executor.submit(time.sleep, 0.1) | ||
assert isinstance(fut, asyncio.Future) | ||
await fut | ||
|
||
@pytest.mark.asyncio | ||
async def test_thread_pool_executor_sync_submit(): | ||
"""Tests the ThreadPoolExecutor by submitting the work function synchronously.""" | ||
executor = ThreadPoolExecutor(0) | ||
fut = executor.submit(time.sleep, 0.1) | ||
assert isinstance(fut, asyncio.Future) | ||
await fut | ||
|
||
@pytest.mark.asyncio | ||
async def test_pruning_thread_pool_executor_sync_submit(): | ||
"""Tests the PruningThreadPoolExecutor by submitting the work function synchronously.""" | ||
executor = PruningThreadPoolExecutor(0) | ||
fut = executor.submit(time.sleep, 0.1) | ||
assert isinstance(fut, asyncio.Future) | ||
await fut |