diff --git a/arroyo/processing/strategies/run_task_with_multiprocessing.py b/arroyo/processing/strategies/run_task_with_multiprocessing.py index e72be6ef..a887c1a9 100644 --- a/arroyo/processing/strategies/run_task_with_multiprocessing.py +++ b/arroyo/processing/strategies/run_task_with_multiprocessing.py @@ -37,8 +37,8 @@ TResult = TypeVar("TResult") TBatchValue = TypeVar("TBatchValue") -DEFAULT_INPUT_BLOCK_SIZE = 16 * 1024 * 1024 -DEFAULT_OUTPUT_BLOCK_SIZE = 16 * 1024 * 1024 +DEFAULT_INPUT_BLOCK_SIZE = 25 * 1024 * 1024 +DEFAULT_OUTPUT_BLOCK_SIZE = 25 * 1024 * 1024 LOG_THRESHOLD_TIME = 20 # In seconds diff --git a/tests/processing/strategies/test_run_task_with_multiprocessing.py b/tests/processing/strategies/test_run_task_with_multiprocessing.py index df10c16c..a72c7a87 100644 --- a/tests/processing/strategies/test_run_task_with_multiprocessing.py +++ b/tests/processing/strategies/test_run_task_with_multiprocessing.py @@ -11,6 +11,7 @@ from arroyo.dlq import InvalidMessage from arroyo.processing.strategies import MessageRejected from arroyo.processing.strategies.run_task_with_multiprocessing import ( + DEFAULT_INPUT_BLOCK_SIZE, MessageBatch, MultiprocessingPool, RunTaskWithMultiprocessing, @@ -552,7 +553,7 @@ def test_input_block_resizing_max_size() -> None: pool=pool, input_block_size=None, output_block_size=INPUT_SIZE // 2, - max_input_block_size=16000, + max_input_block_size=DEFAULT_INPUT_BLOCK_SIZE, ) with pytest.raises(MessageRejected): @@ -621,7 +622,7 @@ def test_output_block_resizing_max_size() -> None: pool=pool, input_block_size=INPUT_SIZE, output_block_size=None, - max_output_block_size=16000, + max_output_block_size=DEFAULT_INPUT_BLOCK_SIZE, ) for _ in range(NUM_MESSAGES):