diff --git a/CHANGES.rst b/CHANGES.rst index 654cc5e..cc973f3 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -2,6 +2,13 @@ Changelog ~~~~~~~~~ +Version 0.31.0 +-------------- + +Enhancements: + +- Added `always_flush` argument to `dispatch()` (issue #145) + Version 0.30.0 -------------- diff --git a/src/argh/dispatching.py b/src/argh/dispatching.py index fb4b085..a983f7b 100644 --- a/src/argh/dispatching.py +++ b/src/argh/dispatching.py @@ -79,6 +79,7 @@ def dispatch( raw_output: bool = False, namespace: Optional[argparse.Namespace] = None, skip_unknown_args: bool = False, + always_flush: bool = False, ) -> Optional[str]: """ Parses given list of arguments using given parser, calls the relevant @@ -142,6 +143,19 @@ def dispatch( that support for combined default and nested functions may be broken if a different type of object is forced. + :param always_flush: + + If the output stream is not a terminal (i.e. redirected to a file or + another process), it's probably buffered. In most cases it doesn't + matter. + + However, if the output of your program is generated with delays + between the lines and you may want to redirect them to another process + and immediately see the results (e.g. `my_app.py | grep something`), + it's a good idea to force flushing of the buffer. + + .. versionadded:: 0.31 + By default the exceptions are not wrapped and will propagate. The only exception that is always wrapped is :class:`~argh.exceptions.CommandError` which is interpreted as an expected event so the traceback is hidden. @@ -182,6 +196,7 @@ def dispatch( output_file=output_file, errors_file=errors_file, raw_output=raw_output, + always_flush=always_flush, ) @@ -225,20 +240,29 @@ def run_endpoint_function( output_file: IO = sys.stdout, errors_file: IO = sys.stderr, raw_output: bool = False, + always_flush: bool = False, ) -> Optional[str]: """ .. versionadded:: 0.30 Extracts arguments from the namespace object, calls the endpoint function and processes its output. + + :param always_flush: + + Flush the buffer after every line even if `output_file` is not a TTY. + Turn this off if you don't need dynamic output) """ lines = _execute_command(function, namespace_obj, errors_file) - return _process_command_output(lines, output_file, raw_output) + return _process_command_output(lines, output_file, raw_output, always_flush) def _process_command_output( - lines: Iterator[str], output_file: Optional[IO], raw_output: bool + lines: Iterator[str], + output_file: Optional[IO], + raw_output: bool, + always_flush: bool, ) -> Optional[str]: out_io: IO @@ -260,6 +284,14 @@ def _process_command_output( # in most cases user wants one message per line out_io.write("\n") + # If it's not a terminal (i.e. redirected to a file or another + # process), it's probably buffered. In most cases it doesn't matter + # but if the output is generated with delays between the lines and we + # may want to monitor it (e.g. `my_app.py | grep something`), it's a + # good idea to force flushing. + if always_flush: + out_io.flush() + if output_file is None: # user wanted a string; return contents of our temporary file-like obj out_io.seek(0) diff --git a/tests/test_dispatching.py b/tests/test_dispatching.py index 987b8c8..38fbd1c 100644 --- a/tests/test_dispatching.py +++ b/tests/test_dispatching.py @@ -54,6 +54,23 @@ def func(): mock_dispatch.assert_called_with(mock_parser) +@pytest.mark.parametrize("always_flush", [True, False]) +def test_run_endpoint_function__always_flush(always_flush): + def func(): + return ["first line", "second line"] + + out_io = Mock(spec=io.StringIO) + + argh.dispatching.run_endpoint_function( + func, argparse.Namespace(), output_file=out_io, always_flush=always_flush + ) + + if always_flush: + assert out_io.flush.call_count == 2 + else: + out_io.flush.assert_not_called() + + @patch("argh.dispatching.parse_and_resolve") @patch("argh.dispatching.run_endpoint_function") def test_dispatch_command_two_stage(mock_run_endpoint_function, mock_parse_and_resolve): @@ -68,6 +85,7 @@ def func() -> str: mock_errors_file = Mock(io.TextIOBase) raw_output = False skip_unknown_args = False + always_flush = True mock_endpoint_function = Mock() mock_namespace = Mock(argparse.Namespace) mock_namespace_obj = Mock(argparse.Namespace) @@ -83,6 +101,7 @@ def func() -> str: output_file=mock_output_file, errors_file=mock_errors_file, raw_output=raw_output, + always_flush=always_flush, ) mock_parse_and_resolve.assert_called_with( @@ -98,6 +117,7 @@ def func() -> str: output_file=mock_output_file, errors_file=mock_errors_file, raw_output=raw_output, + always_flush=always_flush, ) assert retval == "run_endpoint_function retval"